From 11c866748860a4b0e6def43d2b559e8565207ecd Mon Sep 17 00:00:00 2001
From: Kacper Muda <mudakacper@gmail.com>
Date: Wed, 22 Jan 2025 13:04:34 +0100
Subject: [PATCH] tests: Add OpenLineage check in gcs_upload_download system
 test (#45681)

Signed-off-by: Kacper Muda <mudakacper@gmail.com>
---
 .../cloud/gcs/example_gcs_upload_download.py  |   8 +-
 .../openlineage_gcs_upload_download.json      | 123 ++++++++++++++++++
 providers/tests/system/google/conftest.py     |   2 +
 .../tests/system/openlineage/operator.py      |  13 +-
 4 files changed, 144 insertions(+), 2 deletions(-)
 create mode 100644 providers/tests/system/google/cloud/gcs/resources/openlineage_gcs_upload_download.json

diff --git a/providers/tests/system/google/cloud/gcs/example_gcs_upload_download.py b/providers/tests/system/google/cloud/gcs/example_gcs_upload_download.py
index abde62d35bfaa..06958e5c57c0f 100644
--- a/providers/tests/system/google/cloud/gcs/example_gcs_upload_download.py
+++ b/providers/tests/system/google/cloud/gcs/example_gcs_upload_download.py
@@ -32,6 +32,7 @@
 from airflow.utils.trigger_rule import TriggerRule
 
 from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+from providers.tests.system.openlineage.operator import OpenLineageTestOperator
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
@@ -82,6 +83,11 @@
     # [END howto_operator_gcs_delete_bucket]
     delete_bucket.trigger_rule = TriggerRule.ALL_DONE
 
+    check_events = OpenLineageTestOperator(
+        task_id="check_openlineage_events",
+        file_path=str(Path(__file__).parent / "resources" / "openlineage_gcs_upload_download.json"),
+    )
+
     (
         # TEST SETUP
         create_bucket
@@ -89,7 +95,7 @@
         # TEST BODY
         >> download_file
         # TEST TEARDOWN
-        >> delete_bucket
+        >> [delete_bucket, check_events]
     )
 
     from tests_common.test_utils.watcher import watcher
diff --git a/providers/tests/system/google/cloud/gcs/resources/openlineage_gcs_upload_download.json b/providers/tests/system/google/cloud/gcs/resources/openlineage_gcs_upload_download.json
new file mode 100644
index 0000000000000..b9aac84e5237d
--- /dev/null
+++ b/providers/tests/system/google/cloud/gcs/resources/openlineage_gcs_upload_download.json
@@ -0,0 +1,123 @@
+[
+    {
+        "eventType": "START",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "gcs_upload_download.upload_file",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        },
+        "inputs": [
+            {
+                "namespace": "file",
+                "name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}"
+            }
+        ],
+        "outputs": [
+            {
+                "namespace": "gs://bucket_gcs_upload_download_default",
+                "name": "example_upload.txt"
+            }
+        ]
+    },
+    {
+        "eventType": "COMPLETE",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "gcs_upload_download.upload_file",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        },
+        "inputs": [
+            {
+                "namespace": "file",
+                "name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}"
+            }
+        ],
+        "outputs": [
+            {
+                "namespace": "gs://bucket_gcs_upload_download_default",
+                "name": "example_upload.txt"
+            }
+        ]
+    },
+    {
+        "eventType": "START",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "gcs_upload_download.download_file",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        },
+        "inputs": [
+            {
+                "namespace": "gs://bucket_gcs_upload_download_default",
+                "name": "example_upload.txt"
+            }
+        ],
+        "outputs": [
+            {
+                "namespace": "file",
+                "name": "example_upload_download.txt"
+            }
+        ]
+    },
+
+    {
+        "eventType": "COMPLETE",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "gcs_upload_download.download_file",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        },
+        "inputs": [
+            {
+                "namespace": "gs://bucket_gcs_upload_download_default",
+                "name": "example_upload.txt"
+            }
+        ],
+        "outputs": [
+            {
+                "namespace": "file",
+                "name": "example_upload_download.txt"
+            }
+        ]
+    }
+]
diff --git a/providers/tests/system/google/conftest.py b/providers/tests/system/google/conftest.py
index cfd74f1387bae..a43620d669f2d 100644
--- a/providers/tests/system/google/conftest.py
+++ b/providers/tests/system/google/conftest.py
@@ -18,6 +18,8 @@
 
 import pytest
 
+from providers.tests.system.openlineage.conftest import set_transport_variable  # noqa: F401
+
 REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",)
 
 
diff --git a/providers/tests/system/openlineage/operator.py b/providers/tests/system/openlineage/operator.py
index 28995b1f44eaf..f371cf1de25c9 100644
--- a/providers/tests/system/openlineage/operator.py
+++ b/providers/tests/system/openlineage/operator.py
@@ -20,6 +20,7 @@
 import json
 import logging
 import os
+import re
 import uuid
 from typing import TYPE_CHECKING, Any
 from urllib.parse import urlparse
@@ -63,6 +64,15 @@ def is_uuid(result: Any) -> str:
     return "false"
 
 
+def regex_match(result: Any, pattern: str) -> str:
+    try:
+        if re.match(pattern=pattern, string=result):
+            return "true"
+    except Exception:
+        pass
+    return "false"
+
+
 def env_var(var: str, default: str | None = None) -> str:
     """
     Use this jinja method to access the environment variable named 'var'.
@@ -97,6 +107,7 @@ def setup_jinja() -> Environment:
     env.globals["any"] = any
     env.globals["is_datetime"] = is_datetime
     env.globals["is_uuid"] = is_uuid
+    env.globals["regex_match"] = regex_match
     env.globals["env_var"] = env_var
     env.globals["not_match"] = not_match
     env.filters["url_scheme_authority"] = url_scheme_authority
@@ -151,7 +162,7 @@ def match(expected, result, env: Environment) -> bool:
             except ValueError as e:
                 log.error("Error rendering jinja template %s: %s", expected, e)
                 return False
-            if rendered == "true" or rendered == result:
+            if str(rendered).lower() == "true" or rendered == result:
                 return True
             log.error("Rendered value %s does not equal 'true' or %s", rendered, result)
             return False