From a539dbd8a94c814dd499e914511b32c2a836ba60 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 3 Dec 2018 15:58:29 -0800 Subject: [PATCH 1/4] Add to_bqstorage to convert from Table[Reference] google-cloud-bigquery-storage reference. This makes it easier to use the new BigQuery Storage API (currently in Alpha) in combination with the BigQuery API. --- bigquery/docs/snippets.py | 19 ++-- bigquery/google/cloud/bigquery/table.py | 111 ++++++++++++++++++++++++ bigquery/noxfile.py | 6 +- bigquery/setup.py | 1 + bigquery/tests/unit/test_job.py | 2 +- bigquery/tests/unit/test_table.py | 105 ++++++++++++++++++++++ 6 files changed, 230 insertions(+), 14 deletions(-) diff --git a/bigquery/docs/snippets.py b/bigquery/docs/snippets.py index 90762f98b37e..538c5e41eb65 100644 --- a/bigquery/docs/snippets.py +++ b/bigquery/docs/snippets.py @@ -1314,7 +1314,7 @@ def test_load_table_from_file(client, to_delete): def test_load_table_from_uri_avro(client, to_delete, capsys): - dataset_id = 'load_table_from_uri_avro_{}'.format(_millis()) + dataset_id = "load_table_from_uri_avro_{}".format(_millis()) dataset = bigquery.Dataset(client.dataset(dataset_id)) client.create_dataset(dataset) to_delete.append(dataset) @@ -1327,23 +1327,22 @@ def test_load_table_from_uri_avro(client, to_delete, capsys): dataset_ref = client.dataset(dataset_id) job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.AVRO - uri = 'gs://cloud-samples-data/bigquery/us-states/us-states.avro' + uri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro" load_job = client.load_table_from_uri( - uri, - dataset_ref.table('us_states'), - job_config=job_config) # API request - print('Starting job {}'.format(load_job.job_id)) + uri, dataset_ref.table("us_states"), job_config=job_config + ) # API request + print("Starting job {}".format(load_job.job_id)) load_job.result() # Waits for table load to complete. - print('Job finished.') + print("Job finished.") - destination_table = client.get_table(dataset_ref.table('us_states')) - print('Loaded {} rows.'.format(destination_table.num_rows)) + destination_table = client.get_table(dataset_ref.table("us_states")) + print("Loaded {} rows.".format(destination_table.num_rows)) # [END bigquery_load_table_gcs_avro] out, _ = capsys.readouterr() - assert 'Loaded 50 rows.' in out + assert "Loaded 50 rows." in out def test_load_table_from_uri_csv(client, to_delete, capsys): diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index cb72dde189b2..3f32a95306fe 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -279,6 +279,53 @@ def to_api_repr(self): "tableId": self._table_id, } + def to_bqstorage(self, selected_fields=None): + """Construct a BigQuery Storage API representation of this table. + + Args: + selected_fields (Sequence[ \ + google.cloud.bigquery.schema.SchemaField, \ + ]): + Optional. A subset of columns to select from this table. + + Returns: + Tuple[ \ + google.cloud.bigquery_storage_v1beta1.types.TableReference, \ + google.cloud.bigquery_storage_v1beta1.types.TableModifiers, \ + google.cloud.bigquery_storage_v1beta1.types.TableReadOptions, \ + ]: + A reference to this table in the BigQuery Storage API. + """ + from google.cloud import bigquery_storage_v1beta1 + + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = self._project + table_ref.dataset_id = self._dataset_id + + modifiers = bigquery_storage_v1beta1.types.TableModifiers() + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + table_id = self._table_id + partition = None + + if "@" in table_id: + table_id, snapshot_time = table_id.split("@") + snapshot_time = int(snapshot_time) + modifiers.snapshot_time.FromMilliseconds(snapshot_time) + + if "$" in table_id: + table_id, partition = table_id.split("$") + read_options.row_restriction = "_PARTITIONTIME = TIMESTAMP('{}-{}-{}')".format( + partition[:4], partition[4:6], partition[6:] + ) + + table_ref.table_id = table_id + + if selected_fields is not None: + for field in selected_fields: + read_options.selected_fields.append(field.name) + + return (table_ref, modifiers, read_options) + def _key(self): """A tuple key that uniquely describes this field. @@ -820,6 +867,25 @@ def to_api_repr(self): """ return copy.deepcopy(self._properties) + def to_bqstorage(self, selected_fields=None): + """Construct a BigQuery Storage API representation of this table. + + Args: + selected_fields (Sequence[ \ + google.cloud.bigquery.schema.SchemaField, \ + ]): + Optional. A subset of columns to select from this table. + + Returns: + Tuple[ \ + google.cloud.bigquery_storage_v1beta1.types.TableReference, \ + google.cloud.bigquery_storage_v1beta1.types.TableModifiers, \ + google.cloud.bigquery_storage_v1beta1.types.TableReadOptions, \ + ]: + A reference to this table in the BigQuery Storage API. + """ + return self.reference.to_bqstorage(selected_fields=selected_fields) + def _build_resource(self, filter_fields): """Generate a resource for ``update``.""" partial = {} @@ -971,6 +1037,51 @@ def friendly_name(self): view_use_legacy_sql = property(_view_use_legacy_sql_getter) + @classmethod + def from_string(cls, full_table_id): + """Construct a table from fully-qualified table ID. + + Args: + full_table_id (str): + A fully-qualified table ID in standard SQL format. Must + included a project ID, dataset ID, and table ID, each + separated by ``.``. + + Returns: + Table: Table parsed from ``full_table_id``. + + Examples: + >>> Table.from_string('my-project.mydataset.mytable') + Table(TableRef...(D...('my-project', 'mydataset'), 'mytable')) + + Raises: + ValueError: + If ``full_table_id`` is not a fully-qualified table ID in + standard SQL format. + """ + return cls( + {"tableReference": TableReference.from_string(full_table_id).to_api_repr()} + ) + + def to_bqstorage(self, selected_fields=None): + """Construct a BigQuery Storage API representation of this table. + + Args: + selected_fields (Sequence[ \ + google.cloud.bigquery.schema.SchemaField, \ + ]): + Optional. A subset of columns to select from this table. + + Returns: + Tuple[ \ + google.cloud.bigquery_storage_v1beta1.types.TableReference, \ + google.cloud.bigquery_storage_v1beta1.types.TableModifiers, \ + google.cloud.bigquery_storage_v1beta1.types.TableReadOptions, \ + ]: + A reference to this table in the BigQuery Storage API. + """ + return self.reference.to_bqstorage(selected_fields=selected_fields) + def _row_from_mapping(mapping, schema): """Convert a mapping to a row tuple using the schema. diff --git a/bigquery/noxfile.py b/bigquery/noxfile.py index 0927d2d430f1..7bf58f18043b 100644 --- a/bigquery/noxfile.py +++ b/bigquery/noxfile.py @@ -20,7 +20,7 @@ LOCAL_DEPS = ( - os.path.join('..', 'api_core'), + os.path.join('..', 'api_core[grpc]'), os.path.join('..', 'core'), ) @@ -40,9 +40,9 @@ def default(session): # Pyarrow does not support Python 3.7 if session.python == '3.7': - dev_install = '.[pandas]' + dev_install = '.[bqstorage, pandas]' else: - dev_install = '.[pandas, pyarrow]' + dev_install = '.[bqstorage, pandas, pyarrow]' session.install('-e', dev_install) # IPython does not support Python 2 after version 5.x diff --git a/bigquery/setup.py b/bigquery/setup.py index 0cd725cf1505..da2f2fbbb5f6 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -34,6 +34,7 @@ 'google-resumable-media >= 0.3.1', ] extras = { + 'bqstorage': 'google-cloud-bigquery-storage<=2.0.0dev', 'pandas': 'pandas>=0.17.1', # Exclude PyArrow dependency from Windows Python 2.7. 'pyarrow: platform_system != "Windows" or python_version >= "3.4"': diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 699101745814..75a84d77e2c7 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -800,7 +800,7 @@ def test_result_default_wo_state(self, result): begin.assert_called_once_with(retry=DEFAULT_RETRY) result.assert_called_once_with(timeout=None) - @mock.patch('google.api_core.future.polling.PollingFuture.result') + @mock.patch("google.api_core.future.polling.PollingFuture.result") def test_result_w_retry_wo_state(self, result): client = _make_client(project=self.PROJECT) job = self._make_one(self.JOB_ID, client) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index d84fa6e4b1c8..7083c64e5856 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,11 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import itertools import unittest import mock +import pytest import six +try: + from google.cloud import bigquery_storage_v1beta1 +except ImportError: # pragma: NO COVER + bigquery_storage_v1beta1 = None try: import pandas except (ImportError, AttributeError): # pragma: NO COVER @@ -1688,3 +1694,102 @@ def test_set_expiration_w_none(self): time_partitioning = self._make_one() time_partitioning.expiration_ms = None assert time_partitioning._properties["expirationMs"] is None + + +@pytest.mark.skipif( + bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" +) +def test_table_reference_to_bqstorage(): + from google.protobuf import timestamp_pb2 + from google.cloud.bigquery import table as mut + + # Can't use parametrized pytest because bigquery_storage_v1beta1 may not be + # available. + ts1234567890 = timestamp_pb2.Timestamp() + ts1234567890.FromMilliseconds(1234567890) + cases = ( + ( + "my-project.my_dataset.my_table", + None, + ( + bigquery_storage_v1beta1.types.TableReference( + project_id="my-project", + dataset_id="my_dataset", + table_id="my_table", + ), + bigquery_storage_v1beta1.types.TableModifiers(), + bigquery_storage_v1beta1.types.TableReadOptions(), + ), + ), + ( + "my-project.my_dataset.my_table", + (mut.SchemaField("col_name", "IGNORED"),), + ( + bigquery_storage_v1beta1.types.TableReference( + project_id="my-project", + dataset_id="my_dataset", + table_id="my_table", + ), + bigquery_storage_v1beta1.types.TableModifiers(), + bigquery_storage_v1beta1.types.TableReadOptions( + selected_fields=["col_name"] + ), + ), + ), + ( + "my-project.my_dataset.my_table$20181225", + None, + ( + bigquery_storage_v1beta1.types.TableReference( + project_id="my-project", + dataset_id="my_dataset", + table_id="my_table", + ), + bigquery_storage_v1beta1.types.TableModifiers(), + bigquery_storage_v1beta1.types.TableReadOptions( + row_restriction="_PARTITIONTIME = TIMESTAMP('2018-12-25')" + ), + ), + ), + ( + "my-project.my_dataset.my_table@1234567890", + None, + ( + bigquery_storage_v1beta1.types.TableReference( + project_id="my-project", + dataset_id="my_dataset", + table_id="my_table", + ), + bigquery_storage_v1beta1.types.TableModifiers( + snapshot_time=ts1234567890 + ), + bigquery_storage_v1beta1.types.TableReadOptions(), + ), + ), + ( + "my-project.my_dataset.my_table$20181225@1234567890", + None, + ( + bigquery_storage_v1beta1.types.TableReference( + project_id="my-project", + dataset_id="my_dataset", + table_id="my_table", + ), + bigquery_storage_v1beta1.types.TableModifiers( + snapshot_time=ts1234567890 + ), + bigquery_storage_v1beta1.types.TableReadOptions( + row_restriction="_PARTITIONTIME = TIMESTAMP('2018-12-25')" + ), + ), + ), + ) + + classes = (mut.TableReference, mut.Table, mut.TableListItem) + + for case, cls in itertools.product(cases, classes): + table_string, selected_fields, expected = case + got = cls.from_string(table_string).to_bqstorage( + selected_fields=selected_fields + ) + assert got == expected From ef53b39f1a37e22a12df185c977d7ff5b683702f Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 4 Dec 2018 13:24:34 -0800 Subject: [PATCH 2/4] Remove logic for partition filter and snapshot selector. --- bigquery/google/cloud/bigquery/table.py | 37 +++------- bigquery/tests/unit/test_table.py | 90 +++---------------------- 2 files changed, 19 insertions(+), 108 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 3f32a95306fe..0450c768a483 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -282,18 +282,16 @@ def to_api_repr(self): def to_bqstorage(self, selected_fields=None): """Construct a BigQuery Storage API representation of this table. - Args: - selected_fields (Sequence[ \ - google.cloud.bigquery.schema.SchemaField, \ - ]): - Optional. A subset of columns to select from this table. + If the ``table_id`` contains a partition identifier (e.g. + ``my_table$201812``) or a snapshot identifier (e.g. + ``mytable@1234567890``), it is ignored. Use + :class:`google.cloud.bigquery_storage_v1beta1.types.TableReadOptions` + to filter rows by partition. Use + :class:`google.cloud.bigquery_storage_v1beta1.types.TableModifiers` + to select a specific snapshot to read from. Returns: - Tuple[ \ - google.cloud.bigquery_storage_v1beta1.types.TableReference, \ - google.cloud.bigquery_storage_v1beta1.types.TableModifiers, \ - google.cloud.bigquery_storage_v1beta1.types.TableReadOptions, \ - ]: + google.cloud.bigquery_storage_v1beta1.types.TableReference: A reference to this table in the BigQuery Storage API. """ from google.cloud import bigquery_storage_v1beta1 @@ -301,30 +299,17 @@ def to_bqstorage(self, selected_fields=None): table_ref = bigquery_storage_v1beta1.types.TableReference() table_ref.project_id = self._project table_ref.dataset_id = self._dataset_id - - modifiers = bigquery_storage_v1beta1.types.TableModifiers() - read_options = bigquery_storage_v1beta1.types.TableReadOptions() table_id = self._table_id - partition = None if "@" in table_id: - table_id, snapshot_time = table_id.split("@") - snapshot_time = int(snapshot_time) - modifiers.snapshot_time.FromMilliseconds(snapshot_time) + table_id = table_id.split("@")[0] if "$" in table_id: - table_id, partition = table_id.split("$") - read_options.row_restriction = "_PARTITIONTIME = TIMESTAMP('{}-{}-{}')".format( - partition[:4], partition[4:6], partition[6:] - ) + table_id = table_id.split("$")[0] table_ref.table_id = table_id - if selected_fields is not None: - for field in selected_fields: - read_options.selected_fields.append(field.name) - - return (table_ref, modifiers, read_options) + return table_ref def _key(self): """A tuple key that uniquely describes this field. diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 7083c64e5856..d51a5dfec52e 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1700,96 +1700,22 @@ def test_set_expiration_w_none(self): bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" ) def test_table_reference_to_bqstorage(): - from google.protobuf import timestamp_pb2 from google.cloud.bigquery import table as mut # Can't use parametrized pytest because bigquery_storage_v1beta1 may not be # available. - ts1234567890 = timestamp_pb2.Timestamp() - ts1234567890.FromMilliseconds(1234567890) + expected = bigquery_storage_v1beta1.types.TableReference( + project_id="my-project", dataset_id="my_dataset", table_id="my_table" + ) cases = ( - ( - "my-project.my_dataset.my_table", - None, - ( - bigquery_storage_v1beta1.types.TableReference( - project_id="my-project", - dataset_id="my_dataset", - table_id="my_table", - ), - bigquery_storage_v1beta1.types.TableModifiers(), - bigquery_storage_v1beta1.types.TableReadOptions(), - ), - ), - ( - "my-project.my_dataset.my_table", - (mut.SchemaField("col_name", "IGNORED"),), - ( - bigquery_storage_v1beta1.types.TableReference( - project_id="my-project", - dataset_id="my_dataset", - table_id="my_table", - ), - bigquery_storage_v1beta1.types.TableModifiers(), - bigquery_storage_v1beta1.types.TableReadOptions( - selected_fields=["col_name"] - ), - ), - ), - ( - "my-project.my_dataset.my_table$20181225", - None, - ( - bigquery_storage_v1beta1.types.TableReference( - project_id="my-project", - dataset_id="my_dataset", - table_id="my_table", - ), - bigquery_storage_v1beta1.types.TableModifiers(), - bigquery_storage_v1beta1.types.TableReadOptions( - row_restriction="_PARTITIONTIME = TIMESTAMP('2018-12-25')" - ), - ), - ), - ( - "my-project.my_dataset.my_table@1234567890", - None, - ( - bigquery_storage_v1beta1.types.TableReference( - project_id="my-project", - dataset_id="my_dataset", - table_id="my_table", - ), - bigquery_storage_v1beta1.types.TableModifiers( - snapshot_time=ts1234567890 - ), - bigquery_storage_v1beta1.types.TableReadOptions(), - ), - ), - ( - "my-project.my_dataset.my_table$20181225@1234567890", - None, - ( - bigquery_storage_v1beta1.types.TableReference( - project_id="my-project", - dataset_id="my_dataset", - table_id="my_table", - ), - bigquery_storage_v1beta1.types.TableModifiers( - snapshot_time=ts1234567890 - ), - bigquery_storage_v1beta1.types.TableReadOptions( - row_restriction="_PARTITIONTIME = TIMESTAMP('2018-12-25')" - ), - ), - ), + "my-project.my_dataset.my_table", + "my-project.my_dataset.my_table$20181225", + "my-project.my_dataset.my_table@1234567890", + "my-project.my_dataset.my_table$20181225@1234567890", ) classes = (mut.TableReference, mut.Table, mut.TableListItem) for case, cls in itertools.product(cases, classes): - table_string, selected_fields, expected = case - got = cls.from_string(table_string).to_bqstorage( - selected_fields=selected_fields - ) + got = cls.from_string(case).to_bqstorage() assert got == expected From f783fe3f90d6fee13053dbd498458edd37d5e78d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 4 Dec 2018 13:38:54 -0800 Subject: [PATCH 3/4] Remove unused selected_fields argument. --- bigquery/google/cloud/bigquery/table.py | 30 +++++-------------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 0450c768a483..474484229077 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -279,7 +279,7 @@ def to_api_repr(self): "tableId": self._table_id, } - def to_bqstorage(self, selected_fields=None): + def to_bqstorage(self): """Construct a BigQuery Storage API representation of this table. If the ``table_id`` contains a partition identifier (e.g. @@ -852,21 +852,11 @@ def to_api_repr(self): """ return copy.deepcopy(self._properties) - def to_bqstorage(self, selected_fields=None): + def to_bqstorage(self): """Construct a BigQuery Storage API representation of this table. - Args: - selected_fields (Sequence[ \ - google.cloud.bigquery.schema.SchemaField, \ - ]): - Optional. A subset of columns to select from this table. - Returns: - Tuple[ \ - google.cloud.bigquery_storage_v1beta1.types.TableReference, \ - google.cloud.bigquery_storage_v1beta1.types.TableModifiers, \ - google.cloud.bigquery_storage_v1beta1.types.TableReadOptions, \ - ]: + google.cloud.bigquery_storage_v1beta1.types.TableReference: A reference to this table in the BigQuery Storage API. """ return self.reference.to_bqstorage(selected_fields=selected_fields) @@ -1048,21 +1038,11 @@ def from_string(cls, full_table_id): {"tableReference": TableReference.from_string(full_table_id).to_api_repr()} ) - def to_bqstorage(self, selected_fields=None): + def to_bqstorage(self): """Construct a BigQuery Storage API representation of this table. - Args: - selected_fields (Sequence[ \ - google.cloud.bigquery.schema.SchemaField, \ - ]): - Optional. A subset of columns to select from this table. - Returns: - Tuple[ \ - google.cloud.bigquery_storage_v1beta1.types.TableReference, \ - google.cloud.bigquery_storage_v1beta1.types.TableModifiers, \ - google.cloud.bigquery_storage_v1beta1.types.TableReadOptions, \ - ]: + google.cloud.bigquery_storage_v1beta1.types.TableReference: A reference to this table in the BigQuery Storage API. """ return self.reference.to_bqstorage(selected_fields=selected_fields) From 3604b9bf7be89080eeee7eabc8e1a50c21592986 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 4 Dec 2018 14:04:17 -0800 Subject: [PATCH 4/4] Really remove selected_fields --- bigquery/google/cloud/bigquery/table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 474484229077..868921fe5a4c 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -859,7 +859,7 @@ def to_bqstorage(self): google.cloud.bigquery_storage_v1beta1.types.TableReference: A reference to this table in the BigQuery Storage API. """ - return self.reference.to_bqstorage(selected_fields=selected_fields) + return self.reference.to_bqstorage() def _build_resource(self, filter_fields): """Generate a resource for ``update``.""" @@ -1045,7 +1045,7 @@ def to_bqstorage(self): google.cloud.bigquery_storage_v1beta1.types.TableReference: A reference to this table in the BigQuery Storage API. """ - return self.reference.to_bqstorage(selected_fields=selected_fields) + return self.reference.to_bqstorage() def _row_from_mapping(mapping, schema):