Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load async sql lab results early for Presto #4834

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion superset/assets/src/SqlLab/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const START_QUERY = 'START_QUERY';
export const STOP_QUERY = 'STOP_QUERY';
export const REQUEST_QUERY_RESULTS = 'REQUEST_QUERY_RESULTS';
export const QUERY_SUCCESS = 'QUERY_SUCCESS';
export const PREFETCH_SUCCESS = 'PREFETCH_SUCCESS';
export const QUERY_FAILED = 'QUERY_FAILED';
export const CLEAR_QUERY_RESULTS = 'CLEAR_QUERY_RESULTS';
export const REMOVE_DATA_PREVIEW = 'REMOVE_DATA_PREVIEW';
Expand Down Expand Up @@ -78,6 +79,10 @@ export function querySuccess(query, results) {
return { type: QUERY_SUCCESS, query, results };
}

export function prefetchSuccess(query, results) {
return { type: PREFETCH_SUCCESS, query, results };
}

export function queryFailed(query, msg) {
return { type: QUERY_FAILED, query, msg };
}
Expand Down Expand Up @@ -107,7 +112,11 @@ export function fetchQueryResults(query) {
dataType: 'json',
url: sqlJsonUrl,
success(results) {
dispatch(querySuccess(query, results));
if (results.status === 'prefetched') {
dispatch(prefetchSuccess(query, results));
} else {
dispatch(querySuccess(query, results));
}
},
error(err) {
let msg = t('Failed at retrieving results from the results backend');
Expand Down
3 changes: 2 additions & 1 deletion superset/assets/src/SqlLab/components/QueryAutoRefresh.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class QueryAutoRefresh extends React.PureComponent {
const now = new Date().getTime();
return Object.values(queries)
.some(
q => ['running', 'started', 'pending', 'fetching'].indexOf(q.state) >= 0 &&
q => ['running', 'started', 'pending', 'fetching', 'prefetched'].indexOf(
q.state) >= 0 &&
now - q.startDttm < MAX_QUERY_AGE_TO_POLL,
);
}
Expand Down
60 changes: 58 additions & 2 deletions superset/assets/src/SqlLab/components/ResultSet.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const propTypes = {
visualize: PropTypes.bool,
cache: PropTypes.bool,
height: PropTypes.number.isRequired,
has_prefetched: PropTypes.bool,
};
const defaultProps = {
search: true,
Expand All @@ -26,6 +27,7 @@ const defaultProps = {
csv: true,
actions: {},
cache: false,
has_prefetched: false,
};

const SEARCH_HEIGHT = 46;
Expand Down Expand Up @@ -61,7 +63,7 @@ export default class ResultSet extends React.PureComponent {
getControls() {
if (this.props.search || this.props.visualize || this.props.csv) {
let csvButton;
if (this.props.csv) {
if (this.props.csv && this.props.query.state === 'success') {
csvButton = (
<Button bsSize="small" href={'/superset/csv/' + this.props.query.id}>
<i className="fa fa-file-text-o" /> {t('.CSV')}
Expand Down Expand Up @@ -214,14 +216,68 @@ export default class ResultSet extends React.PureComponent {
}
let progressBar;
let trackingUrl;
if (query.progress > 0 && query.state === 'running') {
if (query.progress > 0 && (query.state === 'running' || query.state === 'prefetched')) {
progressBar = (
<ProgressBar
striped
now={query.progress}
label={`${query.progress}%`}
/>);
}

if (query.state === 'prefetched') {
const results = query.results;
let data;
if (this.props.cache && query.cached) {
data = this.state.data;
} else if (results && results.data) {
data = results.data;
}
if (!this.props.has_prefetched) {
if (data && data.length > 0) {
return (
<div>
<div>
{progressBar}
</div>
<VisualizeModal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole section is copy pasted from earlier in this very method. This should be refactored into its own component, or at least a renderDataSection method or something like that, that would receive different props/params as needed.

show={this.state.showModal}
query={this.props.query}
onHide={this.hideModal.bind(this)}
/>
{this.getControls.bind(this)()}
{sql}
<FilterableTable
data={data}
orderedColumnKeys={results.columns.map(col => col.name)}
height={height}
filterText={this.state.searchText}
/>
</div>
);
} else if (data && data.length === 0) {
return <Alert bsStyle="warning">The query returned no data</Alert>;
}
} else {
return (
<div>
<div>
<QueryStateLabel query={query} />
{progressBar}
</div>
<VisualizeModal
show={this.state.showModal}
query={this.props.query}
onHide={this.hideModal.bind(this)}
/>
{this.getControls.bind(this)()}
{sql}
</div>
);
}
}


if (query.trackingUrl) {
trackingUrl = (
<Button
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const defaultProps = {
export default function RunQueryActionButton(props) {
const runBtnText = props.selectedText ? t('Run Selected Query') : t('Run Query');
const btnStyle = props.selectedText ? 'warning' : 'primary';
const shouldShowStopBtn = ['running', 'pending'].indexOf(props.queryState) > -1;
const shouldShowStopBtn = ['running', 'pending', 'prefetched'].indexOf(props.queryState) > -1;

const commonBtnProps = {
bsSize: 'small',
Expand Down
17 changes: 17 additions & 0 deletions superset/assets/src/SqlLab/reducers.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,22 @@ export const sqlLabReducer = function (state, action) {
[actions.REQUEST_QUERY_RESULTS]() {
return alterInObject(state, 'queries', action.query, { state: 'fetching' });
},
[actions.PREFETCH_SUCCESS]() {
let rows;
if (action.results.data) {
rows = action.results.data.length;
}
const alts = {
results: action.results,
rows,
state: 'prefetched',
errorMessage: null,
cached: false,
csv: false,
has_prefetched: true,
};
return alterInObject(state, 'queries', action.query, alts);
},
[actions.QUERY_SUCCESS]() {
let rows;
if (action.results.data) {
Expand All @@ -174,6 +190,7 @@ export const sqlLabReducer = function (state, action) {
state: action.query.state,
errorMessage: null,
cached: false,
csv: true,
};
return alterInObject(state, 'queries', action.query, alts);
},
Expand Down
8 changes: 7 additions & 1 deletion superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@
MAPBOX_API_KEY = os.environ.get('MAPBOX_API_KEY', '')

# Maximum number of rows returned in the SQL editor
SQL_MAX_ROW = 1000000
SQL_MAX_ROW = 10000
DISPLAY_SQL_MAX_ROW = 1000

# Maximum number of tables/views displayed in the dropdown window in SQL Lab.
Expand Down Expand Up @@ -294,6 +294,12 @@ class CeleryConfig(object):
# Timeout duration for SQL Lab synchronous queries
SQLLAB_TIMEOUT = 30

# When set to true, results from asynchronous sql lab are prefetched
PREFETCH_ASYNC = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a db-level param?


# Howmany rows to prefetch from asyncronous queries
PREFETCH_ROWS = 100

# SQLLAB_DEFAULT_DBID
SQLLAB_DEFAULT_DBID = None

Expand Down
44 changes: 39 additions & 5 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

from collections import defaultdict, namedtuple
import inspect
import json
import logging
import os
import re
import textwrap
import time
import uuid

import boto3
from flask import g
Expand All @@ -40,7 +42,7 @@
import unicodecsv
from werkzeug.utils import secure_filename

from superset import app, cache_util, conf, db, utils
from superset import app, cache_util, conf, db, results_backend, utils
from superset.exceptions import SupersetTemplateException
from superset.utils import QueryStatus

Expand Down Expand Up @@ -72,8 +74,8 @@ class BaseEngineSpec(object):
inner_joins = True

@classmethod
def fetch_data(cls, cursor, limit):
if cls.limit_method == LimitMethod.FETCH_MANY:
def fetch_data(cls, cursor, limit, prefetch=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does fetch_data need a new arg or does it just need to be called with a limit?

if cls.limit_method == LimitMethod.FETCH_MANY or prefetch:
return cursor.fetchmany(limit)
return cursor.fetchall()

Expand Down Expand Up @@ -726,7 +728,29 @@ def extra_table_metadata(cls, database, table_name, schema_name):
}

@classmethod
def handle_cursor(cls, cursor, query, session):
def prefetch_results(cls, cursor, query, cache_timeout, session, limit):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears this method is not covered by tests

data = cursor.fetchmany(limit)
column_names = cls.get_normalized_column_names(cursor.description)
cdf = utils.convert_results_to_df(column_names, data)
payload = dict(query_id=query.id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much of the logic here is not specific to Presto and should probably live in the base class or outside this module. Maybe something like cache_prefetched_data(data).

payload.update({
'status': utils.QueryStatus.PREFETCHED,
'data': cdf.data if cdf.data else [],
'columns': cdf.columns if cdf.columns else [],
'query': query.to_dict(),
})

json_payload = json.dumps(payload, default=utils.json_iso_dttm_ser)
key = '{}'.format(uuid.uuid4())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to format this as a string as it's already a string.

prefetch_key = key
results_backend.set(
prefetch_key, utils.zlib_compress(json_payload), cache_timeout)
query.status = utils.QueryStatus.PREFETCHED
query.results_key = key
session.commit()

@classmethod
def handle_cursor(cls, cursor, query, session, cache_timeout=0):
"""Updates progress information"""
logging.info('Polling the cursor for progress')
polled = cursor.poll()
Expand All @@ -737,12 +761,20 @@ def handle_cursor(cls, cursor, query, session):
while polled:
# Update the object and wait for the kill signal.
stats = polled.get('stats', {})

query = session.query(type(query)).filter_by(id=query.id).one()
if query.status in [QueryStatus.STOPPED, QueryStatus.TIMED_OUT]:
cursor.cancel()
break

if (
config.get('PREFETCH_ASYNC') and
(not query.has_loaded_early)
):
query.has_loaded_early = True
limit = config.get('PREFETCH_ROWS')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefetch_count would be a better name than limit as limit has different meaning around limiting the query itself.

PrestoEngineSpec.prefetch_results(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this possible that since fetch_many is synchronous, it will prevent publishing any query % progress until some rows are fetched? I think we may be loosing or obfuscating progress here. Say a query with a large groupby returning a small result set that takes 5 minutes to scan will show 0% until it can return the small result set. Now I also wonder how the UI does if/when the prefetch and final result occur right at around the same moment (large scan query with small result set), does it flicker, does it look ok?

cursor, query, cache_timeout, session, limit)

if stats:
completed_splits = float(stats.get('completedSplits'))
total_splits = float(stats.get('totalSplits'))
Expand Down Expand Up @@ -1079,6 +1111,8 @@ def handle_cursor(cls, cursor, query, session):
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break
if hive.ttypes.TOperationState.RUNNING_STATE == polled.operationState:
BaseEngineSpec.fetch_data(cursor, 100, prefetch=True)

log = cursor.fetch_logs() or ''
if log:
Expand Down
Loading