From d9480f461a3c689133dd70c10b3e53e5966db557 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 22 Apr 2019 15:23:24 -0700 Subject: [PATCH 1/3] Preserve order in `to_dataframe` with BQ Storage and queries containing ORDER BY This fixes an issue where due to reading from multiple stream in parallel, the order of rows is not preserved. Normally this isn't an issue, but it is when the rows are query results from an ORDER BY query. --- bigquery/google/cloud/bigquery/job.py | 15 +- bigquery/google/cloud/bigquery/table.py | 10 +- bigquery/tests/unit/test_job.py | 177 ++++++++++++++++++++---- 3 files changed, 173 insertions(+), 29 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index d14119b1f1b1..4c25e80eb40c 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -15,6 +15,7 @@ """Define API Jobs.""" import copy +import re import threading import six @@ -92,6 +93,14 @@ def _error_result_to_exception(error_result): ) +def _contains_order_by(query): + """Do we need to preserve the order of the query results?""" + if query is None: + return False + + return re.search(r"ORDER\s+BY", query, re.IGNORECASE) is not None + + class Compression(object): """The compression type to use for exported files. The default value is :attr:`NONE`. @@ -2546,7 +2555,7 @@ def from_api_repr(cls, resource, client): :returns: Job parsed from ``resource``. """ job_id, config = cls._get_resource_config(resource) - query = config["query"]["query"] + query = _helpers._get_sub_prop(config, ["query", "query"]) job = cls(job_id, query, client=client) job._set_properties(resource) return job @@ -2849,7 +2858,9 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): dest_table_ref = self.destination dest_table = Table(dest_table_ref, schema=schema) dest_table._properties["numRows"] = self._query_results.total_rows - return self._client.list_rows(dest_table, retry=retry) + rows = self._client.list_rows(dest_table, retry=retry) + rows._preserve_order = _contains_order_by(self.query) + return rows def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): """Return a pandas DataFrame from a QueryJob diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index d50fec487a31..46213d5fe8bf 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1348,6 +1348,7 @@ def __init__( ) self._field_to_index = _helpers._field_to_index_mapping(schema) self._page_size = page_size + self._preserve_order = False self._project = client.project self._schema = schema self._selected_fields = selected_fields @@ -1496,10 +1497,15 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None): for field in self._selected_fields: read_options.selected_fields.append(field.name) + requested_streams = 0 + if self._preserve_order: + requested_streams = 1 + session = bqstorage_client.create_read_session( self._table.to_bqstorage(), "projects/{}".format(self._project), read_options=read_options, + requested_streams=requested_streams, ) # We need to parse the schema manually so that we can rearrange the @@ -1512,6 +1518,8 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None): if not session.streams: return pandas.DataFrame(columns=columns) + total_streams = len(session.streams) + # Use _to_dataframe_finished to notify worker threads when to quit. # See: https://stackoverflow.com/a/29237343/101923 self._to_dataframe_finished = False @@ -1560,7 +1568,7 @@ def get_frames(pool): return frames - with concurrent.futures.ThreadPoolExecutor() as pool: + with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: try: frames = get_frames(pool) finally: diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index a30c026a82c0..0a512d288e28 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -17,6 +17,7 @@ import unittest import mock +import pytest from six.moves import http_client try: @@ -59,6 +60,47 @@ def _make_connection(*responses): return mock_conn +def _make_job_resource( + creation_time_ms=1437767599006, + started_time_ms=1437767600007, + ended_time_ms=1437767601008, + started=False, + ended=False, + etag="abc-def-hjk", + endpoint="https://www.googleapis.com", + job_type="load", + job_id="a-random-id", + project_id="some-project", + user_email="bq-user@example.com", +): + resource = { + "configuration": {job_type: {}}, + "statistics": {"creationTime": creation_time_ms, job_type: {}}, + "etag": etag, + "id": "{}:{}".format(project_id, job_id), + "jobReference": {"projectId": project_id, "jobId": job_id}, + "selfLink": "{}/bigquery/v2/projects/{}/jobs/{}".format( + endpoint, project_id, job_id + ), + "user_email": user_email, + } + + if started or ended: + resource["statistics"]["startTime"] = started_time_ms + + if ended: + resource["statistics"]["endTime"] = ended_time_ms + + if job_type == "query": + resource["configuration"]["query"]["destinationTable"] = { + "projectId": project_id, + "datasetId": "_temp_dataset", + "tableId": "_temp_table", + } + + return resource + + class Test__error_result_to_exception(unittest.TestCase): def _call_fut(self, *args, **kwargs): from google.cloud.bigquery import job @@ -974,6 +1016,7 @@ class _Base(object): from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.table import TableReference + ENDPOINT = "https://www.googleapis.com" PROJECT = "project" SOURCE1 = "http://example.com/source1.csv" DS_ID = "dataset_id" @@ -994,7 +1037,9 @@ def _setUpConstants(self): self.WHEN = datetime.datetime.utcfromtimestamp(self.WHEN_TS).replace(tzinfo=UTC) self.ETAG = "ETAG" self.FULL_JOB_ID = "%s:%s" % (self.PROJECT, self.JOB_ID) - self.RESOURCE_URL = "http://example.com/path/to/resource" + self.RESOURCE_URL = "{}/bigquery/v2/projects/{}/jobs/{}".format( + self.ENDPOINT, self.PROJECT, self.JOB_ID + ) self.USER_EMAIL = "phred@example.com" def _table_ref(self, table_id): @@ -1004,30 +1049,19 @@ def _table_ref(self, table_id): def _make_resource(self, started=False, ended=False): self._setUpConstants() - resource = { - "configuration": {self.JOB_TYPE: {}}, - "statistics": {"creationTime": self.WHEN_TS * 1000, self.JOB_TYPE: {}}, - "etag": self.ETAG, - "id": self.FULL_JOB_ID, - "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, - "selfLink": self.RESOURCE_URL, - "user_email": self.USER_EMAIL, - } - - if started or ended: - resource["statistics"]["startTime"] = self.WHEN_TS * 1000 - - if ended: - resource["statistics"]["endTime"] = (self.WHEN_TS + 1000) * 1000 - - if self.JOB_TYPE == "query": - resource["configuration"]["query"]["destinationTable"] = { - "projectId": self.PROJECT, - "datasetId": "_temp_dataset", - "tableId": "_temp_table", - } - - return resource + return _make_job_resource( + creation_time_ms=int(self.WHEN_TS * 1000), + started_time_ms=int(self.WHEN_TS * 1000), + ended_time_ms=int(self.WHEN_TS * 1000) + 1000000, + started=started, + ended=ended, + etag=self.ETAG, + endpoint=self.ENDPOINT, + job_type=self.JOB_TYPE, + job_id=self.JOB_ID, + project_id=self.PROJECT, + user_email=self.USER_EMAIL, + ) def _verifyInitialReadonlyProperties(self, job): # root elements of resource @@ -4684,7 +4718,11 @@ def test_to_dataframe_bqstorage(self): job.to_dataframe(bqstorage_client=bqstorage_client) bqstorage_client.create_read_session.assert_called_once_with( - mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY + mock.ANY, + "projects/{}".format(self.PROJECT), + read_options=mock.ANY, + # Use default number of streams for best performance. + requested_streams=0, ) @unittest.skipIf(pandas is None, "Requires `pandas`") @@ -5039,3 +5077,90 @@ def test_from_api_repr_normal(self): self.assertEqual(entry.pending_units, self.PENDING_UNITS) self.assertEqual(entry.completed_units, self.COMPLETED_UNITS) self.assertEqual(entry.slot_millis, self.SLOT_MILLIS) + + +@pytest.mark.parametrize( + "query,expected", + ( + (None, False), + ("", False), + ("select name, age from table", False), + ("select name, age from table LIMIT 10;", False), + ("select name, age from table order by other_column;", True), + ("Select name, age From table Order By other_column", True), + ("SELECT name, age FROM table ORDER BY other_column;", True), + ("select name, age from table order\nby other_column", True), + ("Select name, age From table Order\nBy other_column;", True), + ("SELECT name, age FROM table ORDER\nBY other_column", True), + ("SelecT name, age froM table OrdeR \n\t BY other_column;", True), + ), +) +def test__contains_order_by(query, expected): + from google.cloud.bigquery import job as mut + + assert mut._contains_order_by(query) == expected + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif( + bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" +) +@pytest.mark.parametrize( + "query", + ( + "select name, age from table order by other_column;", + "Select name, age From table Order By other_column;", + "SELECT name, age FROM table ORDER BY other_column;", + "select name, age from table order\nby other_column;", + "Select name, age From table Order\nBy other_column;", + "SELECT name, age FROM table ORDER\nBY other_column;", + "SelecT name, age froM table OrdeR \n\t BY other_column;", + ), +) +def test_to_dataframe_bqstorage_preserve_order(query): + from google.cloud.bigquery.job import QueryJob as target_class + + job_resource = _make_job_resource( + project_id="test-project", job_type="query", ended=True + ) + job_resource["configuration"]["query"]["query"] = query + job_resource["status"] = {"state": "DONE"} + get_query_results_resource = { + "jobComplete": True, + "jobReference": {"projectId": "test-project", "jobId": "test-job"}, + "schema": { + "fields": [ + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, + ] + }, + "totalRows": "4", + } + connection = _make_connection(get_query_results_resource, job_resource) + client = _make_client(connection=connection) + job = target_class.from_api_repr(job_resource, client) + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession() + session.avro_schema.schema = json.dumps( + { + "type": "record", + "name": "__root__", + "fields": [ + {"name": "name", "type": ["null", "string"]}, + {"name": "age", "type": ["null", "long"]}, + ], + } + ) + bqstorage_client.create_read_session.return_value = session + + job.to_dataframe(bqstorage_client=bqstorage_client) + + bqstorage_client.create_read_session.assert_called_once_with( + mock.ANY, + "projects/test-project", + read_options=mock.ANY, + # Use a single stream to preserve row order. + requested_streams=1, + ) From babdd66bbc8dcc748d18ba5e50b58cafddde641c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 29 Apr 2019 14:18:00 -0700 Subject: [PATCH 2/3] Compile regex. --- bigquery/google/cloud/bigquery/job.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 4c25e80eb40c..18f22270feac 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -46,6 +46,7 @@ _DONE_STATE = "DONE" _STOPPED_REASON = "stopped" _TIMEOUT_BUFFER_SECS = 0.1 +_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE) _ERROR_REASON_TO_EXCEPTION = { "accessDenied": http_client.FORBIDDEN, @@ -94,11 +95,26 @@ def _error_result_to_exception(error_result): def _contains_order_by(query): - """Do we need to preserve the order of the query results?""" - if query is None: - return False + """Do we need to preserve the order of the query results? - return re.search(r"ORDER\s+BY", query, re.IGNORECASE) is not None + This function has known false positives, such as with ordered window + functions: + + .. code-block:: sql + + SELECT SUM(x) OVER ( + window_name + PARTITION BY... + ORDER BY... + window_frame_clause) + FROM ... + + This false positive failure case means the behavior will be correct, but + downloading results with the BigQuery Storage API may be slower than it + otherwise would. This is preferable to the false negative case, where + results are expected to be in order but are not (due to parallel reads). + """ + return query and _CONTAINS_ORDER_BY.search(query) class Compression(object): From 7ccba7c7c0c4b33766874290920310d0fcaa5be3 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 29 Apr 2019 16:02:02 -0700 Subject: [PATCH 3/3] Assert based on truthiness not equality. --- bigquery/tests/unit/test_job.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 0a512d288e28..bb6f03f3efb3 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -5098,7 +5098,10 @@ def test_from_api_repr_normal(self): def test__contains_order_by(query, expected): from google.cloud.bigquery import job as mut - assert mut._contains_order_by(query) == expected + if expected: + assert mut._contains_order_by(query) + else: + assert not mut._contains_order_by(query) @pytest.mark.skipif(pandas is None, reason="Requires `pandas`")