From d7da3cd1a5b707721b6cd475c040f01098e834f5 Mon Sep 17 00:00:00 2001 From: wanghong1314 <498213175@qq.com> Date: Sun, 8 Jan 2023 00:36:13 +0800 Subject: [PATCH 1/3] fix: Stop query in SQL Lab with impala engine (#20950) --- docker/pythonpath_dev/superset_config.py | 4 ++ superset/config.py | 6 +- superset/db_engine_specs/hive.py | 10 ++- superset/db_engine_specs/impala.py | 90 ++++++++++++++++++++++++ superset/views/core.py | 4 ++ 5 files changed, 111 insertions(+), 3 deletions(-) diff --git a/docker/pythonpath_dev/superset_config.py b/docker/pythonpath_dev/superset_config.py index 84c1dc58ab502..beed3378e48db 100644 --- a/docker/pythonpath_dev/superset_config.py +++ b/docker/pythonpath_dev/superset_config.py @@ -106,6 +106,10 @@ class CeleryConfig(object): WEBDRIVER_BASEURL = "http://superset:8088/" # The base URL for the email report hyperlinks. WEBDRIVER_BASEURL_USER_FRIENDLY = WEBDRIVER_BASEURL +# customize the polling time of each engine. The default time is 5 seconds +DB_POLL_INTERVAL_SECONDS = { + "hive": int(timedelta(seconds=5).total_seconds()), +} SQLLAB_CTAS_NO_LIMIT = True diff --git a/superset/config.py b/superset/config.py index 5cdf1c7cb3eb8..c0d306c11be3d 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1129,8 +1129,10 @@ def CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC( # pylint: disable=invalid-name TRACKING_URL_TRANSFORMER = lambda url: url -# Interval between consecutive polls when using Hive Engine -HIVE_POLL_INTERVAL = int(timedelta(seconds=5).total_seconds()) +# customize the polling time of each engine. The default time is 5 seconds +DB_POLL_INTERVAL_SECONDS = { + "hive": int(timedelta(seconds=5).total_seconds()), +} # Interval between consecutive polls when using Presto Engine # See here: https://github.com/dropbox/PyHive/blob/8eb0aeab8ca300f3024655419b93dad926c1a351/pyhive/presto.py#L93 # pylint: disable=line-too-long,useless-suppression diff --git a/superset/db_engine_specs/hive.py b/superset/db_engine_specs/hive.py index c69908976728b..ebe89d9c644c1 100644 --- a/superset/db_engine_specs/hive.py +++ b/superset/db_engine_specs/hive.py @@ -375,7 +375,15 @@ def handle_cursor( # pylint: disable=too-many-locals last_log_line = len(log_lines) if needs_commit: session.commit() - time.sleep(current_app.config["HIVE_POLL_INTERVAL"]) + if sleep_interval := current_app.config.get("HIVE_POLL_INTERVAL"): + logger.warning( + "HIVE_POLL_INTERVAL is deprecated and will be removed in 3.0. Please use DB_POLL_INTERVAL instead" + ) + else: + sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get( + cls.engine, 5 + ) + time.sleep(sleep_interval) polled = cursor.poll() @classmethod diff --git a/superset/db_engine_specs/impala.py b/superset/db_engine_specs/impala.py index 048588c046fd4..b39aec3786248 100644 --- a/superset/db_engine_specs/impala.py +++ b/superset/db_engine_specs/impala.py @@ -14,20 +14,31 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import logging +import re +import time from datetime import datetime from typing import Any, Dict, List, Optional +from flask import current_app from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.orm import Session +from superset.constants import QUERY_EARLY_CANCEL_KEY from superset.db_engine_specs.base import BaseEngineSpec +from superset.models.sql_lab import Query from superset.utils import core as utils +logger = logging.getLogger(__name__) + class ImpalaEngineSpec(BaseEngineSpec): """Engine spec for Cloudera's Impala""" engine = "impala" engine_name = "Apache Impala" + # Query 5543ffdf692b7d02:f78a944000000000: 3% Complete (17 out of 547) + query_progress_r = re.compile(r".*Query.*: (?P[0-9]+)%.*") _time_grain_expressions = { None: "{col}", @@ -63,3 +74,82 @@ def get_schema_names(cls, inspector: Inspector) -> List[str]: if not row[0].startswith("_") ] return schemas + + @classmethod + def has_implicit_cancel(cls) -> bool: + """ + Return True if the live cursor handles the implicit cancelation of the query, + False otherise. + + :return: Whether the live cursor implicitly cancels the query + :see: handle_cursor + """ + + return True + + @classmethod + def execute( + cls, + cursor: Any, + query: str, + **kwargs: Any, # pylint: disable=unused-argument + ) -> None: + try: + cursor.execute_async(query) + except Exception as ex: + raise cls.get_dbapi_mapped_exception(ex) + + @classmethod + def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None: + """Stop query and updates progress information""" + + query_id = query.id + unfinished_states = ( + "INITIALIZED_STATE", + "RUNNING_STATE", + ) + + try: + status = cursor.status() + while status in unfinished_states: + session.refresh(query) + query = session.query(Query).filter_by(id=query_id).one() + # if query cancelation was requested prior to the handle_cursor call, but + # the query was still executed + # modified in stop_query in views / core.py is reflected here. + # stop query + if query.extra.get(QUERY_EARLY_CANCEL_KEY): + cursor.cancel_operation() + cursor.close_operation() + cursor.close() + break + + # updates progress info by log + try: + log = cursor.get_log() or "" + except Exception: # pylint: disable=broad-except + logger.warning("Call to GetLog() failed") + log = "" + + if log: + match = cls.query_progress_r.match(log) + if match: + progress = int(match.groupdict()["query_progress"]) + logger.debug( + "Query %s: Progress total: %s", str(query_id), str(progress) + ) + needs_commit = False + if progress > query.progress: + query.progress = progress + needs_commit = True + + if needs_commit: + session.commit() + sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get( + cls.engine, 5 + ) + time.sleep(sleep_interval) + status = cursor.status() + except Exception: # pylint: disable=broad-except + logger.debug("Call to status() failed ") + return diff --git a/superset/views/core.py b/superset/views/core.py index 534f8f667d707..3f207f1b33ebb 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -67,6 +67,7 @@ SqlMetric, TableColumn, ) +from superset.constants import QUERY_EARLY_CANCEL_KEY from superset.dashboards.commands.importers.v0 import ImportDashboardsCommand from superset.dashboards.dao import DashboardDAO from superset.dashboards.permalink.commands.get import GetDashboardPermalinkCommand @@ -2313,6 +2314,9 @@ def stop_query(self) -> FlaskResponse: raise SupersetCancelQueryException("Could not cancel query") query.status = QueryStatus.STOPPED + # Add the stop identity attribute because the sqlalchemy thread is unsafe + # because of multiple updates to the status in the query table + query.set_extra_json_key(QUERY_EARLY_CANCEL_KEY, True) query.end_time = now_as_float() db.session.commit() From b28b39d92cf41fd7218b8ed4ae8be9688b65f1cf Mon Sep 17 00:00:00 2001 From: wanghong1314 <498213175@qq.com> Date: Tue, 10 Jan 2023 19:01:25 +0800 Subject: [PATCH 2/3] fix: standardized code --- docker/pythonpath_dev/superset_config.py | 4 ---- superset/config.py | 4 +--- superset/db_engine_specs/hive.py | 2 +- superset/db_engine_specs/impala.py | 6 +++--- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/docker/pythonpath_dev/superset_config.py b/docker/pythonpath_dev/superset_config.py index beed3378e48db..84c1dc58ab502 100644 --- a/docker/pythonpath_dev/superset_config.py +++ b/docker/pythonpath_dev/superset_config.py @@ -106,10 +106,6 @@ class CeleryConfig(object): WEBDRIVER_BASEURL = "http://superset:8088/" # The base URL for the email report hyperlinks. WEBDRIVER_BASEURL_USER_FRIENDLY = WEBDRIVER_BASEURL -# customize the polling time of each engine. The default time is 5 seconds -DB_POLL_INTERVAL_SECONDS = { - "hive": int(timedelta(seconds=5).total_seconds()), -} SQLLAB_CTAS_NO_LIMIT = True diff --git a/superset/config.py b/superset/config.py index c0d306c11be3d..894fb5a7eb1a8 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1130,9 +1130,7 @@ def CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC( # pylint: disable=invalid-name # customize the polling time of each engine. The default time is 5 seconds -DB_POLL_INTERVAL_SECONDS = { - "hive": int(timedelta(seconds=5).total_seconds()), -} +DB_POLL_INTERVAL_SECONDS: Dict[str, int] = {} # Interval between consecutive polls when using Presto Engine # See here: https://github.com/dropbox/PyHive/blob/8eb0aeab8ca300f3024655419b93dad926c1a351/pyhive/presto.py#L93 # pylint: disable=line-too-long,useless-suppression diff --git a/superset/db_engine_specs/hive.py b/superset/db_engine_specs/hive.py index ebe89d9c644c1..1d27978e9d23c 100644 --- a/superset/db_engine_specs/hive.py +++ b/superset/db_engine_specs/hive.py @@ -377,7 +377,7 @@ def handle_cursor( # pylint: disable=too-many-locals session.commit() if sleep_interval := current_app.config.get("HIVE_POLL_INTERVAL"): logger.warning( - "HIVE_POLL_INTERVAL is deprecated and will be removed in 3.0. Please use DB_POLL_INTERVAL instead" + "HIVE_POLL_INTERVAL is deprecated and will be removed in 3.0. Please use DB_POLL_INTERVAL_SECONDS instead" ) else: sleep_interval = current_app.config["DB_POLL_INTERVAL_SECONDS"].get( diff --git a/superset/db_engine_specs/impala.py b/superset/db_engine_specs/impala.py index b39aec3786248..177a9728fe0f8 100644 --- a/superset/db_engine_specs/impala.py +++ b/superset/db_engine_specs/impala.py @@ -30,6 +30,8 @@ from superset.utils import core as utils logger = logging.getLogger(__name__) +# Query 5543ffdf692b7d02:f78a944000000000: 3% Complete (17 out of 547) +QUERY_PROGRESS_REGEX = re.compile(r"Query.*: (?P[0-9]+)%") class ImpalaEngineSpec(BaseEngineSpec): @@ -37,8 +39,6 @@ class ImpalaEngineSpec(BaseEngineSpec): engine = "impala" engine_name = "Apache Impala" - # Query 5543ffdf692b7d02:f78a944000000000: 3% Complete (17 out of 547) - query_progress_r = re.compile(r".*Query.*: (?P[0-9]+)%.*") _time_grain_expressions = { None: "{col}", @@ -132,7 +132,7 @@ def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None: log = "" if log: - match = cls.query_progress_r.match(log) + match = QUERY_PROGRESS_REGEX.match(log) if match: progress = int(match.groupdict()["query_progress"]) logger.debug( From c2bf101553470ae5e3b11eafc4fe9075f3a6a032 Mon Sep 17 00:00:00 2001 From: wanghong1314 <498213175@qq.com> Date: Tue, 10 Jan 2023 19:28:41 +0800 Subject: [PATCH 3/3] Update config.py --- superset/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/superset/config.py b/superset/config.py index fca04fabf511c..a3fc3df00ef53 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1131,7 +1131,7 @@ def CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC( # pylint: disable=invalid-name TRACKING_URL_TRANSFORMER = lambda url: url -# customize the polling time of each engine. The default time is 5 seconds +# customize the polling time of each engine DB_POLL_INTERVAL_SECONDS: Dict[str, int] = {} # Interval between consecutive polls when using Presto Engine