diff --git a/providers/tests/system/google/cloud/gcs/example_gcs_upload_download.py b/providers/tests/system/google/cloud/gcs/example_gcs_upload_download.py index abde62d35bfaa..06958e5c57c0f 100644 --- a/providers/tests/system/google/cloud/gcs/example_gcs_upload_download.py +++ b/providers/tests/system/google/cloud/gcs/example_gcs_upload_download.py @@ -32,6 +32,7 @@ from airflow.utils.trigger_rule import TriggerRule from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID +from providers.tests.system.openlineage.operator import OpenLineageTestOperator ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID @@ -82,6 +83,11 @@ # [END howto_operator_gcs_delete_bucket] delete_bucket.trigger_rule = TriggerRule.ALL_DONE + check_events = OpenLineageTestOperator( + task_id="check_openlineage_events", + file_path=str(Path(__file__).parent / "resources" / "openlineage_gcs_upload_download.json"), + ) + ( # TEST SETUP create_bucket @@ -89,7 +95,7 @@ # TEST BODY >> download_file # TEST TEARDOWN - >> delete_bucket + >> [delete_bucket, check_events] ) from tests_common.test_utils.watcher import watcher diff --git a/providers/tests/system/google/cloud/gcs/resources/openlineage_gcs_upload_download.json b/providers/tests/system/google/cloud/gcs/resources/openlineage_gcs_upload_download.json new file mode 100644 index 0000000000000..b9aac84e5237d --- /dev/null +++ b/providers/tests/system/google/cloud/gcs/resources/openlineage_gcs_upload_download.json @@ -0,0 +1,123 @@ +[ + { + "eventType": "START", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "gcs_upload_download.upload_file", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + }, + "inputs": [ + { + "namespace": "file", + "name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}" + } + ], + "outputs": [ + { + "namespace": "gs://bucket_gcs_upload_download_default", + "name": "example_upload.txt" + } + ] + }, + { + "eventType": "COMPLETE", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "gcs_upload_download.upload_file", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + }, + "inputs": [ + { + "namespace": "file", + "name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}" + } + ], + "outputs": [ + { + "namespace": "gs://bucket_gcs_upload_download_default", + "name": "example_upload.txt" + } + ] + }, + { + "eventType": "START", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "gcs_upload_download.download_file", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + }, + "inputs": [ + { + "namespace": "gs://bucket_gcs_upload_download_default", + "name": "example_upload.txt" + } + ], + "outputs": [ + { + "namespace": "file", + "name": "example_upload_download.txt" + } + ] + }, + + { + "eventType": "COMPLETE", + "eventTime": "{{ is_datetime(result) }}", + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "gcs_upload_download.download_file", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "TASK", + "processingType": "BATCH" + } + } + }, + "inputs": [ + { + "namespace": "gs://bucket_gcs_upload_download_default", + "name": "example_upload.txt" + } + ], + "outputs": [ + { + "namespace": "file", + "name": "example_upload_download.txt" + } + ] + } +] diff --git a/providers/tests/system/google/conftest.py b/providers/tests/system/google/conftest.py index cfd74f1387bae..a43620d669f2d 100644 --- a/providers/tests/system/google/conftest.py +++ b/providers/tests/system/google/conftest.py @@ -18,6 +18,8 @@ import pytest +from providers.tests.system.openlineage.conftest import set_transport_variable # noqa: F401 + REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",) diff --git a/providers/tests/system/openlineage/operator.py b/providers/tests/system/openlineage/operator.py index 28995b1f44eaf..f371cf1de25c9 100644 --- a/providers/tests/system/openlineage/operator.py +++ b/providers/tests/system/openlineage/operator.py @@ -20,6 +20,7 @@ import json import logging import os +import re import uuid from typing import TYPE_CHECKING, Any from urllib.parse import urlparse @@ -63,6 +64,15 @@ def is_uuid(result: Any) -> str: return "false" +def regex_match(result: Any, pattern: str) -> str: + try: + if re.match(pattern=pattern, string=result): + return "true" + except Exception: + pass + return "false" + + def env_var(var: str, default: str | None = None) -> str: """ Use this jinja method to access the environment variable named 'var'. @@ -97,6 +107,7 @@ def setup_jinja() -> Environment: env.globals["any"] = any env.globals["is_datetime"] = is_datetime env.globals["is_uuid"] = is_uuid + env.globals["regex_match"] = regex_match env.globals["env_var"] = env_var env.globals["not_match"] = not_match env.filters["url_scheme_authority"] = url_scheme_authority @@ -151,7 +162,7 @@ def match(expected, result, env: Environment) -> bool: except ValueError as e: log.error("Error rendering jinja template %s: %s", expected, e) return False - if rendered == "true" or rendered == result: + if str(rendered).lower() == "true" or rendered == result: return True log.error("Rendered value %s does not equal 'true' or %s", rendered, result) return False