Skip to content

Commit

Permalink
Update BigQuery document; Python SDK(apache#26693)
Browse files Browse the repository at this point in the history
Add usage of BigQuery StorageWriteAPI in Python.
  • Loading branch information
RyuSA committed May 25, 2023
1 parent 9270ee3 commit b979eb0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
33 changes: 33 additions & 0 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,39 @@ def model_bigqueryio(
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
# [END model_bigqueryio_write]

# [START model_bigqueryio_write_with_storage_write_api]
quotes | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)
# [END model_bigqueryio_write_with_storage_write_api]

# [START model_bigqueryio_write_schema]
table_schema = {
'fields': [
{
"name": "request_ts",
"type": "TIMESTAMP",
"mode": "REQUIRED"
},
{
"name": "user_name",
"type": "STRING",
"mode": "REQUIRED"
}
]
}
# [END model_bigqueryio_write_schema]

# [START model_bigqueryio_storage_write_api_with_frequency_and_multiple_streams]
# The SDK for Python does not support `withNumStorageWriteApiStreams`
quotes | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
triggering_frequency=5)
# [END model_bigqueryio_storage_write_api_with_frequency_and_multiple_streams]

# [START model_bigqueryio_write_dynamic_destinations]
fictional_characters_view = beam.pvalue.AsDict(
pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ BigQueryIO.writeTableRows()
);
{{< /highlight >}}
{{< highlight py >}}
# The SDK for Python does not support the BigQuery Storage API.
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_write_with_storage_write_api >}}
{{< /highlight >}}

If you want to change the behavior of BigQueryIO so that all the BigQuery sinks
Expand All @@ -820,7 +820,7 @@ TableSchema schema = new TableSchema().setFields(
.setMode("REQUIRED")));
{{< /highlight >}}
{{< highlight py >}}
# The SDK for Python does not support the BigQuery Storage API.
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_write_schema >}}
{{< /highlight >}}

For streaming pipelines, you need to set two additional parameters: the number
Expand All @@ -834,7 +834,7 @@ BigQueryIO.writeTableRows()
);
{{< /highlight >}}
{{< highlight py >}}
# The SDK for Python does not support the BigQuery Storage API.
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_storage_write_api_with_frequency_and_multiple_streams >}}
{{< /highlight >}}

The number of streams defines the parallelism of the BigQueryIO Write transform
Expand Down

0 comments on commit b979eb0

Please sign in to comment.