fix: Stop query in SQL Lab with impala engine (#22635)

This commit is contained in:
wanghong1314 2023-01-10 22:52:07 +08:00 committed by GitHub
parent 159dcd7e62
commit 8bf6d80155
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 3 deletions

View File

@ -1131,8 +1131,8 @@ BLUEPRINTS: List[Blueprint] = []
TRACKING_URL_TRANSFORMER = lambda url: url TRACKING_URL_TRANSFORMER = lambda url: url
# Interval between consecutive polls when using Hive Engine # customize the polling time of each engine
HIVE_POLL_INTERVAL = int(timedelta(seconds=5).total_seconds()) DB_POLL_INTERVAL_SECONDS: Dict[str, int] = {}
# Interval between consecutive polls when using Presto Engine # Interval between consecutive polls when using Presto Engine
# See here: https://github.com/dropbox/PyHive/blob/8eb0aeab8ca300f3024655419b93dad926c1a351/pyhive/presto.py#L93 # pylint: disable=line-too-long,useless-suppression # See here: https://github.com/dropbox/PyHive/blob/8eb0aeab8ca300f3024655419b93dad926c1a351/pyhive/presto.py#L93 # pylint: disable=line-too-long,useless-suppression

View File

@ -375,7 +375,15 @@ class HiveEngineSpec(PrestoEngineSpec):
last_log_line = len(log_lines) last_log_line = len(log_lines)
if needs_commit: if needs_commit:
session.commit() session.commit()
time.sleep(current_app.config["HIVE_POLL_INTERVAL"]) if sleep_interval := current_app.config.get("HIVE_POLL_INTERVAL"):
logger.warning(
"HIVE_POLL_INTERVAL is deprecated and will be removed in 3.0. Please use DB_POLL_INTERVAL_SECONDS instead"
)
else:
sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get(
cls.engine, 5
)
time.sleep(sleep_interval)
polled = cursor.poll() polled = cursor.poll()
@classmethod @classmethod

View File

@ -14,14 +14,25 @@
# KIND, either express or implied. See the License for the # KIND, either express or implied. See the License for the
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
import logging
import re
import time
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from flask import current_app
from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm import Session
from superset.constants import QUERY_EARLY_CANCEL_KEY
from superset.db_engine_specs.base import BaseEngineSpec from superset.db_engine_specs.base import BaseEngineSpec
from superset.models.sql_lab import Query
from superset.utils import core as utils from superset.utils import core as utils
logger = logging.getLogger(__name__)
# Query 5543ffdf692b7d02:f78a944000000000: 3% Complete (17 out of 547)
QUERY_PROGRESS_REGEX = re.compile(r"Query.*: (?P<query_progress>[0-9]+)%")
class ImpalaEngineSpec(BaseEngineSpec): class ImpalaEngineSpec(BaseEngineSpec):
"""Engine spec for Cloudera's Impala""" """Engine spec for Cloudera's Impala"""
@ -63,3 +74,82 @@ class ImpalaEngineSpec(BaseEngineSpec):
if not row[0].startswith("_") if not row[0].startswith("_")
] ]
return schemas return schemas
@classmethod
def has_implicit_cancel(cls) -> bool:
"""
Return True if the live cursor handles the implicit cancelation of the query,
False otherise.
:return: Whether the live cursor implicitly cancels the query
:see: handle_cursor
"""
return True
@classmethod
def execute(
cls,
cursor: Any,
query: str,
**kwargs: Any, # pylint: disable=unused-argument
) -> None:
try:
cursor.execute_async(query)
except Exception as ex:
raise cls.get_dbapi_mapped_exception(ex)
@classmethod
def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None:
"""Stop query and updates progress information"""
query_id = query.id
unfinished_states = (
"INITIALIZED_STATE",
"RUNNING_STATE",
)
try:
status = cursor.status()
while status in unfinished_states:
session.refresh(query)
query = session.query(Query).filter_by(id=query_id).one()
# if query cancelation was requested prior to the handle_cursor call, but
# the query was still executed
# modified in stop_query in views / core.py is reflected here.
# stop query
if query.extra.get(QUERY_EARLY_CANCEL_KEY):
cursor.cancel_operation()
cursor.close_operation()
cursor.close()
break
# updates progress info by log
try:
log = cursor.get_log() or ""
except Exception: # pylint: disable=broad-except
logger.warning("Call to GetLog() failed")
log = ""
if log:
match = QUERY_PROGRESS_REGEX.match(log)
if match:
progress = int(match.groupdict()["query_progress"])
logger.debug(
"Query %s: Progress total: %s", str(query_id), str(progress)
)
needs_commit = False
if progress > query.progress:
query.progress = progress
needs_commit = True
if needs_commit:
session.commit()
sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get(
cls.engine, 5
)
time.sleep(sleep_interval)
status = cursor.status()
except Exception: # pylint: disable=broad-except
logger.debug("Call to status() failed ")
return

View File

@ -67,6 +67,7 @@ from superset.connectors.sqla.models import (
SqlMetric, SqlMetric,
TableColumn, TableColumn,
) )
from superset.constants import QUERY_EARLY_CANCEL_KEY
from superset.dashboards.commands.importers.v0 import ImportDashboardsCommand from superset.dashboards.commands.importers.v0 import ImportDashboardsCommand
from superset.dashboards.dao import DashboardDAO from superset.dashboards.dao import DashboardDAO
from superset.dashboards.permalink.commands.get import GetDashboardPermalinkCommand from superset.dashboards.permalink.commands.get import GetDashboardPermalinkCommand
@ -2318,6 +2319,9 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
raise SupersetCancelQueryException("Could not cancel query") raise SupersetCancelQueryException("Could not cancel query")
query.status = QueryStatus.STOPPED query.status = QueryStatus.STOPPED
# Add the stop identity attribute because the sqlalchemy thread is unsafe
# because of multiple updates to the status in the query table
query.set_extra_json_key(QUERY_EARLY_CANCEL_KEY, True)
query.end_time = now_as_float() query.end_time = now_as_float()
db.session.commit() db.session.commit()