From 4f8be01317846bb9a937c8c751c845ba0a8f9ab6 Mon Sep 17 00:00:00 2001 From: Maksim Yermakou Date: Thu, 7 Dec 2023 13:49:47 +0000 Subject: [PATCH] Update system test for Dataflow --- .../example_dataflow_streaming_python.py | 15 +-- .../dataflow/resources/streaming_wordcount.py | 99 ------------------- 2 files changed, 2 insertions(+), 112 deletions(-) delete mode 100644 tests/system/providers/google/cloud/dataflow/resources/streaming_wordcount.py diff --git a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py index f63e730eeb669..0489117e5d8ea 100644 --- a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py +++ b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py @@ -23,7 +23,6 @@ import os from datetime import datetime -from pathlib import Path from airflow.models.dag import DAG from airflow.providers.apache.beam.hooks.beam import BeamRunnerType @@ -34,20 +33,18 @@ PubSubCreateTopicOperator, PubSubDeleteTopicOperator, ) -from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator from airflow.utils.trigger_rule import TriggerRule ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") DAG_ID = "dataflow_native_python_streaming" +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" -PYTHON_FILE_NAME = "streaming_wordcount.py" GCS_TMP = f"gs://{BUCKET_NAME}/temp/" GCS_STAGING = f"gs://{BUCKET_NAME}/staging/" -GCS_PYTHON_SCRIPT = f"gs://{BUCKET_NAME}/{PYTHON_FILE_NAME}" -PYTHON_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / PYTHON_FILE_NAME) +GCS_PYTHON_SCRIPT = f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/streaming_wordcount.py" LOCATION = "europe-west3" TOPIC_ID = f"topic-{DAG_ID}" @@ -68,13 +65,6 @@ ) as dag: create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME) - upload_file = LocalFilesystemToGCSOperator( - task_id="upload_file_to_bucket", - src=PYTHON_FILE_LOCAL_PATH, - dst=PYTHON_FILE_NAME, - bucket=BUCKET_NAME, - ) - create_pub_sub_topic = PubSubCreateTopicOperator( task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False ) @@ -114,7 +104,6 @@ ( # TEST SETUP create_bucket - >> upload_file >> create_pub_sub_topic # TEST BODY >> start_streaming_python_job diff --git a/tests/system/providers/google/cloud/dataflow/resources/streaming_wordcount.py b/tests/system/providers/google/cloud/dataflow/resources/streaming_wordcount.py deleted file mode 100644 index 8bafab39f8bd5..0000000000000 --- a/tests/system/providers/google/cloud/dataflow/resources/streaming_wordcount.py +++ /dev/null @@ -1,99 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""A streaming word-counting workflow. -""" - -# type: ignore -from __future__ import annotations - -import argparse -import logging - -import apache_beam as beam -from apache_beam.examples.wordcount_with_metrics import WordExtractingDoFn -from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, StandardOptions -from apache_beam.transforms import window - - -def run(argv=None, save_main_session=True): - """Build and run the pipeline.""" - parser = argparse.ArgumentParser() - parser.add_argument( - "--output_topic", - required=True, - help=("Output PubSub topic of the form " '"projects//topics/".'), - ) - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument( - "--input_topic", help=("Input PubSub topic of the form " '"projects//topics/".') - ) - group.add_argument( - "--input_subscription", - help=("Input PubSub subscription of the form " '"projects//subscriptions/."'), - ) - known_args, pipeline_args = parser.parse_known_args(argv) - - # We use the save_main_session option because one or more DoFn's in this - # workflow rely on global context (e.g., a module imported at module level). - pipeline_options = PipelineOptions(pipeline_args) - pipeline_options.view_as(SetupOptions).save_main_session = save_main_session - pipeline_options.view_as(StandardOptions).streaming = True - with beam.Pipeline(options=pipeline_options) as p: - # Read from PubSub into a PCollection. - if known_args.input_subscription: - messages = p | beam.io.ReadFromPubSub( - subscription=known_args.input_subscription - ).with_output_types(bytes) - else: - messages = p | beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes) - - lines = messages | "decode" >> beam.Map(lambda x: x.decode("utf-8")) - - # Count the occurrences of each word. - def count_ones(word_ones): - (word, ones) = word_ones - return (word, sum(ones)) - - counts = ( - lines - | "split" >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) - | "pair_with_one" >> beam.Map(lambda x: (x, 1)) - | beam.WindowInto(window.FixedWindows(15, 0)) - | "group" >> beam.GroupByKey() - | "count" >> beam.Map(count_ones) - ) - - # Format the counts into a PCollection of strings. - def format_result(word_count): - (word, count) = word_count - return "%s: %d" % (word, count) - - output = ( - counts - | "format" >> beam.Map(format_result) - | "encode" >> beam.Map(lambda x: x.encode("utf-8")).with_output_types(bytes) - ) - - # Write to PubSub. - # pylint: disable=expression-not-assigned - output | beam.io.WriteToPubSub(known_args.output_topic) - - -if __name__ == "__main__": - logging.getLogger().setLevel(logging.INFO) - run()