From 9cc45818d6bb5352ca3c6205043699727b9ae00f Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 21 Jan 2025 15:12:59 -0500 Subject: [PATCH 1/8] Write how-to doc on dataflow cost benchmarking --- .../apache_beam/testing/benchmarks/README.md | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 sdks/python/apache_beam/testing/benchmarks/README.md diff --git a/sdks/python/apache_beam/testing/benchmarks/README.md b/sdks/python/apache_beam/testing/benchmarks/README.md new file mode 100644 index 000000000000..ceb72d9eb5ff --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/README.md @@ -0,0 +1,103 @@ + + +# Writing a Dataflow Cost Benchmark + +Writing a Dataflow Cost Benchmark to estimate the financial cost of executing a pipeline on Google Cloud Platform Dataflow requires 4 components in the repository: + +1. A pipeline to execute (ideally one located in the examples directory) +1. A text file with pipeline options in the `.github/workflows/cost-benchmarks-pipeline-options` directory +1. A test class inheriting from the `DataflowCostBenchmark` class +1. An entry to execute the pipeline as part of the cost benchmarks workflow action + +### Choosing a Pipeline +Pipelines that are worth benchmarking in terms of performance and cost have a few straightforward requirements. + +1. The transforms used in the pipeline should be native to Beam *or* be lightweight and readily available in the given pipeline +1. The pipeline itself should run on a consistent data set and have consistent internals (such as model versions for `RunInference` workloads.) +1. The pipeline should perform some sort of behavior that would be common enough for a user to create themselves + * Effectively, we want to read data from a source, do some sort of transformation, then write that data elsewhere. No need to overcomplicate things. + +Additionally, the `run()` call for the pipeline should return a `PipelineResult` object, which the benchmark framework uses to query metrics from Dataflow after the run completes. + +### Pipeline Options +Once you have a functioning pipeline to configure as a benchmark, the options needs to be saved as a `.txt` file in the `.github/workflows/cost-benchmarks-pipeline-options` directory. The file needs the Apache 2.0 license header at the top of the file, then each flag will need to be provided on a separate line. These arguments include: + +* GCP Region (usually us-central1) +* Machine Type +* Number of Workers +* Disk Size +* Autoscaling Algorithm (typically `NONE` for benchmarking purposes) +* Staging and Temp locations +* A requirements file path in the repository (if additional dependencies are needed) +* Benchmark-specific values + * `publish_to_big_query` - always true for benchmarks + * Metrics Dataset for Output + * Metrics Table for Output + +### Configuring the Test Class +With the pipeline itself chosen and the arguments set, we can build out the test class that will execute the pipeline. Navigate to `sdks/python/apache_beam/testing/benchmarks` and select an appropriate sub-directory (or create one if necessary.) The class for `wordcount` is shown below: + +```py +import logging + +from apache_beam.examples import wordcount +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark + + +class WordcountCostBenchmark(DataflowCostBenchmark): + def __init__(self): + super().__init__() + + def test(self): + extra_opts = {} + extra_opts['output'] = self.pipeline.get_option('output_file') + self.result = wordcount.run( + self.pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + WordcountCostBenchmark().run() +``` + +The important notes here: if there are any arguments with common arg names (like `input` and `output`) you can use the `extra_opts` dictionary to map to them from alternatives in the options file. + +You should also make sure that you save the output of the `run()` call from the pipeline in the `self.result` field, as the framework will try to re-run the pipeline without the extra opts if that value is missing. Beyond those two key notes, the benchmarking framework does all of the other work in terms of setup, teardown, and metrics querying. + +### Updating the Benchmark Workflow +Navigate to `.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml` and make the following changes: + +1. Add the pipeline options `.txt` file written above to the `argument-file-paths` list. This will load those pipeline options as an entry in the workflow environment, with the entry getting the value `env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_X`. X is an integer value corresponding to the position of the text file in the list of files, starting from `1`. +2. Create an entry for the benchmark. The entry for wordcount is as follows: + +```yaml + - name: Run wordcount on Dataflow + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 30 + with: + @@ -88,4 +92,14 @@ jobs: + -PloadTest.mainClass=apache_beam.testing.benchmarks.wordcount.wordcount \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \ +``` + +The main class is the `DataflowCostBenchmark` subclass defined earlier, then the runner and python version are specified. The majority of pipeline arguments are loaded from the `.txt` file, with the job name and output being specified here. Be sure to set a reasonable timeout here as well. \ No newline at end of file From 3eaeb9105b14213708b5e8d68308a65b63d09332 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 28 Jan 2025 11:04:15 -0500 Subject: [PATCH 2/8] trailing whitespace --- .../apache_beam/testing/benchmarks/README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/README.md b/sdks/python/apache_beam/testing/benchmarks/README.md index ceb72d9eb5ff..b777ae44bbb7 100644 --- a/sdks/python/apache_beam/testing/benchmarks/README.md +++ b/sdks/python/apache_beam/testing/benchmarks/README.md @@ -23,18 +23,18 @@ Writing a Dataflow Cost Benchmark to estimate the financial cost of executing a 1. A pipeline to execute (ideally one located in the examples directory) 1. A text file with pipeline options in the `.github/workflows/cost-benchmarks-pipeline-options` directory -1. A test class inheriting from the `DataflowCostBenchmark` class -1. An entry to execute the pipeline as part of the cost benchmarks workflow action +1. A test class inheriting from the `DataflowCostBenchmark` class +1. An entry to execute the pipeline as part of the cost benchmarks workflow action ### Choosing a Pipeline -Pipelines that are worth benchmarking in terms of performance and cost have a few straightforward requirements. +Pipelines that are worth benchmarking in terms of performance and cost have a few straightforward requirements. 1. The transforms used in the pipeline should be native to Beam *or* be lightweight and readily available in the given pipeline 1. The pipeline itself should run on a consistent data set and have consistent internals (such as model versions for `RunInference` workloads.) 1. The pipeline should perform some sort of behavior that would be common enough for a user to create themselves - * Effectively, we want to read data from a source, do some sort of transformation, then write that data elsewhere. No need to overcomplicate things. + * Effectively, we want to read data from a source, do some sort of transformation, then write that data elsewhere. No need to overcomplicate things. -Additionally, the `run()` call for the pipeline should return a `PipelineResult` object, which the benchmark framework uses to query metrics from Dataflow after the run completes. +Additionally, the `run()` call for the pipeline should return a `PipelineResult` object, which the benchmark framework uses to query metrics from Dataflow after the run completes. ### Pipeline Options Once you have a functioning pipeline to configure as a benchmark, the options needs to be saved as a `.txt` file in the `.github/workflows/cost-benchmarks-pipeline-options` directory. The file needs the Apache 2.0 license header at the top of the file, then each flag will need to be provided on a separate line. These arguments include: @@ -78,7 +78,7 @@ if __name__ == '__main__': WordcountCostBenchmark().run() ``` -The important notes here: if there are any arguments with common arg names (like `input` and `output`) you can use the `extra_opts` dictionary to map to them from alternatives in the options file. +The important notes here: if there are any arguments with common arg names (like `input` and `output`) you can use the `extra_opts` dictionary to map to them from alternatives in the options file. You should also make sure that you save the output of the `run()` call from the pipeline in the `self.result` field, as the framework will try to re-run the pipeline without the extra opts if that value is missing. Beyond those two key notes, the benchmarking framework does all of the other work in terms of setup, teardown, and metrics querying. @@ -100,4 +100,4 @@ Navigate to `.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml` and make '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \ ``` -The main class is the `DataflowCostBenchmark` subclass defined earlier, then the runner and python version are specified. The majority of pipeline arguments are loaded from the `.txt` file, with the job name and output being specified here. Be sure to set a reasonable timeout here as well. \ No newline at end of file +The main class is the `DataflowCostBenchmark` subclass defined earlier, then the runner and python version are specified. The majority of pipeline arguments are loaded from the `.txt` file, with the job name and output being specified here. Be sure to set a reasonable timeout here as well. \ No newline at end of file From 4af236d20983ba7cd53d1a5c55ca1cc546133edd Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 28 Jan 2025 11:36:18 -0500 Subject: [PATCH 3/8] add streaming information, links --- .../apache_beam/testing/benchmarks/README.md | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/README.md b/sdks/python/apache_beam/testing/benchmarks/README.md index b777ae44bbb7..7319caaab789 100644 --- a/sdks/python/apache_beam/testing/benchmarks/README.md +++ b/sdks/python/apache_beam/testing/benchmarks/README.md @@ -22,8 +22,8 @@ Writing a Dataflow Cost Benchmark to estimate the financial cost of executing a pipeline on Google Cloud Platform Dataflow requires 4 components in the repository: 1. A pipeline to execute (ideally one located in the examples directory) -1. A text file with pipeline options in the `.github/workflows/cost-benchmarks-pipeline-options` directory -1. A test class inheriting from the `DataflowCostBenchmark` class +1. A text file with pipeline options in the `.github/workflows/cost-benchmarks-pipeline-options` [directory](../../../../../.github/workflows/cost-benchmarks-pipeline-options) +1. A test class inheriting from the `DataflowCostBenchmark` [class](../load_tests/dataflow_cost_benchmark.py) 1. An entry to execute the pipeline as part of the cost benchmarks workflow action ### Choosing a Pipeline @@ -37,7 +37,8 @@ Pipelines that are worth benchmarking in terms of performance and cost have a fe Additionally, the `run()` call for the pipeline should return a `PipelineResult` object, which the benchmark framework uses to query metrics from Dataflow after the run completes. ### Pipeline Options -Once you have a functioning pipeline to configure as a benchmark, the options needs to be saved as a `.txt` file in the `.github/workflows/cost-benchmarks-pipeline-options` directory. The file needs the Apache 2.0 license header at the top of the file, then each flag will need to be provided on a separate line. These arguments include: +Once you have a functioning pipeline to configure as a benchmark, the options needs to be saved as a `.txt` file in the `.github/workflows/cost-benchmarks-pipeline-options` [directory](../../../../../.github/workflows/cost-benchmarks-pipeline-options). +The file needs the Apache 2.0 license header at the top of the file, then each flag will need to be provided on a separate line. These arguments include: * GCP Region (usually us-central1) * Machine Type @@ -52,7 +53,8 @@ Once you have a functioning pipeline to configure as a benchmark, the options ne * Metrics Table for Output ### Configuring the Test Class -With the pipeline itself chosen and the arguments set, we can build out the test class that will execute the pipeline. Navigate to `sdks/python/apache_beam/testing/benchmarks` and select an appropriate sub-directory (or create one if necessary.) The class for `wordcount` is shown below: +With the pipeline itself chosen and the arguments set, we can build out the test class that will execute the pipeline. Navigate to [`sdks/python/apache_beam/testing/benchmarks`](../../testing/benchmarks/) and select an appropriate sub-directory (or create one if necessary.) +The class for `wordcount` is shown below: ```py import logging @@ -82,8 +84,17 @@ The important notes here: if there are any arguments with common arg names (like You should also make sure that you save the output of the `run()` call from the pipeline in the `self.result` field, as the framework will try to re-run the pipeline without the extra opts if that value is missing. Beyond those two key notes, the benchmarking framework does all of the other work in terms of setup, teardown, and metrics querying. +#### Streaming Workloads + +If the pipeline is a streaming use case, two versions need to be created: one operating on a backlog of work items (e.g. the entire test corpus is placed into the streaming source +before the pipeline begins) and one operating in steady state (e.g. elements are added to the streaming source at a regular rate.) The former is relatively simple, simply add an extra +step to the `test()` function to stage the input data into the streaming source being read from. For the latter, a separate Python thread should be spun up to stage one element at a time +repeatedly over a given time interval (the interval between elements and the duration of the staging should be defined as part of the benchmark configuration.) Once the streaming pipeline +is out of data and does not receive more for an extended period of time, the pipeline will exit and the benchmarking framework will process the results in the same manner as the batch case. +In the steady state case, remember to call `join()` to close the thread after exectution. + ### Updating the Benchmark Workflow -Navigate to `.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml` and make the following changes: +Navigate to [`.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml`](../../../../../.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml) and make the following changes: 1. Add the pipeline options `.txt` file written above to the `argument-file-paths` list. This will load those pipeline options as an entry in the workflow environment, with the entry getting the value `env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_X`. X is an integer value corresponding to the position of the text file in the list of files, starting from `1`. 2. Create an entry for the benchmark. The entry for wordcount is as follows: From f5a4a8093f50e17ce18e7928ff1ca88ce11687f3 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 28 Jan 2025 11:43:51 -0500 Subject: [PATCH 4/8] add context for BQ stuff --- sdks/python/apache_beam/testing/benchmarks/README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/README.md b/sdks/python/apache_beam/testing/benchmarks/README.md index 7319caaab789..7d42066d1bda 100644 --- a/sdks/python/apache_beam/testing/benchmarks/README.md +++ b/sdks/python/apache_beam/testing/benchmarks/README.md @@ -48,9 +48,12 @@ The file needs the Apache 2.0 license header at the top of the file, then each f * Staging and Temp locations * A requirements file path in the repository (if additional dependencies are needed) * Benchmark-specific values - * `publish_to_big_query` - always true for benchmarks + * `publish_to_big_query` + * This is always `true` for cost benchmarks * Metrics Dataset for Output + * For `RunInference` workloads this will be `beam_run_inference` * Metrics Table for Output + * This should be named for the benchmark being run ### Configuring the Test Class With the pipeline itself chosen and the arguments set, we can build out the test class that will execute the pipeline. Navigate to [`sdks/python/apache_beam/testing/benchmarks`](../../testing/benchmarks/) and select an appropriate sub-directory (or create one if necessary.) From c2f40e9da5acdb3772a8d004c28284e0a4403ce6 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 28 Jan 2025 12:26:41 -0500 Subject: [PATCH 5/8] remove trailing whitespace --- sdks/python/apache_beam/testing/benchmarks/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/README.md b/sdks/python/apache_beam/testing/benchmarks/README.md index 7d42066d1bda..fc83f16a43a8 100644 --- a/sdks/python/apache_beam/testing/benchmarks/README.md +++ b/sdks/python/apache_beam/testing/benchmarks/README.md @@ -48,7 +48,7 @@ The file needs the Apache 2.0 license header at the top of the file, then each f * Staging and Temp locations * A requirements file path in the repository (if additional dependencies are needed) * Benchmark-specific values - * `publish_to_big_query` + * `publish_to_big_query` * This is always `true` for cost benchmarks * Metrics Dataset for Output * For `RunInference` workloads this will be `beam_run_inference` From de90d540c6bab8212ce6202bf2fd546b08b67da9 Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:14:31 -0500 Subject: [PATCH 6/8] Apply suggestions from code review Co-authored-by: tvalentyn --- sdks/python/apache_beam/testing/benchmarks/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/README.md b/sdks/python/apache_beam/testing/benchmarks/README.md index fc83f16a43a8..ada2069b4c1f 100644 --- a/sdks/python/apache_beam/testing/benchmarks/README.md +++ b/sdks/python/apache_beam/testing/benchmarks/README.md @@ -23,7 +23,7 @@ Writing a Dataflow Cost Benchmark to estimate the financial cost of executing a 1. A pipeline to execute (ideally one located in the examples directory) 1. A text file with pipeline options in the `.github/workflows/cost-benchmarks-pipeline-options` [directory](../../../../../.github/workflows/cost-benchmarks-pipeline-options) -1. A test class inheriting from the `DataflowCostBenchmark` [class](../load_tests/dataflow_cost_benchmark.py) +1. A python file with a class inheriting from the `DataflowCostBenchmark` [class](../load_tests/dataflow_cost_benchmark.py) 1. An entry to execute the pipeline as part of the cost benchmarks workflow action ### Choosing a Pipeline @@ -44,7 +44,7 @@ The file needs the Apache 2.0 license header at the top of the file, then each f * Machine Type * Number of Workers * Disk Size -* Autoscaling Algorithm (typically `NONE` for benchmarking purposes) +* Autoscaling Algorithm (set this to `NONE` for a more consistent benchmark signal) * Staging and Temp locations * A requirements file path in the repository (if additional dependencies are needed) * Benchmark-specific values @@ -53,7 +53,7 @@ The file needs the Apache 2.0 license header at the top of the file, then each f * Metrics Dataset for Output * For `RunInference` workloads this will be `beam_run_inference` * Metrics Table for Output - * This should be named for the benchmark being run + * This should be named after the benchmark being run ### Configuring the Test Class With the pipeline itself chosen and the arguments set, we can build out the test class that will execute the pipeline. Navigate to [`sdks/python/apache_beam/testing/benchmarks`](../../testing/benchmarks/) and select an appropriate sub-directory (or create one if necessary.) From e78d1cc9123eba4ea84dee94fc71ade8889dafc6 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 4 Feb 2025 10:34:19 -0500 Subject: [PATCH 7/8] Elaborate on requirements --- sdks/python/apache_beam/testing/benchmarks/README.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/README.md b/sdks/python/apache_beam/testing/benchmarks/README.md index ada2069b4c1f..c4a25dfa79f3 100644 --- a/sdks/python/apache_beam/testing/benchmarks/README.md +++ b/sdks/python/apache_beam/testing/benchmarks/README.md @@ -29,8 +29,13 @@ Writing a Dataflow Cost Benchmark to estimate the financial cost of executing a ### Choosing a Pipeline Pipelines that are worth benchmarking in terms of performance and cost have a few straightforward requirements. -1. The transforms used in the pipeline should be native to Beam *or* be lightweight and readily available in the given pipeline -1. The pipeline itself should run on a consistent data set and have consistent internals (such as model versions for `RunInference` workloads.) +1. The transforms used in the pipeline should be native to Beam *or* be lightweight and contain their source code in the pipeline code. \ + * The performance impact of non-Beam transforms should be minimized since the aim is to benchmark Beam transforms on Dataflow, not custom user code. +1. The pipeline itself should run on a consistent data set and have a consistent configuration. + * For example, a `RunInference` benchmark should use the same model and version for each run, never pulling the latest release of a model for use. + * The same focus on consistency extends to both the hardware and software configurations for the pipeline, from input data and model version all the way + to which Google Cloud Platform region the Dataflow pipeline runs in. All of this configuration should be explicit and available in the repository as part + of the benchmark's definition. 1. The pipeline should perform some sort of behavior that would be common enough for a user to create themselves * Effectively, we want to read data from a source, do some sort of transformation, then write that data elsewhere. No need to overcomplicate things. From 9fd1a689855bbaf08613765e6a00ac307091a17f Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 4 Feb 2025 10:52:33 -0500 Subject: [PATCH 8/8] remove errant char --- sdks/python/apache_beam/testing/benchmarks/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/README.md b/sdks/python/apache_beam/testing/benchmarks/README.md index c4a25dfa79f3..eacbd570e4c5 100644 --- a/sdks/python/apache_beam/testing/benchmarks/README.md +++ b/sdks/python/apache_beam/testing/benchmarks/README.md @@ -29,7 +29,7 @@ Writing a Dataflow Cost Benchmark to estimate the financial cost of executing a ### Choosing a Pipeline Pipelines that are worth benchmarking in terms of performance and cost have a few straightforward requirements. -1. The transforms used in the pipeline should be native to Beam *or* be lightweight and contain their source code in the pipeline code. \ +1. The transforms used in the pipeline should be native to Beam *or* be lightweight and contain their source code in the pipeline code. * The performance impact of non-Beam transforms should be minimized since the aim is to benchmark Beam transforms on Dataflow, not custom user code. 1. The pipeline itself should run on a consistent data set and have a consistent configuration. * For example, a `RunInference` benchmark should use the same model and version for each run, never pulling the latest release of a model for use.