diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 37600b861c..692a39587c 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -364,6 +364,7 @@ class PrestoEngineSpec(BaseEngineSpec): @classmethod def handle_cursor(cls, cursor, query, session): """Updates progress information""" + logging.info('Polling the cursor for progress') polled = cursor.poll() # poll returns dict -- JSON status information or ``None`` # if the query is done @@ -383,10 +384,14 @@ class PrestoEngineSpec(BaseEngineSpec): total_splits = float(stats.get('totalSplits')) if total_splits and completed_splits: progress = 100 * (completed_splits / total_splits) + logging.info( + 'Query progress: {} / {} ' + 'splits'.format(completed_splits, total_splits)) if progress > query.progress: query.progress = progress session.commit() time.sleep(1) + logging.info('Polling the cursor for progress') polled = cursor.poll() @classmethod diff --git a/superset/migrations/versions/a6c18f869a4e_query_start_running_time.py b/superset/migrations/versions/a6c18f869a4e_query_start_running_time.py new file mode 100644 index 0000000000..3be8ae06b2 --- /dev/null +++ b/superset/migrations/versions/a6c18f869a4e_query_start_running_time.py @@ -0,0 +1,26 @@ +"""query.start_running_time + +Revision ID: a6c18f869a4e +Revises: 979c03af3341 +Create Date: 2017-03-28 11:28:41.387182 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'a6c18f869a4e' +down_revision = '979c03af3341' + + + +def upgrade(): + op.add_column( + 'query', + sa.Column('start_running_time', + sa.Numeric(precision=20, scale=6), + nullable=True)) + + +def downgrade(): + op.drop_column('query', 'start_running_time') diff --git a/superset/models/core.py b/superset/models/core.py index c31267f577..5889c7576f 100644 --- a/superset/models/core.py +++ b/superset/models/core.py @@ -830,6 +830,7 @@ class Query(Model): # Using Numeric in place of DateTime for sub-second precision # stored as seconds since epoch, allowing for milliseconds start_time = Column(Numeric(precision=3)) + start_running_time = Column(Numeric(precision=3)) end_time = Column(Numeric(precision=3)) changed_on = Column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=True) diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 29cfc03609..d4ff8873a2 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -104,12 +104,17 @@ def get_sql_results(self, query_id, return_results=True, store_results=False): handle_error(msg) query.executed_sql = executed_sql - logging.info("Running query: \n{}".format(executed_sql)) + query.status = QueryStatus.RUNNING + query.start_running_time = utils.now_as_float() + session.merge(query) + session.commit() + logging.info("Set query to 'running'") + engine = database.get_sqla_engine(schema=query.schema) conn = engine.raw_connection() cursor = conn.cursor() + logging.info("Running query: \n{}".format(executed_sql)) try: - print(query.executed_sql) logging.info(query.executed_sql) cursor.execute( query.executed_sql, **db_engine_spec.cursor_execute_kwargs) @@ -118,8 +123,6 @@ def get_sql_results(self, query_id, return_results=True, store_results=False): conn.close() handle_error(db_engine_spec.extract_error_message(e)) - query.status = QueryStatus.RUNNING - session.flush() try: logging.info("Handling cursor") db_engine_spec.handle_cursor(cursor, query, session) @@ -156,6 +159,7 @@ def get_sql_results(self, query_id, return_results=True, store_results=False): schema=database.force_ctas_schema )) query.end_time = utils.now_as_float() + session.merge(query) session.flush() payload = { @@ -173,7 +177,7 @@ def get_sql_results(self, query_id, return_results=True, store_results=False): results_backend.set(key, zlib.compress(payload)) query.results_key = key - session.flush() + session.merge(query) session.commit() if return_results: