mirror of https://github.com/apache/superset.git
More improvements to SQL Lab (#1104)
* Handling timeouts * Fixing timer on non-utc server * Allowing async with results * [bugfix] database is not selected * Making sure the session is up and running * Cleaning up query results and query objects * Picking a groupby and metric field on visualize flow * Showing local time in query history * Using pull-left pull-right instead of grid layout for table metdata Long column name were looking weird and icons were wrapping oddly * Linting * Eliminating east buttons under the sql editor * Sort database dropdown by name * Linting * Allowing non-SELECT statements to run * Adding a db config * Making sqla checkout check cross-db
This commit is contained in:
parent
8081080709
commit
e8088d5c9a
|
@ -31,3 +31,4 @@ exclude_paths:
|
|||
- "caravel/assets/vendor/"
|
||||
- "caravel/assets/node_modules/"
|
||||
- "caravel/assets/javascripts/dist/"
|
||||
- "caravel/migrations"
|
||||
|
|
|
@ -10,6 +10,7 @@ from logging.handlers import TimedRotatingFileHandler
|
|||
|
||||
from flask import Flask, redirect
|
||||
from flask_appbuilder import SQLA, AppBuilder, IndexView
|
||||
from sqlalchemy import event, exc
|
||||
from flask_appbuilder.baseviews import expose
|
||||
from flask_cache import Cache
|
||||
from flask_migrate import Migrate
|
||||
|
@ -27,6 +28,35 @@ if not app.debug:
|
|||
|
||||
db = SQLA(app)
|
||||
|
||||
|
||||
@event.listens_for(db.engine, 'checkout')
|
||||
def checkout(dbapi_con, con_record, con_proxy):
|
||||
"""
|
||||
Making sure the connection is live, and preventing against:
|
||||
'MySQL server has gone away'
|
||||
|
||||
Copied from:
|
||||
http://stackoverflow.com/questions/30630120/mysql-keeps-losing-connection-during-celery-tasks
|
||||
"""
|
||||
try:
|
||||
try:
|
||||
if hasattr(dbapi_con, 'ping'):
|
||||
dbapi_con.ping(False)
|
||||
else:
|
||||
cursor = dbapi_con.cursor()
|
||||
cursor.execute("SELECT 1")
|
||||
except TypeError:
|
||||
app.logger.debug('MySQL connection died. Restoring...')
|
||||
dbapi_con.ping()
|
||||
except dbapi_con.OperationalError as e:
|
||||
app.logger.warning(e)
|
||||
if e.args[0] in (2006, 2013, 2014, 2045, 2055):
|
||||
raise exc.DisconnectionError()
|
||||
else:
|
||||
raise
|
||||
return db
|
||||
|
||||
|
||||
cache = Cache(app, config=app.config.get('CACHE_CONFIG'))
|
||||
|
||||
migrate = Migrate(app, db, directory=APP_DIR + "/migrations")
|
||||
|
|
|
@ -41,7 +41,7 @@ class QueryTable extends React.Component {
|
|||
if (q.endDttm) {
|
||||
q.duration = fDuration(q.startDttm, q.endDttm);
|
||||
}
|
||||
q.started = moment.utc(q.startDttm).format('HH:mm:ss');
|
||||
q.started = moment(q.startDttm).format('HH:mm:ss');
|
||||
const source = (q.ctas) ? q.executedSql : q.sql;
|
||||
q.sql = (
|
||||
<SqlShrink sql={source} />
|
||||
|
|
|
@ -60,7 +60,7 @@ class ResultSet extends React.Component {
|
|||
</div>
|
||||
);
|
||||
}
|
||||
if (results && results.data.length > 0) {
|
||||
if (results && results.data && results.data.length > 0) {
|
||||
return (
|
||||
<div>
|
||||
<VisualizeModal
|
||||
|
|
|
@ -9,9 +9,7 @@ import {
|
|||
InputGroup,
|
||||
Form,
|
||||
FormControl,
|
||||
DropdownButton,
|
||||
Label,
|
||||
MenuItem,
|
||||
OverlayTrigger,
|
||||
Row,
|
||||
Tooltip,
|
||||
|
@ -27,7 +25,6 @@ import { connect } from 'react-redux';
|
|||
import * as Actions from '../actions';
|
||||
|
||||
import shortid from 'shortid';
|
||||
import ButtonWithTooltip from './ButtonWithTooltip';
|
||||
import SouthPane from './SouthPane';
|
||||
import Timer from './Timer';
|
||||
|
||||
|
@ -52,8 +49,8 @@ class SqlEditor extends React.Component {
|
|||
this.startQuery();
|
||||
}
|
||||
}
|
||||
runQuery() {
|
||||
this.startQuery();
|
||||
runQuery(runAsync = false) {
|
||||
this.startQuery(runAsync);
|
||||
}
|
||||
startQuery(runAsync = false, ctas = false) {
|
||||
const that = this;
|
||||
|
@ -76,10 +73,10 @@ class SqlEditor extends React.Component {
|
|||
|
||||
const sqlJsonUrl = '/caravel/sql_json/';
|
||||
const sqlJsonRequest = {
|
||||
async: runAsync,
|
||||
client_id: query.id,
|
||||
database_id: this.props.queryEditor.dbId,
|
||||
json: true,
|
||||
runAsync,
|
||||
schema: this.props.queryEditor.schema,
|
||||
select_as_cta: ctas,
|
||||
sql: this.props.queryEditor.sql,
|
||||
|
@ -149,17 +146,36 @@ class SqlEditor extends React.Component {
|
|||
}
|
||||
|
||||
render() {
|
||||
let runButtons = (
|
||||
<ButtonGroup bsSize="small" className="inline m-r-5 pull-left">
|
||||
let runButtons = [];
|
||||
if (this.props.database && this.props.database.allow_run_sync) {
|
||||
runButtons.push(
|
||||
<Button
|
||||
bsSize="small"
|
||||
bsStyle="primary"
|
||||
style={{ width: '100px' }}
|
||||
onClick={this.runQuery.bind(this)}
|
||||
onClick={this.runQuery.bind(this, false)}
|
||||
disabled={!(this.props.queryEditor.dbId)}
|
||||
>
|
||||
<i className="fa fa-table" /> Run Query
|
||||
</Button>
|
||||
);
|
||||
}
|
||||
if (this.props.database && this.props.database.allow_run_async) {
|
||||
runButtons.push(
|
||||
<Button
|
||||
bsSize="small"
|
||||
bsStyle="primary"
|
||||
style={{ width: '100px' }}
|
||||
onClick={this.runQuery.bind(this, true)}
|
||||
disabled={!(this.props.queryEditor.dbId)}
|
||||
>
|
||||
<i className="fa fa-table" /> Run Async
|
||||
</Button>
|
||||
);
|
||||
}
|
||||
runButtons = (
|
||||
<ButtonGroup bsSize="small" className="inline m-r-5 pull-left">
|
||||
{runButtons}
|
||||
</ButtonGroup>
|
||||
);
|
||||
if (this.props.latestQuery && this.props.latestQuery.state === 'running') {
|
||||
|
@ -176,35 +192,6 @@ class SqlEditor extends React.Component {
|
|||
</ButtonGroup>
|
||||
);
|
||||
}
|
||||
const rightButtons = (
|
||||
<ButtonGroup className="inlineblock">
|
||||
<ButtonWithTooltip
|
||||
tooltip="Save this query in your workspace"
|
||||
placement="left"
|
||||
bsSize="small"
|
||||
onClick={this.addWorkspaceQuery.bind(this)}
|
||||
>
|
||||
<i className="fa fa-save" />
|
||||
</ButtonWithTooltip>
|
||||
<DropdownButton
|
||||
id="ddbtn-export"
|
||||
pullRight
|
||||
bsSize="small"
|
||||
title={<i className="fa fa-file-o" />}
|
||||
>
|
||||
<MenuItem
|
||||
onClick={this.notImplemented}
|
||||
>
|
||||
<i className="fa fa-file-text-o" /> export to .csv
|
||||
</MenuItem>
|
||||
<MenuItem
|
||||
onClick={this.notImplemented}
|
||||
>
|
||||
<i className="fa fa-file-code-o" /> export to .json
|
||||
</MenuItem>
|
||||
</DropdownButton>
|
||||
</ButtonGroup>
|
||||
);
|
||||
let limitWarning = null;
|
||||
const rowLimit = 1000;
|
||||
if (this.props.latestQuery && this.props.latestQuery.rows === rowLimit) {
|
||||
|
@ -256,7 +243,6 @@ class SqlEditor extends React.Component {
|
|||
<div className="pull-right">
|
||||
{limitWarning}
|
||||
<Timer query={this.props.latestQuery} />
|
||||
{rightButtons}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
|
|
|
@ -77,7 +77,12 @@ class SqlEditorTopToolbar extends React.Component {
|
|||
}
|
||||
fetchDatabaseOptions() {
|
||||
this.setState({ databaseLoading: true });
|
||||
const url = '/databaseasync/api/read?_flt_0_expose_in_sqllab=1';
|
||||
const url = (
|
||||
'/databaseasync/api/read?' +
|
||||
'_flt_0_expose_in_sqllab=1&' +
|
||||
'_oc_DatabaseAsync=database_name&' +
|
||||
'_od_DatabaseAsync=asc'
|
||||
);
|
||||
$.get(url, (data) => {
|
||||
const options = data.result.map((db) => ({ value: db.id, label: db.database_name }));
|
||||
this.props.actions.setDatabases(data.result);
|
||||
|
|
|
@ -63,12 +63,12 @@ class TableElement extends React.Component {
|
|||
metadata = (
|
||||
<div>
|
||||
{this.props.table.columns.map((col) => (
|
||||
<div className="row">
|
||||
<div className="col-sm-8">
|
||||
<div className="m-l-5">{col.name}</div>
|
||||
<div className="clearfix">
|
||||
<div className="pull-left m-l-10">
|
||||
{col.name}
|
||||
</div>
|
||||
<div className="col-sm-4">
|
||||
<div className="pull-right text-muted"><small>{col.type}</small></div>
|
||||
<div className="pull-right text-muted">
|
||||
<small> {col.type}</small>
|
||||
</div>
|
||||
</div>
|
||||
))}
|
||||
|
@ -88,11 +88,11 @@ class TableElement extends React.Component {
|
|||
}
|
||||
return (
|
||||
<div>
|
||||
<div className="row">
|
||||
<div className="col-sm-9 m-b-10">
|
||||
<div className="clearfix">
|
||||
<div className="pull-left">
|
||||
{buttonToggle}
|
||||
</div>
|
||||
<div className="col-sm-3">
|
||||
<div className="pull-right">
|
||||
<ButtonGroup className="ws-el-controls pull-right">
|
||||
<Link
|
||||
className="fa fa-pencil pull-left m-l-2"
|
||||
|
|
|
@ -27,8 +27,8 @@ class Timer extends React.Component {
|
|||
}
|
||||
stopwatch() {
|
||||
if (this.props && this.props.query) {
|
||||
const since = this.props.query.endDttm || now();
|
||||
const clockStr = fDuration(this.props.query.startDttm, since);
|
||||
const endDttm = this.props.query.endDttm || now();
|
||||
const clockStr = fDuration(this.props.query.startDttm, endDttm);
|
||||
this.setState({ clockStr });
|
||||
if (this.props.query.state !== 'running') {
|
||||
this.stopTimer();
|
||||
|
|
|
@ -239,6 +239,3 @@ div.tablePopover:hover {
|
|||
.SouthPane .tab-content {
|
||||
padding-top: 10px;
|
||||
}
|
||||
.SqlEditor textarea {
|
||||
display: none;
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ function addToObject(state, arrKey, obj) {
|
|||
|
||||
function alterInObject(state, arrKey, obj, alterations) {
|
||||
const newObject = Object.assign({}, state[arrKey]);
|
||||
newObject[obj.id] = (Object.assign({}, newObject[obj.id], alterations));
|
||||
newObject[obj.id] = Object.assign({}, newObject[obj.id], alterations);
|
||||
return Object.assign({}, state, { [arrKey]: newObject });
|
||||
}
|
||||
|
||||
|
@ -65,6 +65,16 @@ function removeFromArr(state, arrKey, obj, idKey = 'id') {
|
|||
return Object.assign({}, state, { [arrKey]: newArr });
|
||||
}
|
||||
|
||||
function getFromArr(arr, id) {
|
||||
let obj;
|
||||
arr.forEach((o) => {
|
||||
if (o.id === id) {
|
||||
obj = o;
|
||||
}
|
||||
});
|
||||
return obj;
|
||||
}
|
||||
|
||||
function addToArr(state, arrKey, obj) {
|
||||
const newObj = Object.assign({}, obj);
|
||||
if (!newObj.id) {
|
||||
|
@ -87,9 +97,16 @@ export const sqlLabReducer = function (state, action) {
|
|||
let newState = removeFromArr(state, 'queryEditors', action.queryEditor);
|
||||
// List of remaining queryEditor ids
|
||||
const qeIds = newState.queryEditors.map((qe) => qe.id);
|
||||
let th = state.tabHistory.slice();
|
||||
th = th.filter((id) => qeIds.includes(id));
|
||||
newState = Object.assign({}, newState, { tabHistory: th });
|
||||
const queries = {};
|
||||
Object.keys(state.queries).forEach((k) => {
|
||||
const query = state.queries[k];
|
||||
if (qeIds.includes(query.sqlEditorId)) {
|
||||
queries[k] = query;
|
||||
}
|
||||
});
|
||||
let tabHistory = state.tabHistory.slice();
|
||||
tabHistory = tabHistory.filter((id) => qeIds.includes(id));
|
||||
newState = Object.assign({}, newState, { tabHistory, queries });
|
||||
return newState;
|
||||
},
|
||||
[actions.REMOVE_QUERY]() {
|
||||
|
@ -113,7 +130,14 @@ export const sqlLabReducer = function (state, action) {
|
|||
return removeFromArr(state, 'tables', action.table);
|
||||
},
|
||||
[actions.START_QUERY]() {
|
||||
const newState = addToObject(state, 'queries', action.query);
|
||||
const qe = getFromArr(state.queryEditors, action.query.sqlEditorId);
|
||||
let newState = Object.assign({}, state);
|
||||
if (qe.latestQueryId) {
|
||||
const q = Object.assign({}, state.queries[qe.latestQueryId], { results: null });
|
||||
const queries = Object.assign({}, state.queries, { [q.id]: q });
|
||||
newState = Object.assign({}, state, { queries });
|
||||
}
|
||||
newState = addToObject(newState, 'queries', action.query);
|
||||
const sqlEditor = { id: action.query.sqlEditorId };
|
||||
return alterInArr(newState, 'queryEditors', sqlEditor, { latestQueryId: action.query.id });
|
||||
},
|
||||
|
@ -121,12 +145,16 @@ export const sqlLabReducer = function (state, action) {
|
|||
return alterInObject(state, 'queries', action.query, { state: 'stopped' });
|
||||
},
|
||||
[actions.QUERY_SUCCESS]() {
|
||||
let rows;
|
||||
if (action.results.data) {
|
||||
rows = action.results.data.length;
|
||||
}
|
||||
const alts = {
|
||||
state: 'success',
|
||||
results: action.results,
|
||||
rows: action.results.data.length,
|
||||
progress: 100,
|
||||
endDttm: now(),
|
||||
progress: 100,
|
||||
results: action.results,
|
||||
rows,
|
||||
state: 'success',
|
||||
};
|
||||
return alterInObject(state, 'queries', action.query, alts);
|
||||
},
|
||||
|
@ -158,12 +186,6 @@ export const sqlLabReducer = function (state, action) {
|
|||
[actions.QUERY_EDITOR_SET_AUTORUN]() {
|
||||
return alterInArr(state, 'queryEditors', action.queryEditor, { autorun: action.autorun });
|
||||
},
|
||||
[actions.ADD_WORKSPACE_QUERY]() {
|
||||
return addToArr(state, 'workspaceQueries', action.query);
|
||||
},
|
||||
[actions.REMOVE_WORKSPACE_QUERY]() {
|
||||
return removeFromArr(state, 'workspaceQueries', action.query);
|
||||
},
|
||||
[actions.ADD_ALERT]() {
|
||||
return addToArr(state, 'alerts', action.alert);
|
||||
},
|
||||
|
|
|
@ -8,7 +8,6 @@ g.caravel path {
|
|||
}
|
||||
|
||||
text.nv-axislabel {
|
||||
// font-weight: bold;
|
||||
font-size: 14px;
|
||||
}
|
||||
|
||||
|
|
|
@ -213,6 +213,9 @@ SQL_CELERY_RESULTS_DB_FILE_PATH = os.path.join(DATA_DIR, 'celery_results.sqlite'
|
|||
# The db id here results in selecting this one as a default in SQL Lab
|
||||
DEFAULT_DB_ID = None
|
||||
|
||||
# Timeout duration for SQL Lab synchronous queries
|
||||
SQLLAB_TIMEOUT = 30
|
||||
|
||||
try:
|
||||
from caravel_config import * # noqa
|
||||
except ImportError:
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
"""allow_run_sync_async
|
||||
|
||||
Revision ID: 4500485bde7d
|
||||
Revises: 41f6a59a61f2
|
||||
Create Date: 2016-09-12 23:33:14.789632
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '4500485bde7d'
|
||||
down_revision = '41f6a59a61f2'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('dbs', sa.Column('allow_run_async', sa.Boolean(), nullable=True))
|
||||
op.add_column('dbs', sa.Column('allow_run_sync', sa.Boolean(), nullable=True))
|
||||
|
||||
|
||||
def downgrade():
|
||||
try:
|
||||
op.drop_column('dbs', 'allow_run_sync')
|
||||
op.drop_column('dbs', 'allow_run_async')
|
||||
except Exception:
|
||||
pass
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
"""allow_dml
|
||||
|
||||
Revision ID: 65903709c321
|
||||
Revises: 4500485bde7d
|
||||
Create Date: 2016-09-15 08:48:27.284752
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '65903709c321'
|
||||
down_revision = '4500485bde7d'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('dbs', sa.Column('allow_dml', sa.Boolean(), nullable=True))
|
||||
|
||||
|
||||
def downgrade():
|
||||
try:
|
||||
op.drop_column('dbs', 'allow_dml')
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
pass
|
|
@ -416,7 +416,10 @@ class Database(Model, AuditMixinNullable):
|
|||
cache_timeout = Column(Integer)
|
||||
select_as_create_table_as = Column(Boolean, default=False)
|
||||
expose_in_sqllab = Column(Boolean, default=False)
|
||||
allow_run_sync = Column(Boolean, default=True)
|
||||
allow_run_async = Column(Boolean, default=False)
|
||||
allow_ctas = Column(Boolean, default=False)
|
||||
allow_dml = Column(Boolean, default=False)
|
||||
force_ctas_schema = Column(String(250))
|
||||
extra = Column(Text, default=textwrap.dedent("""\
|
||||
{
|
||||
|
|
|
@ -2,8 +2,10 @@ import celery
|
|||
from datetime import datetime
|
||||
import pandas as pd
|
||||
import logging
|
||||
from caravel import app, db, models, utils
|
||||
import time
|
||||
import json
|
||||
|
||||
from caravel import app, cache, db, models, utils
|
||||
|
||||
QueryStatus = models.QueryStatus
|
||||
|
||||
|
@ -44,92 +46,106 @@ def create_table_as(sql, table_name, schema=None, override=False):
|
|||
@celery_app.task
|
||||
def get_sql_results(query_id, return_results=True):
|
||||
"""Executes the sql query returns the results."""
|
||||
db.session.commit() # HACK
|
||||
query = db.session.query(models.Query).filter_by(id=query_id).one()
|
||||
session = db.session()
|
||||
session.commit() # HACK
|
||||
query = session.query(models.Query).filter_by(id=query_id).one()
|
||||
database = query.database
|
||||
executed_sql = query.sql.strip().strip(';')
|
||||
|
||||
|
||||
def handle_error(msg):
|
||||
"""Local method handling error while processing the SQL"""
|
||||
query.error_message = msg
|
||||
query.status = QueryStatus.FAILED
|
||||
query.tmp_table_name = None
|
||||
session.commit()
|
||||
raise Exception(query.error_message)
|
||||
|
||||
# Limit enforced only for retrieving the data, not for the CTA queries.
|
||||
if is_query_select(executed_sql):
|
||||
if query.select_as_cta:
|
||||
if not query.tmp_table_name:
|
||||
start_dttm = datetime.fromtimestamp(query.start_time)
|
||||
query.tmp_table_name = 'tmp_{}_table_{}'.format(
|
||||
query.user_id,
|
||||
start_dttm.strftime('%Y_%m_%d_%H_%M_%S'))
|
||||
executed_sql = create_table_as(
|
||||
executed_sql, query.tmp_table_name, database.force_ctas_schema)
|
||||
query.select_as_cta_used = True
|
||||
elif query.limit:
|
||||
executed_sql = database.wrap_sql_limit(executed_sql, query.limit)
|
||||
query.limit_used = True
|
||||
engine = database.get_sqla_engine(schema=query.schema)
|
||||
try:
|
||||
query.executed_sql = executed_sql
|
||||
logging.info("Running query: \n{}".format(executed_sql))
|
||||
result_proxy = engine.execute(query.executed_sql, schema=query.schema)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
query.error_message = utils.error_msg_from_exception(e)
|
||||
query.status = QueryStatus.FAILED
|
||||
query.tmp_table_name = None
|
||||
db.session.commit()
|
||||
raise Exception(query.error_message)
|
||||
is_select = is_query_select(executed_sql);
|
||||
if not is_select and not database.allow_dml:
|
||||
handle_error(
|
||||
"Only `SELECT` statements are allowed against this database")
|
||||
if query.select_as_cta:
|
||||
if not is_select:
|
||||
handle_error(
|
||||
"Only `SELECT` statements can be used with the CREATE TABLE "
|
||||
"feature.")
|
||||
if not query.tmp_table_name:
|
||||
start_dttm = datetime.fromtimestamp(query.start_time)
|
||||
query.tmp_table_name = 'tmp_{}_table_{}'.format(
|
||||
query.user_id,
|
||||
start_dttm.strftime('%Y_%m_%d_%H_%M_%S'))
|
||||
executed_sql = create_table_as(
|
||||
executed_sql, query.tmp_table_name, database.force_ctas_schema)
|
||||
query.select_as_cta_used = True
|
||||
elif query.limit and is_select:
|
||||
executed_sql = database.wrap_sql_limit(executed_sql, query.limit)
|
||||
query.limit_used = True
|
||||
engine = database.get_sqla_engine(schema=query.schema)
|
||||
try:
|
||||
query.executed_sql = executed_sql
|
||||
logging.info("Running query: \n{}".format(executed_sql))
|
||||
result_proxy = engine.execute(query.executed_sql, schema=query.schema)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
handle_error(utils.error_msg_from_exception(e))
|
||||
|
||||
cursor = result_proxy.cursor
|
||||
query.status = QueryStatus.RUNNING
|
||||
db.session.flush()
|
||||
if database.backend == 'presto':
|
||||
cursor = result_proxy.cursor
|
||||
query.status = QueryStatus.RUNNING
|
||||
session.flush()
|
||||
if database.backend == 'presto':
|
||||
polled = cursor.poll()
|
||||
# poll returns dict -- JSON status information or ``None``
|
||||
# if the query is done
|
||||
# https://github.com/dropbox/PyHive/blob/
|
||||
# b34bdbf51378b3979eaf5eca9e956f06ddc36ca0/pyhive/presto.py#L178
|
||||
while polled:
|
||||
# Update the object and wait for the kill signal.
|
||||
stats = polled.get('stats', {})
|
||||
if stats:
|
||||
completed_splits = float(stats.get('completedSplits'))
|
||||
total_splits = float(stats.get('totalSplits'))
|
||||
if total_splits and completed_splits:
|
||||
progress = 100 * (completed_splits / total_splits)
|
||||
if progress > query.progress:
|
||||
query.progress = progress
|
||||
session.commit()
|
||||
time.sleep(1)
|
||||
polled = cursor.poll()
|
||||
# poll returns dict -- JSON status information or ``None``
|
||||
# if the query is done
|
||||
# https://github.com/dropbox/PyHive/blob/
|
||||
# b34bdbf51378b3979eaf5eca9e956f06ddc36ca0/pyhive/presto.py#L178
|
||||
while polled:
|
||||
# Update the object and wait for the kill signal.
|
||||
stats = polled.get('stats', {})
|
||||
if stats:
|
||||
completed_splits = float(stats.get('completedSplits'))
|
||||
total_splits = float(stats.get('totalSplits'))
|
||||
if total_splits and completed_splits:
|
||||
progress = 100 * (completed_splits / total_splits)
|
||||
if progress > query.progress:
|
||||
query.progress = progress
|
||||
db.session.commit()
|
||||
time.sleep(1)
|
||||
polled = cursor.poll()
|
||||
|
||||
columns = None
|
||||
data = None
|
||||
if result_proxy.cursor:
|
||||
columns = [col[0] for col in result_proxy.cursor.description]
|
||||
data = result_proxy.fetchall()
|
||||
df = pd.DataFrame(data, columns=columns)
|
||||
df = df.where((pd.notnull(df)), None)
|
||||
# TODO consider generating tuples instead of dicts to send
|
||||
# less data through the wire. The command bellow does that,
|
||||
# but we'd need to align on the client side.
|
||||
# data = df.values.tolist()
|
||||
data = df.to_dict(orient='records')
|
||||
columns = None
|
||||
data = None
|
||||
if result_proxy.cursor:
|
||||
columns = [col[0] for col in result_proxy.cursor.description]
|
||||
data = result_proxy.fetchall()
|
||||
df = pd.DataFrame(data, columns=columns)
|
||||
df = df.where((pd.notnull(df)), None)
|
||||
# TODO consider generating tuples instead of dicts to send
|
||||
# less data through the wire. The command bellow does that,
|
||||
# but we'd need to align on the client side.
|
||||
# data = df.values.tolist()
|
||||
data = df.to_dict(orient='records')
|
||||
|
||||
query.rows = result_proxy.rowcount
|
||||
query.progress = 100
|
||||
query.status = QueryStatus.SUCCESS
|
||||
if query.rows == -1 and data:
|
||||
# Presto doesn't provide result_proxy.row_count
|
||||
query.rows = len(data)
|
||||
query.rows = result_proxy.rowcount
|
||||
query.progress = 100
|
||||
query.status = QueryStatus.SUCCESS
|
||||
if query.rows == -1 and data:
|
||||
# Presto doesn't provide result_proxy.row_count
|
||||
query.rows = len(data)
|
||||
|
||||
# CTAs queries result in 1 cell having the # of the added rows.
|
||||
if query.select_as_cta:
|
||||
query.select_sql = '{}'.format(database.select_star(
|
||||
query.tmp_table_name, limit=query.limit))
|
||||
# CTAs queries result in 1 cell having the # of the added rows.
|
||||
if query.select_as_cta:
|
||||
query.select_sql = '{}'.format(database.select_star(
|
||||
query.tmp_table_name, limit=query.limit))
|
||||
|
||||
query.end_time = utils.now_as_float()
|
||||
db.session.commit()
|
||||
query.end_time = utils.now_as_float()
|
||||
session.commit()
|
||||
|
||||
payload = {
|
||||
'query_id': query.id,
|
||||
'status': query.status,
|
||||
'data': [],
|
||||
}
|
||||
if query.status == models.QueryStatus.SUCCESS:
|
||||
payload['data'] = data
|
||||
|
@ -138,3 +154,9 @@ def get_sql_results(query_id, return_results=True):
|
|||
payload['error'] = query.error_message
|
||||
if return_results:
|
||||
return payload
|
||||
'''
|
||||
# Hack testing using a kv store for results
|
||||
key = "query_id={}".format(query.id)
|
||||
logging.info("Storing results in key=[{}]".format(key))
|
||||
cache.set(key, json.dumps(payload, default=utils.json_iso_dttm_ser))
|
||||
'''
|
||||
|
|
|
@ -4,13 +4,14 @@ from __future__ import division
|
|||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from builtins import object
|
||||
from datetime import date, datetime
|
||||
import decimal
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import numpy
|
||||
import time
|
||||
import signal
|
||||
import uuid
|
||||
|
||||
import parsedatetime
|
||||
|
@ -30,6 +31,10 @@ class CaravelException(Exception):
|
|||
pass
|
||||
|
||||
|
||||
class CaravelTimeoutException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class CaravelSecurityException(CaravelException):
|
||||
pass
|
||||
|
||||
|
@ -345,7 +350,7 @@ def datetime_to_epoch(dttm):
|
|||
|
||||
|
||||
def now_as_float():
|
||||
return datetime_to_epoch(datetime.now())
|
||||
return datetime_to_epoch(datetime.utcnow())
|
||||
|
||||
|
||||
def json_int_dttm_ser(obj):
|
||||
|
@ -414,3 +419,31 @@ def generic_find_constraint_name(table, columns, referenced, db):
|
|||
fk.referred_table.name == referenced and
|
||||
set(fk.column_keys) == columns):
|
||||
return fk.name
|
||||
|
||||
|
||||
class timeout(object):
|
||||
"""
|
||||
To be used in a ``with`` block and timeout its content.
|
||||
"""
|
||||
def __init__(self, seconds=1, error_message='Timeout'):
|
||||
self.seconds = seconds
|
||||
self.error_message = error_message
|
||||
|
||||
def handle_timeout(self, signum, frame):
|
||||
logging.error("Process timed out")
|
||||
raise CaravelTimeoutException(self.error_message)
|
||||
|
||||
def __enter__(self):
|
||||
try:
|
||||
signal.signal(signal.SIGALRM, self.handle_timeout)
|
||||
signal.alarm(self.seconds)
|
||||
except ValueError as e:
|
||||
logging.warning("timeout can't be used in the current context")
|
||||
logging.exception(e)
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
try:
|
||||
signal.alarm(0)
|
||||
except ValueError as e:
|
||||
logging.warning("timeout can't be used in the current context")
|
||||
logging.exception(e)
|
||||
|
|
|
@ -33,7 +33,7 @@ from wtforms.validators import ValidationError
|
|||
|
||||
import caravel
|
||||
from caravel import (
|
||||
appbuilder, db, models, viz, utils, app, sm, ascii_art, sql_lab
|
||||
appbuilder, cache, db, models, viz, utils, app, sm, ascii_art, sql_lab
|
||||
)
|
||||
|
||||
config = app.config
|
||||
|
@ -456,7 +456,8 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
|
|||
list_columns = ['database_name', 'creator', 'changed_on_']
|
||||
add_columns = [
|
||||
'database_name', 'sqlalchemy_uri', 'cache_timeout', 'extra',
|
||||
'expose_in_sqllab', 'allow_ctas', 'force_ctas_schema']
|
||||
'expose_in_sqllab', 'allow_run_sync', 'allow_run_async',
|
||||
'allow_ctas', 'allow_dml', 'force_ctas_schema']
|
||||
search_exclude_columns = ('password',)
|
||||
edit_columns = add_columns
|
||||
show_columns = [
|
||||
|
@ -479,7 +480,19 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
|
|||
"to structure your URI here: "
|
||||
"http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html"),
|
||||
'expose_in_sqllab': _("Expose this DB in SQL Lab"),
|
||||
'allow_run_sync': _(
|
||||
"Allow users to run synchronous queries, this is the default "
|
||||
"and should work well for queries that can be executed "
|
||||
"within a web request scope (<~1 minute)"),
|
||||
'allow_run_async': _(
|
||||
"Allow users to run queries, against an async backend. "
|
||||
"This assumes that you have a Celery worker setup as well "
|
||||
"as a results backend."),
|
||||
'allow_ctas': _("Allow CREATE TABLE AS option in SQL Lab"),
|
||||
'allow_dml': _(
|
||||
"Allow users to run non-SELECT statements "
|
||||
"(UPDATE, DELETE, CREATE, ...) "
|
||||
"in SQL Lab"),
|
||||
'force_ctas_schema': _(
|
||||
"When allowing CREATE TABLE AS option in SQL Lab, "
|
||||
"this option forces the table to be created in this schema"),
|
||||
|
@ -496,6 +509,7 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa
|
|||
label_columns = {
|
||||
'expose_in_sqllab': _("Expose in SQL Lab"),
|
||||
'allow_ctas': _("Allow CREATE TABLE AS"),
|
||||
'allow_dml': _("Allow DML"),
|
||||
'force_ctas_schema': _("CTAS Schema"),
|
||||
'database_name': _("Database"),
|
||||
'creator': _("Creator"),
|
||||
|
@ -525,7 +539,8 @@ appbuilder.add_view(
|
|||
class DatabaseAsync(DatabaseView):
|
||||
list_columns = [
|
||||
'id', 'database_name',
|
||||
'expose_in_sqllab', 'allow_ctas', 'force_ctas_schema'
|
||||
'expose_in_sqllab', 'allow_ctas', 'force_ctas_schema',
|
||||
'allow_run_async', 'allow_run_sync', 'allow_dml',
|
||||
]
|
||||
|
||||
appbuilder.add_view_no_menu(DatabaseAsync)
|
||||
|
@ -1453,15 +1468,19 @@ class Caravel(BaseCaravelView):
|
|||
table.sql = data.get('sql')
|
||||
db.session.add(table)
|
||||
cols = []
|
||||
dims = []
|
||||
metrics = []
|
||||
for column_name, config in data.get('columns').items():
|
||||
is_dim = config.get('is_dim', False)
|
||||
cols.append(models.TableColumn(
|
||||
col = models.TableColumn(
|
||||
column_name=column_name,
|
||||
filterable=is_dim,
|
||||
groupby=is_dim,
|
||||
is_dttm=config.get('is_date', False),
|
||||
))
|
||||
)
|
||||
cols.append(col)
|
||||
if is_dim:
|
||||
dims.append(col)
|
||||
agg = config.get('agg')
|
||||
if agg:
|
||||
metrics.append(models.SqlMetric(
|
||||
|
@ -1476,8 +1495,15 @@ class Caravel(BaseCaravelView):
|
|||
table.columns = cols
|
||||
table.metrics = metrics
|
||||
db.session.commit()
|
||||
url = '/caravel/explore/table/{table.id}/?viz_type={viz_type}'
|
||||
return redirect(url.format(**locals()))
|
||||
params = {
|
||||
'viz_type': viz_type,
|
||||
'groupby': dims[0].column_name if dims else '',
|
||||
'metrics': metrics[0].metric_name if metrics else '',
|
||||
'metric': metrics[0].metric_name if metrics else '',
|
||||
}
|
||||
params = "&".join([k + '=' + v for k, v in params.items()])
|
||||
url = '/caravel/explore/table/{table.id}/?{params}'.format(**locals())
|
||||
return redirect(url)
|
||||
|
||||
@has_access
|
||||
@expose("/sql/<database_id>/")
|
||||
|
@ -1558,12 +1584,22 @@ class Caravel(BaseCaravelView):
|
|||
def theme(self):
|
||||
return self.render_template('caravel/theme.html')
|
||||
|
||||
@has_access
|
||||
@has_access_api
|
||||
@expose("/cached_key/<key>/")
|
||||
@log_this
|
||||
def cached_key(self, key):
|
||||
"""Returns a key from the cache"""
|
||||
resp = cache.get(key)
|
||||
if resp:
|
||||
return resp
|
||||
return "nope"
|
||||
|
||||
@has_access_api
|
||||
@expose("/sql_json/", methods=['POST', 'GET'])
|
||||
@log_this
|
||||
def sql_json(self):
|
||||
"""Runs arbitrary sql and returns and json"""
|
||||
async = request.form.get('async') == 'true'
|
||||
async = request.form.get('runAsync') == 'true'
|
||||
sql = request.form.get('sql')
|
||||
database_id = request.form.get('database_id')
|
||||
|
||||
|
@ -1600,7 +1636,7 @@ class Caravel(BaseCaravelView):
|
|||
# Async request.
|
||||
if async:
|
||||
# Ignore the celery future object and the request may time out.
|
||||
sql_lab.get_sql_results.delay(query_id)
|
||||
sql_lab.get_sql_results.delay(query_id, return_results=False)
|
||||
return Response(
|
||||
json.dumps({'query': query.to_dict()},
|
||||
default=utils.json_int_dttm_ser,
|
||||
|
@ -1610,7 +1646,15 @@ class Caravel(BaseCaravelView):
|
|||
|
||||
# Sync request.
|
||||
try:
|
||||
data = sql_lab.get_sql_results(query_id)
|
||||
SQLLAB_TIMEOUT = config.get("SQLLAB_TIMEOUT")
|
||||
with utils.timeout(
|
||||
seconds=SQLLAB_TIMEOUT,
|
||||
error_message=(
|
||||
"The query exceeded the {SQLLAB_TIMEOUT} seconds "
|
||||
"timeout. You may want to run your query as a "
|
||||
"`CREATE TABLE AS` to prevent timeouts."
|
||||
).format(**locals())):
|
||||
data = sql_lab.get_sql_results(query_id, return_results=True)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
return Response(
|
||||
|
|
|
@ -294,6 +294,25 @@ Upgrading should be as straightforward as running::
|
|||
pip install caravel --upgrade
|
||||
caravel db upgrade
|
||||
|
||||
SQL Lab
|
||||
-------
|
||||
SQL Lab is a powerful SQL IDE that works with all SQLAlchemy compatible
|
||||
databases out there. By default, queries are run in a web request, and
|
||||
may eventually timeout as queries exceed the maximum duration of a web
|
||||
request in your environment, whether it'd be a reverse proxy or the Caravel
|
||||
server itself.
|
||||
|
||||
In the modern analytics world, it's not uncommon to run large queries that
|
||||
run for minutes or hours.
|
||||
To enable support for long running queries that
|
||||
execute beyond the typical web request's timeout (30-60 seconds), it is
|
||||
necessary to deploy an asynchronous backend, which consist of one or many
|
||||
Caravel worker, which is implemented as a Celery worker, and a Celery
|
||||
broker for which we recommend using Redis or RabbitMQ.
|
||||
|
||||
It's also preferable to setup an async result backend as a key value store
|
||||
that can hold the long-running query results for a period of time. More
|
||||
details to come as to how to set this up here soon.
|
||||
|
||||
Making your own build
|
||||
---------------------
|
||||
|
|
|
@ -226,7 +226,8 @@ class CeleryTestCase(CaravelTestCase):
|
|||
sql_where = "SELECT name FROM ab_role WHERE name='Admin'"
|
||||
result1 = self.run_sql(
|
||||
1, sql_where, async='true', tmp_table='tmp_async_1', cta='true')
|
||||
self.assertEqual(QueryStatus.PENDING, result1['query']['state'])
|
||||
assert result1['query']['state'] in (
|
||||
QueryStatus.PENDING, QueryStatus.RUNNING, QueryStatus.SUCCESS)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
|
|
Loading…
Reference in New Issue