Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery Storage: Add more in-depth system tests #8992

Merged
merged 14 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bigquery_storage/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def system(session):
session.install("-e", local_dep)
session.install("-e", "../test_utils/")
session.install("-e", ".[fastavro,pandas,pyarrow]")
session.install("-e", "../bigquery/")
session.install("-e", ".")

# Run py.test against the system tests.
Expand Down
6 changes: 6 additions & 0 deletions bigquery_storage/tests/system/assets/people_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
first_name,last_name,age
John,Doe,42
Jack,Black,53
Nick,Sleek,24
Kevin,Powell,50
Johnny,Young,2
191 changes: 187 additions & 4 deletions bigquery_storage/tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,203 @@
"""System tests for reading rows from tables."""

import os
import uuid

import pytest

from google.cloud import bigquery_storage_v1beta1


@pytest.fixture()
_ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets")


@pytest.fixture(scope="session")
def project_id():
return os.environ["PROJECT_ID"]


@pytest.fixture()
def client():
return bigquery_storage_v1beta1.BigQueryStorageClient()
@pytest.fixture(scope="session")
def credentials():
from google.oauth2 import service_account

# NOTE: the test config in noxfile checks that the env variable is indeed set
filename = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
return service_account.Credentials.from_service_account_file(filename)


@pytest.fixture(scope="session")
def bq_client(credentials):
from google.cloud import bigquery

return bigquery.Client(credentials=credentials)


@pytest.fixture(scope="session")
def dataset(project_id, bq_client):
from google.cloud import bigquery

unique_suffix = str(uuid.uuid4()).replace("-", "_")
dataset_name = "bq_storage_system_tests_" + unique_suffix

dataset_id = "{}.{}".format(project_id, dataset_name)
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
created_dataset = bq_client.create_dataset(dataset)

yield created_dataset

bq_client.delete_dataset(dataset, delete_contents=True)


@pytest.fixture(scope="session")
def table(project_id, dataset, bq_client):
from google.cloud import bigquery

schema = [
bigquery.SchemaField("first_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
]

table_id = "{}.{}.{}".format(project_id, dataset.dataset_id, "users")
bq_table = bigquery.Table(table_id, schema=schema)
created_table = bq_client.create_table(bq_table)

yield created_table

bq_client.delete_table(created_table)


@pytest.fixture
def table_with_data_ref(dataset, table, bq_client):
from google.cloud import bigquery

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
job_config.schema = table.schema

filename = os.path.join(_ASSETS_DIR, "people_data.csv")

with open(filename, "rb") as source_file:
job = bq_client.load_table_from_file(source_file, table, job_config=job_config)

job.result() # wait for the load to complete

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = table.project
table_ref.dataset_id = table.dataset_id
table_ref.table_id = table.table_id
yield table_ref

# truncate table data
query = "DELETE FROM {}.{} WHERE 1 = 1".format(dataset.dataset_id, table.table_id)
query_job = bq_client.query(query, location="US")
query_job.result()


@pytest.fixture
def col_partition_table_ref(project_id, dataset, bq_client):
from google.cloud import bigquery

schema = [
bigquery.SchemaField("occurred", "DATE", mode="REQUIRED"),
bigquery.SchemaField("description", "STRING", mode="REQUIRED"),
]
time_partitioning = bigquery.table.TimePartitioning(
type_=bigquery.table.TimePartitioningType.DAY, field="occurred"
)
bq_table = bigquery.table.Table(
table_ref="{}.{}.notable_events".format(project_id, dataset.dataset_id),
schema=schema,
)
bq_table.time_partitioning = time_partitioning

created_table = bq_client.create_table(bq_table)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = created_table.project
table_ref.dataset_id = created_table.dataset_id
table_ref.table_id = created_table.table_id
yield table_ref

bq_client.delete_table(created_table)


@pytest.fixture
def ingest_partition_table_ref(project_id, dataset, bq_client):
from google.cloud import bigquery

schema = [
bigquery.SchemaField("shape", "STRING", mode="REQUIRED"),
bigquery.SchemaField("altitude", "INT64", mode="REQUIRED"),
]
time_partitioning = bigquery.table.TimePartitioning(
type_=bigquery.table.TimePartitioningType.DAY,
field=None, # use _PARTITIONTIME pseudo column
)
bq_table = bigquery.table.Table(
table_ref="{}.{}.ufo_sightings".format(project_id, dataset.dataset_id),
schema=schema,
)
bq_table.time_partitioning = time_partitioning

created_table = bq_client.create_table(bq_table)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = created_table.project
table_ref.dataset_id = created_table.dataset_id
table_ref.table_id = created_table.table_id
yield table_ref

bq_client.delete_table(created_table)


@pytest.fixture
def all_types_table_ref(project_id, dataset, bq_client):
from google.cloud import bigquery

schema = [
bigquery.SchemaField("string_field", "STRING"),
bigquery.SchemaField("bytes_field", "BYTES"),
bigquery.SchemaField("int64_field", "INT64"),
bigquery.SchemaField("float64_field", "FLOAT64"),
bigquery.SchemaField("numeric_field", "NUMERIC"),
bigquery.SchemaField("bool_field", "BOOL"),
bigquery.SchemaField("geography_field", "GEOGRAPHY"),
bigquery.SchemaField(
"person_struct_field",
"STRUCT",
fields=(
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("age", "INT64"),
),
),
bigquery.SchemaField("timestamp_field", "TIMESTAMP"),
bigquery.SchemaField("date_field", "DATE"),
bigquery.SchemaField("time_field", "TIME"),
bigquery.SchemaField("datetime_field", "DATETIME"),
bigquery.SchemaField("string_array_field", "STRING", mode="REPEATED"),
]
bq_table = bigquery.table.Table(
table_ref="{}.{}.complex_records".format(project_id, dataset.dataset_id),
schema=schema,
)

created_table = bq_client.create_table(bq_table)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = created_table.project
table_ref.dataset_id = created_table.dataset_id
table_ref.table_id = created_table.table_id
yield table_ref

bq_client.delete_table(created_table)


@pytest.fixture(scope="session")
def client(credentials):
return bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)


@pytest.fixture()
Expand Down
Loading