refactor sql_json view endpoint: separate getting and checking existing query to ad hoc methods (#16449)

This commit is contained in:
ofekisr 2021-08-31 19:55:45 +03:00 committed by GitHub
parent c5a5cf7db9
commit 68c2a6d43a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 48 additions and 26 deletions

View File

@ -2564,6 +2564,14 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
execution_context = SqlJsonExecutionContext(request.json)
return self.sql_json_exec(execution_context, request.json, log_params)
@classmethod
def is_query_handled(cls, query: Optional[Query]) -> bool:
return query is not None and query.status in [
QueryStatus.RUNNING,
QueryStatus.PENDING,
QueryStatus.TIMED_OUT,
]
def sql_json_exec( # pylint: disable=too-many-statements,too-many-locals
self,
execution_context: SqlJsonExecutionContext,
@ -2574,35 +2582,16 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
session = db.session()
# check to see if this query is already running
query = (
session.query(Query)
.filter_by(
client_id=execution_context.client_id,
user_id=execution_context.user_id,
sql_editor_id=execution_context.sql_editor_id,
)
.one_or_none()
)
if query is not None and query.status in [
QueryStatus.RUNNING,
QueryStatus.PENDING,
QueryStatus.TIMED_OUT,
]:
query = self._get_existing_query(execution_context, session)
if self.is_query_handled(query):
# return the existing query
payload = json.dumps(
{"query": query.to_dict()}, default=utils.json_int_dttm_ser
{"query": query.to_dict()}, default=utils.json_int_dttm_ser # type: ignore
)
return json_success(payload)
mydb = session.query(Database).get(execution_context.database_id)
if not mydb:
raise SupersetGenericErrorException(
__(
"The database referenced in this query was not found. Please "
"contact an administrator for further assistance or try again."
)
)
mydb = self._get_the_query_db(execution_context, session)
# Set tmp_schema_name for CTA
# TODO(bkyryliuk): consider parsing, splitting tmp_schema_name from
@ -2708,9 +2697,9 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
mydb.db_engine_spec.get_limit_from_sql(rendered_query),
execution_context.limit,
]
if limits[0] is None or limits[0] > limits[1]:
if limits[0] is None or limits[0] > limits[1]: # type: ignore
query.limiting_factor = LimitingFactor.DROPDOWN
elif limits[1] > limits[0]:
elif limits[1] > limits[0]: # type: ignore
query.limiting_factor = LimitingFactor.QUERY
else: # limits[0] == limits[1]
query.limiting_factor = LimitingFactor.QUERY_AND_DROPDOWN
@ -2734,6 +2723,39 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
session, rendered_query, query, expand_data, log_params
)
@classmethod
def _get_the_query_db(
cls, execution_context: SqlJsonExecutionContext, session: Session
) -> Database:
mydb = session.query(Database).get(execution_context.database_id)
cls._validate_query_db(mydb)
return mydb
@classmethod
def _validate_query_db(cls, database: Optional[Database]) -> None:
if not database:
raise SupersetGenericErrorException(
__(
"The database referenced in this query was not found. Please "
"contact an administrator for further assistance or try again."
)
)
@classmethod
def _get_existing_query(
cls, execution_context: SqlJsonExecutionContext, session: Session
) -> Optional[Query]:
query = (
session.query(Query)
.filter_by(
client_id=execution_context.client_id,
user_id=execution_context.user_id,
sql_editor_id=execution_context.sql_editor_id,
)
.one_or_none()
)
return query
@has_access
@event_logger.log_this
@expose("/csv/<client_id>")