From 52805d358236f5a6c8fdcaffec1102d540f49d9f Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 7 Aug 2019 17:35:13 +0200 Subject: [PATCH 01/14] Add additional BQ storage system test fixtures --- bigquery_storage/noxfile.py | 1 + .../tests/system/assets/people_data.csv | 6 ++ bigquery_storage/tests/system/conftest.py | 76 ++++++++++++++++++- 3 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 bigquery_storage/tests/system/assets/people_data.csv diff --git a/bigquery_storage/noxfile.py b/bigquery_storage/noxfile.py index ec450b9aedc7..79b9d3689512 100644 --- a/bigquery_storage/noxfile.py +++ b/bigquery_storage/noxfile.py @@ -118,6 +118,7 @@ def system(session): session.install("-e", local_dep) session.install("-e", "../test_utils/") session.install("-e", ".[fastavro,pandas,pyarrow]") + session.install("-e", "../bigquery/") session.install("-e", ".") # Run py.test against the system tests. diff --git a/bigquery_storage/tests/system/assets/people_data.csv b/bigquery_storage/tests/system/assets/people_data.csv new file mode 100644 index 000000000000..819adfc4bdf5 --- /dev/null +++ b/bigquery_storage/tests/system/assets/people_data.csv @@ -0,0 +1,6 @@ +first_name,last_name,age +John,Doe,42 +Jack,Black,53 +Nick,Sleek,24 +Kevin,Powell,50 +Johnny,Young,2 diff --git a/bigquery_storage/tests/system/conftest.py b/bigquery_storage/tests/system/conftest.py index f88a38e43e9a..5f2912df7357 100644 --- a/bigquery_storage/tests/system/conftest.py +++ b/bigquery_storage/tests/system/conftest.py @@ -16,17 +16,91 @@ """System tests for reading rows from tables.""" import os +import uuid import pytest from google.cloud import bigquery_storage_v1beta1 -@pytest.fixture() +_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets") + + +@pytest.fixture(scope="session") def project_id(): return os.environ["PROJECT_ID"] +@pytest.fixture(scope="session") +def dataset(project_id): + from google.cloud import bigquery + + bq_client = bigquery.Client() + + unique_suffix = str(uuid.uuid4()).replace("-", "_") + dataset_name = "bq_storage_system_tests_" + unique_suffix + + dataset_id = "{}.{}".format(project_id, dataset_name) + dataset = bigquery.Dataset(dataset_id) + dataset.location = "US" + created_dataset = bq_client.create_dataset(dataset) + + yield created_dataset + + bq_client.delete_dataset(dataset, delete_contents=True) + + +@pytest.fixture(scope="session") +def table(project_id, dataset): + from google.cloud import bigquery + + bq_client = bigquery.Client() + + schema = [ + bigquery.SchemaField("first_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"), + bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + + table_id = "{}.{}.{}".format(project_id, dataset.dataset_id, "users") + bq_table = bigquery.Table(table_id, schema=schema) + created_table = bq_client.create_table(bq_table) + + yield created_table + + bq_client.delete_table(created_table) + + +@pytest.fixture +def table_with_data_ref(dataset, table): + from google.cloud import bigquery + + bq_client = bigquery.Client() + + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.CSV + job_config.skip_leading_rows = 1 + job_config.schema = table.schema + + filename = os.path.join(_ASSETS_DIR, "people_data.csv") + + with open(filename, "rb") as source_file: + job = bq_client.load_table_from_file(source_file, table, job_config=job_config) + + job.result() # wait for the load to complete + + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = table.project + table_ref.dataset_id = table.dataset_id + table_ref.table_id = table.table_id + yield table_ref + + # truncate table data + query = "DELETE FROM {}.{} WHERE 1 = 1".format(dataset.dataset_id, table.table_id) + query_job = bq_client.query(query, location="US") + query_job.result() + + @pytest.fixture() def client(): return bigquery_storage_v1beta1.BigQueryStorageClient() From 703bc8e8668a4be0f5cb89e037fcfb18fe076975 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 8 Aug 2019 14:48:35 +0200 Subject: [PATCH 02/14] Add basic reader system test --- bigquery_storage/tests/system/test_reader.py | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index 13e72fafeceb..abefa98be16a 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -73,3 +73,27 @@ def test_read_rows_as_rows_full_table( rows = list(client.read_rows(stream_pos).rows(session)) assert len(rows) > 0 + + +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_basic_nonfiltered_read(client, project_id, table_with_data_ref, data_format): + session = client.create_read_session( + table_with_data_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + assert len(rows) == 5 # all table rows + From d25aaba23e71e83d36dd4736bc3b5e25d16dee1c Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 8 Aug 2019 14:49:01 +0200 Subject: [PATCH 03/14] Add reader with row filter system test --- bigquery_storage/tests/system/test_reader.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index abefa98be16a..7e2dcd8d1391 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -97,3 +97,22 @@ def test_basic_nonfiltered_read(client, project_id, table_with_data_ref, data_fo assert len(rows) == 5 # all table rows + +def test_filtered_rows_read(client, project_id, table_with_data_ref): + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.row_restriction = "age >= 50" + + session = client.create_read_session( + table_with_data_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.AVRO, + requested_streams=1, + read_options=read_options, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + assert len(rows) == 2 From 94fdd7d16f4399afb5af13a3f3734dccb9bf1f71 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 8 Aug 2019 14:41:33 +0200 Subject: [PATCH 04/14] Add reader column selection system test --- bigquery_storage/tests/system/test_reader.py | 29 ++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index 7e2dcd8d1391..1bc66205f374 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -116,3 +116,32 @@ def test_filtered_rows_read(client, project_id, table_with_data_ref): rows = list(client.read_rows(stream_pos).rows(session)) assert len(rows) == 2 + + +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_column_selection_read(client, project_id, table_with_data_ref, data_format): + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.selected_fields.append("first_name") + read_options.selected_fields.append("age") + + session = client.create_read_session( + table_with_data_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + read_options=read_options, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + for row in rows: + assert sorted(row.keys()) == ["age", "first_name"] From 5b08445287debc69c806f16f29a56484f3eaafab Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 8 Aug 2019 23:08:48 +0200 Subject: [PATCH 05/14] Add reading data with snapshot system test --- bigquery_storage/noxfile.py | 2 + bigquery_storage/tests/system/test_reader.py | 70 ++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/bigquery_storage/noxfile.py b/bigquery_storage/noxfile.py index 79b9d3689512..7f7d867cb205 100644 --- a/bigquery_storage/noxfile.py +++ b/bigquery_storage/noxfile.py @@ -111,6 +111,8 @@ def system(session): # Use pre-release gRPC for system tests. session.install("--pre", "grpcio") + session.install("protobuf") + # Install all test dependencies, then install this package into the # virtualenv's dist-packages. session.install("mock", "pytest") diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index 1bc66205f374..6dc8f89d5067 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -15,9 +15,47 @@ # limitations under the License. """System tests for reading rows from tables.""" +import json +import io + import pytest +from google.cloud import bigquery from google.cloud import bigquery_storage_v1beta1 +from google.protobuf import timestamp_pb2 + + +def _add_rows(table_ref, new_data): + """Insert additional rows into an existing table. + + Args: + table_ref (bigquery_storage_v1beta1.types.TableReference): + A reference to the target table. + new_data (Iterable[Dict[str, Any]]): + New data to insert with each row represented as a dictionary. + The keys must match the table column names, and the values + must be JSON serializable. + """ + bq_client = bigquery.Client() + + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON + ) + + new_data_str = u"\n".join(json.dumps(item) for item in new_data) + new_data_file = io.BytesIO(new_data_str.encode()) + + destination_ref = bigquery.table.TableReference.from_api_repr( + { + "projectId": table_ref.project_id, + "datasetId": table_ref.dataset_id, + "tableId": table_ref.table_id, + } + ) + job = bq_client.load_table_from_file( + new_data_file, destination=destination_ref, job_config=job_config + ) + job.result() # wait for the load to complete @pytest.mark.parametrize( @@ -145,3 +183,35 @@ def test_column_selection_read(client, project_id, table_with_data_ref, data_for for row in rows: assert sorted(row.keys()) == ["age", "first_name"] + + +def test_snapshot(client, project_id, table_with_data_ref): + before_new_data = timestamp_pb2.Timestamp() + before_new_data.GetCurrentTime() + + # load additional data into the table + new_data = [ + {u"first_name": u"NewGuyFoo", u"last_name": u"Smith", u"age": 46}, + {u"first_name": u"NewGuyBar", u"last_name": u"Jones", u"age": 30}, + ] + _add_rows(table_with_data_ref, new_data) + + # read data using the timestamp before the additional data load + session = client.create_read_session( + table_with_data_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.AVRO, + requested_streams=1, + table_modifiers={"snapshot_time": before_new_data}, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + # verify that only the data before the timestamp was returned + assert len(rows) == 5 # all initial records + + for row in rows: + assert "NewGuy" not in row["first_name"] # no new records From c0ca3788c1167fa5f888e184723da251440dfc3c Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 9 Aug 2019 11:40:07 +0200 Subject: [PATCH 06/14] Add reading column partitioned table system test --- bigquery_storage/tests/system/conftest.py | 30 +++++++++++++++ bigquery_storage/tests/system/test_reader.py | 40 ++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/bigquery_storage/tests/system/conftest.py b/bigquery_storage/tests/system/conftest.py index 5f2912df7357..5a96104f670c 100644 --- a/bigquery_storage/tests/system/conftest.py +++ b/bigquery_storage/tests/system/conftest.py @@ -101,6 +101,36 @@ def table_with_data_ref(dataset, table): query_job.result() +@pytest.fixture +def col_partition_table_ref(project_id, dataset): + from google.cloud import bigquery + + bq_client = bigquery.Client() + + schema = [ + bigquery.SchemaField("occurred", "DATE", mode="REQUIRED"), + bigquery.SchemaField("description", "STRING", mode="REQUIRED"), + ] + time_partitioning = bigquery.table.TimePartitioning( + type_=bigquery.table.TimePartitioningType.DAY, field="occurred" + ) + bq_table = bigquery.table.Table( + table_ref="{}.{}.notable_events".format(project_id, dataset.dataset_id), + schema=schema, + ) + bq_table.time_partitioning = time_partitioning + + created_table = bq_client.create_table(bq_table) + + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = created_table.project + table_ref.dataset_id = created_table.dataset_id + table_ref.table_id = created_table.table_id + yield table_ref + + bq_client.delete_table(created_table) + + @pytest.fixture() def client(): return bigquery_storage_v1beta1.BigQueryStorageClient() diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index 6dc8f89d5067..9f4ca2b44007 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -15,6 +15,7 @@ # limitations under the License. """System tests for reading rows from tables.""" +import datetime as dt import json import io @@ -215,3 +216,42 @@ def test_snapshot(client, project_id, table_with_data_ref): for row in rows: assert "NewGuy" not in row["first_name"] # no new records + + +def test_column_partitioned_table(client, project_id, col_partition_table_ref): + data = [ + {"description": "Tracking established.", "occurred": "2017-02-15"}, + {"description": "Look, a solar eclipse!", "occurred": "2018-02-15"}, + {"description": "Fake solar eclipse reported.", "occurred": "2018-02-15"}, + {"description": "1 day after false eclipse report.", "occurred": "2018-02-16"}, + {"description": "1 year after false eclipse report.", "occurred": "2019-02-15"}, + ] + + _add_rows(col_partition_table_ref, data) + + # Read from the table with a partition filter specified, and verify that + # only the expected data is returned. + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.row_restriction = "occurred = '2018-02-15'" + + session = client.create_read_session( + col_partition_table_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.AVRO, + requested_streams=1, + read_options=read_options, + ) + + assert session.streams # there should be some data to fetch + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + rows = list(client.read_rows(stream_pos).rows(session)) + + assert len(rows) == 2 + + expected_descriptions = ("Look, a solar eclipse!", "Fake solar eclipse reported.") + for row in rows: + assert row["occurred"] == dt.date(2018, 2, 15) + assert row["description"] in expected_descriptions From ef68906c69877209e7ccd53033fa4c5aa767c7eb Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 9 Aug 2019 17:25:05 +0200 Subject: [PATCH 07/14] Add system test for column types data conversions --- bigquery_storage/tests/system/conftest.py | 44 ++++++++++++ bigquery_storage/tests/system/test_reader.py | 73 ++++++++++++++++++++ 2 files changed, 117 insertions(+) diff --git a/bigquery_storage/tests/system/conftest.py b/bigquery_storage/tests/system/conftest.py index 5a96104f670c..9ac643072ed7 100644 --- a/bigquery_storage/tests/system/conftest.py +++ b/bigquery_storage/tests/system/conftest.py @@ -131,6 +131,50 @@ def col_partition_table_ref(project_id, dataset): bq_client.delete_table(created_table) +@pytest.fixture +def all_types_table_ref(project_id, dataset): + from google.cloud import bigquery + + bq_client = bigquery.Client() + + schema = [ + bigquery.SchemaField("string_field", "STRING"), + bigquery.SchemaField("bytes_field", "BYTES"), + bigquery.SchemaField("int64_field", "INT64"), + bigquery.SchemaField("float64_field", "FLOAT64"), + bigquery.SchemaField("numeric_field", "NUMERIC"), + bigquery.SchemaField("bool_field", "BOOL"), + bigquery.SchemaField("geography_field", "GEOGRAPHY"), + bigquery.SchemaField( + "person_struct_field", + "STRUCT", + fields=( + bigquery.SchemaField("name", "STRING"), + bigquery.SchemaField("age", "INT64"), + ), + ), + bigquery.SchemaField("timestamp_field", "TIMESTAMP"), + bigquery.SchemaField("date_field", "DATE"), + bigquery.SchemaField("time_field", "TIME"), + bigquery.SchemaField("datetime_field", "DATETIME"), + bigquery.SchemaField("string_array_field", "STRING", mode="REPEATED"), + ] + bq_table = bigquery.table.Table( + table_ref="{}.{}.complex_records".format(project_id, dataset.dataset_id), + schema=schema, + ) + + created_table = bq_client.create_table(bq_table) + + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = created_table.project + table_ref.dataset_id = created_table.dataset_id + table_ref.table_id = created_table.table_id + yield table_ref + + bq_client.delete_table(created_table) + + @pytest.fixture() def client(): return bigquery_storage_v1beta1.BigQueryStorageClient() diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index 9f4ca2b44007..209476f3f410 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -15,11 +15,15 @@ # limitations under the License. """System tests for reading rows from tables.""" +import copy import datetime as dt +import decimal import json import io +import re import pytest +import pytz from google.cloud import bigquery from google.cloud import bigquery_storage_v1beta1 @@ -255,3 +259,72 @@ def test_column_partitioned_table(client, project_id, col_partition_table_ref): for row in rows: assert row["occurred"] == dt.date(2018, 2, 15) assert row["description"] in expected_descriptions + + +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_decoding_data_types(client, project_id, all_types_table_ref, data_format): + data = [ + { + u"string_field": u"Price: € 9.95.", + u"bytes_field": bigquery._helpers._bytes_to_json(b"byteees"), + u"int64_field": -1085, + u"float64_field": -42.195, + u"numeric_field": "1.4142", + u"bool_field": True, + u"geography_field": '{"type": "Point", "coordinates": [-49.3028, 69.0622]}', + u"person_struct_field": {u"name": u"John", u"age": 42}, + u"timestamp_field": 1565357902.017896, # 2019-08-09T13:38:22.017896 + u"date_field": u"1995-03-17", + u"time_field": u"16:24:51", + u"datetime_field": u"2005-10-26T19:49:41", + u"string_array_field": [u"foo", u"bar", u"baz"], + } + ] + + _add_rows(all_types_table_ref, data) + + session = client.create_read_session( + all_types_table_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + ) + + assert session.streams # there should be data available + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + rows = list(client.read_rows(stream_pos).rows(session)) + + expected_result = { + u"string_field": u"Price: € 9.95.", + u"bytes_field": b"byteees", + u"int64_field": -1085, + u"float64_field": -42.195, + u"numeric_field": decimal.Decimal("1.4142"), + u"bool_field": True, + u"geography_field": "POINT(-49.3028 69.0622)", + u"person_struct_field": {u"name": u"John", u"age": 42}, + u"timestamp_field": dt.datetime(2019, 8, 9, 13, 38, 22, 17896, tzinfo=pytz.UTC), + u"date_field": dt.date(1995, 3, 17), + u"time_field": dt.time(16, 24, 51), + u"string_array_field": [u"foo", u"bar", u"baz"], + } + + result_copy = copy.copy(rows[0]) + del result_copy["datetime_field"] + assert result_copy == expected_result + + # Compare datetime separately, AVRO and PYARROW return different object types, + # although they should both represent the same value. + # TODO: when fixed, change assertion to assert a datetime instance! + expected_pattern = re.compile(r"2005-10-26( |T)19:49:41") + assert expected_pattern.match(str(rows[0]["datetime_field"])) From 52e94706c6c074a710a0f3e551974d9e42e0e8db Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 12 Aug 2019 14:58:00 +0200 Subject: [PATCH 08/14] Add ingestion time partitioned table system test --- bigquery_storage/tests/system/conftest.py | 31 ++++++++++++ bigquery_storage/tests/system/test_reader.py | 53 +++++++++++++++++++- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/bigquery_storage/tests/system/conftest.py b/bigquery_storage/tests/system/conftest.py index 9ac643072ed7..b436712fa5e2 100644 --- a/bigquery_storage/tests/system/conftest.py +++ b/bigquery_storage/tests/system/conftest.py @@ -131,6 +131,37 @@ def col_partition_table_ref(project_id, dataset): bq_client.delete_table(created_table) +@pytest.fixture +def ingest_partition_table_ref(project_id, dataset): + from google.cloud import bigquery + + bq_client = bigquery.Client() + + schema = [ + bigquery.SchemaField("shape", "STRING", mode="REQUIRED"), + bigquery.SchemaField("altitude", "INT64", mode="REQUIRED"), + ] + time_partitioning = bigquery.table.TimePartitioning( + type_=bigquery.table.TimePartitioningType.DAY, + field=None, # use _PARTITIONTIME pseudo column + ) + bq_table = bigquery.table.Table( + table_ref="{}.{}.ufo_sightings".format(project_id, dataset.dataset_id), + schema=schema, + ) + bq_table.time_partitioning = time_partitioning + + created_table = bq_client.create_table(bq_table) + + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = created_table.project + table_ref.dataset_id = created_table.dataset_id + table_ref.table_id = created_table.table_id + yield table_ref + + bq_client.delete_table(created_table) + + @pytest.fixture def all_types_table_ref(project_id, dataset): from google.cloud import bigquery diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index 209476f3f410..7da2849aad41 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -30,7 +30,7 @@ from google.protobuf import timestamp_pb2 -def _add_rows(table_ref, new_data): +def _add_rows(table_ref, new_data, partition_suffix=""): """Insert additional rows into an existing table. Args: @@ -54,7 +54,7 @@ def _add_rows(table_ref, new_data): { "projectId": table_ref.project_id, "datasetId": table_ref.dataset_id, - "tableId": table_ref.table_id, + "tableId": table_ref.table_id + partition_suffix, } ) job = bq_client.load_table_from_file( @@ -261,6 +261,55 @@ def test_column_partitioned_table(client, project_id, col_partition_table_ref): assert row["description"] in expected_descriptions +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_ingestion_time_partitioned_table( + client, project_id, ingest_partition_table_ref, data_format +): + data = [{"shape": "cigar", "altitude": 1200}, {"shape": "disc", "altitude": 750}] + _add_rows(ingest_partition_table_ref, data, partition_suffix="$20190809") + + data = [ + {"shape": "sphere", "altitude": 3500}, + {"shape": "doughnut", "altitude": 100}, + ] + _add_rows(ingest_partition_table_ref, data, partition_suffix="$20190810") + + data = [ + {"shape": "elephant", "altitude": 1}, + {"shape": "rocket", "altitude": 12700}, + ] + _add_rows(ingest_partition_table_ref, data, partition_suffix="$20190811") + + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.row_restriction = "DATE(_PARTITIONTIME) = '2019-08-10'" + + session = client.create_read_session( + ingest_partition_table_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + read_options=read_options, + ) + + assert session.streams # there should be some data to fetch + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + rows = list(client.read_rows(stream_pos).rows(session)) + assert len(rows) == 2 + + actual_items = {(row["shape"], row["altitude"]) for row in rows} + expected_items = {("sphere", 3500), ("doughnut", 100)} + assert actual_items == expected_items + + @pytest.mark.parametrize( "data_format", ( From f5cc93f9b5471269046771ee9c52858b02ce1a6e Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 12 Aug 2019 17:29:35 +0200 Subject: [PATCH 09/14] Add system test for resuming a read at an offset --- bigquery_storage/tests/system/test_reader.py | 46 ++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index 7da2849aad41..e7b4353b74a2 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -377,3 +377,49 @@ def test_decoding_data_types(client, project_id, all_types_table_ref, data_forma # TODO: when fixed, change assertion to assert a datetime instance! expected_pattern = re.compile(r"2005-10-26( |T)19:49:41") assert expected_pattern.match(str(rows[0]["datetime_field"])) + + +@pytest.mark.parametrize( + "data_format", + ( + (bigquery_storage_v1beta1.enums.DataFormat.AVRO), + (bigquery_storage_v1beta1.enums.DataFormat.ARROW), + ), +) +def test_resuming_read_from_offset(client, project_id, data_format): + shakespeare_ref = bigquery_storage_v1beta1.types.TableReference() + shakespeare_ref.project_id = project_id + shakespeare_ref.dataset_id = "public_samples_copy" + shakespeare_ref.table_id = "shakespeare" + + read_session = client.create_read_session( + shakespeare_ref, + "projects/{}".format(project_id), + format_=data_format, + requested_streams=1, + ) + + assert read_session.streams # there should be data available + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=read_session.streams[0], offset=0 + ) + read_rows_stream = client.read_rows(stream_pos) + + # fetch the first two batches of rows + rows_iter = iter(read_rows_stream) + some_rows = next(rows_iter) + more_rows = next(rows_iter) + + # fetch the rest of the rows using the stream offset + new_stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=read_session.streams[0], offset=some_rows.row_count + more_rows.row_count + ) + remaining_rows_count = sum( + 1 for _ in client.read_rows(new_stream_pos).rows(read_session) + ) + + # verify that the counts match + expected_len = 164656 # total rows in shakespeare table + actual_len = remaining_rows_count + some_rows.row_count + more_rows.row_count + assert actual_len == expected_len From b4d94383f23ad302147aabf2c8c83991f21ec6ec Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 15 Aug 2019 11:06:34 +0200 Subject: [PATCH 10/14] Remove unnecessary protobuf install in noxfile --- bigquery_storage/noxfile.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/bigquery_storage/noxfile.py b/bigquery_storage/noxfile.py index 7f7d867cb205..79b9d3689512 100644 --- a/bigquery_storage/noxfile.py +++ b/bigquery_storage/noxfile.py @@ -111,8 +111,6 @@ def system(session): # Use pre-release gRPC for system tests. session.install("--pre", "grpcio") - session.install("protobuf") - # Install all test dependencies, then install this package into the # virtualenv's dist-packages. session.install("mock", "pytest") From 6da135ee2e412a1dd5eccbf6f697620821b64319 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 15 Aug 2019 11:26:28 +0200 Subject: [PATCH 11/14] Add TODO comment to replace a test helper method A similar method is planned to be added to the library itself, and when done, the _add_rows() will not be needed anymore. --- bigquery_storage/tests/system/test_reader.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index e7b4353b74a2..67d22b3ab2b3 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -30,6 +30,8 @@ from google.protobuf import timestamp_pb2 +# TODO: remove once a similar method is implemented in the library itself +# https://github.com/googleapis/google-cloud-python/issues/4553 def _add_rows(table_ref, new_data, partition_suffix=""): """Insert additional rows into an existing table. From 222ab1ae1457e9ea069f55879c83fa6a316015c3 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 15 Aug 2019 12:20:32 +0200 Subject: [PATCH 12/14] Extract BQ client to session fixture in tests Creating a client once per system tests session avoids the overhead of authenticating before each test case. --- bigquery_storage/tests/system/conftest.py | 29 +++++++---------- bigquery_storage/tests/system/test_reader.py | 33 ++++++++++++-------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/bigquery_storage/tests/system/conftest.py b/bigquery_storage/tests/system/conftest.py index b436712fa5e2..d19f5d28cae6 100644 --- a/bigquery_storage/tests/system/conftest.py +++ b/bigquery_storage/tests/system/conftest.py @@ -32,10 +32,15 @@ def project_id(): @pytest.fixture(scope="session") -def dataset(project_id): +def bq_client(): from google.cloud import bigquery - bq_client = bigquery.Client() + return bigquery.Client() + + +@pytest.fixture(scope="session") +def dataset(project_id, bq_client): + from google.cloud import bigquery unique_suffix = str(uuid.uuid4()).replace("-", "_") dataset_name = "bq_storage_system_tests_" + unique_suffix @@ -51,11 +56,9 @@ def dataset(project_id): @pytest.fixture(scope="session") -def table(project_id, dataset): +def table(project_id, dataset, bq_client): from google.cloud import bigquery - bq_client = bigquery.Client() - schema = [ bigquery.SchemaField("first_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"), @@ -72,11 +75,9 @@ def table(project_id, dataset): @pytest.fixture -def table_with_data_ref(dataset, table): +def table_with_data_ref(dataset, table, bq_client): from google.cloud import bigquery - bq_client = bigquery.Client() - job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.CSV job_config.skip_leading_rows = 1 @@ -102,11 +103,9 @@ def table_with_data_ref(dataset, table): @pytest.fixture -def col_partition_table_ref(project_id, dataset): +def col_partition_table_ref(project_id, dataset, bq_client): from google.cloud import bigquery - bq_client = bigquery.Client() - schema = [ bigquery.SchemaField("occurred", "DATE", mode="REQUIRED"), bigquery.SchemaField("description", "STRING", mode="REQUIRED"), @@ -132,11 +131,9 @@ def col_partition_table_ref(project_id, dataset): @pytest.fixture -def ingest_partition_table_ref(project_id, dataset): +def ingest_partition_table_ref(project_id, dataset, bq_client): from google.cloud import bigquery - bq_client = bigquery.Client() - schema = [ bigquery.SchemaField("shape", "STRING", mode="REQUIRED"), bigquery.SchemaField("altitude", "INT64", mode="REQUIRED"), @@ -163,11 +160,9 @@ def ingest_partition_table_ref(project_id, dataset): @pytest.fixture -def all_types_table_ref(project_id, dataset): +def all_types_table_ref(project_id, dataset, bq_client): from google.cloud import bigquery - bq_client = bigquery.Client() - schema = [ bigquery.SchemaField("string_field", "STRING"), bigquery.SchemaField("bytes_field", "BYTES"), diff --git a/bigquery_storage/tests/system/test_reader.py b/bigquery_storage/tests/system/test_reader.py index 67d22b3ab2b3..eb02eeea48cd 100644 --- a/bigquery_storage/tests/system/test_reader.py +++ b/bigquery_storage/tests/system/test_reader.py @@ -32,7 +32,7 @@ # TODO: remove once a similar method is implemented in the library itself # https://github.com/googleapis/google-cloud-python/issues/4553 -def _add_rows(table_ref, new_data, partition_suffix=""): +def _add_rows(table_ref, new_data, bq_client, partition_suffix=""): """Insert additional rows into an existing table. Args: @@ -42,9 +42,12 @@ def _add_rows(table_ref, new_data, partition_suffix=""): New data to insert with each row represented as a dictionary. The keys must match the table column names, and the values must be JSON serializable. + bq_client (bigquery.Client): + A BigQuery client instance to use for API calls. + partition_suffix (str): + An option suffix to append to the table_id, useful for selecting + partitions of ingestion-time partitioned tables. """ - bq_client = bigquery.Client() - job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON ) @@ -192,7 +195,7 @@ def test_column_selection_read(client, project_id, table_with_data_ref, data_for assert sorted(row.keys()) == ["age", "first_name"] -def test_snapshot(client, project_id, table_with_data_ref): +def test_snapshot(client, project_id, table_with_data_ref, bq_client): before_new_data = timestamp_pb2.Timestamp() before_new_data.GetCurrentTime() @@ -201,7 +204,7 @@ def test_snapshot(client, project_id, table_with_data_ref): {u"first_name": u"NewGuyFoo", u"last_name": u"Smith", u"age": 46}, {u"first_name": u"NewGuyBar", u"last_name": u"Jones", u"age": 30}, ] - _add_rows(table_with_data_ref, new_data) + _add_rows(table_with_data_ref, new_data, bq_client) # read data using the timestamp before the additional data load session = client.create_read_session( @@ -224,7 +227,9 @@ def test_snapshot(client, project_id, table_with_data_ref): assert "NewGuy" not in row["first_name"] # no new records -def test_column_partitioned_table(client, project_id, col_partition_table_ref): +def test_column_partitioned_table( + client, project_id, col_partition_table_ref, bq_client +): data = [ {"description": "Tracking established.", "occurred": "2017-02-15"}, {"description": "Look, a solar eclipse!", "occurred": "2018-02-15"}, @@ -233,7 +238,7 @@ def test_column_partitioned_table(client, project_id, col_partition_table_ref): {"description": "1 year after false eclipse report.", "occurred": "2019-02-15"}, ] - _add_rows(col_partition_table_ref, data) + _add_rows(col_partition_table_ref, data, bq_client) # Read from the table with a partition filter specified, and verify that # only the expected data is returned. @@ -271,22 +276,22 @@ def test_column_partitioned_table(client, project_id, col_partition_table_ref): ), ) def test_ingestion_time_partitioned_table( - client, project_id, ingest_partition_table_ref, data_format + client, project_id, ingest_partition_table_ref, bq_client, data_format ): data = [{"shape": "cigar", "altitude": 1200}, {"shape": "disc", "altitude": 750}] - _add_rows(ingest_partition_table_ref, data, partition_suffix="$20190809") + _add_rows(ingest_partition_table_ref, data, bq_client, partition_suffix="$20190809") data = [ {"shape": "sphere", "altitude": 3500}, {"shape": "doughnut", "altitude": 100}, ] - _add_rows(ingest_partition_table_ref, data, partition_suffix="$20190810") + _add_rows(ingest_partition_table_ref, data, bq_client, partition_suffix="$20190810") data = [ {"shape": "elephant", "altitude": 1}, {"shape": "rocket", "altitude": 12700}, ] - _add_rows(ingest_partition_table_ref, data, partition_suffix="$20190811") + _add_rows(ingest_partition_table_ref, data, bq_client, partition_suffix="$20190811") read_options = bigquery_storage_v1beta1.types.TableReadOptions() read_options.row_restriction = "DATE(_PARTITIONTIME) = '2019-08-10'" @@ -319,7 +324,9 @@ def test_ingestion_time_partitioned_table( (bigquery_storage_v1beta1.enums.DataFormat.ARROW), ), ) -def test_decoding_data_types(client, project_id, all_types_table_ref, data_format): +def test_decoding_data_types( + client, project_id, all_types_table_ref, bq_client, data_format +): data = [ { u"string_field": u"Price: € 9.95.", @@ -338,7 +345,7 @@ def test_decoding_data_types(client, project_id, all_types_table_ref, data_forma } ] - _add_rows(all_types_table_ref, data) + _add_rows(all_types_table_ref, data, bq_client) session = client.create_read_session( all_types_table_ref, From b373e3fb89e9c4ed58e80a780936242d4b51f50d Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 15 Aug 2019 12:27:15 +0200 Subject: [PATCH 13/14] Only create BQ storage client once per test run Creating a client just once avoids the auth overhead on every system test case. --- bigquery_storage/tests/system/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery_storage/tests/system/conftest.py b/bigquery_storage/tests/system/conftest.py index d19f5d28cae6..04968c6fd97b 100644 --- a/bigquery_storage/tests/system/conftest.py +++ b/bigquery_storage/tests/system/conftest.py @@ -201,7 +201,7 @@ def all_types_table_ref(project_id, dataset, bq_client): bq_client.delete_table(created_table) -@pytest.fixture() +@pytest.fixture(scope="session") def client(): return bigquery_storage_v1beta1.BigQueryStorageClient() From 5524ca9d31a3d9aba418a04e576952ae8d940f72 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 15 Aug 2019 12:59:47 +0200 Subject: [PATCH 14/14] Add common credentials fixture for system tests --- bigquery_storage/tests/system/conftest.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/bigquery_storage/tests/system/conftest.py b/bigquery_storage/tests/system/conftest.py index 04968c6fd97b..1db1521e5510 100644 --- a/bigquery_storage/tests/system/conftest.py +++ b/bigquery_storage/tests/system/conftest.py @@ -32,10 +32,19 @@ def project_id(): @pytest.fixture(scope="session") -def bq_client(): +def credentials(): + from google.oauth2 import service_account + + # NOTE: the test config in noxfile checks that the env variable is indeed set + filename = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + return service_account.Credentials.from_service_account_file(filename) + + +@pytest.fixture(scope="session") +def bq_client(credentials): from google.cloud import bigquery - return bigquery.Client() + return bigquery.Client(credentials=credentials) @pytest.fixture(scope="session") @@ -202,8 +211,8 @@ def all_types_table_ref(project_id, dataset, bq_client): @pytest.fixture(scope="session") -def client(): - return bigquery_storage_v1beta1.BigQueryStorageClient() +def client(credentials): + return bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials) @pytest.fixture()