From be897ac521425ed05539602b9547c23beab5b5c9 Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Wed, 28 Mar 2018 14:44:01 -0700 Subject: [PATCH 1/3] Rename no_reload (#4703) --- superset/config.py | 33 ++++++++++++++++++++------------- superset/db_engine_specs.py | 7 ++++++- superset/sql_lab.py | 2 +- superset/views/core.py | 1 + 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/superset/config.py b/superset/config.py index 530b1268962ac..3d7ac6a1ce287 100644 --- a/superset/config.py +++ b/superset/config.py @@ -58,8 +58,8 @@ SECRET_KEY = '\2\1thisismyscretkey\1\2\e\y\y\h' # noqa # The SQLAlchemy connection string. -SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db') -# SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp' +#SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db') +SQLALCHEMY_DATABASE_URI = 'mysql://root@localhost/superset_development' # SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp' # In order to hook up a custom password store for all SQLACHEMY connections @@ -247,7 +247,7 @@ BACKUP_COUNT = 30 # Set this API key to enable Mapbox visualizations -MAPBOX_API_KEY = os.environ.get('MAPBOX_API_KEY', '') +MAPBOX_API_KEY = '' # Maximum number of rows returned in the SQL editor SQL_MAX_ROW = 1000000 @@ -264,19 +264,23 @@ # Default celery config is to use SQLA as a broker, in a production setting # you'll want to use a proper broker as specified here: # http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html -""" + # Example: class CeleryConfig(object): - BROKER_URL = 'sqla+sqlite:///celerydb.sqlite' - CELERY_IMPORTS = ('superset.sql_lab', ) - CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite' - CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}} - CELERYD_LOG_LEVEL = 'DEBUG' - CELERYD_PREFETCH_MULTIPLIER = 1 - CELERY_ACKS_LATE = True + BROKER_URL = 'redis://localhost:6379' + CELERY_IMPORTS = ('superset.sql_lab', ) + CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' + CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}} + CELERYD_LOG_LEVEL = 'DEBUG' + CELERYD_PREFETCH_MULTIPLIER = 1 + CELERYD_CONCURRENCY = 16 + CELERY_ACKS_LATE = True + STATSD_HOST = 'localhost' + STATSD_PORT = 8125 CELERY_CONFIG = CeleryConfig """ CELERY_CONFIG = None +""" SQL_CELERY_DB_FILE_PATH = os.path.join(DATA_DIR, 'celerydb.sqlite') SQL_CELERY_RESULTS_DB_FILE_PATH = os.path.join(DATA_DIR, 'celery_results.sqlite') @@ -304,7 +308,10 @@ class CeleryConfig(object): # An instantiated derivative of werkzeug.contrib.cache.BaseCache # if enabled, it can be used to store the results of long-running queries # in SQL Lab by using the "Run Async" button/feature -RESULTS_BACKEND = None +from s3cache.s3cache import S3Cache +S3_CACHE_BUCKET = 'airbnb-superset' +S3_CACHE_KEY_PREFIX = 'sql_lab_result' +RESULTS_BACKEND = S3Cache(S3_CACHE_BUCKET, S3_CACHE_KEY_PREFIX) # The S3 bucket where you want to store your external hive tables created # from CSV files. For example, 'companyname-superset' @@ -433,4 +440,4 @@ class CeleryConfig(object): print('Loaded your LOCAL configuration at [{}]'.format( superset_config.__file__)) except ImportError: - pass + pass \ No newline at end of file diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index cbcc570934345..f989aaed38658 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -737,7 +737,9 @@ def handle_cursor(cls, cursor, query, session): while polled: # Update the object and wait for the kill signal. stats = polled.get('stats', {}) - + print(stats) + print(dir(stats)) + print("????") query = session.query(type(query)).filter_by(id=query.id).one() if query.status in [QueryStatus.STOPPED, QueryStatus.TIMED_OUT]: cursor.cancel() @@ -1071,6 +1073,9 @@ def handle_cursor(cls, cursor, query, session): hive.ttypes.TOperationState.RUNNING_STATE, ) polled = cursor.poll() + print("~~~~~~~~~~~~~~~") + print(polled) + print(dir(polled)) last_log_line = 0 tracking_url = None job_id = None diff --git a/superset/sql_lab.py b/superset/sql_lab.py index c9f07ae906c7b..a9dbe1cd654c1 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -128,6 +128,7 @@ def get_sql_results( except Exception as e: logging.exception(e) stats_logger.incr('error_sqllab_unhandled') + sesh = get_session(not ctask.request.called_directly) query = get_query(query_id, session) query.error_message = str(e) query.status = QueryStatus.FAILED @@ -141,7 +142,6 @@ def execute_sql( user_name=None, session=None, ): """Executes the sql query returns the results.""" - query = get_query(query_id, session) payload = dict(query_id=query_id) diff --git a/superset/views/core.py b/superset/views/core.py index b4a1689a9165b..c677802affe30 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -2538,6 +2538,7 @@ def fetch_datasource_metadata(self): @expose('/queries/') def queries(self, last_updated_ms): """Get the updated queries.""" + print("frontend is checking!!~~~~~~") stats_logger.incr('queries') if not g.user.get_id(): return json_error_response( From 0d3dc521d9486935a66f5fecfb890d2ff61dd9fd Mon Sep 17 00:00:00 2001 From: Timi Fasubaa Date: Mon, 2 Apr 2018 01:26:07 -0700 Subject: [PATCH 2/3] prefetch asyncronous query results from presto --- superset/assets/src/SqlLab/actions.js | 19 ++++-- .../SqlLab/components/QueryAutoRefresh.jsx | 5 +- .../src/SqlLab/components/ResultSet.jsx | 68 ++++++++++++++++++- .../components/RunQueryActionButton.jsx | 2 +- superset/assets/src/SqlLab/reducers.js | 17 +++++ superset/assets/yarn.lock | 12 ++++ superset/config.py | 36 +++++----- superset/db_engine_specs.py | 43 +++++++++--- superset/sql_lab.py | 36 +++++++++- superset/utils.py | 51 ++++++++++++++ superset/views/core.py | 17 +++-- tests/sqllab_tests.py | 7 +- 12 files changed, 265 insertions(+), 48 deletions(-) diff --git a/superset/assets/src/SqlLab/actions.js b/superset/assets/src/SqlLab/actions.js index 644947023bcb8..71d021b91bdc6 100644 --- a/superset/assets/src/SqlLab/actions.js +++ b/superset/assets/src/SqlLab/actions.js @@ -36,6 +36,7 @@ export const START_QUERY = 'START_QUERY'; export const STOP_QUERY = 'STOP_QUERY'; export const REQUEST_QUERY_RESULTS = 'REQUEST_QUERY_RESULTS'; export const QUERY_SUCCESS = 'QUERY_SUCCESS'; +export const PREFETCH_SUCCESS = 'PREFETCH_SUCCESS'; export const QUERY_FAILED = 'QUERY_FAILED'; export const CLEAR_QUERY_RESULTS = 'CLEAR_QUERY_RESULTS'; export const REMOVE_DATA_PREVIEW = 'REMOVE_DATA_PREVIEW'; @@ -77,7 +78,9 @@ export function startQuery(query) { export function querySuccess(query, results) { return { type: QUERY_SUCCESS, query, results }; } - +export function prefetchSuccess(query, results) { + return { type: PREFETCH_SUCCESS, query, results }; +} export function queryFailed(query, msg) { return { type: QUERY_FAILED, query, msg }; } @@ -100,14 +103,22 @@ export function requestQueryResults(query) { export function fetchQueryResults(query) { return function (dispatch) { - dispatch(requestQueryResults(query)); - const sqlJsonUrl = `/superset/results/${query.resultsKey}/`; + dispatch(requestQueryResults(query)); + let sqlJsonUrl = `/superset/results/${query.resultsKey}`; + + if (query.state === 'prefetched') { + sqlJsonUrl += '_prefetch'; + } $.ajax({ type: 'GET', dataType: 'json', url: sqlJsonUrl, success(results) { - dispatch(querySuccess(query, results)); + if (results.status === "prefetched") { + dispatch(prefetchSuccess(query, results)); + } else { + dispatch(querySuccess(query, results)); + } }, error(err) { let msg = t('Failed at retrieving results from the results backend'); diff --git a/superset/assets/src/SqlLab/components/QueryAutoRefresh.jsx b/superset/assets/src/SqlLab/components/QueryAutoRefresh.jsx index 55e06cc1467b2..11c5d235f3e26 100644 --- a/superset/assets/src/SqlLab/components/QueryAutoRefresh.jsx +++ b/superset/assets/src/SqlLab/components/QueryAutoRefresh.jsx @@ -23,7 +23,8 @@ class QueryAutoRefresh extends React.PureComponent { const now = new Date().getTime(); return Object.values(queries) .some( - q => ['running', 'started', 'pending', 'fetching'].indexOf(q.state) >= 0 && + q => ['running', 'started', 'pending', 'fetching', 'prefetched'].indexOf( + q.state) >= 0 && now - q.startDttm < MAX_QUERY_AGE_TO_POLL, ); } @@ -39,7 +40,7 @@ class QueryAutoRefresh extends React.PureComponent { stopwatch() { // only poll /superset/queries/ if there are started or running queries if (this.shouldCheckForQueries()) { - const url = `/superset/queries/${this.props.queriesLastUpdate - QUERY_UPDATE_BUFFER_MS}`; + const url = `/superset/queries/${this.props.queriesLastUpdate - QUERY_UPDATE_BUFFER_MS}`; $.getJSON(url, (data) => { if (Object.keys(data).length > 0) { this.props.actions.refreshQueries(data); diff --git a/superset/assets/src/SqlLab/components/ResultSet.jsx b/superset/assets/src/SqlLab/components/ResultSet.jsx index f36a1640c79c3..e4b8d699e25f1 100644 --- a/superset/assets/src/SqlLab/components/ResultSet.jsx +++ b/superset/assets/src/SqlLab/components/ResultSet.jsx @@ -18,6 +18,7 @@ const propTypes = { visualize: PropTypes.bool, cache: PropTypes.bool, height: PropTypes.number.isRequired, + has_prefetched: PropTypes.bool, }; const defaultProps = { search: true, @@ -26,6 +27,7 @@ const defaultProps = { csv: true, actions: {}, cache: false, + has_prefetched: false, }; const SEARCH_HEIGHT = 46; @@ -53,6 +55,7 @@ export default class ResultSet extends React.PureComponent { this.clearQueryResults(nextProps.query), ); } + if (nextProps.query.resultsKey && nextProps.query.resultsKey !== this.props.query.resultsKey) { this.fetchResults(nextProps.query); @@ -61,12 +64,18 @@ export default class ResultSet extends React.PureComponent { getControls() { if (this.props.search || this.props.visualize || this.props.csv) { let csvButton; - if (this.props.csv) { + let next; + if (this.props.csv && this.props.query.state === 'success') { csvButton = ( ); + next = ( + + ); } let visualizeButton; if (this.props.visualize) { @@ -97,6 +106,7 @@ export default class ResultSet extends React.PureComponent { {visualizeButton} {csvButton} + {next}
@@ -214,7 +224,7 @@ export default class ResultSet extends React.PureComponent { } let progressBar; let trackingUrl; - if (query.progress > 0 && query.state === 'running') { + if (query.progress > 0 && (query.state === 'running' || query.state === 'prefetched')) { progressBar = ( ); } + + if (query.state === 'prefetched') { + const results = query.results; + let data; + if (this.props.cache && query.cached) { + data = this.state.data; + } else if (results && results.data) { + data = results.data; + } + if (!this.props.has_prefetched) { + if (data && data.length > 0) { + return ( +
+
+ {progressBar} +
+ + {this.getControls.bind(this)()} + {sql} + col.name)} + height={height} + filterText={this.state.searchText} + /> +
+ ); + } else if (data && data.length === 0) { + return The query returned no data; + } + } else { + return ( +
+
+ + {progressBar} +
+ + {this.getControls.bind(this)()} + {sql} +
+ ); + } + } + + if (query.trackingUrl) { trackingUrl = ( ); - next = ( - - ); } let visualizeButton; if (this.props.visualize) { @@ -106,7 +99,6 @@ export default class ResultSet extends React.PureComponent { {visualizeButton} {csvButton} - {next}
diff --git a/superset/assets/yarn.lock b/superset/assets/yarn.lock index 92a0cdc7adeb3..5ebc447b92592 100644 --- a/superset/assets/yarn.lock +++ b/superset/assets/yarn.lock @@ -5287,14 +5287,6 @@ lodash._basecreate@^3.0.0: version "3.0.3" resolved "https://registry.yarnpkg.com/lodash._basecreate/-/lodash._basecreate-3.0.3.tgz#1bc661614daa7fc311b7d03bf16806a0213cf821" -lodash._baseisequal@^3.0.0: - version "3.0.7" - resolved "https://registry.yarnpkg.com/lodash._baseisequal/-/lodash._baseisequal-3.0.7.tgz#d8025f76339d29342767dcc887ce5cb95a5b51f1" - dependencies: - lodash.isarray "^3.0.0" - lodash.istypedarray "^3.0.0" - lodash.keys "^3.0.0" - lodash._baseuniq@~4.6.0: version "4.6.0" resolved "https://registry.yarnpkg.com/lodash._baseuniq/-/lodash._baseuniq-4.6.0.tgz#0ebb44e456814af7905c6212fa2c9b2d51b841e8" @@ -5302,10 +5294,6 @@ lodash._baseuniq@~4.6.0: lodash._createset "~4.0.0" lodash._root "~3.0.0" -lodash._bindcallback@^3.0.0: - version "3.0.1" - resolved "https://registry.yarnpkg.com/lodash._bindcallback/-/lodash._bindcallback-3.0.1.tgz#e531c27644cf8b57a99e17ed95b35c748789392e" - lodash._createset@~4.0.0: version "4.0.3" resolved "https://registry.yarnpkg.com/lodash._createset/-/lodash._createset-4.0.3.tgz#0f4659fbb09d75194fa9e2b88a6644d363c9fe26" diff --git a/superset/config.py b/superset/config.py index ce5566d44a019..48e445c103c4d 100644 --- a/superset/config.py +++ b/superset/config.py @@ -250,7 +250,7 @@ MAPBOX_API_KEY = os.environ.get('MAPBOX_API_KEY', '') # Maximum number of rows returned in the SQL editor -SQL_MAX_ROW = 1000000 +SQL_MAX_ROW = 10000 DISPLAY_SQL_MAX_ROW = 1000 # Maximum number of tables/views displayed in the dropdown window in SQL Lab. @@ -295,7 +295,10 @@ class CeleryConfig(object): SQLLAB_TIMEOUT = 30 # When set to true, results from asynchronous sql lab are prefetched -PREFETCH_PRESTO = True +PREFETCH_ASYNC = True + +# Howmany rows to prefetch from asyncronous queries +PREFETCH_ROWS = 100 # SQLLAB_DEFAULT_DBID SQLLAB_DEFAULT_DBID = None diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index 5850c0863495c..a6fa370c1a3ab 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -74,8 +74,8 @@ class BaseEngineSpec(object): inner_joins = True @classmethod - def fetch_data(cls, cursor, limit): - if cls.limit_method == LimitMethod.FETCH_MANY: + def fetch_data(cls, cursor, limit, prefetch=False): + if cls.limit_method == LimitMethod.FETCH_MANY or prefetch: return cursor.fetchmany(limit) return cursor.fetchall() @@ -728,9 +728,10 @@ def extra_table_metadata(cls, database, table_name, schema_name): } @classmethod - def prefetch_results(cls, cursor, query, cache_timeout, session, limit=1000): + def prefetch_results(cls, cursor, query, cache_timeout, session, limit): data = cursor.fetchmany(limit) - cdf = utils.convert_results_to_df(cursor.description, data) + column_names = cls.get_normalized_column_names(cursor.description) + cdf = utils.convert_results_to_df(column_names, data) payload = dict(query_id=query.id) payload.update({ 'status': utils.QueryStatus.PREFETCHED, @@ -741,7 +742,7 @@ def prefetch_results(cls, cursor, query, cache_timeout, session, limit=1000): json_payload = json.dumps(payload, default=utils.json_iso_dttm_ser) key = '{}'.format(uuid.uuid4()) - prefetch_key = key + '_prefetch' + prefetch_key = key results_backend.set( prefetch_key, utils.zlib_compress(json_payload), cache_timeout) query.status = utils.QueryStatus.PREFETCHED @@ -760,20 +761,19 @@ def handle_cursor(cls, cursor, query, session, cache_timeout=0): while polled: # Update the object and wait for the kill signal. stats = polled.get('stats', {}) - processed_rows = stats['processedRows'] - query = session.query(type(query)).filter_by(id=query.id).one() if query.status in [QueryStatus.STOPPED, QueryStatus.TIMED_OUT]: cursor.cancel() break if ( - config.get('PREFETCH_PRESTO') and - processed_rows > 1000 and - not query.has_loaded_early + config.get('PREFETCH_ASYNC') and + (not query.has_loaded_early) ): query.has_loaded_early = True - PrestoEngineSpec.prefetch_results(cursor, query, cache_timeout, session) + limit = config.get('PREFETCH_ROWS') + PrestoEngineSpec.prefetch_results( + cursor, query, cache_timeout, session, limit) if stats: completed_splits = float(stats.get('completedSplits')) @@ -1111,6 +1111,8 @@ def handle_cursor(cls, cursor, query, session): if query.status == QueryStatus.STOPPED: cursor.cancel() break + if hive.ttypes.TOperationState.RUNNING_STATE == polled.operationState: + BaseEngineSpec.fetch_data(cursor, 100, prefetch=True) log = cursor.fetch_logs() or '' if log: diff --git a/superset/sql_lab.py b/superset/sql_lab.py index e965b075a53ec..b51e39e138815 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -9,22 +9,12 @@ import uuid from celery.exceptions import SoftTimeLimitExceeded -<<<<<<< HEAD from contextlib2 import contextmanager -import numpy as np -import pandas as pd -======= ->>>>>>> prefetch asyncronous query results from presto import sqlalchemy from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import NullPool -<<<<<<< HEAD -from superset import app, dataframe, db, results_backend, security_manager, utils -======= from superset import app, db, results_backend, security_manager, utils -from superset.db_engine_specs import LimitMethod ->>>>>>> prefetch asyncronous query results from presto from superset.models.sql_lab import Query from superset.sql_parse import SupersetQuery from superset.utils import get_celery_app, QueryStatus @@ -39,27 +29,6 @@ class SqlLabException(Exception): pass -def dedup(l, suffix='__'): - """De-duplicates a list of string by suffixing a counter - - Always returns the same number of entries as provided, and always returns - unique values. - - >>> print(','.join(dedup(['foo', 'bar', 'bar', 'bar']))) - foo,bar,bar__1,bar__2 - """ - new_l = [] - seen = {} - for s in l: - if s in seen: - seen[s] += 1 - s += suffix + str(seen[s]) - else: - seen[s] = 0 - new_l.append(s) - return new_l - - def get_query(query_id, session, retry_count=5): """attemps to get the query and retry if it cannot""" query = None @@ -104,32 +73,12 @@ def session_scope(nullpool): session.close() -def convert_results_to_df(column_names, data): - """Convert raw query results to a DataFrame.""" - column_names = dedup(column_names) - - # check whether the result set has any nested dict columns - if data: - first_row = data[0] - has_dict_col = any([isinstance(c, dict) for c in first_row]) - df_data = list(data) if has_dict_col else np.array(data, dtype=object) - else: - df_data = [] - - cdf = dataframe.SupersetDataFrame( - pd.DataFrame(df_data, columns=column_names)) - - return cdf - - @celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT) def get_sql_results( ctask, query_id, rendered_query, return_results=True, store_results=False, user_name=None): """Executes the sql query returns the results.""" -<<<<<<< HEAD with session_scope(not ctask.request.called_directly) as session: - try: return execute_sql( ctask, query_id, rendered_query, return_results, store_results, user_name, @@ -137,7 +86,6 @@ def get_sql_results( except Exception as e: logging.exception(e) stats_logger.incr('error_sqllab_unhandled') - sesh = get_session(not ctask.request.called_directly) query = get_query(query_id, session) query.error_message = str(e) query.status = QueryStatus.FAILED @@ -176,9 +124,6 @@ def handle_error(msg): if store_results and not results_backend: return handle_error("Results backend isn't configured.") - cache_timeout = database.cache_timeout - if cache_timeout is None: - cache_timeout = config.get('CACHE_DEFAULT_TIMEOUT', 0) # Limit enforced only for retrieving the data, not for the CTA queries. superset_query = SupersetQuery(rendered_query) @@ -233,7 +178,7 @@ def handle_error(msg): logging.info('Handling cursor') db_engine_spec.handle_cursor(cursor, query, session) logging.info('Fetching data: {}'.format(query.to_dict())) - data = db_engine_spec.fetch_data(cursor, query) + data = db_engine_spec.fetch_data(cursor, query.limit) except SoftTimeLimitExceeded as e: logging.exception(e) if conn is not None: @@ -255,12 +200,6 @@ def handle_error(msg): conn.close() if query.status == utils.QueryStatus.STOPPED: -<<<<<<< HEAD - return handle_error('The query has been stopped') - - cdf = convert_results_to_df(column_names, data) - -======= return json.dumps( { 'query_id': query.id, @@ -269,8 +208,13 @@ def handle_error(msg): }, default=utils.json_iso_dttm_ser) - cdf = utils.convert_results_to_df(cursor_description, data) ->>>>>>> prefetch asyncronous query results from presto + if query.has_loaded_early: + preloaded_data = results_backend.get(query.results_key) + preloaded_data = json.loads( + utils.zlib_decompress_to_string(preloaded_data))['data'] + preloaded_data = [list(row.values()) for row in preloaded_data] + data = preloaded_data + data + cdf = utils.convert_results_to_df(column_names, data) query.rows = cdf.size query.progress = 100 query.status = QueryStatus.SUCCESS @@ -285,12 +229,11 @@ def handle_error(msg): query.end_time = utils.now_as_float() session.merge(query) session.flush() - new_key = query.results_key if cdf.data else utils.prefetch_key(query.results_key) payload.update({ 'status': query.status, 'data': cdf.data if cdf.data else [], - 'results_key': new_key + 'results_key': query.results_key, 'columns': cdf.columns if cdf.columns else [], 'query': query.to_dict(), }) diff --git a/superset/utils.py b/superset/utils.py index 8b0669560e0fd..cf6027dd655db 100644 --- a/superset/utils.py +++ b/superset/utils.py @@ -177,7 +177,7 @@ def convert_results_to_df(cursor_description, data): import pandas as pd """Convert raw query results to a DataFrame.""" column_names = ( - [col[0] for col in cursor_description] if cursor_description else []) + [col for col in cursor_description] if cursor_description else []) column_names = dedup(column_names) # check whether the result set has any nested dict columns @@ -188,9 +188,9 @@ def convert_results_to_df(cursor_description, data): else: df_data = [] + pdf = pd.DataFrame(df_data, columns=column_names) cdf = dataframe.SupersetDataFrame( - pd.DataFrame(df_data, columns=column_names)) - + pdf) return cdf @@ -927,9 +927,3 @@ def split_adhoc_filters_into_base_filters(fd): fd['having_filters'] = simple_having_filters fd['filters'] = simple_where_filters del fd['adhoc_filters'] - - - -def prefetch_key(key): - # given a key, this returns the location of the prefetched key - return key + "_prefetch" diff --git a/superset/views/core.py b/superset/views/core.py index 6a3d8dd9d28b0..b4a1689a9165b 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -2318,6 +2318,7 @@ def results(self, key): """Serves a key off of the results backend""" if not results_backend: return json_error_response("Results backend isn't configured") + blob = results_backend.get(key) if not blob: return json_error_response( @@ -2325,8 +2326,7 @@ def results(self, key): 'You may want to re-run the query.', status=410, ) - if key.endswith('_prefetch'): # hack to not break when requesting prefetch - key = key[:-9] + query = db.session.query(Query).filter_by(results_key=key).one() rejected_tables = security_manager.rejected_datasources( query.sql, query.database, query.schema) @@ -2492,7 +2492,6 @@ def csv(self, client_id): if rejected_tables: flash(get_datasource_access_error_msg('{}'.format(rejected_tables))) return redirect('/') - prefetched_blob = None blob = None if results_backend and query.results_key: logging.info( @@ -2503,17 +2502,8 @@ def csv(self, client_id): logging.info('Decompressing') json_payload = utils.zlib_decompress_to_string(blob) obj = json.loads(json_payload) - - prefetched_blob = results_backend.get(query.results_key + '_prefetch') - if prefetched_blob: - prefetched_payload = utils.zlib_decompress_to_string(prefetched_blob) - prefetched_blob_json = json.loads(prefetched_payload) - - data = ( - obj['data'] + prefetched_blob_json['data'] if prefetched_blob - else obj['data']) columns = [c['name'] for c in obj['columns']] - df = pd.DataFrame.from_records(data, columns=columns) + df = pd.DataFrame.from_records(obj['data'], columns=columns) logging.info('Using pandas to convert to CSV') csv = df.to_csv(index=False, **config.get('CSV_EXPORT')) else: