Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stop query functionality. #2387

Merged
merged 2 commits into from
Mar 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions superset/assets/javascripts/SqlLab/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,24 @@ export function runQuery(query) {
};
}

export function postStopQuery(query) {
return function (dispatch) {
const stopQueryUrl = '/superset/stop_query/';
const stopQueryRequestData = { client_id: query.id };
$.ajax({
type: 'POST',
dataType: 'json',
url: stopQueryUrl,
data: stopQueryRequestData,
success() {
if (!query.runAsync) {
dispatch(stopQuery(query));
}
},
});
};
}

export function setDatabases(databases) {
return { type: SET_DATABASES, databases };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SqlEditor extends React.PureComponent {
this.props.actions.setActiveSouthPaneTab('Results');
}
stopQuery() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename this to cancelQuery as well to match the action name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

named it postStopQuery

this.props.actions.stopQuery(this.props.latestQuery);
this.props.actions.postStopQuery(this.props.latestQuery);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the stopQuery action used elsewhere? if not we should remove it in superset/assets/javascripts/SqlLab/actions.js as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postStopQuery uses it

}
createTableAs() {
this.startQuery(true, true);
Expand Down
19 changes: 18 additions & 1 deletion superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from sqlalchemy import select
from sqlalchemy.sql import text
from superset.utils import SupersetTemplateException
from superset.utils import QueryStatus
from flask_babel import lazy_gettext as _

Grain = namedtuple('Grain', 'name label function')
Expand Down Expand Up @@ -272,6 +273,12 @@ class PrestoEngineSpec(BaseEngineSpec):
"date_add('day', 1, CAST({col} AS TIMESTAMP))))"),
)

@classmethod
def patch(cls):
from pyhive import presto
from superset.db_engines import presto as patched_presto
presto.Cursor.cancel = patched_presto.cancel

@classmethod
def sql_preprocessor(cls, sql):
return sql.replace('%', '%%')
Expand Down Expand Up @@ -342,6 +349,12 @@ def handle_cursor(cls, cursor, query, session):
while polled:
# Update the object and wait for the kill signal.
stats = polled.get('stats', {})

query = session.query(type(query)).filter_by(id=query.id).one()
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break

if stats:
completed_splits = float(stats.get('completedSplits'))
total_splits = float(stats.get('totalSplits'))
Expand Down Expand Up @@ -566,13 +579,17 @@ def progress(cls, logs):
def handle_cursor(cls, cursor, query, session):
"""Updates progress information"""
from pyhive import hive
print("PATCHED TCLIService {}".format(hive.TCLIService.__file__))
unfinished_states = (
hive.ttypes.TOperationState.INITIALIZED_STATE,
hive.ttypes.TOperationState.RUNNING_STATE,
)
polled = cursor.poll()
while polled.operationState in unfinished_states:
query = session.query(type(query)).filter_by(id=query.id)
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break

resp = cursor.fetch_logs()
if resp and resp.log:
progress = cls.progress(resp.log)
Expand Down
19 changes: 19 additions & 0 deletions superset/db_engines/presto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from pyhive import presto


# TODO(bogdan): Remove this when new pyhive release will be available.
def cancel(self):
if self._state == self._STATE_NONE:
raise presto.ProgrammingError("No query yet")
if self._nextUri is None:
assert self._state == self._STATE_FINISHED, \
"Should be finished if nextUri is None"
return

response = presto.requests.delete(self._nextUri)
if response.status_code != presto.requests.codes.no_content:
fmt = "Unexpected status code after cancel {}\n{}"
raise presto.OperationalError(
fmt.format(response.status_code, response.content))
self._state = self._STATE_FINISHED
return
7 changes: 7 additions & 0 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ def handle_error(msg):
conn.commit()
conn.close()

if query.status == utils.QueryStatus.STOPPED:
return json.dumps({
'query_id': query.id,
'status': query.status,
'query': query.to_dict(),
}, default=utils.json_iso_dttm_ser)

column_names = (
[col[0] for col in cursor.description] if cursor.description else [])
column_names = dedup(column_names)
Expand Down
2 changes: 1 addition & 1 deletion superset/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ class QueryStatus(object):

"""Enum-type class for query statuses"""

CANCELLED = 'cancelled'
STOPPED = 'stopped'
FAILED = 'failed'
PENDING = 'pending'
RUNNING = 'running'
Expand Down
14 changes: 14 additions & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1960,6 +1960,20 @@ def results(self, key):
return json_success(
json.dumps(payload_json, default=utils.json_iso_dttm_ser))

@has_access_api
@expose("/stop_query/", methods=['POST'])
@log_this
def stop_query(self):
client_id = request.form.get('client_id')
query = db.session.query(models.Query).filter_by(
client_id=client_id).one()
if query.user_id != g.user.id:
return json_error_response(
"Only original author can stop the query.")
query.status = utils.QueryStatus.STOPPED
db.session.commit()
return Response(201)

@has_access_api
@expose("/sql_json/", methods=['POST', 'GET'])
@log_this
Expand Down