Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(bigquery): add extract table and table cmek operations #101

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 0 additions & 177 deletions bigquery/docs/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,35 +161,6 @@ def test_create_table_nested_repeated_schema(client, to_delete):
# [END bigquery_nested_repeated_schema]


def test_create_table_cmek(client, to_delete):
dataset_id = "create_table_cmek_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
client.create_dataset(dataset)
to_delete.append(dataset)

# [START bigquery_create_table_cmek]
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

table_ref = client.dataset(dataset_id).table("my_table")
table = bigquery.Table(table_ref)

# Set the encryption key to use for the table.
# TODO: Replace this key with a key you have created in Cloud KMS.
kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
"cloud-samples-tests", "us", "test", "test"
)
table.encryption_configuration = bigquery.EncryptionConfiguration(
kms_key_name=kms_key_name
)

table = client.create_table(table) # API request

assert table.encryption_configuration.kms_key_name == kms_key_name
# [END bigquery_create_table_cmek]


def test_create_partitioned_table(client, to_delete):
dataset_id = "create_table_partitioned_{}".format(_millis())
dataset_ref = bigquery.Dataset(client.dataset(dataset_id))
Expand Down Expand Up @@ -411,51 +382,6 @@ def test_relax_column(client, to_delete):
# [END bigquery_relax_column]


@pytest.mark.skip(
reason=(
"update_table() is flaky "
"https://github.com/GoogleCloudPlatform/google-cloud-python/issues/5589"
)
)
def test_update_table_cmek(client, to_delete):
"""Patch a table's metadata."""
dataset_id = "update_table_cmek_{}".format(_millis())
table_id = "update_table_cmek_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
client.create_dataset(dataset)
to_delete.append(dataset)

table = bigquery.Table(dataset.table(table_id))
original_kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
"cloud-samples-tests", "us", "test", "test"
)
table.encryption_configuration = bigquery.EncryptionConfiguration(
kms_key_name=original_kms_key_name
)
table = client.create_table(table)

# [START bigquery_update_table_cmek]
# from google.cloud import bigquery
# client = bigquery.Client()

assert table.encryption_configuration.kms_key_name == original_kms_key_name

# Set a new encryption key to use for the destination.
# TODO: Replace this key with a key you have created in KMS.
updated_kms_key_name = (
"projects/cloud-samples-tests/locations/us/keyRings/test/cryptoKeys/otherkey"
)
table.encryption_configuration = bigquery.EncryptionConfiguration(
kms_key_name=updated_kms_key_name
)

table = client.update_table(table, ["encryption_configuration"]) # API request

assert table.encryption_configuration.kms_key_name == updated_kms_key_name
assert original_kms_key_name != updated_kms_key_name
# [END bigquery_update_table_cmek]


@pytest.mark.skip(
reason=(
"update_table() is flaky "
Expand Down Expand Up @@ -1152,109 +1078,6 @@ def test_load_table_relax_column(client, to_delete):
assert table.num_rows > 0


def test_extract_table(client, to_delete):
bucket_name = "extract_shakespeare_{}".format(_millis())
storage_client = storage.Client()
bucket = retry_storage_errors(storage_client.create_bucket)(bucket_name)
to_delete.append(bucket)

# [START bigquery_extract_table]
# from google.cloud import bigquery
# client = bigquery.Client()
# bucket_name = 'my-bucket'
project = "bigquery-public-data"
dataset_id = "samples"
table_id = "shakespeare"

destination_uri = "gs://{}/{}".format(bucket_name, "shakespeare.csv")
dataset_ref = client.dataset(dataset_id, project=project)
table_ref = dataset_ref.table(table_id)

extract_job = client.extract_table(
table_ref,
destination_uri,
# Location must match that of the source table.
location="US",
) # API request
extract_job.result() # Waits for job to complete.

print(
"Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
)
# [END bigquery_extract_table]

blob = retry_storage_errors(bucket.get_blob)("shakespeare.csv")
assert blob.exists
assert blob.size > 0
to_delete.insert(0, blob)


def test_extract_table_json(client, to_delete):
bucket_name = "extract_shakespeare_json_{}".format(_millis())
storage_client = storage.Client()
bucket = retry_storage_errors(storage_client.create_bucket)(bucket_name)
to_delete.append(bucket)

# [START bigquery_extract_table_json]
# from google.cloud import bigquery
# client = bigquery.Client()
# bucket_name = 'my-bucket'

destination_uri = "gs://{}/{}".format(bucket_name, "shakespeare.json")
dataset_ref = client.dataset("samples", project="bigquery-public-data")
table_ref = dataset_ref.table("shakespeare")
job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON

extract_job = client.extract_table(
table_ref,
destination_uri,
job_config=job_config,
# Location must match that of the source table.
location="US",
) # API request
extract_job.result() # Waits for job to complete.
# [END bigquery_extract_table_json]

blob = retry_storage_errors(bucket.get_blob)("shakespeare.json")
assert blob.exists
assert blob.size > 0
to_delete.insert(0, blob)


def test_extract_table_compressed(client, to_delete):
bucket_name = "extract_shakespeare_compress_{}".format(_millis())
storage_client = storage.Client()
bucket = retry_storage_errors(storage_client.create_bucket)(bucket_name)
to_delete.append(bucket)

# [START bigquery_extract_table_compressed]
# from google.cloud import bigquery
# client = bigquery.Client()
# bucket_name = 'my-bucket'

destination_uri = "gs://{}/{}".format(bucket_name, "shakespeare.csv.gz")
dataset_ref = client.dataset("samples", project="bigquery-public-data")
table_ref = dataset_ref.table("shakespeare")
job_config = bigquery.job.ExtractJobConfig()
job_config.compression = bigquery.Compression.GZIP

extract_job = client.extract_table(
table_ref,
destination_uri,
# Location must match that of the source table.
location="US",
job_config=job_config,
) # API request
extract_job.result() # Waits for job to complete.
# [END bigquery_extract_table_compressed]

blob = retry_storage_errors(bucket.get_blob)("shakespeare.csv.gz")
assert blob.exists
assert blob.size > 0
to_delete.insert(0, blob)


def test_client_query_total_rows(client, capsys):
"""Run a query and just check for how many rows."""
# [START bigquery_query_total_rows]
Expand Down
4 changes: 2 additions & 2 deletions bigquery/docs/usage/encryption.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ in the BigQuery documentation for more details.
Create a new table, using a customer-managed encryption key from
Cloud KMS to encrypt it.

.. literalinclude:: ../snippets.py
.. literalinclude:: ../samples/create_table_cmek.py
:language: python
:dedent: 4
:start-after: [START bigquery_create_table_cmek]
:end-before: [END bigquery_create_table_cmek]

Change the key used to encrypt a table.

.. literalinclude:: ../snippets.py
.. literalinclude:: ../samples/update_table_cmek.py
:language: python
:dedent: 4
:start-after: [START bigquery_update_table_cmek]
Expand Down
18 changes: 17 additions & 1 deletion bigquery/docs/usage/tables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,28 @@ Copy a table with the
Copy table data to Google Cloud Storage with the
:func:`~google.cloud.bigquery.client.Client.extract_table` method:

.. literalinclude:: ../snippets.py
.. literalinclude:: ../samples/extract_table.py
:language: python
:dedent: 4
:start-after: [START bigquery_extract_table]
:end-before: [END bigquery_extract_table]

Copy table data to Google Cloud Storage json file:

.. literalinclude:: ../samples/extract_table_compressed.py
:language: python
:dedent: 4
:start-after: [START bigquery_extract_table_compressed]
:end-before: [END bigquery_extract_table_compressed]

Copy table data to Google Cloud Storage compressed file:

.. literalinclude:: ../samples/extract_table_json.py
:language: python
:dedent: 4
:start-after: [START bigquery_extract_table_json]
:end-before: [END bigquery_extract_table_json]

Deleting a Table
^^^^^^^^^^^^^^^^

Expand Down
43 changes: 43 additions & 0 deletions bigquery/samples/create_table_cmek.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def create_table_cmek(table_id, kms_key_name):

# [START bigquery_create_table_cmek]
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

# Set the encryption key to use for the destination.
# TODO: Replace this key with a key you have created in KMS.
# kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
# "cloud-samples-tests", "us", "test", "test"
# )

table = bigquery.Table(table_id)
table.encryption_configuration = bigquery.EncryptionConfiguration(
kms_key_name=kms_key_name
)

table = client.create_table(table) # Make an API request.

if table.encryption_configuration.kms_key_name == kms_key_name:
print("A table created with encryption configuration key")

# [END bigquery_create_table_cmek]
63 changes: 63 additions & 0 deletions bigquery/samples/extract_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def extract_table(table_id):

# [START bigquery_extract_table]
import time

from google.cloud import bigquery
from google.cloud import storage

# Construct a BigQuery client object.
client = bigquery.Client()

# Construct a Storage client object.
storage_client = storage.Client()

# TODO(developer): Set table_id to the ID of the model to fetch.
# table_id = 'your-project.your_dataset.your_table'

bucket_name = "extract_shakespeare_{}".format(int(time.time() * 1000))
bucket = storage_client.create_bucket(bucket_name)

destination_uri = "gs://{}/{}".format(bucket_name, "shakespeare.csv")

table = bigquery.Table(
table_id,
schema=[
bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
],
)
table = client.create_table(table)

extract_job = client.extract_table(
table,
destination_uri,
# Must match the source table location.
location="US",
) # Make an API request.
extract_job.result() # Waits for job to complete.

print(
"Exported {}.{}.{} to {}".format(
table.project, table.dataset_id, table.table_id, destination_uri
)
)
# [END bigquery_extract_table]

blob = bucket.get_blob("shakespeare.csv")
return blob, bucket
Loading