2015-07-30 02:39:30 -04:00
|
|
|
from datetime import timedelta
|
2015-07-15 13:12:32 -04:00
|
|
|
from dateutil.parser import parse
|
2015-09-09 13:37:59 -04:00
|
|
|
from flask import flash
|
|
|
|
from flask.ext.appbuilder import Model
|
|
|
|
from flask.ext.appbuilder.models.mixins import AuditMixin
|
|
|
|
from pandas import read_sql_query
|
2015-08-05 20:36:33 -04:00
|
|
|
from pydruid import client
|
|
|
|
from pydruid.utils.filters import Dimension, Filter
|
2015-09-09 13:37:59 -04:00
|
|
|
from sqlalchemy import (
|
|
|
|
Column, Integer, String, ForeignKey, Text, Boolean, DateTime)
|
|
|
|
from sqlalchemy import Table as sqlaTable
|
2015-09-13 22:07:54 -04:00
|
|
|
from sqlalchemy import create_engine, MetaData, desc, select, and_, Table
|
2015-09-09 13:37:59 -04:00
|
|
|
from sqlalchemy.orm import relationship
|
2015-09-09 15:48:20 -04:00
|
|
|
from sqlalchemy.sql import table, literal_column, text
|
2015-09-17 21:06:03 -04:00
|
|
|
from flask import request
|
2015-07-15 13:12:32 -04:00
|
|
|
|
2015-08-06 03:00:17 -04:00
|
|
|
from copy import deepcopy, copy
|
2015-08-07 19:25:19 -04:00
|
|
|
from collections import namedtuple
|
|
|
|
from datetime import datetime
|
2015-07-30 02:39:30 -04:00
|
|
|
import json
|
2015-08-12 13:28:50 -04:00
|
|
|
import sqlparse
|
2015-07-30 02:39:30 -04:00
|
|
|
import requests
|
2015-08-07 19:25:19 -04:00
|
|
|
import textwrap
|
2015-07-15 13:12:32 -04:00
|
|
|
|
2015-09-17 21:06:03 -04:00
|
|
|
from panoramix import db, get_session, config, utils
|
|
|
|
from panoramix.viz import viz_types
|
2015-08-03 11:34:58 -04:00
|
|
|
|
2015-08-07 19:25:19 -04:00
|
|
|
QueryResult = namedtuple('namedtuple', ['df', 'query', 'duration'])
|
|
|
|
|
|
|
|
|
2015-09-11 18:32:42 -04:00
|
|
|
class Slice(Model, AuditMixin):
|
|
|
|
"""A slice is essentially a report or a view on data"""
|
|
|
|
__tablename__ = 'slices'
|
|
|
|
id = Column(Integer, primary_key=True)
|
2015-09-12 00:49:04 -04:00
|
|
|
slice_name = Column(String(250))
|
2015-09-15 15:33:26 -04:00
|
|
|
druid_datasource_id = Column(Integer, ForeignKey('datasources.id'))
|
2015-09-13 22:07:54 -04:00
|
|
|
table_id = Column(Integer, ForeignKey('tables.id'))
|
2015-09-12 00:49:04 -04:00
|
|
|
datasource_type = Column(String(200))
|
|
|
|
datasource_name = Column(String(2000))
|
2015-09-11 18:32:42 -04:00
|
|
|
viz_type = Column(String(250))
|
2015-09-12 00:49:04 -04:00
|
|
|
params = Column(Text)
|
|
|
|
|
2015-09-15 15:33:26 -04:00
|
|
|
table = relationship(
|
|
|
|
'Table', foreign_keys=[table_id], backref='slices')
|
|
|
|
druid_datasource = relationship(
|
|
|
|
'Datasource', foreign_keys=[druid_datasource_id], backref='slices')
|
2015-09-13 22:07:54 -04:00
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.slice_name
|
|
|
|
|
|
|
|
@property
|
|
|
|
def datasource(self):
|
|
|
|
return self.table or self.druid_datasource
|
|
|
|
|
2015-09-17 21:06:03 -04:00
|
|
|
@property
|
|
|
|
@utils.memoized
|
|
|
|
def viz(self):
|
|
|
|
d = json.loads(self.params)
|
|
|
|
viz = viz_types[self.viz_type](
|
|
|
|
self.datasource,
|
|
|
|
form_data=d)
|
|
|
|
return viz
|
|
|
|
|
2015-09-15 15:33:26 -04:00
|
|
|
@property
|
|
|
|
def datasource_id(self):
|
|
|
|
datasource = self.datasource
|
|
|
|
return datasource.id if datasource else None
|
|
|
|
|
2015-09-12 00:49:04 -04:00
|
|
|
@property
|
2015-09-17 21:06:03 -04:00
|
|
|
def slice_url(self):
|
2015-09-12 00:49:04 -04:00
|
|
|
d = json.loads(self.params)
|
2015-09-17 21:06:03 -04:00
|
|
|
from werkzeug.urls import Href
|
|
|
|
href = Href(
|
2015-09-12 00:49:04 -04:00
|
|
|
"/panoramix/{self.datasource_type}/"
|
2015-09-17 21:06:03 -04:00
|
|
|
"{self.datasource_id}/".format(self=self))
|
|
|
|
return href(d)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def slice_link(self):
|
|
|
|
url = self.slice_url
|
2015-09-12 00:49:04 -04:00
|
|
|
return '<a href="{url}">{self.slice_name}</a>'.format(**locals())
|
2015-09-11 18:32:42 -04:00
|
|
|
|
2015-09-13 22:07:54 -04:00
|
|
|
@property
|
|
|
|
def js_files(self):
|
|
|
|
from panoramix.viz import viz_types
|
|
|
|
return viz_types[self.viz_type].js_files
|
|
|
|
|
|
|
|
@property
|
|
|
|
def css_files(self):
|
|
|
|
from panoramix.viz import viz_types
|
|
|
|
return viz_types[self.viz_type].css_files
|
|
|
|
|
2015-09-15 15:33:26 -04:00
|
|
|
def get_viz(self):
|
|
|
|
pass
|
|
|
|
|
2015-09-13 22:07:54 -04:00
|
|
|
|
|
|
|
dashboard_slices = Table('dashboard_slices', Model.metadata,
|
|
|
|
Column('id', Integer, primary_key=True),
|
|
|
|
Column('dashboard_id', Integer, ForeignKey('dashboards.id')),
|
|
|
|
Column('slice_id', Integer, ForeignKey('slices.id')),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class Dashboard(Model, AuditMixin):
|
|
|
|
"""A dash to slash"""
|
|
|
|
__tablename__ = 'dashboards'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
dashboard_title = Column(String(500))
|
2015-09-17 21:06:03 -04:00
|
|
|
position_json = Column(Text)
|
2015-09-13 22:07:54 -04:00
|
|
|
slices = relationship(
|
|
|
|
'Slice', secondary=dashboard_slices, backref='dashboards')
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.dashboard_title
|
|
|
|
|
|
|
|
def dashboard_link(self):
|
|
|
|
url = "/panoramix/dashboard/{}/".format(self.id)
|
|
|
|
return '<a href="{url}">{self.dashboard_title}</a>'.format(**locals())
|
|
|
|
|
|
|
|
@property
|
|
|
|
def js_files(self):
|
|
|
|
l = []
|
|
|
|
for o in self.slices:
|
2015-09-14 11:04:32 -04:00
|
|
|
l += [f for f in o.js_files if f not in l]
|
|
|
|
return l
|
2015-09-13 22:07:54 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def css_files(self):
|
|
|
|
l = []
|
|
|
|
for o in self.slices:
|
|
|
|
l += o.css_files
|
|
|
|
return list(set(l))
|
|
|
|
|
2015-09-11 18:32:42 -04:00
|
|
|
|
2015-08-03 11:34:58 -04:00
|
|
|
class Queryable(object):
|
|
|
|
@property
|
|
|
|
def column_names(self):
|
|
|
|
return sorted([c.column_name for c in self.columns])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def groupby_column_names(self):
|
|
|
|
return sorted([c.column_name for c in self.columns if c.groupby])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def filterable_column_names(self):
|
|
|
|
return sorted([c.column_name for c in self.columns if c.filterable])
|
|
|
|
|
2015-09-09 13:37:59 -04:00
|
|
|
|
2015-08-03 11:34:58 -04:00
|
|
|
class Database(Model, AuditMixin):
|
2015-08-06 17:44:25 -04:00
|
|
|
__tablename__ = 'dbs'
|
2015-08-03 11:34:58 -04:00
|
|
|
id = Column(Integer, primary_key=True)
|
2015-09-11 18:32:42 -04:00
|
|
|
database_name = Column(String(250), unique=True)
|
2015-08-03 11:34:58 -04:00
|
|
|
sqlalchemy_uri = Column(String(1024))
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.database_name
|
|
|
|
|
2015-08-05 02:41:00 -04:00
|
|
|
def get_sqla_engine(self):
|
|
|
|
return create_engine(self.sqlalchemy_uri)
|
|
|
|
|
2015-08-06 01:42:42 -04:00
|
|
|
def get_table(self, table_name):
|
2015-08-05 02:41:00 -04:00
|
|
|
meta = MetaData()
|
|
|
|
return sqlaTable(
|
2015-08-06 01:42:42 -04:00
|
|
|
table_name, meta,
|
2015-08-05 02:41:00 -04:00
|
|
|
autoload=True,
|
|
|
|
autoload_with=self.get_sqla_engine())
|
|
|
|
|
2015-08-03 11:34:58 -04:00
|
|
|
|
2015-08-06 17:49:18 -04:00
|
|
|
class Table(Model, Queryable, AuditMixin):
|
2015-09-12 00:49:04 -04:00
|
|
|
type = "table"
|
|
|
|
|
2015-08-03 11:34:58 -04:00
|
|
|
__tablename__ = 'tables'
|
|
|
|
id = Column(Integer, primary_key=True)
|
2015-08-11 00:12:21 -04:00
|
|
|
table_name = Column(String(255), unique=True)
|
2015-08-08 11:03:36 -04:00
|
|
|
main_datetime_column_id = Column(Integer, ForeignKey('table_columns.id'))
|
|
|
|
main_datetime_column = relationship(
|
|
|
|
'TableColumn', foreign_keys=[main_datetime_column_id])
|
2015-08-03 11:34:58 -04:00
|
|
|
default_endpoint = Column(Text)
|
2015-08-23 17:57:08 -04:00
|
|
|
database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False)
|
2015-08-03 11:34:58 -04:00
|
|
|
database = relationship(
|
|
|
|
'Database', backref='tables', foreign_keys=[database_id])
|
|
|
|
|
2015-08-08 11:03:36 -04:00
|
|
|
baselink = "tableview"
|
|
|
|
|
2015-09-13 22:07:54 -04:00
|
|
|
def __repr__(self):
|
|
|
|
return self.table_name
|
|
|
|
|
2015-08-05 02:41:00 -04:00
|
|
|
@property
|
|
|
|
def name(self):
|
|
|
|
return self.table_name
|
|
|
|
|
2015-08-03 11:34:58 -04:00
|
|
|
@property
|
|
|
|
def table_link(self):
|
|
|
|
url = "/panoramix/table/{}/".format(self.id)
|
|
|
|
return '<a href="{url}">{self.table_name}</a>'.format(**locals())
|
|
|
|
|
|
|
|
@property
|
|
|
|
def metrics_combo(self):
|
|
|
|
return sorted(
|
|
|
|
[
|
2015-08-05 02:41:00 -04:00
|
|
|
(m.metric_name, m.verbose_name)
|
|
|
|
for m in self.metrics],
|
2015-08-03 11:34:58 -04:00
|
|
|
key=lambda x: x[1])
|
|
|
|
|
2015-08-12 13:28:50 -04:00
|
|
|
def query_bkp(
|
2015-08-05 02:41:00 -04:00
|
|
|
self, groupby, metrics,
|
|
|
|
granularity,
|
|
|
|
from_dttm, to_dttm,
|
|
|
|
limit_spec=None,
|
|
|
|
filter=None,
|
|
|
|
is_timeseries=True,
|
2015-09-09 15:48:20 -04:00
|
|
|
timeseries_limit=15,
|
|
|
|
row_limit=None,
|
|
|
|
extras=None):
|
2015-08-12 13:28:50 -04:00
|
|
|
"""
|
|
|
|
Unused, legacy way of querying by building a SQL string without
|
|
|
|
using the sqlalchemy expression API (new approach which supports
|
|
|
|
all dialects)
|
|
|
|
"""
|
2015-08-05 02:41:00 -04:00
|
|
|
from pandas import read_sql_query
|
2015-08-07 19:25:19 -04:00
|
|
|
qry_start_dttm = datetime.now()
|
2015-08-05 02:41:00 -04:00
|
|
|
metrics_exprs = [
|
|
|
|
"{} AS {}".format(m.expression, m.metric_name)
|
|
|
|
for m in self.metrics if m.metric_name in metrics]
|
|
|
|
from_dttm_iso = from_dttm.isoformat()
|
|
|
|
to_dttm_iso = to_dttm.isoformat()
|
|
|
|
|
2015-08-06 03:00:17 -04:00
|
|
|
if metrics:
|
2015-09-09 13:37:59 -04:00
|
|
|
main_metric_expr = [
|
|
|
|
m.expression for m in self.metrics
|
|
|
|
if m.metric_name == metrics[0]][0]
|
2015-08-06 03:00:17 -04:00
|
|
|
else:
|
|
|
|
main_metric_expr = "COUNT(*)"
|
|
|
|
|
2015-08-05 02:41:00 -04:00
|
|
|
select_exprs = []
|
|
|
|
groupby_exprs = []
|
|
|
|
|
|
|
|
if groupby:
|
2015-08-06 03:00:17 -04:00
|
|
|
select_exprs = copy(groupby)
|
2015-08-05 02:41:00 -04:00
|
|
|
groupby_exprs = [s for s in groupby]
|
2015-08-06 03:00:17 -04:00
|
|
|
inner_groupby_exprs = [s for s in groupby]
|
2015-08-05 02:41:00 -04:00
|
|
|
select_exprs += metrics_exprs
|
|
|
|
if granularity != "all":
|
|
|
|
select_exprs += ['ds as timestamp']
|
|
|
|
groupby_exprs += ['ds']
|
|
|
|
|
|
|
|
select_exprs = ",\n".join(select_exprs)
|
|
|
|
groupby_exprs = ",\n".join(groupby_exprs)
|
|
|
|
|
|
|
|
where_clause = [
|
|
|
|
"ds >= '{from_dttm_iso}'",
|
|
|
|
"ds < '{to_dttm_iso}'"
|
|
|
|
]
|
2015-08-06 01:42:42 -04:00
|
|
|
for col, op, eq in filter:
|
|
|
|
if op in ('in', 'not in'):
|
|
|
|
l = ["'{}'".format(s) for s in eq.split(",")]
|
|
|
|
l = ", ".join(l)
|
|
|
|
op = op.upper()
|
|
|
|
where_clause.append(
|
|
|
|
"{col} {op} ({l})".format(**locals())
|
|
|
|
)
|
2015-08-05 02:41:00 -04:00
|
|
|
where_clause = " AND\n".join(where_clause).format(**locals())
|
2015-08-06 03:00:17 -04:00
|
|
|
on_clause = " AND ".join(["{g} = __{g}".format(g=g) for g in groupby])
|
|
|
|
limiting_join = ""
|
|
|
|
if timeseries_limit and groupby:
|
2015-09-09 13:37:59 -04:00
|
|
|
inner_select = ", ".join([
|
|
|
|
"{g} as __{g}".format(g=g) for g in inner_groupby_exprs])
|
2015-08-06 03:00:17 -04:00
|
|
|
inner_groupby_exprs = ", ".join(inner_groupby_exprs)
|
2015-08-07 19:25:19 -04:00
|
|
|
limiting_join = (
|
2015-09-09 13:37:59 -04:00
|
|
|
"JOIN ( \n"
|
|
|
|
" SELECT {inner_select} \n"
|
|
|
|
" FROM {self.table_name} \n"
|
|
|
|
" WHERE \n"
|
|
|
|
" {where_clause}\n"
|
|
|
|
" GROUP BY {inner_groupby_exprs}\n"
|
|
|
|
" ORDER BY {main_metric_expr} DESC\n"
|
|
|
|
" LIMIT {timeseries_limit}\n"
|
|
|
|
") z ON {on_clause}\n"
|
2015-08-07 19:25:19 -04:00
|
|
|
).format(**locals())
|
|
|
|
|
|
|
|
sql = (
|
2015-09-09 13:37:59 -04:00
|
|
|
"SELECT\n"
|
|
|
|
" {select_exprs}\n"
|
|
|
|
"FROM {self.table_name}\n"
|
|
|
|
"{limiting_join}"
|
|
|
|
"WHERE\n"
|
|
|
|
" {where_clause}\n"
|
|
|
|
"GROUP BY\n"
|
|
|
|
" {groupby_exprs}\n"
|
2015-08-07 19:25:19 -04:00
|
|
|
).format(**locals())
|
2015-08-05 02:41:00 -04:00
|
|
|
df = read_sql_query(
|
|
|
|
sql=sql,
|
|
|
|
con=self.database.get_sqla_engine()
|
|
|
|
)
|
2015-08-07 19:25:19 -04:00
|
|
|
textwrap.dedent(sql)
|
|
|
|
|
|
|
|
return QueryResult(
|
|
|
|
df=df, duration=datetime.now() - qry_start_dttm, query=sql)
|
2015-08-05 02:41:00 -04:00
|
|
|
|
2015-08-12 13:28:50 -04:00
|
|
|
def query(
|
|
|
|
self, groupby, metrics,
|
|
|
|
granularity,
|
|
|
|
from_dttm, to_dttm,
|
|
|
|
limit_spec=None,
|
|
|
|
filter=None,
|
|
|
|
is_timeseries=True,
|
2015-09-09 15:48:20 -04:00
|
|
|
timeseries_limit=15, row_limit=None,
|
|
|
|
extras=None):
|
2015-08-12 13:28:50 -04:00
|
|
|
|
|
|
|
qry_start_dttm = datetime.now()
|
|
|
|
timestamp = literal_column(
|
|
|
|
self.main_datetime_column.column_name).label('timestamp')
|
|
|
|
metrics_exprs = [
|
|
|
|
literal_column(m.expression).label(m.metric_name)
|
|
|
|
for m in self.metrics if m.metric_name in metrics]
|
|
|
|
|
|
|
|
if metrics:
|
2015-09-09 13:37:59 -04:00
|
|
|
main_metric_expr = literal_column([
|
|
|
|
m.expression for m in self.metrics
|
|
|
|
if m.metric_name == metrics[0]][0])
|
2015-08-12 13:28:50 -04:00
|
|
|
else:
|
|
|
|
main_metric_expr = literal_column("COUNT(*)")
|
|
|
|
|
|
|
|
select_exprs = []
|
|
|
|
groupby_exprs = []
|
|
|
|
|
|
|
|
if groupby:
|
|
|
|
select_exprs = [literal_column(s) for s in groupby]
|
|
|
|
groupby_exprs = [literal_column(s) for s in groupby]
|
2015-09-09 13:37:59 -04:00
|
|
|
inner_groupby_exprs = [
|
|
|
|
literal_column(s).label('__' + s) for s in groupby]
|
2015-08-12 13:28:50 -04:00
|
|
|
if granularity != "all":
|
|
|
|
select_exprs += [timestamp]
|
|
|
|
groupby_exprs += [timestamp]
|
|
|
|
|
2015-08-13 00:22:02 -04:00
|
|
|
select_exprs += metrics_exprs
|
2015-08-12 13:28:50 -04:00
|
|
|
qry = select(select_exprs)
|
|
|
|
from_clause = table(self.table_name)
|
|
|
|
qry = qry.group_by(*groupby_exprs)
|
|
|
|
|
|
|
|
where_clause_and = [
|
|
|
|
timestamp >= from_dttm.isoformat(),
|
|
|
|
timestamp < to_dttm.isoformat(),
|
|
|
|
]
|
|
|
|
for col, op, eq in filter:
|
|
|
|
if op in ('in', 'not in'):
|
|
|
|
values = eq.split(",")
|
|
|
|
cond = literal_column(col).in_(values)
|
|
|
|
if op == 'not in':
|
|
|
|
cond = ~cond
|
|
|
|
where_clause_and.append(cond)
|
2015-09-09 15:48:20 -04:00
|
|
|
if extras and 'where' in extras:
|
|
|
|
where_clause_and += [text(extras['where'])]
|
2015-08-12 13:28:50 -04:00
|
|
|
qry = qry.where(and_(*where_clause_and))
|
2015-08-13 00:22:02 -04:00
|
|
|
qry = qry.order_by(desc(main_metric_expr))
|
2015-08-12 13:28:50 -04:00
|
|
|
qry = qry.limit(row_limit)
|
|
|
|
|
|
|
|
if timeseries_limit and groupby:
|
|
|
|
subq = select(inner_groupby_exprs)
|
|
|
|
subq = subq.select_from(table(self.table_name))
|
|
|
|
subq = subq.where(and_(*where_clause_and))
|
|
|
|
subq = subq.group_by(*inner_groupby_exprs)
|
2015-08-13 00:22:02 -04:00
|
|
|
subq = subq.order_by(desc(main_metric_expr))
|
2015-08-12 13:28:50 -04:00
|
|
|
subq = subq.limit(timeseries_limit)
|
|
|
|
on_clause = []
|
|
|
|
for gb in groupby:
|
2015-09-09 13:37:59 -04:00
|
|
|
on_clause.append(
|
|
|
|
literal_column(gb) == literal_column("__" + gb))
|
2015-08-12 13:28:50 -04:00
|
|
|
|
|
|
|
from_clause = from_clause.join(subq.alias(), and_(*on_clause))
|
|
|
|
|
|
|
|
qry = qry.select_from(from_clause)
|
|
|
|
|
|
|
|
engine = self.database.get_sqla_engine()
|
|
|
|
sql = str(qry.compile(engine, compile_kwargs={"literal_binds": True}))
|
|
|
|
df = read_sql_query(
|
|
|
|
sql=sql,
|
|
|
|
con=engine
|
|
|
|
)
|
|
|
|
sql = sqlparse.format(sql, reindent=True)
|
|
|
|
return QueryResult(
|
|
|
|
df=df, duration=datetime.now() - qry_start_dttm, query=sql)
|
|
|
|
|
2015-08-03 11:34:58 -04:00
|
|
|
def fetch_metadata(self):
|
2015-08-23 17:57:08 -04:00
|
|
|
try:
|
|
|
|
table = self.database.get_table(self.table_name)
|
|
|
|
except Exception as e:
|
2015-09-03 18:23:44 -04:00
|
|
|
flash(str(e))
|
2015-08-23 17:57:08 -04:00
|
|
|
flash(
|
2015-09-03 18:23:44 -04:00
|
|
|
"Table doesn't seem to exist in the specified database, "
|
2015-08-23 17:57:08 -04:00
|
|
|
"couldn't fetch column information", "danger")
|
|
|
|
return
|
|
|
|
|
2015-08-03 11:34:58 -04:00
|
|
|
TC = TableColumn
|
2015-08-23 17:57:08 -04:00
|
|
|
M = SqlMetric
|
|
|
|
metrics = []
|
|
|
|
any_date_col = None
|
2015-08-03 11:34:58 -04:00
|
|
|
for col in table.columns:
|
2015-09-03 18:23:44 -04:00
|
|
|
try:
|
|
|
|
datatype = str(col.type)
|
|
|
|
except Exception as e:
|
|
|
|
datatype = "UNKNOWN"
|
2015-08-03 11:34:58 -04:00
|
|
|
dbcol = (
|
|
|
|
db.session
|
|
|
|
.query(TC)
|
2015-09-09 13:37:59 -04:00
|
|
|
.filter(TC.table == self)
|
|
|
|
.filter(TC.column_name == col.name)
|
2015-08-03 11:34:58 -04:00
|
|
|
.first()
|
|
|
|
)
|
|
|
|
db.session.flush()
|
|
|
|
if not dbcol:
|
|
|
|
dbcol = TableColumn(column_name=col.name)
|
2015-09-03 18:23:44 -04:00
|
|
|
|
2015-08-23 17:57:08 -04:00
|
|
|
if (
|
2015-09-03 18:23:44 -04:00
|
|
|
str(datatype).startswith('VARCHAR') or
|
|
|
|
str(datatype).startswith('STRING')):
|
2015-08-03 11:34:58 -04:00
|
|
|
dbcol.groupby = True
|
|
|
|
dbcol.filterable = True
|
2015-08-23 17:57:08 -04:00
|
|
|
db.session.merge(self)
|
|
|
|
self.columns.append(dbcol)
|
|
|
|
|
2015-09-03 18:23:44 -04:00
|
|
|
if not any_date_col and 'date' in datatype.lower():
|
2015-08-23 17:57:08 -04:00
|
|
|
any_date_col = dbcol
|
|
|
|
|
|
|
|
if dbcol.sum:
|
|
|
|
metrics.append(M(
|
|
|
|
metric_name='sum__' + dbcol.column_name,
|
|
|
|
verbose_name='sum__' + dbcol.column_name,
|
|
|
|
metric_type='sum',
|
|
|
|
expression="SUM({})".format(dbcol.column_name)
|
|
|
|
))
|
|
|
|
if dbcol.max:
|
|
|
|
metrics.append(M(
|
|
|
|
metric_name='max__' + dbcol.column_name,
|
|
|
|
verbose_name='max__' + dbcol.column_name,
|
|
|
|
metric_type='max',
|
|
|
|
expression="MAX({})".format(dbcol.column_name)
|
|
|
|
))
|
|
|
|
if dbcol.min:
|
|
|
|
metrics.append(M(
|
|
|
|
metric_name='min__' + dbcol.column_name,
|
|
|
|
verbose_name='min__' + dbcol.column_name,
|
|
|
|
metric_type='min',
|
|
|
|
expression="MIN({})".format(dbcol.column_name)
|
|
|
|
))
|
|
|
|
if dbcol.count_distinct:
|
|
|
|
metrics.append(M(
|
|
|
|
metric_name='count_distinct__' + dbcol.column_name,
|
|
|
|
verbose_name='count_distinct__' + dbcol.column_name,
|
|
|
|
metric_type='count_distinct',
|
|
|
|
expression="COUNT(DISTINCT {})".format(dbcol.column_name)
|
|
|
|
))
|
2015-09-03 18:23:44 -04:00
|
|
|
dbcol.type = datatype
|
2015-08-23 17:57:08 -04:00
|
|
|
db.session.merge(self)
|
2015-08-03 11:34:58 -04:00
|
|
|
db.session.commit()
|
|
|
|
|
2015-08-23 17:57:08 -04:00
|
|
|
metrics.append(M(
|
|
|
|
metric_name='count',
|
|
|
|
verbose_name='COUNT(*)',
|
|
|
|
metric_type='count',
|
|
|
|
expression="COUNT(*)"
|
|
|
|
))
|
|
|
|
for metric in metrics:
|
|
|
|
m = (
|
|
|
|
db.session.query(M)
|
2015-09-09 13:37:59 -04:00
|
|
|
.filter(M.metric_name == metric.metric_name)
|
|
|
|
.filter(M.table == self)
|
2015-08-23 17:57:08 -04:00
|
|
|
.first()
|
|
|
|
)
|
|
|
|
metric.table = self
|
|
|
|
if not m:
|
|
|
|
db.session.add(metric)
|
|
|
|
db.session.commit()
|
|
|
|
if not self.main_datetime_column:
|
|
|
|
self.main_datetime_column = any_date_col
|
|
|
|
|
|
|
|
|
2015-08-06 17:49:18 -04:00
|
|
|
class SqlMetric(Model, AuditMixin):
|
2015-08-05 02:41:00 -04:00
|
|
|
__tablename__ = 'sql_metrics'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
metric_name = Column(String(512))
|
|
|
|
verbose_name = Column(String(1024))
|
|
|
|
metric_type = Column(String(32))
|
2015-09-09 13:37:59 -04:00
|
|
|
table_id = Column(Integer, ForeignKey('tables.id'))
|
2015-08-05 02:41:00 -04:00
|
|
|
table = relationship(
|
|
|
|
'Table', backref='metrics', foreign_keys=[table_id])
|
|
|
|
expression = Column(Text)
|
|
|
|
description = Column(Text)
|
|
|
|
|
|
|
|
|
2015-08-03 11:34:58 -04:00
|
|
|
class TableColumn(Model, AuditMixin):
|
|
|
|
__tablename__ = 'table_columns'
|
|
|
|
id = Column(Integer, primary_key=True)
|
2015-08-06 17:44:25 -04:00
|
|
|
table_id = Column(Integer, ForeignKey('tables.id'))
|
2015-08-05 02:41:00 -04:00
|
|
|
table = relationship('Table', backref='columns', foreign_keys=[table_id])
|
2015-08-03 11:34:58 -04:00
|
|
|
column_name = Column(String(256))
|
|
|
|
is_dttm = Column(Boolean, default=True)
|
|
|
|
is_active = Column(Boolean, default=True)
|
|
|
|
type = Column(String(32), default='')
|
|
|
|
groupby = Column(Boolean, default=False)
|
|
|
|
count_distinct = Column(Boolean, default=False)
|
|
|
|
sum = Column(Boolean, default=False)
|
|
|
|
max = Column(Boolean, default=False)
|
|
|
|
min = Column(Boolean, default=False)
|
|
|
|
filterable = Column(Boolean, default=False)
|
|
|
|
description = Column(Text, default='')
|
|
|
|
|
2015-08-08 11:03:36 -04:00
|
|
|
def __repr__(self):
|
|
|
|
return self.column_name
|
2015-07-29 20:33:37 -04:00
|
|
|
|
2015-09-08 17:56:27 -04:00
|
|
|
@property
|
|
|
|
def isnum(self):
|
|
|
|
return self.type in ('LONG', 'DOUBLE', 'FLOAT')
|
|
|
|
|
2015-09-09 13:37:59 -04:00
|
|
|
|
2015-07-29 20:33:37 -04:00
|
|
|
class Cluster(Model, AuditMixin):
|
|
|
|
__tablename__ = 'clusters'
|
|
|
|
id = Column(Integer, primary_key=True)
|
2015-09-11 18:32:42 -04:00
|
|
|
cluster_name = Column(String(250), unique=True)
|
2015-07-29 20:33:37 -04:00
|
|
|
coordinator_host = Column(String(256))
|
|
|
|
coordinator_port = Column(Integer)
|
|
|
|
coordinator_endpoint = Column(String(256))
|
|
|
|
broker_host = Column(String(256))
|
|
|
|
broker_port = Column(Integer)
|
|
|
|
broker_endpoint = Column(String(256))
|
|
|
|
metadata_last_refreshed = Column(DateTime)
|
|
|
|
|
2015-07-30 02:39:30 -04:00
|
|
|
def __repr__(self):
|
|
|
|
return self.cluster_name
|
|
|
|
|
|
|
|
def get_pydruid_client(self):
|
|
|
|
cli = client.PyDruid(
|
|
|
|
"http://{0}:{1}/".format(self.broker_host, self.broker_port),
|
|
|
|
self.broker_endpoint)
|
|
|
|
return cli
|
|
|
|
|
|
|
|
def refresh_datasources(self):
|
|
|
|
endpoint = (
|
|
|
|
"http://{self.coordinator_host}:{self.coordinator_port}/"
|
|
|
|
"{self.coordinator_endpoint}/datasources"
|
|
|
|
).format(self=self)
|
2015-09-02 01:15:02 -04:00
|
|
|
|
2015-07-30 02:39:30 -04:00
|
|
|
datasources = json.loads(requests.get(endpoint).text)
|
|
|
|
for datasource in datasources:
|
2015-08-05 02:41:00 -04:00
|
|
|
Datasource.sync_to_db(datasource, self)
|
|
|
|
|
2015-07-29 20:33:37 -04:00
|
|
|
|
2015-08-03 11:34:58 -04:00
|
|
|
class Datasource(Model, AuditMixin, Queryable):
|
2015-09-12 00:49:04 -04:00
|
|
|
type = "datasource"
|
2015-08-11 00:12:21 -04:00
|
|
|
|
|
|
|
baselink = "datasourcemodelview"
|
|
|
|
|
2015-07-15 13:12:32 -04:00
|
|
|
__tablename__ = 'datasources'
|
|
|
|
id = Column(Integer, primary_key=True)
|
2015-09-11 18:32:42 -04:00
|
|
|
datasource_name = Column(String(250), unique=True)
|
2015-07-15 13:12:32 -04:00
|
|
|
is_featured = Column(Boolean, default=False)
|
|
|
|
is_hidden = Column(Boolean, default=False)
|
|
|
|
description = Column(Text)
|
2015-07-15 20:38:03 -04:00
|
|
|
default_endpoint = Column(Text)
|
2015-08-06 17:44:25 -04:00
|
|
|
user_id = Column(Integer, ForeignKey('ab_user.id'))
|
2015-07-21 14:56:05 -04:00
|
|
|
owner = relationship('User', backref='datasources', foreign_keys=[user_id])
|
2015-09-09 13:37:59 -04:00
|
|
|
cluster_name = Column(
|
|
|
|
String(255), ForeignKey('clusters.cluster_name'))
|
|
|
|
cluster = relationship(
|
|
|
|
'Cluster', backref='datasources', foreign_keys=[cluster_name])
|
2015-07-15 13:12:32 -04:00
|
|
|
|
|
|
|
@property
|
2015-07-15 20:38:03 -04:00
|
|
|
def metrics_combo(self):
|
2015-07-17 02:46:00 -04:00
|
|
|
return sorted(
|
|
|
|
[(m.metric_name, m.verbose_name) for m in self.metrics],
|
|
|
|
key=lambda x: x[1])
|
2015-07-15 13:12:32 -04:00
|
|
|
|
2015-08-05 02:41:00 -04:00
|
|
|
@property
|
|
|
|
def name(self):
|
|
|
|
return self.datasource_name
|
|
|
|
|
2015-07-15 13:12:32 -04:00
|
|
|
def __repr__(self):
|
|
|
|
return self.datasource_name
|
|
|
|
|
|
|
|
@property
|
|
|
|
def datasource_link(self):
|
|
|
|
url = "/panoramix/datasource/{}/".format(self.datasource_name)
|
|
|
|
return '<a href="{url}">{self.datasource_name}</a>'.format(**locals())
|
|
|
|
|
2015-07-15 20:38:03 -04:00
|
|
|
def get_metric_obj(self, metric_name):
|
|
|
|
return [
|
|
|
|
m.json_obj for m in self.metrics
|
|
|
|
if m.metric_name == metric_name
|
|
|
|
][0]
|
|
|
|
|
2015-07-30 02:39:30 -04:00
|
|
|
def latest_metadata(self):
|
|
|
|
client = self.cluster.get_pydruid_client()
|
|
|
|
results = client.time_boundary(datasource=self.datasource_name)
|
2015-09-02 01:15:02 -04:00
|
|
|
if not results:
|
|
|
|
return
|
2015-07-16 20:55:36 -04:00
|
|
|
max_time = results[0]['result']['minTime']
|
2015-07-15 13:12:32 -04:00
|
|
|
max_time = parse(max_time)
|
|
|
|
intervals = (max_time - timedelta(seconds=1)).isoformat() + '/'
|
|
|
|
intervals += (max_time + timedelta(seconds=1)).isoformat()
|
|
|
|
segment_metadata = client.segment_metadata(
|
2015-07-30 02:39:30 -04:00
|
|
|
datasource=self.datasource_name,
|
2015-07-15 13:12:32 -04:00
|
|
|
intervals=intervals)
|
2015-07-15 20:38:03 -04:00
|
|
|
if segment_metadata:
|
|
|
|
return segment_metadata[-1]['columns']
|
|
|
|
|
|
|
|
def generate_metrics(self):
|
|
|
|
for col in self.columns:
|
|
|
|
col.generate_metrics()
|
2015-07-15 13:12:32 -04:00
|
|
|
|
|
|
|
@classmethod
|
2015-07-30 02:39:30 -04:00
|
|
|
def sync_to_db(cls, name, cluster):
|
|
|
|
session = get_session()
|
|
|
|
datasource = session.query(cls).filter_by(datasource_name=name).first()
|
2015-07-15 13:12:32 -04:00
|
|
|
if not datasource:
|
2015-07-30 02:39:30 -04:00
|
|
|
datasource = cls(datasource_name=name)
|
|
|
|
session.add(datasource)
|
|
|
|
datasource.cluster = cluster
|
|
|
|
|
|
|
|
cols = datasource.latest_metadata()
|
2015-07-15 20:38:03 -04:00
|
|
|
if not cols:
|
|
|
|
return
|
2015-07-15 13:12:32 -04:00
|
|
|
for col in cols:
|
|
|
|
col_obj = (
|
2015-07-30 02:39:30 -04:00
|
|
|
session
|
2015-07-15 13:12:32 -04:00
|
|
|
.query(Column)
|
|
|
|
.filter_by(datasource_name=name, column_name=col)
|
|
|
|
.first()
|
|
|
|
)
|
|
|
|
datatype = cols[col]['type']
|
|
|
|
if not col_obj:
|
|
|
|
col_obj = Column(datasource_name=name, column_name=col)
|
2015-07-30 02:39:30 -04:00
|
|
|
session.add(col_obj)
|
2015-07-15 13:12:32 -04:00
|
|
|
if datatype == "STRING":
|
|
|
|
col_obj.groupby = True
|
|
|
|
col_obj.filterable = True
|
|
|
|
if col_obj:
|
|
|
|
col_obj.type = cols[col]['type']
|
2015-08-01 20:08:00 -04:00
|
|
|
col_obj.datasource = datasource
|
2015-07-15 20:38:03 -04:00
|
|
|
col_obj.generate_metrics()
|
2015-08-05 20:36:33 -04:00
|
|
|
|
2015-08-05 02:41:00 -04:00
|
|
|
def query(
|
2015-09-03 18:23:44 -04:00
|
|
|
self, groupby, metrics,
|
|
|
|
granularity,
|
|
|
|
from_dttm, to_dttm,
|
|
|
|
limit_spec=None,
|
|
|
|
filter=None,
|
|
|
|
is_timeseries=True,
|
|
|
|
timeseries_limit=None,
|
2015-09-09 15:48:20 -04:00
|
|
|
row_limit=None,
|
|
|
|
extras=None):
|
2015-08-07 19:25:19 -04:00
|
|
|
qry_start_dttm = datetime.now()
|
2015-09-09 13:37:59 -04:00
|
|
|
|
|
|
|
# add tzinfo to native datetime with config
|
|
|
|
from_dttm = from_dttm.replace(tzinfo=config.DRUID_TZ)
|
2015-09-06 05:11:11 -04:00
|
|
|
to_dttm = to_dttm.replace(tzinfo=config.DRUID_TZ)
|
2015-08-05 02:41:00 -04:00
|
|
|
|
2015-08-08 11:03:36 -04:00
|
|
|
query_str = ""
|
2015-08-05 02:41:00 -04:00
|
|
|
aggregations = {
|
|
|
|
m.metric_name: m.json_obj
|
|
|
|
for m in self.metrics if m.metric_name in metrics
|
|
|
|
}
|
|
|
|
if not isinstance(granularity, basestring):
|
|
|
|
granularity = {"type": "duration", "duration": granularity}
|
|
|
|
|
|
|
|
qry = dict(
|
|
|
|
datasource=self.datasource_name,
|
|
|
|
dimensions=groupby,
|
|
|
|
aggregations=aggregations,
|
|
|
|
granularity=granularity,
|
2015-09-09 13:37:59 -04:00
|
|
|
intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(),
|
2015-08-05 02:41:00 -04:00
|
|
|
)
|
2015-08-06 01:42:42 -04:00
|
|
|
filters = None
|
|
|
|
for col, op, eq in filter:
|
|
|
|
cond = None
|
|
|
|
if op == '==':
|
2015-09-09 13:37:59 -04:00
|
|
|
cond = Dimension(col) == eq
|
2015-08-06 01:42:42 -04:00
|
|
|
elif op == '!=':
|
2015-09-09 13:37:59 -04:00
|
|
|
cond = ~(Dimension(col) == eq)
|
2015-08-06 01:42:42 -04:00
|
|
|
elif op in ('in', 'not in'):
|
|
|
|
fields = []
|
|
|
|
splitted = eq.split(',')
|
|
|
|
if len(splitted) > 1:
|
|
|
|
for s in eq.split(','):
|
|
|
|
s = s.strip()
|
2015-09-09 13:37:59 -04:00
|
|
|
fields.append(Filter.build_filter(Dimension(col) == s))
|
2015-08-06 01:42:42 -04:00
|
|
|
cond = Filter(type="or", fields=fields)
|
|
|
|
else:
|
2015-09-09 13:37:59 -04:00
|
|
|
cond = Dimension(col) == eq
|
2015-08-06 01:42:42 -04:00
|
|
|
if op == 'not in':
|
|
|
|
cond = ~cond
|
|
|
|
if filters:
|
|
|
|
filters = Filter(type="and", fields=[
|
|
|
|
Filter.build_filter(cond),
|
|
|
|
Filter.build_filter(filters)
|
|
|
|
])
|
|
|
|
else:
|
|
|
|
filters = cond
|
|
|
|
|
|
|
|
if filters:
|
|
|
|
qry['filter'] = filters
|
2015-08-05 20:36:33 -04:00
|
|
|
|
2015-08-05 02:41:00 -04:00
|
|
|
client = self.cluster.get_pydruid_client()
|
2015-08-06 01:42:42 -04:00
|
|
|
orig_filters = filters
|
2015-09-03 18:23:44 -04:00
|
|
|
if timeseries_limit and is_timeseries:
|
2015-08-05 20:36:33 -04:00
|
|
|
# Limit on the number of timeseries, doing a two-phases query
|
|
|
|
pre_qry = deepcopy(qry)
|
|
|
|
pre_qry['granularity'] = "all"
|
2015-08-06 01:42:42 -04:00
|
|
|
pre_qry['limit_spec'] = {
|
|
|
|
"type": "default",
|
|
|
|
"limit": timeseries_limit,
|
|
|
|
"columns": [{
|
|
|
|
"dimension": metrics[0] if metrics else self.metrics[0],
|
|
|
|
"direction": "descending",
|
|
|
|
}],
|
|
|
|
}
|
|
|
|
client.groupby(**pre_qry)
|
2015-08-12 13:28:50 -04:00
|
|
|
query_str += "// Two phase query\n// Phase 1\n"
|
2015-08-08 11:03:36 -04:00
|
|
|
query_str += json.dumps(client.query_dict, indent=2) + "\n"
|
2015-08-12 13:28:50 -04:00
|
|
|
query_str += "//\nPhase 2 (built based on phase one's results)\n"
|
2015-08-05 20:36:33 -04:00
|
|
|
df = client.export_pandas()
|
2015-09-09 13:37:59 -04:00
|
|
|
if df is not None and not df.empty:
|
2015-08-05 20:36:33 -04:00
|
|
|
dims = qry['dimensions']
|
|
|
|
filters = []
|
|
|
|
for index, row in df.iterrows():
|
|
|
|
fields = []
|
|
|
|
for dim in dims:
|
|
|
|
f = Filter.build_filter(Dimension(dim) == row[dim])
|
|
|
|
fields.append(f)
|
|
|
|
if len(fields) > 1:
|
|
|
|
filt = Filter(type="and", fields=fields)
|
|
|
|
filters.append(Filter.build_filter(filt))
|
|
|
|
elif fields:
|
|
|
|
filters.append(fields[0])
|
|
|
|
|
|
|
|
if filters:
|
|
|
|
ff = Filter(type="or", fields=filters)
|
2015-08-06 01:42:42 -04:00
|
|
|
if not orig_filters:
|
2015-08-05 20:36:33 -04:00
|
|
|
qry['filter'] = ff
|
|
|
|
else:
|
|
|
|
qry['filter'] = Filter(type="and", fields=[
|
|
|
|
Filter.build_filter(ff),
|
2015-08-06 01:42:42 -04:00
|
|
|
Filter.build_filter(orig_filters)])
|
2015-08-05 20:36:33 -04:00
|
|
|
qry['limit_spec'] = None
|
2015-09-03 18:23:44 -04:00
|
|
|
if row_limit:
|
|
|
|
qry['limit_spec'] = {
|
|
|
|
"type": "default",
|
|
|
|
"limit": row_limit,
|
|
|
|
"columns": [{
|
|
|
|
"dimension": metrics[0] if metrics else self.metrics[0],
|
|
|
|
"direction": "descending",
|
|
|
|
}],
|
|
|
|
}
|
2015-08-05 02:41:00 -04:00
|
|
|
client.groupby(**qry)
|
2015-08-08 11:03:36 -04:00
|
|
|
query_str += json.dumps(client.query_dict, indent=2)
|
2015-08-05 02:41:00 -04:00
|
|
|
df = client.export_pandas()
|
2015-08-07 19:25:19 -04:00
|
|
|
return QueryResult(
|
2015-08-08 11:03:36 -04:00
|
|
|
df=df,
|
|
|
|
query=query_str,
|
|
|
|
duration=datetime.now() - qry_start_dttm)
|
2015-07-15 13:12:32 -04:00
|
|
|
|
|
|
|
|
2015-08-07 19:25:19 -04:00
|
|
|
class Metric(Model):
|
2015-07-15 20:38:03 -04:00
|
|
|
__tablename__ = 'metrics'
|
2015-07-15 13:12:32 -04:00
|
|
|
id = Column(Integer, primary_key=True)
|
2015-07-15 20:38:03 -04:00
|
|
|
metric_name = Column(String(512))
|
|
|
|
verbose_name = Column(String(1024))
|
|
|
|
metric_type = Column(String(32))
|
2015-07-15 13:12:32 -04:00
|
|
|
datasource_name = Column(
|
2015-09-11 18:32:42 -04:00
|
|
|
String(250),
|
2015-07-15 13:12:32 -04:00
|
|
|
ForeignKey('datasources.datasource_name'))
|
2015-07-15 20:38:03 -04:00
|
|
|
datasource = relationship('Datasource', backref='metrics')
|
|
|
|
json = Column(Text)
|
2015-07-21 14:56:05 -04:00
|
|
|
description = Column(Text)
|
2015-07-15 13:12:32 -04:00
|
|
|
|
2015-07-15 20:38:03 -04:00
|
|
|
@property
|
|
|
|
def json_obj(self):
|
2015-07-28 19:06:06 -04:00
|
|
|
try:
|
|
|
|
obj = json.loads(self.json)
|
2015-09-09 13:37:59 -04:00
|
|
|
except:
|
2015-07-28 19:06:06 -04:00
|
|
|
obj = {}
|
|
|
|
return obj
|
|
|
|
|
2015-07-15 13:12:32 -04:00
|
|
|
|
|
|
|
class Column(Model, AuditMixin):
|
|
|
|
__tablename__ = 'columns'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
datasource_name = Column(
|
2015-09-11 18:32:42 -04:00
|
|
|
String(250),
|
2015-07-15 13:12:32 -04:00
|
|
|
ForeignKey('datasources.datasource_name'))
|
2015-07-15 20:38:03 -04:00
|
|
|
datasource = relationship('Datasource', backref='columns')
|
2015-07-15 13:12:32 -04:00
|
|
|
column_name = Column(String(256))
|
|
|
|
is_active = Column(Boolean, default=True)
|
|
|
|
type = Column(String(32))
|
|
|
|
groupby = Column(Boolean, default=False)
|
|
|
|
count_distinct = Column(Boolean, default=False)
|
|
|
|
sum = Column(Boolean, default=False)
|
|
|
|
max = Column(Boolean, default=False)
|
|
|
|
min = Column(Boolean, default=False)
|
|
|
|
filterable = Column(Boolean, default=False)
|
2015-07-21 14:56:05 -04:00
|
|
|
description = Column(Text)
|
2015-07-15 13:12:32 -04:00
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.column_name
|
|
|
|
|
2015-07-15 20:38:03 -04:00
|
|
|
@property
|
|
|
|
def isnum(self):
|
2015-07-30 20:07:12 -04:00
|
|
|
return self.type in ('LONG', 'DOUBLE', 'FLOAT')
|
2015-07-15 20:38:03 -04:00
|
|
|
|
|
|
|
def generate_metrics(self):
|
|
|
|
M = Metric
|
|
|
|
metrics = []
|
|
|
|
metrics.append(Metric(
|
|
|
|
metric_name='count',
|
|
|
|
verbose_name='COUNT(*)',
|
|
|
|
metric_type='count',
|
2015-07-30 02:39:30 -04:00
|
|
|
json=json.dumps({'type': 'count', 'name': 'count'})
|
2015-07-15 20:38:03 -04:00
|
|
|
))
|
2015-07-30 20:07:12 -04:00
|
|
|
# Somehow we need to reassign this for UDAFs
|
2015-09-09 13:37:59 -04:00
|
|
|
if self.type in ('DOUBLE', 'FLOAT'):
|
|
|
|
corrected_type = 'DOUBLE'
|
|
|
|
else:
|
|
|
|
corrected_type = self.type
|
2015-07-15 20:38:03 -04:00
|
|
|
|
|
|
|
if self.sum and self.isnum:
|
2015-07-30 20:07:12 -04:00
|
|
|
mt = corrected_type.lower() + 'Sum'
|
2015-09-09 13:37:59 -04:00
|
|
|
name = 'sum__' + self.column_name
|
2015-07-15 20:38:03 -04:00
|
|
|
metrics.append(Metric(
|
|
|
|
metric_name=name,
|
|
|
|
metric_type='sum',
|
|
|
|
verbose_name='SUM({})'.format(self.column_name),
|
|
|
|
json=json.dumps({
|
|
|
|
'type': mt, 'name': name, 'fieldName': self.column_name})
|
|
|
|
))
|
|
|
|
if self.min and self.isnum:
|
2015-07-30 20:07:12 -04:00
|
|
|
mt = corrected_type.lower() + 'Min'
|
2015-09-09 13:37:59 -04:00
|
|
|
name = 'min__' + self.column_name
|
2015-07-15 20:38:03 -04:00
|
|
|
metrics.append(Metric(
|
|
|
|
metric_name=name,
|
|
|
|
metric_type='min',
|
|
|
|
verbose_name='MIN({})'.format(self.column_name),
|
|
|
|
json=json.dumps({
|
|
|
|
'type': mt, 'name': name, 'fieldName': self.column_name})
|
|
|
|
))
|
|
|
|
if self.max and self.isnum:
|
2015-07-30 20:07:12 -04:00
|
|
|
mt = corrected_type.lower() + 'Max'
|
2015-09-09 13:37:59 -04:00
|
|
|
name = 'max__' + self.column_name
|
2015-07-15 20:38:03 -04:00
|
|
|
metrics.append(Metric(
|
|
|
|
metric_name=name,
|
|
|
|
metric_type='max',
|
|
|
|
verbose_name='MAX({})'.format(self.column_name),
|
|
|
|
json=json.dumps({
|
|
|
|
'type': mt, 'name': name, 'fieldName': self.column_name})
|
|
|
|
))
|
|
|
|
if self.count_distinct:
|
|
|
|
mt = 'count_distinct'
|
2015-09-09 13:37:59 -04:00
|
|
|
name = 'count_distinct__' + self.column_name
|
2015-07-15 20:38:03 -04:00
|
|
|
metrics.append(Metric(
|
|
|
|
metric_name=name,
|
|
|
|
verbose_name='COUNT(DISTINCT {})'.format(self.column_name),
|
|
|
|
metric_type='count_distinct',
|
|
|
|
json=json.dumps({
|
|
|
|
'type': 'cardinality',
|
|
|
|
'name': name,
|
|
|
|
'fieldNames': [self.column_name]})
|
|
|
|
))
|
2015-07-30 02:39:30 -04:00
|
|
|
session = get_session()
|
2015-07-15 20:38:03 -04:00
|
|
|
for metric in metrics:
|
|
|
|
m = (
|
2015-07-30 02:39:30 -04:00
|
|
|
session.query(M)
|
2015-09-09 13:37:59 -04:00
|
|
|
.filter(M.metric_name == metric.metric_name)
|
|
|
|
.filter(M.datasource_name == self.datasource_name)
|
|
|
|
.filter(Cluster.cluster_name == self.datasource.cluster_name)
|
2015-07-15 20:38:03 -04:00
|
|
|
.first()
|
|
|
|
)
|
|
|
|
metric.datasource_name = self.datasource_name
|
|
|
|
if not m:
|
2015-07-30 02:39:30 -04:00
|
|
|
session.add(metric)
|
|
|
|
session.commit()
|