From 6eb1b7e4dec00e783ff523904c33085180210c9b Mon Sep 17 00:00:00 2001 From: RyuSA <12961775+RyuSA@users.noreply.github.com> Date: Wed, 12 Jul 2023 02:12:11 +0900 Subject: [PATCH] Update document Storage API Support in `Google BigQuery I/O connector` for Python SDK (#26889) * Update BigQuery document; Python SDK(#26693) Add usage of BigQuery StorageWriteAPI in Python. --------- Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> --- .../apache_beam/examples/snippets/snippets.py | 57 +++++++++++++++++++ .../examples/snippets/snippets_test.py | 10 ++++ .../io/built-in/google-bigquery.md | 19 +++++-- 3 files changed, 82 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 74e1d0a64c88..e4184f37889e 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -952,6 +952,14 @@ def model_bigqueryio( | beam.Map(lambda elem: elem['max_temperature'])) # [END model_bigqueryio_read_query_std_sql] + # [START model_bigqueryio_read_table_with_storage_api] + max_temperatures = ( + pipeline + | 'ReadTableWithStorageAPI' >> beam.io.ReadFromBigQuery( + table=table_spec, method=beam.io.ReadFromBigQuery.Method.DIRECT_READ) + | beam.Map(lambda elem: elem['max_temperature'])) + # [END model_bigqueryio_read_table_with_storage_api] + # [START model_bigqueryio_schema] # column_name:BIGQUERY_TYPE, ... table_schema = 'source:STRING, quote:STRING' @@ -1020,6 +1028,55 @@ def table_fn(element, fictional_characters): # [END model_bigqueryio_time_partitioning] +def model_bigqueryio_xlang( + pipeline, write_project='', write_dataset='', write_table=''): + """Examples for cross-language BigQuery sources and sinks.""" + + # to avoid a validation error(input data schema and the table schema) + # use a table that does not exist + import uuid + never_exists_table = str(uuid.uuid4()) + table_spec = 'clouddataflow-readonly:samples.{}'.format(never_exists_table) + + if write_project and write_dataset and write_table: + table_spec = '{}:{}.{}'.format(write_project, write_dataset, write_table) + + # [START model_bigqueryio_write_schema] + table_schema = { + 'fields': [{ + 'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE' + }, { + 'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED' + }] + } + # [END model_bigqueryio_write_schema] + + quotes = pipeline | beam.Create([ + { + 'source': 'Mahatma Gandhi', 'quote': 'My life is my message.' + }, + { + 'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'." + }, + ]) + + # [START model_bigqueryio_storage_write_api_with_frequency] + # The Python SDK doesn't currently support setting the number of write streams + quotes | "StorageWriteAPIWithFrequency" >> beam.io.WriteToBigQuery( + table_spec, + schema=table_schema, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + triggering_frequency=5) + # [END model_bigqueryio_storage_write_api_with_frequency] + + # [START model_bigqueryio_write_with_storage_write_api] + quotes | "WriteTableWithStorageAPI" >> beam.io.WriteToBigQuery( + table_spec, + schema=table_schema, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API) + # [END model_bigqueryio_write_with_storage_write_api] + + def model_composite_transform_example(contents, output_path): """Example of a composite transform. diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 0cae02f859ab..ec52ca37af39 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -33,6 +33,7 @@ import mock import parameterized +import pytest import apache_beam as beam from apache_beam import WindowInto @@ -757,6 +758,15 @@ def test_model_bigqueryio(self): p.options.view_as(GoogleCloudOptions).temp_location = 'gs://mylocation' snippets.model_bigqueryio(p) + @pytest.mark.uses_gcp_java_expansion_service + @unittest.skipUnless( + os.environ.get('EXPANSION_PORT'), + "EXPANSION_PORT environment var is not provided.") + def test_model_bigqueryio_xlang(self): + p = TestPipeline() + p.options.view_as(GoogleCloudOptions).temp_location = 'gs://mylocation' + snippets.model_bigqueryio_xlang(p) + def _run_test_pipeline_for_options(self, fn): temp_path = self.create_temp_file('aa\nbb\ncc') result_path = temp_path + '.result' diff --git a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md index c0530370a476..24314dc11800 100644 --- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md +++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md @@ -363,7 +363,7 @@ GitHub](https://github.com/apache/beam/blob/master/examples/java/src/main/java/o {{< /highlight >}} {{< highlight py >}} -# The SDK for Python does not support the BigQuery Storage API. +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_read_table_with_storage_api >}} {{< /highlight >}} The following code snippet reads with a query string. @@ -779,6 +779,17 @@ Starting with version 2.36.0 of the Beam SDK for Java, you can use the [BigQuery Storage Write API](https://cloud.google.com/bigquery/docs/write-api) from the BigQueryIO connector. +Also after version 2.47.0 of Beam SDK for Python, SDK supports BigQuery Storage Write API. + +{{< paragraph class="language-py" >}} +BigQuery Storage Write API for Python SDK currently has some limitations on supported data types. As this method makes use of cross-language transforms, we are limited to the types supported at the cross-language boundary. For example, `apache_beam.utils.timestamp.Timestamp` is needed to write a `TIMESTAMP` BigQuery type. Also, some types (e.g. `DATETIME`) are not supported yet. For more details, please refer to the [full type mapping](https://github.com/apache/beam/blob/0b430748cdd2e25edc553747ce018195e9cce888/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L112-L123). +{{< /paragraph >}} + +{{< paragraph class="language-py" >}} +**Note:** If you want to run WriteToBigQuery with Storage Write API from the source code, you need to run `./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build` to build the expansion-service jar. If you are running from a released Beam SDK, the jar will already be included. + +{{< /paragraph >}} + #### Exactly-once semantics To write to BigQuery using the Storage Write API, set `withMethod` to @@ -795,7 +806,7 @@ BigQueryIO.writeTableRows() ); {{< /highlight >}} {{< highlight py >}} -# The SDK for Python does not support the BigQuery Storage API. +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_write_with_storage_write_api >}} {{< /highlight >}} If you want to change the behavior of BigQueryIO so that all the BigQuery sinks @@ -820,7 +831,7 @@ TableSchema schema = new TableSchema().setFields( .setMode("REQUIRED"))); {{< /highlight >}} {{< highlight py >}} -# The SDK for Python does not support the BigQuery Storage API. +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_write_schema >}} {{< /highlight >}} For streaming pipelines, you need to set two additional parameters: the number @@ -834,7 +845,7 @@ BigQueryIO.writeTableRows() ); {{< /highlight >}} {{< highlight py >}} -# The SDK for Python does not support the BigQuery Storage API. +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_storage_write_api_with_frequency >}} {{< /highlight >}} The number of streams defines the parallelism of the BigQueryIO Write transform