2016-03-29 00:55:58 -04:00
|
|
|
"""A collection of ORM sqlalchemy models for Caravel"""
|
2016-04-07 11:39:08 -04:00
|
|
|
from __future__ import absolute_import
|
|
|
|
from __future__ import division
|
|
|
|
from __future__ import print_function
|
|
|
|
from __future__ import unicode_literals
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
import functools
|
|
|
|
import json
|
|
|
|
import logging
|
2016-04-04 19:13:08 -04:00
|
|
|
import textwrap
|
2016-04-08 02:01:40 -04:00
|
|
|
from collections import namedtuple
|
|
|
|
from copy import deepcopy, copy
|
|
|
|
from datetime import timedelta, datetime, date
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-04-08 02:01:40 -04:00
|
|
|
import humanize
|
|
|
|
import pandas as pd
|
|
|
|
import requests
|
|
|
|
import sqlalchemy as sqla
|
2016-08-30 00:55:31 -04:00
|
|
|
from sqlalchemy.engine.url import make_url
|
2016-04-08 02:01:40 -04:00
|
|
|
import sqlparse
|
2016-03-18 02:44:58 -04:00
|
|
|
from dateutil.parser import parse
|
2016-05-02 13:50:23 -04:00
|
|
|
|
2016-04-20 20:36:37 -04:00
|
|
|
from flask import request, g
|
2016-06-06 00:37:03 -04:00
|
|
|
from flask_appbuilder import Model
|
|
|
|
from flask_appbuilder.models.mixins import AuditMixin
|
|
|
|
from flask_appbuilder.models.decorators import renders
|
2016-06-27 23:10:40 -04:00
|
|
|
from flask_babel import lazy_gettext as _
|
2016-05-02 13:50:23 -04:00
|
|
|
|
|
|
|
from pydruid.client import PyDruid
|
2016-03-18 02:44:58 -04:00
|
|
|
from pydruid.utils.filters import Dimension, Filter
|
2016-05-02 13:00:39 -04:00
|
|
|
from pydruid.utils.postaggregator import Postaggregator
|
2016-07-01 17:45:04 -04:00
|
|
|
from pydruid.utils.having import Aggregation
|
2016-04-08 02:01:40 -04:00
|
|
|
from six import string_types
|
2016-08-30 00:55:31 -04:00
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
from sqlalchemy import (
|
2016-08-30 00:55:31 -04:00
|
|
|
Column, Integer, String, ForeignKey, Text, Boolean,
|
|
|
|
DateTime, Date, Table, Numeric,
|
|
|
|
create_engine, MetaData, desc, asc, select, and_, func
|
|
|
|
)
|
2016-04-08 02:01:40 -04:00
|
|
|
from sqlalchemy.ext.declarative import declared_attr
|
2016-03-18 02:44:58 -04:00
|
|
|
from sqlalchemy.orm import relationship
|
|
|
|
from sqlalchemy.sql import table, literal_column, text, column
|
2016-08-30 00:55:31 -04:00
|
|
|
from sqlalchemy.sql.expression import TextAsFrom
|
2016-03-18 02:44:58 -04:00
|
|
|
from sqlalchemy_utils import EncryptedType
|
|
|
|
|
2016-09-06 16:58:09 -04:00
|
|
|
from werkzeug.datastructures import ImmutableMultiDict
|
|
|
|
|
2016-06-10 18:49:33 -04:00
|
|
|
import caravel
|
|
|
|
from caravel import app, db, get_session, utils, sm
|
2016-03-29 00:55:58 -04:00
|
|
|
from caravel.viz import viz_types
|
2016-06-21 12:43:10 -04:00
|
|
|
from caravel.utils import flasher, MetricPermException, DimSelector
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
config = app.config
|
|
|
|
|
|
|
|
QueryResult = namedtuple('namedtuple', ['df', 'query', 'duration'])
|
|
|
|
|
|
|
|
|
2016-05-02 13:00:39 -04:00
|
|
|
class JavascriptPostAggregator(Postaggregator):
|
|
|
|
def __init__(self, name, field_names, function):
|
|
|
|
self.post_aggregator = {
|
|
|
|
'type': 'javascript',
|
|
|
|
'fieldNames': field_names,
|
|
|
|
'name': name,
|
|
|
|
'function': function,
|
|
|
|
}
|
|
|
|
self.name = name
|
|
|
|
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
class AuditMixinNullable(AuditMixin):
|
|
|
|
|
|
|
|
"""Altering the AuditMixin to use nullable fields
|
|
|
|
|
|
|
|
Allows creating objects programmatically outside of CRUD
|
|
|
|
"""
|
|
|
|
|
|
|
|
created_on = Column(DateTime, default=datetime.now, nullable=True)
|
|
|
|
changed_on = Column(
|
|
|
|
DateTime, default=datetime.now,
|
|
|
|
onupdate=datetime.now, nullable=True)
|
|
|
|
|
|
|
|
@declared_attr
|
2016-04-11 01:49:08 -04:00
|
|
|
def created_by_fk(cls): # noqa
|
2016-03-18 02:44:58 -04:00
|
|
|
return Column(Integer, ForeignKey('ab_user.id'),
|
|
|
|
default=cls.get_user_id, nullable=True)
|
|
|
|
|
|
|
|
@declared_attr
|
2016-04-11 01:49:08 -04:00
|
|
|
def changed_by_fk(cls): # noqa
|
2016-03-16 23:25:41 -04:00
|
|
|
return Column(
|
|
|
|
Integer, ForeignKey('ab_user.id'),
|
2016-03-18 02:44:58 -04:00
|
|
|
default=cls.get_user_id, onupdate=cls.get_user_id, nullable=True)
|
|
|
|
|
2016-06-30 01:20:25 -04:00
|
|
|
@renders('created_on')
|
2016-04-26 19:44:51 -04:00
|
|
|
def creator(self): # noqa
|
2016-03-18 02:44:58 -04:00
|
|
|
return '{}'.format(self.created_by or '')
|
|
|
|
|
2016-06-30 01:20:25 -04:00
|
|
|
@property
|
2016-03-18 02:44:58 -04:00
|
|
|
def changed_by_(self):
|
|
|
|
return '{}'.format(self.changed_by or '')
|
|
|
|
|
2016-05-12 00:05:32 -04:00
|
|
|
@renders('changed_on')
|
|
|
|
def changed_on_(self):
|
|
|
|
return '<span class="no-wrap">{}</span>'.format(self.changed_on)
|
|
|
|
|
2016-04-26 19:44:51 -04:00
|
|
|
@renders('changed_on')
|
2016-03-24 17:11:29 -04:00
|
|
|
def modified(self):
|
|
|
|
s = humanize.naturaltime(datetime.now() - self.changed_on)
|
2016-03-26 00:55:28 -04:00
|
|
|
return '<span class="no-wrap">{}</nobr>'.format(s)
|
2016-03-24 17:11:29 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def icons(self):
|
|
|
|
return """
|
|
|
|
<a
|
|
|
|
href="{self.datasource_edit_url}"
|
|
|
|
data-toggle="tooltip"
|
|
|
|
title="{self.datasource}">
|
|
|
|
<i class="fa fa-database"></i>
|
|
|
|
</a>
|
|
|
|
""".format(**locals())
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
class Url(Model, AuditMixinNullable):
|
|
|
|
|
|
|
|
"""Used for the short url feature"""
|
|
|
|
|
|
|
|
__tablename__ = 'url'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
url = Column(Text)
|
|
|
|
|
|
|
|
|
|
|
|
class CssTemplate(Model, AuditMixinNullable):
|
|
|
|
|
|
|
|
"""CSS templates for dashboards"""
|
|
|
|
|
|
|
|
__tablename__ = 'css_templates'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
template_name = Column(String(250))
|
|
|
|
css = Column(Text, default='')
|
|
|
|
|
|
|
|
|
2016-04-26 19:44:51 -04:00
|
|
|
slice_user = Table('slice_user', Model.metadata,
|
|
|
|
Column('id', Integer, primary_key=True),
|
|
|
|
Column('user_id', Integer, ForeignKey('ab_user.id')),
|
|
|
|
Column('slice_id', Integer, ForeignKey('slices.id'))
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
class Slice(Model, AuditMixinNullable):
|
|
|
|
|
|
|
|
"""A slice is essentially a report or a view on data"""
|
|
|
|
|
|
|
|
__tablename__ = 'slices'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
slice_name = Column(String(250))
|
|
|
|
druid_datasource_id = Column(Integer, ForeignKey('datasources.id'))
|
|
|
|
table_id = Column(Integer, ForeignKey('tables.id'))
|
|
|
|
datasource_type = Column(String(200))
|
|
|
|
datasource_name = Column(String(2000))
|
|
|
|
viz_type = Column(String(250))
|
|
|
|
params = Column(Text)
|
|
|
|
description = Column(Text)
|
2016-03-16 23:25:41 -04:00
|
|
|
cache_timeout = Column(Integer)
|
2016-04-26 19:44:51 -04:00
|
|
|
perm = Column(String(2000))
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
table = relationship(
|
|
|
|
'SqlaTable', foreign_keys=[table_id], backref='slices')
|
|
|
|
druid_datasource = relationship(
|
|
|
|
'DruidDatasource', foreign_keys=[druid_datasource_id], backref='slices')
|
2016-04-26 19:44:51 -04:00
|
|
|
owners = relationship("User", secondary=slice_user)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.slice_name
|
|
|
|
|
|
|
|
@property
|
|
|
|
def datasource(self):
|
|
|
|
return self.table or self.druid_datasource
|
|
|
|
|
2016-06-30 01:20:25 -04:00
|
|
|
@renders('datasource_name')
|
2016-03-18 02:44:58 -04:00
|
|
|
def datasource_link(self):
|
|
|
|
if self.table:
|
|
|
|
return self.table.link
|
|
|
|
elif self.druid_datasource:
|
|
|
|
return self.druid_datasource.link
|
|
|
|
|
2016-03-24 17:11:29 -04:00
|
|
|
@property
|
|
|
|
def datasource_edit_url(self):
|
|
|
|
if self.table:
|
|
|
|
return self.table.url
|
|
|
|
elif self.druid_datasource:
|
|
|
|
return self.druid_datasource.url
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
@property
|
|
|
|
@utils.memoized
|
|
|
|
def viz(self):
|
|
|
|
d = json.loads(self.params)
|
2016-04-03 10:37:18 -04:00
|
|
|
viz_class = viz_types[self.viz_type]
|
|
|
|
return viz_class(self.datasource, form_data=d)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def description_markeddown(self):
|
|
|
|
return utils.markdown(self.description)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def datasource_id(self):
|
|
|
|
return self.table_id or self.druid_datasource_id
|
|
|
|
|
|
|
|
@property
|
|
|
|
def data(self):
|
Reactify dashboard grid (#523)
* Use react-grid-layout instead of gridster
* visualizations show and resize
* display slice name and description; links work
* positioning of widgets to match gridster, rowHeight matches
* Change margins, rowHeight, unpositioned viz, and expandedSlices to match gridster
* Saving dashboard, deleting slices, formatting on slices (chart control and resize handle), expanded slices fixed.
* responsiveness + use es6 classes
* Minor ui fixes + linting
* CSS transforms on slices messes up nvd3 tooltip positioning.
Turn off CSS transforms for the time being, with a cost of painting speed.
Issue is currently being looked at on the nvd3 repo
PR: https://github.com/novus/nvd3/pull/1674
* Remove breakpoint listener, fires when it shouldn't (i.e. too often)
* resize is no longer buggy, minor cleanup
* gridster class, const, landscape error
* one source of data for data to front end from python
2016-06-02 15:31:05 -04:00
|
|
|
"""Data used to render slice in templates"""
|
2016-04-03 10:37:18 -04:00
|
|
|
d = {}
|
|
|
|
self.token = ''
|
|
|
|
try:
|
|
|
|
d = self.viz.data
|
|
|
|
self.token = d.get('token')
|
|
|
|
except Exception as e:
|
|
|
|
d['error'] = str(e)
|
2016-03-18 02:44:58 -04:00
|
|
|
d['slice_id'] = self.id
|
Reactify dashboard grid (#523)
* Use react-grid-layout instead of gridster
* visualizations show and resize
* display slice name and description; links work
* positioning of widgets to match gridster, rowHeight matches
* Change margins, rowHeight, unpositioned viz, and expandedSlices to match gridster
* Saving dashboard, deleting slices, formatting on slices (chart control and resize handle), expanded slices fixed.
* responsiveness + use es6 classes
* Minor ui fixes + linting
* CSS transforms on slices messes up nvd3 tooltip positioning.
Turn off CSS transforms for the time being, with a cost of painting speed.
Issue is currently being looked at on the nvd3 repo
PR: https://github.com/novus/nvd3/pull/1674
* Remove breakpoint listener, fires when it shouldn't (i.e. too often)
* resize is no longer buggy, minor cleanup
* gridster class, const, landscape error
* one source of data for data to front end from python
2016-06-02 15:31:05 -04:00
|
|
|
d['slice_name'] = self.slice_name
|
|
|
|
d['description'] = self.description
|
|
|
|
d['slice_url'] = self.slice_url
|
|
|
|
d['edit_url'] = self.edit_url
|
|
|
|
d['description_markeddown'] = self.description_markeddown
|
2016-03-18 02:44:58 -04:00
|
|
|
return d
|
|
|
|
|
|
|
|
@property
|
|
|
|
def json_data(self):
|
|
|
|
return json.dumps(self.data)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def slice_url(self):
|
|
|
|
"""Defines the url to access the slice"""
|
|
|
|
try:
|
|
|
|
slice_params = json.loads(self.params)
|
|
|
|
except Exception as e:
|
|
|
|
logging.exception(e)
|
|
|
|
slice_params = {}
|
|
|
|
slice_params['slice_id'] = self.id
|
2016-04-10 14:17:37 -04:00
|
|
|
slice_params['json'] = "false"
|
2016-03-18 02:44:58 -04:00
|
|
|
slice_params['slice_name'] = self.slice_name
|
|
|
|
from werkzeug.urls import Href
|
|
|
|
href = Href(
|
2016-04-07 11:39:08 -04:00
|
|
|
"/caravel/explore/{obj.datasource_type}/"
|
|
|
|
"{obj.datasource_id}/".format(obj=self))
|
2016-03-18 02:44:58 -04:00
|
|
|
return href(slice_params)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def edit_url(self):
|
|
|
|
return "/slicemodelview/edit/{}".format(self.id)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def slice_link(self):
|
|
|
|
url = self.slice_url
|
2016-04-07 11:39:08 -04:00
|
|
|
return '<a href="{url}">{obj.slice_name}</a>'.format(
|
|
|
|
url=url, obj=self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-09-06 16:58:09 -04:00
|
|
|
def get_viz(self, url_params_multidict=None):
|
|
|
|
"""Creates :py:class:viz.BaseViz object from the url_params_multidict.
|
|
|
|
|
|
|
|
:param werkzeug.datastructures.MultiDict url_params_multidict:
|
|
|
|
Contains the visualization params, they override the self.params
|
|
|
|
stored in the database
|
|
|
|
:return: object of the 'viz_type' type that is taken from the
|
|
|
|
url_params_multidict or self.params.
|
|
|
|
:rtype: :py:class:viz.BaseViz
|
|
|
|
"""
|
|
|
|
slice_params = json.loads(self.params) # {}
|
|
|
|
slice_params['slice_id'] = self.id
|
|
|
|
slice_params['json'] = "false"
|
|
|
|
slice_params['slice_name'] = self.slice_name
|
|
|
|
slice_params['viz_type'] = self.viz_type if self.viz_type else "table"
|
|
|
|
if url_params_multidict:
|
|
|
|
slice_params.update(url_params_multidict)
|
|
|
|
immutable_slice_params = ImmutableMultiDict(slice_params)
|
|
|
|
return viz_types[immutable_slice_params.get('viz_type')](
|
|
|
|
self.datasource,
|
|
|
|
form_data=immutable_slice_params,
|
|
|
|
slice_=self
|
|
|
|
)
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-04-26 19:44:51 -04:00
|
|
|
def set_perm(mapper, connection, target): # noqa
|
|
|
|
if target.table_id:
|
|
|
|
src_class = SqlaTable
|
|
|
|
id_ = target.table_id
|
|
|
|
elif target.druid_datasource_id:
|
|
|
|
src_class = DruidDatasource
|
|
|
|
id_ = target.druid_datasource_id
|
|
|
|
ds = db.session.query(src_class).filter_by(id=int(id_)).first()
|
|
|
|
target.perm = ds.perm
|
|
|
|
|
|
|
|
sqla.event.listen(Slice, 'before_insert', set_perm)
|
|
|
|
sqla.event.listen(Slice, 'before_update', set_perm)
|
|
|
|
|
|
|
|
|
2016-03-16 23:25:41 -04:00
|
|
|
dashboard_slices = Table(
|
|
|
|
'dashboard_slices', Model.metadata,
|
2016-03-18 02:44:58 -04:00
|
|
|
Column('id', Integer, primary_key=True),
|
|
|
|
Column('dashboard_id', Integer, ForeignKey('dashboards.id')),
|
|
|
|
Column('slice_id', Integer, ForeignKey('slices.id')),
|
|
|
|
)
|
|
|
|
|
2016-04-26 19:44:51 -04:00
|
|
|
dashboard_user = Table(
|
|
|
|
'dashboard_user', Model.metadata,
|
|
|
|
Column('id', Integer, primary_key=True),
|
|
|
|
Column('user_id', Integer, ForeignKey('ab_user.id')),
|
|
|
|
Column('dashboard_id', Integer, ForeignKey('dashboards.id'))
|
|
|
|
)
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
class Dashboard(Model, AuditMixinNullable):
|
|
|
|
|
|
|
|
"""The dashboard object!"""
|
|
|
|
|
|
|
|
__tablename__ = 'dashboards'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
dashboard_title = Column(String(500))
|
|
|
|
position_json = Column(Text)
|
|
|
|
description = Column(Text)
|
|
|
|
css = Column(Text)
|
|
|
|
json_metadata = Column(Text)
|
|
|
|
slug = Column(String(255), unique=True)
|
|
|
|
slices = relationship(
|
|
|
|
'Slice', secondary=dashboard_slices, backref='dashboards')
|
2016-04-26 19:44:51 -04:00
|
|
|
owners = relationship("User", secondary=dashboard_user)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.dashboard_title
|
|
|
|
|
2016-06-21 12:41:48 -04:00
|
|
|
@property
|
|
|
|
def table_names(self):
|
|
|
|
return ", ".join({"{}".format(s.datasource) for s in self.slices})
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
@property
|
|
|
|
def url(self):
|
2016-03-29 00:55:58 -04:00
|
|
|
return "/caravel/dashboard/{}/".format(self.slug or self.id)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def metadata_dejson(self):
|
|
|
|
if self.json_metadata:
|
|
|
|
return json.loads(self.json_metadata)
|
|
|
|
else:
|
|
|
|
return {}
|
|
|
|
|
2016-08-30 00:55:31 -04:00
|
|
|
@property
|
|
|
|
def sqla_metadata(self):
|
|
|
|
metadata = MetaData(bind=self.get_sqla_engine())
|
|
|
|
return metadata.reflect()
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
def dashboard_link(self):
|
2016-04-07 11:39:08 -04:00
|
|
|
return '<a href="{obj.url}">{obj.dashboard_title}</a>'.format(obj=self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def json_data(self):
|
|
|
|
d = {
|
|
|
|
'id': self.id,
|
|
|
|
'metadata': self.metadata_dejson,
|
|
|
|
'dashboard_title': self.dashboard_title,
|
|
|
|
'slug': self.slug,
|
|
|
|
'slices': [slc.data for slc in self.slices],
|
2016-06-07 14:07:25 -04:00
|
|
|
'position_json': json.loads(self.position_json) if self.position_json else [],
|
2016-03-18 02:44:58 -04:00
|
|
|
}
|
|
|
|
return json.dumps(d)
|
|
|
|
|
|
|
|
|
|
|
|
class Queryable(object):
|
2016-03-16 23:25:41 -04:00
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
"""A common interface to objects that are queryable (tables and datasources)"""
|
2016-03-16 23:25:41 -04:00
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
@property
|
|
|
|
def column_names(self):
|
|
|
|
return sorted([c.column_name for c in self.columns])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def main_dttm_col(self):
|
|
|
|
return "timestamp"
|
|
|
|
|
|
|
|
@property
|
|
|
|
def groupby_column_names(self):
|
|
|
|
return sorted([c.column_name for c in self.columns if c.groupby])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def filterable_column_names(self):
|
|
|
|
return sorted([c.column_name for c in self.columns if c.filterable])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def dttm_cols(self):
|
|
|
|
return []
|
|
|
|
|
2016-05-02 13:00:39 -04:00
|
|
|
@property
|
|
|
|
def url(self):
|
|
|
|
return '/{}/edit/{}'.format(self.baselink, self.id)
|
|
|
|
|
2016-05-16 19:10:22 -04:00
|
|
|
@property
|
|
|
|
def explore_url(self):
|
|
|
|
if self.default_endpoint:
|
|
|
|
return self.default_endpoint
|
|
|
|
else:
|
|
|
|
return "/caravel/explore/{obj.type}/{obj.id}/".format(obj=self)
|
|
|
|
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
class Database(Model, AuditMixinNullable):
|
|
|
|
|
|
|
|
"""An ORM object that stores Database related information"""
|
|
|
|
|
|
|
|
__tablename__ = 'dbs'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
database_name = Column(String(250), unique=True)
|
|
|
|
sqlalchemy_uri = Column(String(1024))
|
|
|
|
password = Column(EncryptedType(String(1024), config.get('SECRET_KEY')))
|
2016-03-16 23:25:41 -04:00
|
|
|
cache_timeout = Column(Integer)
|
2016-08-30 00:55:31 -04:00
|
|
|
select_as_create_table_as = Column(Boolean, default=False)
|
2016-09-01 17:21:46 -04:00
|
|
|
expose_in_sqllab = Column(Boolean, default=False)
|
|
|
|
allow_ctas = Column(Boolean, default=False)
|
|
|
|
force_ctas_schema = Column(String(250))
|
2016-04-04 19:13:08 -04:00
|
|
|
extra = Column(Text, default=textwrap.dedent("""\
|
|
|
|
{
|
|
|
|
"metadata_params": {},
|
2016-04-04 20:42:31 -04:00
|
|
|
"engine_params": {}
|
2016-04-04 19:13:08 -04:00
|
|
|
}
|
|
|
|
"""))
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.database_name
|
|
|
|
|
2016-08-30 00:55:31 -04:00
|
|
|
def get_sqla_engine(self, schema=None):
|
2016-04-04 19:13:08 -04:00
|
|
|
extra = self.get_extra()
|
|
|
|
params = extra.get('engine_params', {})
|
2016-08-30 00:55:31 -04:00
|
|
|
url = make_url(self.sqlalchemy_uri_decrypted)
|
|
|
|
backend = url.get_backend_name()
|
|
|
|
if backend == 'presto' and schema:
|
|
|
|
if '/' in url.database:
|
|
|
|
url.database = url.database.split('/')[0] + '/' + schema
|
|
|
|
else:
|
|
|
|
url.database += '/' + schema
|
|
|
|
elif schema:
|
|
|
|
url.database = schema
|
|
|
|
return create_engine(url, **params)
|
|
|
|
|
|
|
|
def get_df(self, sql, schema):
|
|
|
|
eng = self.get_sqla_engine(schema=schema)
|
|
|
|
cur = eng.execute(sql, schema=schema)
|
|
|
|
cols = [col[0] for col in cur.cursor.description]
|
|
|
|
df = pd.DataFrame(cur.fetchall(), columns=cols)
|
|
|
|
return df
|
|
|
|
|
|
|
|
def compile_sqla_query(self, qry, schema=None):
|
|
|
|
eng = self.get_sqla_engine(schema=schema)
|
|
|
|
compiled = qry.compile(eng, compile_kwargs={"literal_binds": True})
|
|
|
|
return '{}'.format(compiled)
|
|
|
|
|
|
|
|
def select_star(self, table_name, schema=None, limit=1000):
|
|
|
|
"""Generates a ``select *`` statement in the proper dialect"""
|
|
|
|
qry = select('*').select_from(text(table_name))
|
|
|
|
if limit:
|
|
|
|
qry = qry.limit(limit)
|
|
|
|
return self.compile_sqla_query(qry)
|
|
|
|
|
|
|
|
def wrap_sql_limit(self, sql, limit=1000):
|
|
|
|
qry = (
|
|
|
|
select('*')
|
|
|
|
.select_from(TextAsFrom(text(sql), ['*'])
|
|
|
|
.alias('inner_qry')).limit(limit)
|
|
|
|
)
|
|
|
|
return self.compile_sqla_query(qry)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
def safe_sqlalchemy_uri(self):
|
|
|
|
return self.sqlalchemy_uri
|
|
|
|
|
2016-08-30 00:55:31 -04:00
|
|
|
@property
|
|
|
|
def inspector(self):
|
|
|
|
engine = self.get_sqla_engine()
|
|
|
|
return sqla.inspect(engine)
|
|
|
|
|
|
|
|
def all_table_names(self, schema=None):
|
|
|
|
return sorted(self.inspector.get_table_names(schema))
|
|
|
|
|
|
|
|
def all_view_names(self, schema=None):
|
|
|
|
views = []
|
|
|
|
try:
|
|
|
|
views = self.inspector.get_view_names(schema)
|
|
|
|
except Exception as e:
|
|
|
|
pass
|
|
|
|
return views
|
|
|
|
|
|
|
|
def all_schema_names(self):
|
|
|
|
return sorted(self.inspector.get_schema_names())
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
def grains(self):
|
|
|
|
"""Defines time granularity database-specific expressions.
|
|
|
|
|
|
|
|
The idea here is to make it easy for users to change the time grain
|
|
|
|
form a datetime (maybe the source grain is arbitrary timestamps, daily
|
|
|
|
or 5 minutes increments) to another, "truncated" datetime. Since
|
|
|
|
each database has slightly different but similar datetime functions,
|
|
|
|
this allows a mapping between database engines and actual functions.
|
|
|
|
"""
|
2016-06-17 11:12:15 -04:00
|
|
|
Grain = namedtuple('Grain', 'name label function')
|
2016-03-16 23:25:41 -04:00
|
|
|
db_time_grains = {
|
2016-03-18 02:44:58 -04:00
|
|
|
'presto': (
|
2016-06-17 11:12:15 -04:00
|
|
|
Grain('Time Column', _('Time Column'), '{col}'),
|
2016-07-22 15:10:05 -04:00
|
|
|
Grain('second', _('second'),
|
|
|
|
"date_trunc('second', CAST({col} AS TIMESTAMP))"),
|
|
|
|
Grain('minute', _('minute'),
|
|
|
|
"date_trunc('minute', CAST({col} AS TIMESTAMP))"),
|
|
|
|
Grain('hour', _('hour'),
|
|
|
|
"date_trunc('hour', CAST({col} AS TIMESTAMP))"),
|
|
|
|
Grain('day', _('day'),
|
|
|
|
"date_trunc('day', CAST({col} AS TIMESTAMP))"),
|
|
|
|
Grain('week', _('week'),
|
|
|
|
"date_trunc('week', CAST({col} AS TIMESTAMP))"),
|
|
|
|
Grain('month', _('month'),
|
|
|
|
"date_trunc('month', CAST({col} AS TIMESTAMP))"),
|
|
|
|
Grain('quarter', _('quarter'),
|
|
|
|
"date_trunc('quarter', CAST({col} AS TIMESTAMP))"),
|
|
|
|
Grain("week_ending_saturday", _('week_ending_saturday'),
|
|
|
|
"date_add('day', 5, date_trunc('week', date_add('day', 1, "
|
|
|
|
"CAST({col} AS TIMESTAMP))))"),
|
|
|
|
Grain("week_start_sunday", _('week_start_sunday'),
|
|
|
|
"date_add('day', -1, date_trunc('week', "
|
|
|
|
"date_add('day', 1, CAST({col} AS TIMESTAMP))))"),
|
2016-03-18 02:44:58 -04:00
|
|
|
),
|
|
|
|
'mysql': (
|
2016-06-17 11:12:15 -04:00
|
|
|
Grain('Time Column', _('Time Column'), '{col}'),
|
2016-07-13 11:20:40 -04:00
|
|
|
Grain("second", _('second'), "DATE_ADD(DATE({col}), "
|
|
|
|
"INTERVAL (HOUR({col})*60*60 + MINUTE({col})*60"
|
|
|
|
" + SECOND({col})) SECOND)"),
|
|
|
|
Grain("minute", _('minute'), "DATE_ADD(DATE({col}), "
|
|
|
|
"INTERVAL (HOUR({col})*60 + MINUTE({col})) MINUTE)"),
|
2016-06-17 11:12:15 -04:00
|
|
|
Grain("hour", _('hour'), "DATE_ADD(DATE({col}), "
|
2016-06-15 17:22:27 -04:00
|
|
|
"INTERVAL HOUR({col}) HOUR)"),
|
2016-06-17 11:12:15 -04:00
|
|
|
Grain('day', _('day'), 'DATE({col})'),
|
|
|
|
Grain("week", _('week'), "DATE(DATE_SUB({col}, "
|
2016-04-08 23:54:04 -04:00
|
|
|
"INTERVAL DAYOFWEEK({col}) - 1 DAY))"),
|
2016-06-17 11:12:15 -04:00
|
|
|
Grain("month", _('month'), "DATE(DATE_SUB({col}, "
|
2016-04-08 23:54:04 -04:00
|
|
|
"INTERVAL DAYOFMONTH({col}) - 1 DAY))"),
|
2016-03-18 02:44:58 -04:00
|
|
|
),
|
2016-05-16 20:59:38 -04:00
|
|
|
'sqlite': (
|
2016-06-17 11:12:15 -04:00
|
|
|
Grain('Time Column', _('Time Column'), '{col}'),
|
|
|
|
Grain('day', _('day'), 'DATE({col})'),
|
2016-07-22 15:10:05 -04:00
|
|
|
Grain("week", _('week'),
|
|
|
|
"DATE({col}, -strftime('%w', {col}) || ' days')"),
|
|
|
|
Grain("month", _('month'),
|
|
|
|
"DATE({col}, -strftime('%d', {col}) || ' days')"),
|
2016-05-16 20:59:38 -04:00
|
|
|
),
|
2016-04-04 23:56:10 -04:00
|
|
|
'postgresql': (
|
2016-06-17 11:12:15 -04:00
|
|
|
Grain("Time Column", _('Time Column'), "{col}"),
|
|
|
|
Grain("second", _('second'), "DATE_TRUNC('second', {col})"),
|
|
|
|
Grain("minute", _('minute'), "DATE_TRUNC('minute', {col})"),
|
|
|
|
Grain("hour", _('hour'), "DATE_TRUNC('hour', {col})"),
|
|
|
|
Grain("day", _('day'), "DATE_TRUNC('day', {col})"),
|
|
|
|
Grain("week", _('week'), "DATE_TRUNC('week', {col})"),
|
|
|
|
Grain("month", _('month'), "DATE_TRUNC('month', {col})"),
|
|
|
|
Grain("year", _('year'), "DATE_TRUNC('year', {col})"),
|
2016-04-04 23:56:10 -04:00
|
|
|
),
|
2016-08-17 00:23:03 -04:00
|
|
|
'mssql': (
|
|
|
|
Grain("Time Column", _('Time Column'), "{col}"),
|
|
|
|
Grain("second", _('second'), "DATEADD(second, "
|
|
|
|
"DATEDIFF(second, '2000-01-01', {col}), '2000-01-01')"),
|
|
|
|
Grain("minute", _('minute'), "DATEADD(minute, "
|
|
|
|
"DATEDIFF(minute, 0, {col}), 0)"),
|
|
|
|
Grain("5 minute", _('5 minute'), "DATEADD(minute, "
|
|
|
|
"DATEDIFF(minute, 0, {col}) / 5 * 5, 0)"),
|
|
|
|
Grain("half hour", _('half hour'), "DATEADD(minute, "
|
|
|
|
"DATEDIFF(minute, 0, {col}) / 30 * 30, 0)"),
|
|
|
|
Grain("hour", _('hour'), "DATEADD(hour, "
|
|
|
|
"DATEDIFF(hour, 0, {col}), 0)"),
|
|
|
|
Grain("day", _('day'), "DATEADD(day, "
|
|
|
|
"DATEDIFF(day, 0, {col}), 0)"),
|
|
|
|
Grain("week", _('week'), "DATEADD(week, "
|
|
|
|
"DATEDIFF(week, 0, {col}), 0)"),
|
|
|
|
Grain("month", _('month'), "DATEADD(month, "
|
|
|
|
"DATEDIFF(month, 0, {col}), 0)"),
|
|
|
|
Grain("quarter", _('quarter'), "DATEADD(quarter, "
|
|
|
|
"DATEDIFF(quarter, 0, {col}), 0)"),
|
|
|
|
Grain("year", _('year'), "DATEADD(year, "
|
|
|
|
"DATEDIFF(year, 0, {col}), 0)"),
|
|
|
|
),
|
2016-03-18 02:44:58 -04:00
|
|
|
}
|
2016-04-06 23:12:24 -04:00
|
|
|
db_time_grains['redshift'] = db_time_grains['postgresql']
|
2016-06-01 00:09:04 -04:00
|
|
|
db_time_grains['vertica'] = db_time_grains['postgresql']
|
2016-03-16 23:25:41 -04:00
|
|
|
for db_type, grains in db_time_grains.items():
|
2016-03-18 02:44:58 -04:00
|
|
|
if self.sqlalchemy_uri.startswith(db_type):
|
|
|
|
return grains
|
|
|
|
|
|
|
|
def grains_dict(self):
|
|
|
|
return {grain.name: grain for grain in self.grains()}
|
|
|
|
|
2016-04-04 19:13:08 -04:00
|
|
|
def get_extra(self):
|
|
|
|
extra = {}
|
|
|
|
if self.extra:
|
|
|
|
try:
|
|
|
|
extra = json.loads(self.extra)
|
|
|
|
except Exception as e:
|
|
|
|
logging.error(e)
|
|
|
|
return extra
|
|
|
|
|
2016-04-13 20:28:12 -04:00
|
|
|
def get_table(self, table_name, schema=None):
|
2016-04-04 19:13:08 -04:00
|
|
|
extra = self.get_extra()
|
|
|
|
meta = MetaData(**extra.get('metadata_params', {}))
|
2016-03-18 02:44:58 -04:00
|
|
|
return Table(
|
|
|
|
table_name, meta,
|
2016-04-13 20:28:12 -04:00
|
|
|
schema=schema or None,
|
2016-03-18 02:44:58 -04:00
|
|
|
autoload=True,
|
|
|
|
autoload_with=self.get_sqla_engine())
|
|
|
|
|
2016-08-30 00:55:31 -04:00
|
|
|
def get_columns(self, table_name, schema=None):
|
|
|
|
return self.inspector.get_columns(table_name, schema)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def sqlalchemy_uri_decrypted(self):
|
|
|
|
conn = sqla.engine.url.make_url(self.sqlalchemy_uri)
|
|
|
|
conn.password = self.password
|
|
|
|
return str(conn)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def sql_url(self):
|
2016-03-29 00:55:58 -04:00
|
|
|
return '/caravel/sql/{}/'.format(self.id)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-08-09 20:53:23 -04:00
|
|
|
@property
|
|
|
|
def perm(self):
|
|
|
|
return (
|
|
|
|
"[{obj.database_name}].(id:{obj.id})").format(obj=self)
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
class SqlaTable(Model, Queryable, AuditMixinNullable):
|
|
|
|
|
|
|
|
"""An ORM object for SqlAlchemy table references"""
|
|
|
|
|
|
|
|
type = "table"
|
|
|
|
|
|
|
|
__tablename__ = 'tables'
|
|
|
|
id = Column(Integer, primary_key=True)
|
2016-04-15 17:53:06 -04:00
|
|
|
table_name = Column(String(250))
|
2016-03-18 02:44:58 -04:00
|
|
|
main_dttm_col = Column(String(250))
|
|
|
|
description = Column(Text)
|
|
|
|
default_endpoint = Column(Text)
|
|
|
|
database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False)
|
|
|
|
is_featured = Column(Boolean, default=False)
|
|
|
|
user_id = Column(Integer, ForeignKey('ab_user.id'))
|
|
|
|
owner = relationship('User', backref='tables', foreign_keys=[user_id])
|
|
|
|
database = relationship(
|
|
|
|
'Database', backref='tables', foreign_keys=[database_id])
|
|
|
|
offset = Column(Integer, default=0)
|
2016-03-16 23:25:41 -04:00
|
|
|
cache_timeout = Column(Integer)
|
2016-05-12 13:27:38 -04:00
|
|
|
schema = Column(String(255))
|
2016-08-30 00:55:31 -04:00
|
|
|
sql = Column(Text)
|
2016-06-28 00:33:44 -04:00
|
|
|
table_columns = relationship("TableColumn", back_populates="table")
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
baselink = "tablemodelview"
|
|
|
|
|
2016-04-15 17:53:06 -04:00
|
|
|
__table_args__ = (
|
|
|
|
sqla.UniqueConstraint(
|
|
|
|
'database_id', 'schema', 'table_name',
|
|
|
|
name='_customer_location_uc'),)
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
def __repr__(self):
|
|
|
|
return self.table_name
|
|
|
|
|
|
|
|
@property
|
|
|
|
def description_markeddown(self):
|
|
|
|
return utils.markdown(self.description)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def link(self):
|
|
|
|
return '<a href="{self.url}">{self.table_name}</a>'.format(**locals())
|
|
|
|
|
|
|
|
@property
|
|
|
|
def perm(self):
|
|
|
|
return (
|
2016-04-07 11:39:08 -04:00
|
|
|
"[{obj.database}].[{obj.table_name}]"
|
|
|
|
"(id:{obj.id})").format(obj=self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def full_name(self):
|
2016-04-07 11:39:08 -04:00
|
|
|
return "[{obj.database}].[{obj.table_name}]".format(obj=self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def dttm_cols(self):
|
|
|
|
l = [c.column_name for c in self.columns if c.is_dttm]
|
|
|
|
if self.main_dttm_col not in l:
|
|
|
|
l.append(self.main_dttm_col)
|
|
|
|
return l
|
|
|
|
|
2016-06-24 01:43:52 -04:00
|
|
|
@property
|
|
|
|
def num_cols(self):
|
|
|
|
return [c.column_name for c in self.columns if c.isnum]
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
@property
|
|
|
|
def any_dttm_col(self):
|
|
|
|
cols = self.dttm_cols
|
|
|
|
if cols:
|
|
|
|
return cols[0]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def html(self):
|
|
|
|
t = ((c.column_name, c.type) for c in self.columns)
|
|
|
|
df = pd.DataFrame(t)
|
|
|
|
df.columns = ['field', 'type']
|
|
|
|
return df.to_html(
|
|
|
|
index=False,
|
|
|
|
classes=(
|
|
|
|
"dataframe table table-striped table-bordered "
|
|
|
|
"table-condensed"))
|
|
|
|
|
|
|
|
@property
|
|
|
|
def name(self):
|
|
|
|
return self.table_name
|
|
|
|
|
2016-06-30 01:20:25 -04:00
|
|
|
@renders('table_name')
|
2016-03-18 02:44:58 -04:00
|
|
|
def table_link(self):
|
2016-04-07 11:39:08 -04:00
|
|
|
return '<a href="{obj.explore_url}">{obj.table_name}</a>'.format(obj=self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def metrics_combo(self):
|
|
|
|
return sorted(
|
|
|
|
[
|
|
|
|
(m.metric_name, m.verbose_name or m.metric_name)
|
|
|
|
for m in self.metrics],
|
|
|
|
key=lambda x: x[1])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def sql_url(self):
|
|
|
|
return self.database.sql_url + "?table_name=" + str(self.table_name)
|
|
|
|
|
2016-06-28 00:33:44 -04:00
|
|
|
def get_col(self, col_name):
|
|
|
|
columns = self.table_columns
|
|
|
|
for col in columns:
|
|
|
|
if col_name == col.column_name:
|
|
|
|
return col
|
|
|
|
|
2016-03-16 23:25:41 -04:00
|
|
|
def query( # sqla
|
2016-03-18 02:44:58 -04:00
|
|
|
self, groupby, metrics,
|
|
|
|
granularity,
|
|
|
|
from_dttm, to_dttm,
|
|
|
|
filter=None, # noqa
|
|
|
|
is_timeseries=True,
|
|
|
|
timeseries_limit=15, row_limit=None,
|
|
|
|
inner_from_dttm=None, inner_to_dttm=None,
|
2016-06-24 01:43:52 -04:00
|
|
|
orderby=None,
|
2016-03-18 02:44:58 -04:00
|
|
|
extras=None,
|
|
|
|
columns=None):
|
2016-03-16 23:25:41 -04:00
|
|
|
"""Querying any sqla table from this common interface"""
|
2016-03-18 02:44:58 -04:00
|
|
|
# For backward compatibility
|
|
|
|
if granularity not in self.dttm_cols:
|
|
|
|
granularity = self.main_dttm_col
|
|
|
|
|
|
|
|
cols = {col.column_name: col for col in self.columns}
|
2016-06-24 01:43:52 -04:00
|
|
|
metrics_dict = {m.metric_name: m for m in self.metrics}
|
2016-03-18 02:44:58 -04:00
|
|
|
qry_start_dttm = datetime.now()
|
|
|
|
|
|
|
|
if not granularity and is_timeseries:
|
2016-05-02 13:50:23 -04:00
|
|
|
raise Exception(_(
|
2016-03-18 02:44:58 -04:00
|
|
|
"Datetime column not provided as part table configuration "
|
2016-05-02 13:50:23 -04:00
|
|
|
"and is required by this type of chart"))
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-06-24 01:43:52 -04:00
|
|
|
metrics_exprs = [metrics_dict.get(m).sqla_col for m in metrics]
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
if metrics:
|
2016-06-24 01:43:52 -04:00
|
|
|
main_metric_expr = metrics_exprs[0]
|
2016-03-18 02:44:58 -04:00
|
|
|
else:
|
2016-05-02 13:00:28 -04:00
|
|
|
main_metric_expr = literal_column("COUNT(*)").label("ccount")
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
select_exprs = []
|
|
|
|
groupby_exprs = []
|
|
|
|
|
|
|
|
if groupby:
|
|
|
|
select_exprs = []
|
|
|
|
inner_select_exprs = []
|
|
|
|
inner_groupby_exprs = []
|
|
|
|
for s in groupby:
|
|
|
|
col = cols[s]
|
2016-05-02 13:00:28 -04:00
|
|
|
outer = col.sqla_col
|
2016-05-20 14:03:49 -04:00
|
|
|
inner = col.sqla_col.label(col.column_name + '__')
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
groupby_exprs.append(outer)
|
|
|
|
select_exprs.append(outer)
|
|
|
|
inner_groupby_exprs.append(inner)
|
|
|
|
inner_select_exprs.append(inner)
|
|
|
|
elif columns:
|
|
|
|
for s in columns:
|
2016-05-02 13:00:28 -04:00
|
|
|
select_exprs.append(cols[s].sqla_col)
|
2016-03-18 02:44:58 -04:00
|
|
|
metrics_exprs = []
|
|
|
|
|
|
|
|
if granularity:
|
2016-06-28 00:33:44 -04:00
|
|
|
dttm_col = cols[granularity]
|
|
|
|
dttm_expr = dttm_col.sqla_col.label('timestamp')
|
2016-05-02 13:00:28 -04:00
|
|
|
timestamp = dttm_expr
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
# Transforming time grain into an expression based on configuration
|
|
|
|
time_grain_sqla = extras.get('time_grain_sqla')
|
|
|
|
if time_grain_sqla:
|
|
|
|
udf = self.database.grains_dict().get(time_grain_sqla, '{col}')
|
|
|
|
timestamp_grain = literal_column(
|
|
|
|
udf.function.format(col=dttm_expr)).label('timestamp')
|
|
|
|
else:
|
|
|
|
timestamp_grain = timestamp
|
|
|
|
|
|
|
|
if is_timeseries:
|
|
|
|
select_exprs += [timestamp_grain]
|
|
|
|
groupby_exprs += [timestamp_grain]
|
|
|
|
|
2016-06-28 00:33:44 -04:00
|
|
|
outer_from = text(dttm_col.dttm_sql_literal(from_dttm))
|
|
|
|
outer_to = text(dttm_col.dttm_sql_literal(to_dttm))
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
time_filter = [
|
2016-06-28 00:33:44 -04:00
|
|
|
timestamp >= outer_from,
|
|
|
|
timestamp <= outer_to,
|
2016-03-18 02:44:58 -04:00
|
|
|
]
|
|
|
|
inner_time_filter = copy(time_filter)
|
|
|
|
if inner_from_dttm:
|
2016-05-10 12:29:29 -04:00
|
|
|
inner_time_filter[0] = timestamp >= text(
|
2016-06-28 00:33:44 -04:00
|
|
|
dttm_col.dttm_sql_literal(inner_from_dttm))
|
2016-03-18 02:44:58 -04:00
|
|
|
if inner_to_dttm:
|
2016-05-10 12:29:29 -04:00
|
|
|
inner_time_filter[1] = timestamp <= text(
|
2016-06-28 00:33:44 -04:00
|
|
|
dttm_col.dttm_sql_literal(inner_to_dttm))
|
2016-04-10 19:22:58 -04:00
|
|
|
else:
|
|
|
|
inner_time_filter = []
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
select_exprs += metrics_exprs
|
|
|
|
qry = select(select_exprs)
|
2016-04-13 20:28:12 -04:00
|
|
|
|
|
|
|
tbl = table(self.table_name)
|
|
|
|
if self.schema:
|
|
|
|
tbl.schema = self.schema
|
|
|
|
|
2016-08-30 00:55:31 -04:00
|
|
|
# Supporting arbitrary SQL statements in place of tables
|
|
|
|
if self.sql:
|
|
|
|
tbl = text('(' + self.sql + ') as expr_qry ')
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
if not columns:
|
|
|
|
qry = qry.group_by(*groupby_exprs)
|
|
|
|
|
|
|
|
where_clause_and = []
|
|
|
|
having_clause_and = []
|
|
|
|
for col, op, eq in filter:
|
|
|
|
col_obj = cols[col]
|
|
|
|
if op in ('in', 'not in'):
|
|
|
|
values = eq.split(",")
|
2016-05-02 13:00:28 -04:00
|
|
|
cond = col_obj.sqla_col.in_(values)
|
2016-03-18 02:44:58 -04:00
|
|
|
if op == 'not in':
|
|
|
|
cond = ~cond
|
|
|
|
where_clause_and.append(cond)
|
|
|
|
if extras and 'where' in extras:
|
|
|
|
where_clause_and += [text(extras['where'])]
|
|
|
|
if extras and 'having' in extras:
|
|
|
|
having_clause_and += [text(extras['having'])]
|
|
|
|
if granularity:
|
|
|
|
qry = qry.where(and_(*(time_filter + where_clause_and)))
|
2016-04-04 19:03:21 -04:00
|
|
|
else:
|
|
|
|
qry = qry.where(and_(*where_clause_and))
|
2016-03-18 02:44:58 -04:00
|
|
|
qry = qry.having(and_(*having_clause_and))
|
|
|
|
if groupby:
|
|
|
|
qry = qry.order_by(desc(main_metric_expr))
|
2016-06-24 01:43:52 -04:00
|
|
|
elif orderby:
|
|
|
|
for col, ascending in orderby:
|
|
|
|
direction = asc if ascending else desc
|
|
|
|
qry = qry.order_by(direction(col))
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
qry = qry.limit(row_limit)
|
|
|
|
|
|
|
|
if timeseries_limit and groupby:
|
2016-08-04 18:30:33 -04:00
|
|
|
# some sql dialects require for order by expressions
|
2016-07-21 20:53:25 -04:00
|
|
|
# to also be in the select clause
|
|
|
|
inner_select_exprs += [main_metric_expr]
|
2016-03-18 02:44:58 -04:00
|
|
|
subq = select(inner_select_exprs)
|
2016-04-13 20:28:12 -04:00
|
|
|
subq = subq.select_from(tbl)
|
2016-03-18 02:44:58 -04:00
|
|
|
subq = subq.where(and_(*(where_clause_and + inner_time_filter)))
|
|
|
|
subq = subq.group_by(*inner_groupby_exprs)
|
|
|
|
subq = subq.order_by(desc(main_metric_expr))
|
|
|
|
subq = subq.limit(timeseries_limit)
|
|
|
|
on_clause = []
|
|
|
|
for i, gb in enumerate(groupby):
|
|
|
|
on_clause.append(
|
2016-05-20 14:03:49 -04:00
|
|
|
groupby_exprs[i] == column(gb + '__'))
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-04-13 20:28:12 -04:00
|
|
|
tbl = tbl.join(subq.alias(), and_(*on_clause))
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-04-13 20:28:12 -04:00
|
|
|
qry = qry.select_from(tbl)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
engine = self.database.get_sqla_engine()
|
|
|
|
sql = "{}".format(
|
2016-05-02 13:00:28 -04:00
|
|
|
qry.compile(
|
|
|
|
engine, compile_kwargs={"literal_binds": True},),
|
2016-08-04 18:30:33 -04:00
|
|
|
)
|
2016-03-18 02:44:58 -04:00
|
|
|
df = pd.read_sql_query(
|
|
|
|
sql=sql,
|
|
|
|
con=engine
|
|
|
|
)
|
|
|
|
sql = sqlparse.format(sql, reindent=True)
|
|
|
|
return QueryResult(
|
|
|
|
df=df, duration=datetime.now() - qry_start_dttm, query=sql)
|
|
|
|
|
|
|
|
def fetch_metadata(self):
|
|
|
|
"""Fetches the metadata for the table and merges it in"""
|
|
|
|
try:
|
2016-04-13 20:28:12 -04:00
|
|
|
table = self.database.get_table(self.table_name, schema=self.schema)
|
2016-03-18 02:44:58 -04:00
|
|
|
except Exception as e:
|
2016-04-20 20:36:37 -04:00
|
|
|
flasher(str(e))
|
|
|
|
flasher(
|
2016-03-18 02:44:58 -04:00
|
|
|
"Table doesn't seem to exist in the specified database, "
|
|
|
|
"couldn't fetch column information", "danger")
|
|
|
|
return
|
|
|
|
|
2016-03-16 23:25:41 -04:00
|
|
|
TC = TableColumn # noqa shortcut to class
|
|
|
|
M = SqlMetric # noqa
|
2016-03-18 02:44:58 -04:00
|
|
|
metrics = []
|
|
|
|
any_date_col = None
|
|
|
|
for col in table.columns:
|
|
|
|
try:
|
2016-06-24 01:43:52 -04:00
|
|
|
datatype = "{}".format(col.type).upper()
|
2016-03-18 02:44:58 -04:00
|
|
|
except Exception as e:
|
|
|
|
datatype = "UNKNOWN"
|
2016-06-24 01:43:52 -04:00
|
|
|
logging.error(
|
|
|
|
"Unrecognized data type in {}.{}".format(table, col.name))
|
|
|
|
logging.exception(e)
|
2016-03-18 02:44:58 -04:00
|
|
|
dbcol = (
|
|
|
|
db.session
|
|
|
|
.query(TC)
|
|
|
|
.filter(TC.table == self)
|
|
|
|
.filter(TC.column_name == col.name)
|
|
|
|
.first()
|
|
|
|
)
|
|
|
|
db.session.flush()
|
|
|
|
if not dbcol:
|
2016-06-24 01:43:52 -04:00
|
|
|
dbcol = TableColumn(column_name=col.name, type=datatype)
|
|
|
|
dbcol.groupby = dbcol.is_string
|
|
|
|
dbcol.filterable = dbcol.is_string
|
|
|
|
dbcol.sum = dbcol.isnum
|
|
|
|
dbcol.is_dttm = dbcol.is_time
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
db.session.merge(self)
|
|
|
|
self.columns.append(dbcol)
|
|
|
|
|
2016-06-24 01:43:52 -04:00
|
|
|
if not any_date_col and dbcol.is_time:
|
2016-03-18 02:44:58 -04:00
|
|
|
any_date_col = col.name
|
|
|
|
|
|
|
|
quoted = "{}".format(
|
|
|
|
column(dbcol.column_name).compile(dialect=db.engine.dialect))
|
|
|
|
if dbcol.sum:
|
|
|
|
metrics.append(M(
|
|
|
|
metric_name='sum__' + dbcol.column_name,
|
|
|
|
verbose_name='sum__' + dbcol.column_name,
|
|
|
|
metric_type='sum',
|
|
|
|
expression="SUM({})".format(quoted)
|
|
|
|
))
|
|
|
|
if dbcol.max:
|
|
|
|
metrics.append(M(
|
|
|
|
metric_name='max__' + dbcol.column_name,
|
|
|
|
verbose_name='max__' + dbcol.column_name,
|
|
|
|
metric_type='max',
|
|
|
|
expression="MAX({})".format(quoted)
|
|
|
|
))
|
|
|
|
if dbcol.min:
|
|
|
|
metrics.append(M(
|
|
|
|
metric_name='min__' + dbcol.column_name,
|
|
|
|
verbose_name='min__' + dbcol.column_name,
|
|
|
|
metric_type='min',
|
|
|
|
expression="MIN({})".format(quoted)
|
|
|
|
))
|
|
|
|
if dbcol.count_distinct:
|
|
|
|
metrics.append(M(
|
|
|
|
metric_name='count_distinct__' + dbcol.column_name,
|
|
|
|
verbose_name='count_distinct__' + dbcol.column_name,
|
|
|
|
metric_type='count_distinct',
|
|
|
|
expression="COUNT(DISTINCT {})".format(quoted)
|
|
|
|
))
|
|
|
|
dbcol.type = datatype
|
|
|
|
db.session.merge(self)
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
metrics.append(M(
|
|
|
|
metric_name='count',
|
|
|
|
verbose_name='COUNT(*)',
|
|
|
|
metric_type='count',
|
|
|
|
expression="COUNT(*)"
|
|
|
|
))
|
|
|
|
for metric in metrics:
|
|
|
|
m = (
|
|
|
|
db.session.query(M)
|
|
|
|
.filter(M.metric_name == metric.metric_name)
|
|
|
|
.filter(M.table_id == self.id)
|
|
|
|
.first()
|
|
|
|
)
|
|
|
|
metric.table_id = self.id
|
|
|
|
if not m:
|
|
|
|
db.session.add(metric)
|
|
|
|
db.session.commit()
|
|
|
|
if not self.main_dttm_col:
|
|
|
|
self.main_dttm_col = any_date_col
|
|
|
|
|
|
|
|
|
|
|
|
class SqlMetric(Model, AuditMixinNullable):
|
|
|
|
|
|
|
|
"""ORM object for metrics, each table can have multiple metrics"""
|
|
|
|
|
|
|
|
__tablename__ = 'sql_metrics'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
metric_name = Column(String(512))
|
|
|
|
verbose_name = Column(String(1024))
|
|
|
|
metric_type = Column(String(32))
|
|
|
|
table_id = Column(Integer, ForeignKey('tables.id'))
|
|
|
|
table = relationship(
|
|
|
|
'SqlaTable', backref='metrics', foreign_keys=[table_id])
|
|
|
|
expression = Column(Text)
|
|
|
|
description = Column(Text)
|
2016-06-10 18:49:33 -04:00
|
|
|
is_restricted = Column(Boolean, default=False, nullable=True)
|
2016-07-13 23:45:05 -04:00
|
|
|
d3format = Column(String(128))
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-05-02 13:00:28 -04:00
|
|
|
@property
|
|
|
|
def sqla_col(self):
|
|
|
|
name = self.metric_name
|
|
|
|
return literal_column(self.expression).label(name)
|
|
|
|
|
2016-06-10 18:49:33 -04:00
|
|
|
@property
|
|
|
|
def perm(self):
|
|
|
|
return (
|
|
|
|
"{parent_name}.[{obj.metric_name}](id:{obj.id})"
|
|
|
|
).format(obj=self,
|
2016-06-16 11:55:11 -04:00
|
|
|
parent_name=self.table.full_name) if self.table else None
|
2016-06-10 18:49:33 -04:00
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
class TableColumn(Model, AuditMixinNullable):
|
|
|
|
|
|
|
|
"""ORM object for table columns, each table can have multiple columns"""
|
|
|
|
|
|
|
|
__tablename__ = 'table_columns'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
table_id = Column(Integer, ForeignKey('tables.id'))
|
|
|
|
table = relationship(
|
|
|
|
'SqlaTable', backref='columns', foreign_keys=[table_id])
|
2016-05-12 13:27:38 -04:00
|
|
|
column_name = Column(String(255))
|
2016-05-02 13:00:28 -04:00
|
|
|
verbose_name = Column(String(1024))
|
2016-03-18 02:44:58 -04:00
|
|
|
is_dttm = Column(Boolean, default=False)
|
|
|
|
is_active = Column(Boolean, default=True)
|
|
|
|
type = Column(String(32), default='')
|
|
|
|
groupby = Column(Boolean, default=False)
|
|
|
|
count_distinct = Column(Boolean, default=False)
|
|
|
|
sum = Column(Boolean, default=False)
|
|
|
|
max = Column(Boolean, default=False)
|
|
|
|
min = Column(Boolean, default=False)
|
|
|
|
filterable = Column(Boolean, default=False)
|
|
|
|
expression = Column(Text, default='')
|
|
|
|
description = Column(Text, default='')
|
2016-06-28 00:33:44 -04:00
|
|
|
python_date_format = Column(String(255))
|
|
|
|
database_expression = Column(String(255))
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-06-24 01:43:52 -04:00
|
|
|
num_types = ('DOUBLE', 'FLOAT', 'INT', 'BIGINT', 'LONG')
|
|
|
|
date_types = ('DATE', 'TIME')
|
|
|
|
str_types = ('VARCHAR', 'STRING', 'CHAR')
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
def __repr__(self):
|
|
|
|
return self.column_name
|
|
|
|
|
|
|
|
@property
|
|
|
|
def isnum(self):
|
2016-06-24 01:43:52 -04:00
|
|
|
return any([t in self.type.upper() for t in self.num_types])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def is_time(self):
|
|
|
|
return any([t in self.type.upper() for t in self.date_types])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def is_string(self):
|
|
|
|
return any([t in self.type.upper() for t in self.str_types])
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-05-02 13:00:28 -04:00
|
|
|
@property
|
|
|
|
def sqla_col(self):
|
|
|
|
name = self.column_name
|
|
|
|
if not self.expression:
|
|
|
|
col = column(self.column_name).label(name)
|
|
|
|
else:
|
|
|
|
col = literal_column(self.expression).label(name)
|
|
|
|
return col
|
|
|
|
|
2016-06-28 00:33:44 -04:00
|
|
|
def dttm_sql_literal(self, dttm):
|
|
|
|
"""Convert datetime object to string
|
|
|
|
|
2016-07-22 15:10:05 -04:00
|
|
|
If database_expression is empty, the internal dttm
|
2016-06-28 00:33:44 -04:00
|
|
|
will be parsed as the string with the pattern that
|
2016-07-22 15:10:05 -04:00
|
|
|
the user inputted (python_date_format)
|
2016-06-28 00:33:44 -04:00
|
|
|
If database_expression is not empty, the internal dttm
|
2016-07-22 15:10:05 -04:00
|
|
|
will be parsed as the sql sentence for the database to convert
|
2016-06-28 00:33:44 -04:00
|
|
|
"""
|
|
|
|
tf = self.python_date_format or '%Y-%m-%d %H:%M:%S.%f'
|
|
|
|
if self.database_expression:
|
|
|
|
return self.database_expression.format(dttm.strftime('%Y-%m-%d %H:%M:%S'))
|
|
|
|
elif tf == 'epoch_s':
|
|
|
|
return str((dttm - datetime(1970, 1, 1)).total_seconds())
|
|
|
|
elif tf == 'epoch_ms':
|
|
|
|
return str((dttm - datetime(1970, 1, 1)).total_seconds()*1000.0)
|
|
|
|
else:
|
|
|
|
default = "'{}'".format(dttm.strftime(tf))
|
|
|
|
iso = dttm.isoformat()
|
|
|
|
d = {
|
|
|
|
'mssql': "CONVERT(DATETIME, '{}', 126)".format(iso), # untested
|
|
|
|
'mysql': default,
|
|
|
|
'oracle':
|
|
|
|
"""TO_TIMESTAMP('{}', 'YYYY-MM-DD"T"HH24:MI:SS.ff6')""".format(
|
|
|
|
dttm.isoformat()),
|
|
|
|
'presto': default,
|
|
|
|
'sqlite': default,
|
|
|
|
}
|
|
|
|
for k, v in d.items():
|
|
|
|
if self.table.database.sqlalchemy_uri.startswith(k):
|
|
|
|
return v
|
|
|
|
return default
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
class DruidCluster(Model, AuditMixinNullable):
|
|
|
|
|
|
|
|
"""ORM object referencing the Druid clusters"""
|
|
|
|
|
|
|
|
__tablename__ = 'clusters'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
cluster_name = Column(String(250), unique=True)
|
2016-05-12 13:27:38 -04:00
|
|
|
coordinator_host = Column(String(255))
|
2016-03-18 02:44:58 -04:00
|
|
|
coordinator_port = Column(Integer)
|
|
|
|
coordinator_endpoint = Column(
|
2016-05-12 13:27:38 -04:00
|
|
|
String(255), default='druid/coordinator/v1/metadata')
|
|
|
|
broker_host = Column(String(255))
|
2016-03-18 02:44:58 -04:00
|
|
|
broker_port = Column(Integer)
|
2016-05-12 13:27:38 -04:00
|
|
|
broker_endpoint = Column(String(255), default='druid/v2')
|
2016-03-18 02:44:58 -04:00
|
|
|
metadata_last_refreshed = Column(DateTime)
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.cluster_name
|
|
|
|
|
|
|
|
def get_pydruid_client(self):
|
2016-04-20 18:08:10 -04:00
|
|
|
cli = PyDruid(
|
2016-03-18 02:44:58 -04:00
|
|
|
"http://{0}:{1}/".format(self.broker_host, self.broker_port),
|
|
|
|
self.broker_endpoint)
|
|
|
|
return cli
|
|
|
|
|
2016-04-20 18:08:10 -04:00
|
|
|
def get_datasources(self):
|
2016-03-18 02:44:58 -04:00
|
|
|
endpoint = (
|
2016-04-07 11:39:08 -04:00
|
|
|
"http://{obj.coordinator_host}:{obj.coordinator_port}/"
|
|
|
|
"{obj.coordinator_endpoint}/datasources"
|
|
|
|
).format(obj=self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-04-20 18:08:10 -04:00
|
|
|
return json.loads(requests.get(endpoint).text)
|
|
|
|
|
2016-06-21 13:03:56 -04:00
|
|
|
def get_druid_version(self):
|
|
|
|
endpoint = (
|
|
|
|
"http://{obj.coordinator_host}:{obj.coordinator_port}/status"
|
|
|
|
).format(obj=self)
|
|
|
|
return json.loads(requests.get(endpoint).text)['version']
|
|
|
|
|
2016-04-20 18:08:10 -04:00
|
|
|
def refresh_datasources(self):
|
2016-06-21 13:03:56 -04:00
|
|
|
self.druid_version = self.get_druid_version()
|
2016-04-20 18:08:10 -04:00
|
|
|
for datasource in self.get_datasources():
|
2016-05-06 15:03:42 -04:00
|
|
|
if datasource not in config.get('DRUID_DATA_SOURCE_BLACKLIST'):
|
|
|
|
DruidDatasource.sync_to_db(datasource, self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
|
|
|
|
class DruidDatasource(Model, AuditMixinNullable, Queryable):
|
|
|
|
|
|
|
|
"""ORM object referencing Druid datasources (tables)"""
|
|
|
|
|
|
|
|
type = "druid"
|
|
|
|
|
2016-05-02 13:00:39 -04:00
|
|
|
baselink = "druiddatasourcemodelview"
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
__tablename__ = 'datasources'
|
|
|
|
id = Column(Integer, primary_key=True)
|
2016-05-12 13:27:38 -04:00
|
|
|
datasource_name = Column(String(255), unique=True)
|
2016-03-18 02:44:58 -04:00
|
|
|
is_featured = Column(Boolean, default=False)
|
|
|
|
is_hidden = Column(Boolean, default=False)
|
|
|
|
description = Column(Text)
|
|
|
|
default_endpoint = Column(Text)
|
|
|
|
user_id = Column(Integer, ForeignKey('ab_user.id'))
|
|
|
|
owner = relationship('User', backref='datasources', foreign_keys=[user_id])
|
|
|
|
cluster_name = Column(
|
|
|
|
String(250), ForeignKey('clusters.cluster_name'))
|
|
|
|
cluster = relationship(
|
|
|
|
'DruidCluster', backref='datasources', foreign_keys=[cluster_name])
|
|
|
|
offset = Column(Integer, default=0)
|
2016-03-16 23:25:41 -04:00
|
|
|
cache_timeout = Column(Integer)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def metrics_combo(self):
|
|
|
|
return sorted(
|
|
|
|
[(m.metric_name, m.verbose_name) for m in self.metrics],
|
|
|
|
key=lambda x: x[1])
|
|
|
|
|
2016-06-24 01:43:52 -04:00
|
|
|
@property
|
|
|
|
def num_cols(self):
|
|
|
|
return [c.column_name for c in self.columns if c.isnum]
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
@property
|
|
|
|
def name(self):
|
|
|
|
return self.datasource_name
|
|
|
|
|
|
|
|
@property
|
|
|
|
def perm(self):
|
|
|
|
return (
|
2016-04-07 11:39:08 -04:00
|
|
|
"[{obj.cluster_name}].[{obj.datasource_name}]"
|
|
|
|
"(id:{obj.id})").format(obj=self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def link(self):
|
|
|
|
return (
|
|
|
|
'<a href="{self.url}">'
|
|
|
|
'{self.datasource_name}</a>').format(**locals())
|
|
|
|
|
|
|
|
@property
|
|
|
|
def full_name(self):
|
|
|
|
return (
|
2016-04-07 11:39:08 -04:00
|
|
|
"[{obj.cluster_name}]."
|
|
|
|
"[{obj.datasource_name}]").format(obj=self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.datasource_name
|
|
|
|
|
2016-06-30 01:20:25 -04:00
|
|
|
@renders('datasource_name')
|
2016-03-18 02:44:58 -04:00
|
|
|
def datasource_link(self):
|
2016-04-07 11:39:08 -04:00
|
|
|
url = "/caravel/explore/{obj.type}/{obj.id}/".format(obj=self)
|
|
|
|
return '<a href="{url}">{obj.datasource_name}</a>'.format(
|
|
|
|
url=url, obj=self)
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
def get_metric_obj(self, metric_name):
|
|
|
|
return [
|
|
|
|
m.json_obj for m in self.metrics
|
|
|
|
if m.metric_name == metric_name
|
|
|
|
][0]
|
|
|
|
|
2016-07-01 17:44:25 -04:00
|
|
|
@staticmethod
|
|
|
|
def version_higher(v1, v2):
|
|
|
|
"""is v1 higher than v2
|
|
|
|
|
|
|
|
>>> DruidDatasource.version_higher('0.8.2', '0.9.1')
|
|
|
|
False
|
|
|
|
>>> DruidDatasource.version_higher('0.8.2', '0.6.1')
|
|
|
|
True
|
|
|
|
>>> DruidDatasource.version_higher('0.8.2', '0.8.2')
|
|
|
|
False
|
|
|
|
>>> DruidDatasource.version_higher('0.8.2', '0.9.BETA')
|
|
|
|
False
|
|
|
|
>>> DruidDatasource.version_higher('0.8.2', '0.9')
|
|
|
|
False
|
|
|
|
"""
|
|
|
|
def int_or_0(v):
|
|
|
|
try:
|
|
|
|
v = int(v)
|
2016-08-04 18:30:33 -04:00
|
|
|
except (TypeError, ValueError):
|
2016-07-01 17:44:25 -04:00
|
|
|
v = 0
|
|
|
|
return v
|
|
|
|
v1nums = [int_or_0(n) for n in v1.split('.')]
|
|
|
|
v2nums = [int_or_0(n) for n in v2.split('.')]
|
|
|
|
v1nums = (v1nums + [0, 0, 0])[:3]
|
|
|
|
v2nums = (v2nums + [0, 0, 0])[:3]
|
2016-06-21 13:03:56 -04:00
|
|
|
return v1nums[0] > v2nums[0] or \
|
|
|
|
(v1nums[0] == v2nums[0] and v1nums[1] > v2nums[1]) or \
|
|
|
|
(v1nums[0] == v2nums[0] and v1nums[1] == v2nums[1] and v1nums[2] > v2nums[2])
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
def latest_metadata(self):
|
|
|
|
"""Returns segment metadata from the latest segment"""
|
|
|
|
client = self.cluster.get_pydruid_client()
|
|
|
|
results = client.time_boundary(datasource=self.datasource_name)
|
|
|
|
if not results:
|
|
|
|
return
|
|
|
|
max_time = results[0]['result']['maxTime']
|
|
|
|
max_time = parse(max_time)
|
2016-03-25 16:44:35 -04:00
|
|
|
# Query segmentMetadata for 7 days back. However, due to a bug,
|
2016-03-25 16:49:45 -04:00
|
|
|
# we need to set this interval to more than 1 day ago to exclude
|
|
|
|
# realtime segments, which trigged a bug (fixed in druid 0.8.2).
|
2016-03-25 16:44:35 -04:00
|
|
|
# https://groups.google.com/forum/#!topic/druid-user/gVCqqspHqOQ
|
2016-06-21 13:03:56 -04:00
|
|
|
start = (0 if self.version_higher(self.cluster.druid_version, '0.8.2') else 1)
|
2016-03-25 16:44:35 -04:00
|
|
|
intervals = (max_time - timedelta(days=7)).isoformat() + '/'
|
2016-06-21 13:03:56 -04:00
|
|
|
intervals += (max_time - timedelta(days=start)).isoformat()
|
2016-03-18 02:44:58 -04:00
|
|
|
segment_metadata = client.segment_metadata(
|
|
|
|
datasource=self.datasource_name,
|
|
|
|
intervals=intervals)
|
|
|
|
if segment_metadata:
|
|
|
|
return segment_metadata[-1]['columns']
|
|
|
|
|
|
|
|
def generate_metrics(self):
|
|
|
|
for col in self.columns:
|
|
|
|
col.generate_metrics()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def sync_to_db(cls, name, cluster):
|
2016-03-29 00:55:58 -04:00
|
|
|
"""Fetches metadata for that datasource and merges the Caravel db"""
|
2016-06-09 21:05:58 -04:00
|
|
|
logging.info("Syncing Druid datasource [{}]".format(name))
|
2016-03-18 02:44:58 -04:00
|
|
|
session = get_session()
|
|
|
|
datasource = session.query(cls).filter_by(datasource_name=name).first()
|
|
|
|
if not datasource:
|
|
|
|
datasource = cls(datasource_name=name)
|
|
|
|
session.add(datasource)
|
2016-04-20 20:36:37 -04:00
|
|
|
flasher("Adding new datasource [{}]".format(name), "success")
|
2016-03-18 02:44:58 -04:00
|
|
|
else:
|
2016-04-20 20:36:37 -04:00
|
|
|
flasher("Refreshing datasource [{}]".format(name), "info")
|
2016-05-10 14:19:30 -04:00
|
|
|
session.flush()
|
2016-03-18 02:44:58 -04:00
|
|
|
datasource.cluster = cluster
|
2016-06-12 14:01:16 -04:00
|
|
|
session.flush()
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
cols = datasource.latest_metadata()
|
|
|
|
if not cols:
|
|
|
|
return
|
|
|
|
for col in cols:
|
|
|
|
col_obj = (
|
|
|
|
session
|
|
|
|
.query(DruidColumn)
|
|
|
|
.filter_by(datasource_name=name, column_name=col)
|
|
|
|
.first()
|
|
|
|
)
|
|
|
|
datatype = cols[col]['type']
|
|
|
|
if not col_obj:
|
|
|
|
col_obj = DruidColumn(datasource_name=name, column_name=col)
|
|
|
|
session.add(col_obj)
|
|
|
|
if datatype == "STRING":
|
|
|
|
col_obj.groupby = True
|
|
|
|
col_obj.filterable = True
|
2016-06-13 23:49:51 -04:00
|
|
|
if datatype == "hyperUnique" or datatype == "thetaSketch":
|
|
|
|
col_obj.count_distinct = True
|
2016-03-18 02:44:58 -04:00
|
|
|
if col_obj:
|
|
|
|
col_obj.type = cols[col]['type']
|
2016-05-10 14:19:30 -04:00
|
|
|
session.flush()
|
2016-03-18 02:44:58 -04:00
|
|
|
col_obj.datasource = datasource
|
|
|
|
col_obj.generate_metrics()
|
2016-05-10 14:19:30 -04:00
|
|
|
session.flush()
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-04-20 18:08:10 -04:00
|
|
|
def query( # druid
|
2016-03-18 02:44:58 -04:00
|
|
|
self, groupby, metrics,
|
|
|
|
granularity,
|
|
|
|
from_dttm, to_dttm,
|
|
|
|
filter=None, # noqa
|
|
|
|
is_timeseries=True,
|
|
|
|
timeseries_limit=None,
|
|
|
|
row_limit=None,
|
|
|
|
inner_from_dttm=None, inner_to_dttm=None,
|
2016-06-24 01:43:52 -04:00
|
|
|
orderby=None,
|
2016-03-18 02:44:58 -04:00
|
|
|
extras=None, # noqa
|
2016-07-13 10:58:59 -04:00
|
|
|
select=None, # noqa
|
|
|
|
columns=None, ):
|
2016-03-18 02:44:58 -04:00
|
|
|
"""Runs a query against Druid and returns a dataframe.
|
|
|
|
|
|
|
|
This query interface is common to SqlAlchemy and Druid
|
|
|
|
"""
|
|
|
|
# TODO refactor into using a TBD Query object
|
|
|
|
qry_start_dttm = datetime.now()
|
|
|
|
|
|
|
|
inner_from_dttm = inner_from_dttm or from_dttm
|
|
|
|
inner_to_dttm = inner_to_dttm or to_dttm
|
|
|
|
|
|
|
|
# add tzinfo to native datetime with config
|
|
|
|
from_dttm = from_dttm.replace(tzinfo=config.get("DRUID_TZ"))
|
|
|
|
to_dttm = to_dttm.replace(tzinfo=config.get("DRUID_TZ"))
|
|
|
|
|
|
|
|
query_str = ""
|
2016-05-02 13:00:39 -04:00
|
|
|
metrics_dict = {m.metric_name: m for m in self.metrics}
|
|
|
|
all_metrics = []
|
|
|
|
post_aggs = {}
|
2016-05-05 11:52:48 -04:00
|
|
|
|
|
|
|
def recursive_get_fields(_conf):
|
|
|
|
_fields = _conf.get('fields', [])
|
|
|
|
field_names = []
|
|
|
|
for _f in _fields:
|
|
|
|
_type = _f.get('type')
|
2016-05-10 12:21:09 -04:00
|
|
|
if _type in ['fieldAccess', 'hyperUniqueCardinality']:
|
2016-05-05 11:52:48 -04:00
|
|
|
field_names.append(_f.get('fieldName'))
|
|
|
|
elif _type == 'arithmetic':
|
|
|
|
field_names += recursive_get_fields(_f)
|
|
|
|
|
|
|
|
return list(set(field_names))
|
|
|
|
|
2016-05-02 13:00:39 -04:00
|
|
|
for metric_name in metrics:
|
|
|
|
metric = metrics_dict[metric_name]
|
|
|
|
if metric.metric_type != 'postagg':
|
|
|
|
all_metrics.append(metric_name)
|
|
|
|
else:
|
|
|
|
conf = metric.json_obj
|
2016-05-05 11:52:48 -04:00
|
|
|
all_metrics += recursive_get_fields(conf)
|
2016-05-02 13:00:39 -04:00
|
|
|
all_metrics += conf.get('fieldNames', [])
|
|
|
|
if conf.get('type') == 'javascript':
|
|
|
|
post_aggs[metric_name] = JavascriptPostAggregator(
|
|
|
|
name=conf.get('name'),
|
|
|
|
field_names=conf.get('fieldNames'),
|
|
|
|
function=conf.get('function'))
|
|
|
|
else:
|
|
|
|
post_aggs[metric_name] = Postaggregator(
|
|
|
|
conf.get('fn', "/"),
|
|
|
|
conf.get('fields', []),
|
|
|
|
conf.get('name', ''))
|
2016-06-10 18:49:33 -04:00
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
aggregations = {
|
|
|
|
m.metric_name: m.json_obj
|
2016-05-02 13:00:39 -04:00
|
|
|
for m in self.metrics
|
|
|
|
if m.metric_name in all_metrics
|
2016-06-24 11:47:43 -04:00
|
|
|
}
|
2016-06-10 18:49:33 -04:00
|
|
|
|
|
|
|
rejected_metrics = [
|
|
|
|
m.metric_name for m in self.metrics
|
|
|
|
if m.is_restricted and
|
|
|
|
m.metric_name in aggregations.keys() and
|
|
|
|
not sm.has_access('metric_access', m.perm)
|
2016-06-24 11:47:43 -04:00
|
|
|
]
|
2016-06-10 18:49:33 -04:00
|
|
|
|
|
|
|
if rejected_metrics:
|
|
|
|
raise MetricPermException(
|
|
|
|
"Access to the metrics denied: " + ', '.join(rejected_metrics)
|
2016-08-04 18:30:33 -04:00
|
|
|
)
|
2016-06-10 18:49:33 -04:00
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
granularity = granularity or "all"
|
|
|
|
if granularity != "all":
|
|
|
|
granularity = utils.parse_human_timedelta(
|
|
|
|
granularity).total_seconds() * 1000
|
|
|
|
if not isinstance(granularity, string_types):
|
|
|
|
granularity = {"type": "duration", "duration": granularity}
|
2016-04-15 20:00:44 -04:00
|
|
|
origin = extras.get('druid_time_origin')
|
|
|
|
if origin:
|
|
|
|
dttm = utils.parse_human_datetime(origin)
|
|
|
|
granularity['origin'] = dttm.isoformat()
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
qry = dict(
|
|
|
|
datasource=self.datasource_name,
|
|
|
|
dimensions=groupby,
|
|
|
|
aggregations=aggregations,
|
|
|
|
granularity=granularity,
|
2016-05-02 13:00:39 -04:00
|
|
|
post_aggregations=post_aggs,
|
2016-03-18 02:44:58 -04:00
|
|
|
intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(),
|
|
|
|
)
|
|
|
|
|
2016-06-21 12:43:10 -04:00
|
|
|
filters = self.get_filters(filter)
|
2016-03-18 02:44:58 -04:00
|
|
|
if filters:
|
|
|
|
qry['filter'] = filters
|
|
|
|
|
2016-07-01 17:45:04 -04:00
|
|
|
having_filters = self.get_having_filters(extras.get('having_druid'))
|
2016-06-21 12:43:10 -04:00
|
|
|
if having_filters:
|
|
|
|
qry['having'] = having_filters
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
client = self.cluster.get_pydruid_client()
|
|
|
|
orig_filters = filters
|
|
|
|
if timeseries_limit and is_timeseries:
|
|
|
|
# Limit on the number of timeseries, doing a two-phases query
|
|
|
|
pre_qry = deepcopy(qry)
|
|
|
|
pre_qry['granularity'] = "all"
|
|
|
|
pre_qry['limit_spec'] = {
|
|
|
|
"type": "default",
|
|
|
|
"limit": timeseries_limit,
|
|
|
|
'intervals': (
|
|
|
|
inner_from_dttm.isoformat() + '/' +
|
|
|
|
inner_to_dttm.isoformat()),
|
|
|
|
"columns": [{
|
|
|
|
"dimension": metrics[0] if metrics else self.metrics[0],
|
|
|
|
"direction": "descending",
|
|
|
|
}],
|
|
|
|
}
|
|
|
|
client.groupby(**pre_qry)
|
|
|
|
query_str += "// Two phase query\n// Phase 1\n"
|
2016-05-23 16:06:35 -04:00
|
|
|
query_str += json.dumps(
|
|
|
|
client.query_builder.last_query.query_dict, indent=2) + "\n"
|
2016-03-18 02:44:58 -04:00
|
|
|
query_str += "//\nPhase 2 (built based on phase one's results)\n"
|
|
|
|
df = client.export_pandas()
|
|
|
|
if df is not None and not df.empty:
|
|
|
|
dims = qry['dimensions']
|
|
|
|
filters = []
|
2016-05-04 01:28:30 -04:00
|
|
|
for unused, row in df.iterrows():
|
2016-03-18 02:44:58 -04:00
|
|
|
fields = []
|
|
|
|
for dim in dims:
|
2016-05-24 14:35:10 -04:00
|
|
|
f = Dimension(dim) == row[dim]
|
2016-03-18 02:44:58 -04:00
|
|
|
fields.append(f)
|
|
|
|
if len(fields) > 1:
|
|
|
|
filt = Filter(type="and", fields=fields)
|
2016-05-24 14:35:10 -04:00
|
|
|
filters.append(filt)
|
2016-03-18 02:44:58 -04:00
|
|
|
elif fields:
|
|
|
|
filters.append(fields[0])
|
|
|
|
|
|
|
|
if filters:
|
|
|
|
ff = Filter(type="or", fields=filters)
|
|
|
|
if not orig_filters:
|
|
|
|
qry['filter'] = ff
|
|
|
|
else:
|
|
|
|
qry['filter'] = Filter(type="and", fields=[
|
2016-05-24 14:35:10 -04:00
|
|
|
ff,
|
|
|
|
orig_filters])
|
2016-03-18 02:44:58 -04:00
|
|
|
qry['limit_spec'] = None
|
|
|
|
if row_limit:
|
|
|
|
qry['limit_spec'] = {
|
|
|
|
"type": "default",
|
|
|
|
"limit": row_limit,
|
|
|
|
"columns": [{
|
|
|
|
"dimension": metrics[0] if metrics else self.metrics[0],
|
|
|
|
"direction": "descending",
|
|
|
|
}],
|
|
|
|
}
|
|
|
|
client.groupby(**qry)
|
2016-05-23 16:06:35 -04:00
|
|
|
query_str += json.dumps(
|
|
|
|
client.query_builder.last_query.query_dict, indent=2)
|
2016-03-18 02:44:58 -04:00
|
|
|
df = client.export_pandas()
|
|
|
|
if df is None or df.size == 0:
|
2016-05-02 13:50:23 -04:00
|
|
|
raise Exception(_("No data was returned."))
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
if (
|
|
|
|
not is_timeseries and
|
|
|
|
granularity == "all" and
|
|
|
|
'timestamp' in df.columns):
|
|
|
|
del df['timestamp']
|
|
|
|
|
|
|
|
# Reordering columns
|
|
|
|
cols = []
|
|
|
|
if 'timestamp' in df.columns:
|
|
|
|
cols += ['timestamp']
|
|
|
|
cols += [col for col in groupby if col in df.columns]
|
|
|
|
cols += [col for col in metrics if col in df.columns]
|
|
|
|
df = df[cols]
|
|
|
|
return QueryResult(
|
|
|
|
df=df,
|
|
|
|
query=query_str,
|
|
|
|
duration=datetime.now() - qry_start_dttm)
|
|
|
|
|
2016-06-21 12:43:10 -04:00
|
|
|
@staticmethod
|
|
|
|
def get_filters(raw_filters):
|
|
|
|
filters = None
|
|
|
|
for col, op, eq in raw_filters:
|
|
|
|
cond = None
|
|
|
|
if op == '==':
|
|
|
|
cond = Dimension(col) == eq
|
|
|
|
elif op == '!=':
|
|
|
|
cond = ~(Dimension(col) == eq)
|
|
|
|
elif op in ('in', 'not in'):
|
|
|
|
fields = []
|
|
|
|
splitted = eq.split(',')
|
|
|
|
if len(splitted) > 1:
|
|
|
|
for s in eq.split(','):
|
|
|
|
s = s.strip()
|
|
|
|
fields.append(Dimension(col) == s)
|
|
|
|
cond = Filter(type="or", fields=fields)
|
|
|
|
else:
|
|
|
|
cond = Dimension(col) == eq
|
|
|
|
if op == 'not in':
|
|
|
|
cond = ~cond
|
|
|
|
elif op == 'regex':
|
|
|
|
cond = Filter(type="regex", pattern=eq, dimension=col)
|
|
|
|
if filters:
|
|
|
|
filters = Filter(type="and", fields=[
|
|
|
|
cond,
|
|
|
|
filters
|
|
|
|
])
|
|
|
|
else:
|
|
|
|
filters = cond
|
|
|
|
return filters
|
|
|
|
|
2016-07-01 17:45:04 -04:00
|
|
|
def _get_having_obj(self, col, op, eq):
|
|
|
|
cond = None
|
|
|
|
if op == '==':
|
|
|
|
if col in self.column_names:
|
|
|
|
cond = DimSelector(dimension=col, value=eq)
|
|
|
|
else:
|
|
|
|
cond = Aggregation(col) == eq
|
|
|
|
elif op == '>':
|
|
|
|
cond = Aggregation(col) > eq
|
|
|
|
elif op == '<':
|
|
|
|
cond = Aggregation(col) < eq
|
|
|
|
|
|
|
|
return cond
|
|
|
|
|
2016-06-21 12:43:10 -04:00
|
|
|
def get_having_filters(self, raw_filters):
|
|
|
|
filters = None
|
2016-07-01 17:45:04 -04:00
|
|
|
reversed_op_map = {
|
|
|
|
'!=': '==',
|
|
|
|
'>=': '<',
|
|
|
|
'<=': '>'
|
|
|
|
}
|
|
|
|
|
2016-06-21 12:43:10 -04:00
|
|
|
for col, op, eq in raw_filters:
|
|
|
|
cond = None
|
2016-07-01 17:45:04 -04:00
|
|
|
if op in ['==', '>', '<']:
|
|
|
|
cond = self._get_having_obj(col, op, eq)
|
|
|
|
elif op in reversed_op_map:
|
|
|
|
cond = ~self._get_having_obj(col, reversed_op_map[op], eq)
|
|
|
|
|
2016-06-21 12:43:10 -04:00
|
|
|
if filters:
|
2016-07-01 17:45:04 -04:00
|
|
|
filters = filters & cond
|
2016-06-21 12:43:10 -04:00
|
|
|
else:
|
|
|
|
filters = cond
|
|
|
|
return filters
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
class Log(Model):
|
|
|
|
|
2016-03-29 00:55:58 -04:00
|
|
|
"""ORM object used to log Caravel actions to the database"""
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
__tablename__ = 'logs'
|
|
|
|
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
action = Column(String(512))
|
|
|
|
user_id = Column(Integer, ForeignKey('ab_user.id'))
|
|
|
|
dashboard_id = Column(Integer)
|
|
|
|
slice_id = Column(Integer)
|
|
|
|
json = Column(Text)
|
|
|
|
user = relationship('User', backref='logs', foreign_keys=[user_id])
|
|
|
|
dttm = Column(DateTime, default=func.now())
|
2016-03-24 17:11:29 -04:00
|
|
|
dt = Column(Date, default=date.today())
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def log_this(cls, f):
|
|
|
|
"""Decorator to log user actions"""
|
|
|
|
@functools.wraps(f)
|
|
|
|
def wrapper(*args, **kwargs):
|
|
|
|
user_id = None
|
|
|
|
if g.user:
|
2016-05-04 01:31:37 -04:00
|
|
|
user_id = g.user.get_id()
|
2016-03-18 02:44:58 -04:00
|
|
|
d = request.args.to_dict()
|
|
|
|
d.update(kwargs)
|
2016-04-26 19:44:51 -04:00
|
|
|
slice_id = d.get('slice_id', 0)
|
|
|
|
slice_id = int(slice_id) if slice_id else 0
|
2016-09-01 17:21:46 -04:00
|
|
|
params = ""
|
|
|
|
try:
|
|
|
|
params = json.dumps(d)
|
|
|
|
except:
|
|
|
|
pass
|
2016-03-18 02:44:58 -04:00
|
|
|
log = cls(
|
|
|
|
action=f.__name__,
|
2016-09-01 17:21:46 -04:00
|
|
|
json=params,
|
2016-03-18 02:44:58 -04:00
|
|
|
dashboard_id=d.get('dashboard_id') or None,
|
2016-04-26 19:44:51 -04:00
|
|
|
slice_id=slice_id,
|
2016-03-18 02:44:58 -04:00
|
|
|
user_id=user_id)
|
|
|
|
db.session.add(log)
|
|
|
|
db.session.commit()
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
2016-03-24 17:50:19 -04:00
|
|
|
class DruidMetric(Model, AuditMixinNullable):
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
"""ORM object referencing Druid metrics for a datasource"""
|
|
|
|
|
|
|
|
__tablename__ = 'metrics'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
metric_name = Column(String(512))
|
|
|
|
verbose_name = Column(String(1024))
|
|
|
|
metric_type = Column(String(32))
|
|
|
|
datasource_name = Column(
|
2016-07-22 12:46:41 -04:00
|
|
|
String(255),
|
2016-03-18 02:44:58 -04:00
|
|
|
ForeignKey('datasources.datasource_name'))
|
2016-03-25 19:45:35 -04:00
|
|
|
# Setting enable_typechecks=False disables polymorphic inheritance.
|
|
|
|
datasource = relationship('DruidDatasource', backref='metrics',
|
|
|
|
enable_typechecks=False)
|
2016-03-18 02:44:58 -04:00
|
|
|
json = Column(Text)
|
|
|
|
description = Column(Text)
|
2016-06-10 18:49:33 -04:00
|
|
|
is_restricted = Column(Boolean, default=False, nullable=True)
|
2016-07-13 23:45:05 -04:00
|
|
|
d3format = Column(String(128))
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
@property
|
|
|
|
def json_obj(self):
|
|
|
|
try:
|
|
|
|
obj = json.loads(self.json)
|
|
|
|
except Exception:
|
|
|
|
obj = {}
|
|
|
|
return obj
|
|
|
|
|
2016-06-10 18:49:33 -04:00
|
|
|
@property
|
|
|
|
def perm(self):
|
|
|
|
return (
|
|
|
|
"{parent_name}.[{obj.metric_name}](id:{obj.id})"
|
|
|
|
).format(obj=self,
|
2016-06-16 11:55:11 -04:00
|
|
|
parent_name=self.datasource.full_name
|
|
|
|
) if self.datasource else None
|
|
|
|
|
2016-03-18 02:44:58 -04:00
|
|
|
|
2016-03-24 17:50:19 -04:00
|
|
|
class DruidColumn(Model, AuditMixinNullable):
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
"""ORM model for storing Druid datasource column metadata"""
|
|
|
|
|
|
|
|
__tablename__ = 'columns'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
datasource_name = Column(
|
2016-07-22 12:46:41 -04:00
|
|
|
String(255),
|
2016-03-18 02:44:58 -04:00
|
|
|
ForeignKey('datasources.datasource_name'))
|
2016-03-25 16:44:35 -04:00
|
|
|
# Setting enable_typechecks=False disables polymorphic inheritance.
|
2016-03-25 16:49:45 -04:00
|
|
|
datasource = relationship('DruidDatasource', backref='columns',
|
|
|
|
enable_typechecks=False)
|
2016-05-12 13:27:38 -04:00
|
|
|
column_name = Column(String(255))
|
2016-03-18 02:44:58 -04:00
|
|
|
is_active = Column(Boolean, default=True)
|
|
|
|
type = Column(String(32))
|
|
|
|
groupby = Column(Boolean, default=False)
|
|
|
|
count_distinct = Column(Boolean, default=False)
|
|
|
|
sum = Column(Boolean, default=False)
|
|
|
|
max = Column(Boolean, default=False)
|
|
|
|
min = Column(Boolean, default=False)
|
|
|
|
filterable = Column(Boolean, default=False)
|
|
|
|
description = Column(Text)
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.column_name
|
|
|
|
|
|
|
|
@property
|
|
|
|
def isnum(self):
|
2016-06-24 01:43:52 -04:00
|
|
|
return self.type in ('LONG', 'DOUBLE', 'FLOAT', 'INT')
|
2016-03-18 02:44:58 -04:00
|
|
|
|
|
|
|
def generate_metrics(self):
|
|
|
|
"""Generate metrics based on the column metadata"""
|
2016-03-16 23:25:41 -04:00
|
|
|
M = DruidMetric # noqa
|
2016-03-18 02:44:58 -04:00
|
|
|
metrics = []
|
|
|
|
metrics.append(DruidMetric(
|
|
|
|
metric_name='count',
|
|
|
|
verbose_name='COUNT(*)',
|
|
|
|
metric_type='count',
|
|
|
|
json=json.dumps({'type': 'count', 'name': 'count'})
|
|
|
|
))
|
|
|
|
# Somehow we need to reassign this for UDAFs
|
|
|
|
if self.type in ('DOUBLE', 'FLOAT'):
|
|
|
|
corrected_type = 'DOUBLE'
|
|
|
|
else:
|
|
|
|
corrected_type = self.type
|
|
|
|
|
|
|
|
if self.sum and self.isnum:
|
|
|
|
mt = corrected_type.lower() + 'Sum'
|
|
|
|
name = 'sum__' + self.column_name
|
|
|
|
metrics.append(DruidMetric(
|
|
|
|
metric_name=name,
|
|
|
|
metric_type='sum',
|
|
|
|
verbose_name='SUM({})'.format(self.column_name),
|
|
|
|
json=json.dumps({
|
|
|
|
'type': mt, 'name': name, 'fieldName': self.column_name})
|
|
|
|
))
|
|
|
|
if self.min and self.isnum:
|
|
|
|
mt = corrected_type.lower() + 'Min'
|
|
|
|
name = 'min__' + self.column_name
|
|
|
|
metrics.append(DruidMetric(
|
|
|
|
metric_name=name,
|
|
|
|
metric_type='min',
|
|
|
|
verbose_name='MIN({})'.format(self.column_name),
|
|
|
|
json=json.dumps({
|
|
|
|
'type': mt, 'name': name, 'fieldName': self.column_name})
|
|
|
|
))
|
|
|
|
if self.max and self.isnum:
|
|
|
|
mt = corrected_type.lower() + 'Max'
|
|
|
|
name = 'max__' + self.column_name
|
|
|
|
metrics.append(DruidMetric(
|
|
|
|
metric_name=name,
|
|
|
|
metric_type='max',
|
|
|
|
verbose_name='MAX({})'.format(self.column_name),
|
|
|
|
json=json.dumps({
|
|
|
|
'type': mt, 'name': name, 'fieldName': self.column_name})
|
|
|
|
))
|
|
|
|
if self.count_distinct:
|
|
|
|
name = 'count_distinct__' + self.column_name
|
2016-06-13 23:49:51 -04:00
|
|
|
if self.type == 'hyperUnique' or self.type == 'thetaSketch':
|
|
|
|
metrics.append(DruidMetric(
|
|
|
|
metric_name=name,
|
|
|
|
verbose_name='COUNT(DISTINCT {})'.format(self.column_name),
|
|
|
|
metric_type=self.type,
|
|
|
|
json=json.dumps({
|
|
|
|
'type': self.type,
|
|
|
|
'name': name,
|
|
|
|
'fieldName': self.column_name
|
|
|
|
})
|
|
|
|
))
|
|
|
|
else:
|
|
|
|
mt = 'count_distinct'
|
|
|
|
metrics.append(DruidMetric(
|
|
|
|
metric_name=name,
|
|
|
|
verbose_name='COUNT(DISTINCT {})'.format(self.column_name),
|
|
|
|
metric_type='count_distinct',
|
|
|
|
json=json.dumps({
|
|
|
|
'type': 'cardinality',
|
|
|
|
'name': name,
|
|
|
|
'fieldNames': [self.column_name]})
|
|
|
|
))
|
2016-03-18 02:44:58 -04:00
|
|
|
session = get_session()
|
2016-06-10 18:49:33 -04:00
|
|
|
new_metrics = []
|
2016-03-18 02:44:58 -04:00
|
|
|
for metric in metrics:
|
|
|
|
m = (
|
|
|
|
session.query(M)
|
|
|
|
.filter(M.metric_name == metric.metric_name)
|
|
|
|
.filter(M.datasource_name == self.datasource_name)
|
|
|
|
.filter(DruidCluster.cluster_name == self.datasource.cluster_name)
|
|
|
|
.first()
|
|
|
|
)
|
|
|
|
metric.datasource_name = self.datasource_name
|
|
|
|
if not m:
|
2016-06-10 18:49:33 -04:00
|
|
|
new_metrics.append(metric)
|
2016-03-18 02:44:58 -04:00
|
|
|
session.add(metric)
|
2016-05-10 14:19:30 -04:00
|
|
|
session.flush()
|
2016-03-13 21:16:23 -04:00
|
|
|
|
2016-06-10 18:49:33 -04:00
|
|
|
utils.init_metrics_perm(caravel, new_metrics)
|
|
|
|
|
2016-03-13 21:16:23 -04:00
|
|
|
|
|
|
|
class FavStar(Model):
|
|
|
|
__tablename__ = 'favstar'
|
|
|
|
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
user_id = Column(Integer, ForeignKey('ab_user.id'))
|
|
|
|
class_name = Column(String(50))
|
|
|
|
obj_id = Column(Integer)
|
|
|
|
dttm = Column(DateTime, default=func.now())
|
2016-08-30 00:55:31 -04:00
|
|
|
|
|
|
|
|
|
|
|
class QueryStatus:
|
|
|
|
CANCELLED = 'cancelled'
|
|
|
|
FAILED = 'failed'
|
|
|
|
PENDING = 'pending'
|
|
|
|
RUNNING = 'running'
|
|
|
|
SCHEDULED = 'scheduled'
|
|
|
|
SUCCESS = 'success'
|
|
|
|
TIMED_OUT = 'timed_out'
|
|
|
|
|
|
|
|
|
|
|
|
class Query(Model):
|
|
|
|
|
|
|
|
"""ORM model for SQL query"""
|
|
|
|
|
|
|
|
__tablename__ = 'query'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
client_id = Column(String(11))
|
|
|
|
|
|
|
|
database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False)
|
|
|
|
|
|
|
|
# Store the tmp table into the DB only if the user asks for it.
|
|
|
|
tmp_table_name = Column(String(256))
|
|
|
|
user_id = Column(
|
|
|
|
Integer, ForeignKey('ab_user.id'), nullable=True)
|
|
|
|
status = Column(String(16), default=QueryStatus.PENDING)
|
|
|
|
name = Column(String(256))
|
|
|
|
tab_name = Column(String(256))
|
|
|
|
sql_editor_id = Column(String(256))
|
|
|
|
schema = Column(String(256))
|
|
|
|
sql = Column(Text)
|
|
|
|
# Query to retrieve the results,
|
|
|
|
# used only in case of select_as_cta_used is true.
|
|
|
|
select_sql = Column(Text)
|
|
|
|
executed_sql = Column(Text)
|
|
|
|
# Could be configured in the caravel config.
|
|
|
|
limit = Column(Integer)
|
|
|
|
limit_used = Column(Boolean, default=False)
|
|
|
|
select_as_cta = Column(Boolean)
|
|
|
|
select_as_cta_used = Column(Boolean, default=False)
|
|
|
|
|
|
|
|
progress = Column(Integer, default=0) # 1..100
|
|
|
|
# # of rows in the result set or rows modified.
|
|
|
|
rows = Column(Integer)
|
|
|
|
error_message = Column(Text)
|
|
|
|
|
|
|
|
# Using Numeric in place of DateTime for sub-second precision
|
|
|
|
# stored as seconds since epoch, allowing for milliseconds
|
|
|
|
start_time = Column(Numeric(precision=3))
|
|
|
|
end_time = Column(Numeric(precision=3))
|
|
|
|
changed_on = Column(
|
|
|
|
DateTime, default=datetime.now, onupdate=datetime.now, nullable=True)
|
|
|
|
|
|
|
|
database = relationship(
|
|
|
|
'Database', foreign_keys=[database_id], backref='queries')
|
|
|
|
|
|
|
|
__table_args__ = (
|
|
|
|
sqla.Index('ti_user_id_changed_on', user_id, changed_on),
|
|
|
|
)
|
|
|
|
|
|
|
|
def to_dict(self):
|
|
|
|
return {
|
|
|
|
'changedOn': self.changed_on,
|
|
|
|
'dbId': self.database_id,
|
|
|
|
'endDttm': self.end_time,
|
|
|
|
'errorMessage': self.error_message,
|
|
|
|
'executedSql': self.executed_sql,
|
|
|
|
'id': self.client_id,
|
|
|
|
'limit': self.limit,
|
|
|
|
'progress': self.progress,
|
|
|
|
'rows': self.rows,
|
|
|
|
'schema': self.schema,
|
|
|
|
'ctas': self.select_as_cta,
|
|
|
|
'serverId': self.id,
|
|
|
|
'sql': self.sql,
|
|
|
|
'sqlEditorId': self.sql_editor_id,
|
|
|
|
'startDttm': self.start_time,
|
|
|
|
'state': self.status.lower(),
|
|
|
|
'tab': self.tab_name,
|
|
|
|
'tempTable': self.tmp_table_name,
|
|
|
|
'userId': self.user_id,
|
|
|
|
}
|