2016-08-30 00:55:31 -04:00
|
|
|
import celery
|
|
|
|
from datetime import datetime
|
|
|
|
import pandas as pd
|
|
|
|
import logging
|
2016-09-23 14:14:38 -04:00
|
|
|
import numpy
|
2016-08-30 00:55:31 -04:00
|
|
|
import time
|
2016-09-19 18:28:10 -04:00
|
|
|
|
2016-09-23 14:14:38 -04:00
|
|
|
from caravel import app, db, models, utils, dataframe
|
2016-08-30 00:55:31 -04:00
|
|
|
|
|
|
|
QueryStatus = models.QueryStatus
|
|
|
|
|
|
|
|
|
|
|
|
celery_app = celery.Celery(config_source=app.config.get('CELERY_CONFIG'))
|
|
|
|
|
|
|
|
|
|
|
|
def is_query_select(sql):
|
|
|
|
return sql.upper().startswith('SELECT')
|
|
|
|
|
2016-09-11 10:39:07 -04:00
|
|
|
def create_table_as(sql, table_name, schema=None, override=False):
|
2016-08-30 00:55:31 -04:00
|
|
|
"""Reformats the query into the create table as query.
|
|
|
|
|
|
|
|
Works only for the single select SQL statements, in all other cases
|
|
|
|
the sql query is not modified.
|
|
|
|
:param sql: string, sql query that will be executed
|
|
|
|
:param table_name: string, will contain the results of the query execution
|
|
|
|
:param override, boolean, table table_name will be dropped if true
|
|
|
|
:return: string, create table as query
|
|
|
|
"""
|
|
|
|
# TODO(bkyryliuk): enforce that all the columns have names. Presto requires it
|
|
|
|
# for the CTA operation.
|
|
|
|
# TODO(bkyryliuk): drop table if allowed, check the namespace and
|
|
|
|
# the permissions.
|
|
|
|
# TODO raise if multi-statement
|
2016-09-11 10:39:07 -04:00
|
|
|
if schema:
|
|
|
|
table_name = schema + '.' + table_name
|
2016-08-30 00:55:31 -04:00
|
|
|
exec_sql = ''
|
|
|
|
if is_query_select(sql):
|
|
|
|
if override:
|
2016-09-11 10:39:07 -04:00
|
|
|
exec_sql = 'DROP TABLE IF EXISTS {table_name};\n'
|
2016-08-30 00:55:31 -04:00
|
|
|
exec_sql += "CREATE TABLE {table_name} AS \n{sql}"
|
|
|
|
else:
|
|
|
|
raise Exception("Could not generate CREATE TABLE statement")
|
|
|
|
return exec_sql.format(**locals())
|
|
|
|
|
|
|
|
|
|
|
|
@celery_app.task
|
|
|
|
def get_sql_results(query_id, return_results=True):
|
|
|
|
"""Executes the sql query returns the results."""
|
2016-09-19 18:28:10 -04:00
|
|
|
session = db.session()
|
|
|
|
session.commit() # HACK
|
|
|
|
query = session.query(models.Query).filter_by(id=query_id).one()
|
2016-08-30 00:55:31 -04:00
|
|
|
database = query.database
|
|
|
|
executed_sql = query.sql.strip().strip(';')
|
|
|
|
|
2016-09-19 18:28:10 -04:00
|
|
|
|
|
|
|
def handle_error(msg):
|
|
|
|
"""Local method handling error while processing the SQL"""
|
|
|
|
query.error_message = msg
|
|
|
|
query.status = QueryStatus.FAILED
|
|
|
|
query.tmp_table_name = None
|
|
|
|
session.commit()
|
|
|
|
raise Exception(query.error_message)
|
|
|
|
|
2016-08-30 00:55:31 -04:00
|
|
|
# Limit enforced only for retrieving the data, not for the CTA queries.
|
2016-09-19 18:28:10 -04:00
|
|
|
is_select = is_query_select(executed_sql);
|
|
|
|
if not is_select and not database.allow_dml:
|
|
|
|
handle_error(
|
|
|
|
"Only `SELECT` statements are allowed against this database")
|
|
|
|
if query.select_as_cta:
|
|
|
|
if not is_select:
|
|
|
|
handle_error(
|
|
|
|
"Only `SELECT` statements can be used with the CREATE TABLE "
|
|
|
|
"feature.")
|
|
|
|
if not query.tmp_table_name:
|
|
|
|
start_dttm = datetime.fromtimestamp(query.start_time)
|
|
|
|
query.tmp_table_name = 'tmp_{}_table_{}'.format(
|
|
|
|
query.user_id,
|
|
|
|
start_dttm.strftime('%Y_%m_%d_%H_%M_%S'))
|
|
|
|
executed_sql = create_table_as(
|
|
|
|
executed_sql, query.tmp_table_name, database.force_ctas_schema)
|
|
|
|
query.select_as_cta_used = True
|
|
|
|
elif query.limit and is_select:
|
|
|
|
executed_sql = database.wrap_sql_limit(executed_sql, query.limit)
|
|
|
|
query.limit_used = True
|
|
|
|
engine = database.get_sqla_engine(schema=query.schema)
|
|
|
|
try:
|
|
|
|
query.executed_sql = executed_sql
|
|
|
|
logging.info("Running query: \n{}".format(executed_sql))
|
|
|
|
result_proxy = engine.execute(query.executed_sql, schema=query.schema)
|
|
|
|
except Exception as e:
|
|
|
|
logging.exception(e)
|
|
|
|
handle_error(utils.error_msg_from_exception(e))
|
|
|
|
|
|
|
|
cursor = result_proxy.cursor
|
|
|
|
query.status = QueryStatus.RUNNING
|
|
|
|
session.flush()
|
|
|
|
if database.backend == 'presto':
|
|
|
|
polled = cursor.poll()
|
|
|
|
# poll returns dict -- JSON status information or ``None``
|
|
|
|
# if the query is done
|
|
|
|
# https://github.com/dropbox/PyHive/blob/
|
|
|
|
# b34bdbf51378b3979eaf5eca9e956f06ddc36ca0/pyhive/presto.py#L178
|
|
|
|
while polled:
|
|
|
|
# Update the object and wait for the kill signal.
|
|
|
|
stats = polled.get('stats', {})
|
|
|
|
if stats:
|
|
|
|
completed_splits = float(stats.get('completedSplits'))
|
|
|
|
total_splits = float(stats.get('totalSplits'))
|
|
|
|
if total_splits and completed_splits:
|
|
|
|
progress = 100 * (completed_splits / total_splits)
|
|
|
|
if progress > query.progress:
|
|
|
|
query.progress = progress
|
|
|
|
session.commit()
|
|
|
|
time.sleep(1)
|
2016-08-30 00:55:31 -04:00
|
|
|
polled = cursor.poll()
|
2016-09-19 18:28:10 -04:00
|
|
|
|
2016-09-23 14:14:38 -04:00
|
|
|
cdf = None
|
2016-09-19 18:28:10 -04:00
|
|
|
if result_proxy.cursor:
|
2016-09-23 14:14:38 -04:00
|
|
|
column_names = [col[0] for col in result_proxy.cursor.description]
|
2016-09-19 18:28:10 -04:00
|
|
|
data = result_proxy.fetchall()
|
2016-09-23 14:14:38 -04:00
|
|
|
cdf = dataframe.CaravelDataFrame(
|
|
|
|
pd.DataFrame(data, columns=column_names))
|
|
|
|
# TODO consider generating tuples instead of dicts to send
|
|
|
|
# less data through the wire. The command bellow does that,
|
|
|
|
# but we'd need to align on the client side.
|
|
|
|
# data = df.values.tolist()
|
2016-09-19 18:28:10 -04:00
|
|
|
|
|
|
|
query.rows = result_proxy.rowcount
|
|
|
|
query.progress = 100
|
|
|
|
query.status = QueryStatus.SUCCESS
|
2016-09-23 14:14:38 -04:00
|
|
|
if query.rows == -1 and cdf:
|
2016-09-19 18:28:10 -04:00
|
|
|
# Presto doesn't provide result_proxy.row_count
|
2016-09-23 14:14:38 -04:00
|
|
|
query.rows = cdf.size
|
2016-09-19 18:28:10 -04:00
|
|
|
if query.select_as_cta:
|
|
|
|
query.select_sql = '{}'.format(database.select_star(
|
|
|
|
query.tmp_table_name, limit=query.limit))
|
|
|
|
query.end_time = utils.now_as_float()
|
|
|
|
session.commit()
|
2016-08-30 00:55:31 -04:00
|
|
|
|
|
|
|
if return_results:
|
2016-09-23 14:14:38 -04:00
|
|
|
payload = {
|
|
|
|
'query_id': query.id,
|
|
|
|
'status': query.status,
|
|
|
|
'data': [],
|
|
|
|
}
|
|
|
|
if query.status == models.QueryStatus.SUCCESS:
|
|
|
|
payload['data'] = cdf.data if cdf else []
|
|
|
|
payload['columns'] = cdf.columns_dict if cdf else []
|
|
|
|
else:
|
|
|
|
payload['error'] = query.error_message
|
2016-08-30 00:55:31 -04:00
|
|
|
return payload
|
2016-09-19 18:28:10 -04:00
|
|
|
'''
|
|
|
|
# Hack testing using a kv store for results
|
|
|
|
key = "query_id={}".format(query.id)
|
|
|
|
logging.info("Storing results in key=[{}]".format(key))
|
|
|
|
cache.set(key, json.dumps(payload, default=utils.json_iso_dttm_ser))
|
|
|
|
'''
|