chore: Migrate /superset/stop_query/ to API v1 (#22624)

This commit is contained in:
Diego Medina 2023-01-16 11:07:52 -03:00 committed by GitHub
parent 80b31130b4
commit 3ed288d4ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1189 additions and 446 deletions

File diff suppressed because it is too large Load Diff

View File

@ -450,9 +450,9 @@ export function validateQuery(queryEditor, sql) {
export function postStopQuery(query) { export function postStopQuery(query) {
return function (dispatch) { return function (dispatch) {
return SupersetClient.post({ return SupersetClient.post({
endpoint: '/superset/stop_query/', endpoint: '/api/v1/query/stop',
postPayload: { client_id: query.id }, body: JSON.stringify({ client_id: query.id }),
stringify: false, headers: { 'Content-Type': 'application/json' },
}) })
.then(() => dispatch(stopQuery(query))) .then(() => dispatch(stopQuery(query)))
.then(() => dispatch(addSuccessToast(t('Query was stopped.')))) .then(() => dispatch(addSuccessToast(t('Query was stopped.'))))

View File

@ -317,11 +317,15 @@ describe('async actions', () => {
}); });
describe('postStopQuery', () => { describe('postStopQuery', () => {
const stopQueryEndpoint = 'glob:*/superset/stop_query/*'; const stopQueryEndpoint = 'glob:*/api/v1/query/stop';
fetchMock.post(stopQueryEndpoint, {}); fetchMock.post(stopQueryEndpoint, {});
const baseQuery = {
...query,
id: 'test_foo',
};
const makeRequest = () => { const makeRequest = () => {
const request = actions.postStopQuery(query); const request = actions.postStopQuery(baseQuery);
return request(dispatch); return request(dispatch);
}; };
@ -346,7 +350,8 @@ describe('async actions', () => {
return makeRequest().then(() => { return makeRequest().then(() => {
const call = fetchMock.calls(stopQueryEndpoint)[0]; const call = fetchMock.calls(stopQueryEndpoint)[0];
expect(call[1].body.get('client_id')).toBe(query.id); const body = JSON.parse(call[1].body);
expect(body.client_id).toBe(baseQuery.id);
}); });
}); });
}); });

View File

@ -140,6 +140,7 @@ MODEL_API_RW_METHOD_PERMISSION_MAP = {
"get_data": "read", "get_data": "read",
"samples": "read", "samples": "read",
"delete_ssh_tunnel": "write", "delete_ssh_tunnel": "write",
"stop_query": "read",
} }
EXTRA_FORM_DATA_APPEND_KEYS = { EXTRA_FORM_DATA_APPEND_KEYS = {

View File

@ -266,3 +266,7 @@ class InvalidPayloadSchemaError(SupersetErrorException):
class SupersetCancelQueryException(SupersetException): class SupersetCancelQueryException(SupersetException):
status = 422 status = 422
class QueryNotFoundException(SupersetException):
status = 404

View File

@ -16,14 +16,29 @@
# under the License. # under the License.
import logging import logging
import backoff
from flask_appbuilder.api import expose, protect, request, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface from flask_appbuilder.models.sqla.interface import SQLAInterface
from superset import db, event_logger
from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod
from superset.databases.filters import DatabaseFilter from superset.databases.filters import DatabaseFilter
from superset.exceptions import SupersetException
from superset.models.sql_lab import Query from superset.models.sql_lab import Query
from superset.queries.dao import QueryDAO
from superset.queries.filters import QueryFilter from superset.queries.filters import QueryFilter
from superset.queries.schemas import openapi_spec_methods_override, QuerySchema from superset.queries.schemas import (
from superset.views.base_api import BaseSupersetModelRestApi, RelatedFieldFilter openapi_spec_methods_override,
QuerySchema,
StopQuerySchema,
)
from superset.superset_typing import FlaskResponse
from superset.views.base_api import (
BaseSupersetModelRestApi,
RelatedFieldFilter,
requires_json,
statsd_metrics,
)
from superset.views.filters import BaseFilterRelatedUsers, FilterRelatedOwners from superset.views.filters import BaseFilterRelatedUsers, FilterRelatedOwners
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -43,6 +58,7 @@ class QueryRestApi(BaseSupersetModelRestApi):
RouteMethod.GET_LIST, RouteMethod.GET_LIST,
RouteMethod.RELATED, RouteMethod.RELATED,
RouteMethod.DISTINCT, RouteMethod.DISTINCT,
"stop_query",
} }
list_columns = [ list_columns = [
@ -95,9 +111,11 @@ class QueryRestApi(BaseSupersetModelRestApi):
base_filters = [["id", QueryFilter, lambda: []]] base_filters = [["id", QueryFilter, lambda: []]]
base_order = ("changed_on", "desc") base_order = ("changed_on", "desc")
list_model_schema = QuerySchema() list_model_schema = QuerySchema()
stop_query_schema = StopQuerySchema()
openapi_spec_tag = "Queries" openapi_spec_tag = "Queries"
openapi_spec_methods = openapi_spec_methods_override openapi_spec_methods = openapi_spec_methods_override
openapi_spec_component_schemas = (StopQuerySchema,)
order_columns = [ order_columns = [
"changed_on", "changed_on",
@ -123,3 +141,59 @@ class QueryRestApi(BaseSupersetModelRestApi):
base_related_field_filters = {"database": [["id", DatabaseFilter, lambda: []]]} base_related_field_filters = {"database": [["id", DatabaseFilter, lambda: []]]}
allowed_rel_fields = {"database", "user"} allowed_rel_fields = {"database", "user"}
allowed_distinct_fields = {"status"} allowed_distinct_fields = {"status"}
@expose("/stop", methods=["POST"])
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".stop_query",
log_to_statsd=False,
)
@backoff.on_exception(
backoff.constant,
Exception,
interval=1,
on_backoff=lambda details: db.session.rollback(),
on_giveup=lambda details: db.session.rollback(),
max_tries=5,
)
@requires_json
def stop_query(self) -> FlaskResponse:
"""Manually stop a query with client_id
---
post:
summary: Manually stop a query with client_id
requestBody:
description: Stop query schema
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/StopQuerySchema'
responses:
200:
description: Query stopped
content:
application/json:
schema:
type: object
properties:
result:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
body = self.stop_query_schema.load(request.json)
QueryDAO.stop_query(body["client_id"])
return self.response(200, result="OK")
except SupersetException as ex:
return self.response(ex.status, message=ex.message)

View File

@ -18,10 +18,14 @@ import logging
from datetime import datetime from datetime import datetime
from typing import Any, Dict from typing import Any, Dict
from superset import sql_lab
from superset.common.db_query_status import QueryStatus
from superset.dao.base import BaseDAO from superset.dao.base import BaseDAO
from superset.exceptions import QueryNotFoundException, SupersetCancelQueryException
from superset.extensions import db from superset.extensions import db
from superset.models.sql_lab import Query, SavedQuery from superset.models.sql_lab import Query, SavedQuery
from superset.queries.filters import QueryFilter from superset.queries.filters import QueryFilter
from superset.utils.dates import now_as_float
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -56,3 +60,26 @@ class QueryDAO(BaseDAO):
columns = payload.get("columns", {}) columns = payload.get("columns", {})
db.session.add(query) db.session.add(query)
query.set_extra_json_key("columns", columns) query.set_extra_json_key("columns", columns)
@staticmethod
def stop_query(client_id: str) -> None:
query = db.session.query(Query).filter_by(client_id=client_id).one_or_none()
if not query:
raise QueryNotFoundException(f"Query with client_id {client_id} not found")
if query.status in [
QueryStatus.FAILED,
QueryStatus.SUCCESS,
QueryStatus.TIMED_OUT,
]:
logger.warning(
"Query with client_id could not be stopped: query already complete",
)
return
if not sql_lab.cancel_query(query):
raise SupersetCancelQueryException("Could not cancel query")
query.status = QueryStatus.STOPPED
query.end_time = now_as_float()
db.session.commit()

View File

@ -67,3 +67,11 @@ class QuerySchema(Schema):
# pylint: disable=no-self-use # pylint: disable=no-self-use
def get_sql_tables(self, obj: Query) -> List[Table]: def get_sql_tables(self, obj: Query) -> List[Table]:
return obj.sql_tables return obj.sql_tables
class StopQuerySchema(Schema):
"""
Schema for the stop_query API call.
"""
client_id = fields.String()

View File

@ -2298,6 +2298,7 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods
on_giveup=lambda details: db.session.rollback(), on_giveup=lambda details: db.session.rollback(),
max_tries=5, max_tries=5,
) )
@deprecated()
def stop_query(self) -> FlaskResponse: def stop_query(self) -> FlaskResponse:
client_id = request.form.get("client_id") client_id = request.form.get("client_id")
query = db.session.query(Query).filter_by(client_id=client_id).one() query = db.session.query(Query).filter_by(client_id=client_id).one()

View File

@ -17,6 +17,7 @@
# isort:skip_file # isort:skip_file
"""Unit tests for Superset""" """Unit tests for Superset"""
from datetime import datetime, timedelta from datetime import datetime, timedelta
from unittest import mock
import json import json
import random import random
import string import string
@ -392,3 +393,54 @@ class TestQueryApi(SupersetTestCase):
# rollback changes # rollback changes
db.session.delete(query) db.session.delete(query)
db.session.commit() db.session.commit()
@mock.patch("superset.sql_lab.cancel_query")
@mock.patch("superset.views.core.db.session")
def test_stop_query_not_found(
self, mock_superset_db_session, mock_sql_lab_cancel_query
):
"""
Handles stop query when the DB engine spec does not
have a cancel query method (with invalid client_id).
"""
form_data = {"client_id": "foo2"}
query_mock = mock.Mock()
query_mock.return_value = None
self.login(username="admin")
mock_superset_db_session.query().filter_by().one_or_none = query_mock
mock_sql_lab_cancel_query.return_value = True
rv = self.client.post(
"/api/v1/query/stop",
data=json.dumps(form_data),
content_type="application/json",
)
assert rv.status_code == 404
data = json.loads(rv.data.decode("utf-8"))
assert data["message"] == "Query with client_id foo2 not found"
@mock.patch("superset.sql_lab.cancel_query")
@mock.patch("superset.views.core.db.session")
def test_stop_query(self, mock_superset_db_session, mock_sql_lab_cancel_query):
"""
Handles stop query when the DB engine spec does not
have a cancel query method.
"""
form_data = {"client_id": "foo"}
query_mock = mock.Mock()
query_mock.client_id = "foo"
query_mock.status = QueryStatus.RUNNING
self.login(username="admin")
mock_superset_db_session.query().filter_by().one_or_none().return_value = (
query_mock
)
mock_sql_lab_cancel_query.return_value = True
rv = self.client.post(
"/api/v1/query/stop",
data=json.dumps(form_data),
content_type="application/json",
)
assert rv.status_code == 200
data = json.loads(rv.data.decode("utf-8"))
assert data["result"] == "OK"

View File

@ -15,11 +15,14 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
import json import json
from typing import Iterator from typing import Any, Iterator
import pytest import pytest
from pytest_mock import MockFixture
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from superset.exceptions import QueryNotFoundException, SupersetCancelQueryException
def test_query_dao_save_metadata(session: Session) -> None: def test_query_dao_save_metadata(session: Session) -> None:
from superset.models.core import Database from superset.models.core import Database
@ -53,3 +56,163 @@ def test_query_dao_save_metadata(session: Session) -> None:
query = session.query(Query).one() query = session.query(Query).one()
QueryDAO.save_metadata(query=query, payload={"columns": []}) QueryDAO.save_metadata(query=query, payload={"columns": []})
assert query.extra.get("columns", None) == [] assert query.extra.get("columns", None) == []
def test_query_dao_stop_query_not_found(
mocker: MockFixture, app: Any, session: Session
) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query
engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member
db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")
query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.RUNNING,
)
session.add(db)
session.add(query_obj)
mocker.patch("superset.sql_lab.cancel_query", return_value=False)
from superset.queries.dao import QueryDAO
with pytest.raises(QueryNotFoundException):
QueryDAO.stop_query("foo2")
query = session.query(Query).one()
assert query.status == QueryStatus.RUNNING
def test_query_dao_stop_query_not_running(
mocker: MockFixture, app: Any, session: Session
) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query
engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member
db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")
query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.FAILED,
)
session.add(db)
session.add(query_obj)
from superset.queries.dao import QueryDAO
QueryDAO.stop_query(query_obj.client_id)
query = session.query(Query).one()
assert query.status == QueryStatus.FAILED
def test_query_dao_stop_query_failed(
mocker: MockFixture, app: Any, session: Session
) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query
engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member
db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")
query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.RUNNING,
)
session.add(db)
session.add(query_obj)
mocker.patch("superset.sql_lab.cancel_query", return_value=False)
from superset.queries.dao import QueryDAO
with pytest.raises(SupersetCancelQueryException):
QueryDAO.stop_query(query_obj.client_id)
query = session.query(Query).one()
assert query.status == QueryStatus.RUNNING
def test_query_dao_stop_query(mocker: MockFixture, app: Any, session: Session) -> None:
from superset.common.db_query_status import QueryStatus
from superset.models.core import Database
from superset.models.sql_lab import Query
engine = session.get_bind()
Query.metadata.create_all(engine) # pylint: disable=no-member
db = Database(database_name="my_database", sqlalchemy_uri="sqlite://")
query_obj = Query(
client_id="foo",
database=db,
tab_name="test_tab",
sql_editor_id="test_editor_id",
sql="select * from bar",
select_sql="select * from bar",
executed_sql="select * from bar",
limit=100,
select_as_cta=False,
rows=100,
error_message="none",
results_key="abc",
status=QueryStatus.RUNNING,
)
session.add(db)
session.add(query_obj)
mocker.patch("superset.sql_lab.cancel_query", return_value=True)
from superset.queries.dao import QueryDAO
QueryDAO.stop_query(query_obj.client_id)
query = session.query(Query).one()
assert query.status == QueryStatus.STOPPED