Skip to content

Commit

Permalink
tests: Add OpenLineage check in gcs_upload_download system test (apac…
Browse files Browse the repository at this point in the history
…he#45681)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Jan 22, 2025
1 parent 96d1c81 commit 11c8667
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.utils.trigger_rule import TriggerRule

from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
from providers.tests.system.openlineage.operator import OpenLineageTestOperator

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
Expand Down Expand Up @@ -82,14 +83,19 @@
# [END howto_operator_gcs_delete_bucket]
delete_bucket.trigger_rule = TriggerRule.ALL_DONE

check_events = OpenLineageTestOperator(
task_id="check_openlineage_events",
file_path=str(Path(__file__).parent / "resources" / "openlineage_gcs_upload_download.json"),
)

(
# TEST SETUP
create_bucket
>> upload_file
# TEST BODY
>> download_file
# TEST TEARDOWN
>> delete_bucket
>> [delete_bucket, check_events]
)

from tests_common.test_utils.watcher import watcher
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
[
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "gcs_upload_download.upload_file",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
},
"inputs": [
{
"namespace": "file",
"name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}"
}
],
"outputs": [
{
"namespace": "gs://bucket_gcs_upload_download_default",
"name": "example_upload.txt"
}
]
},
{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "gcs_upload_download.upload_file",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
},
"inputs": [
{
"namespace": "file",
"name": "{{ result.endswith('airflow/providers/tests/system/google/cloud/gcs/resources/example_upload.txt') }}"
}
],
"outputs": [
{
"namespace": "gs://bucket_gcs_upload_download_default",
"name": "example_upload.txt"
}
]
},
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "gcs_upload_download.download_file",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
},
"inputs": [
{
"namespace": "gs://bucket_gcs_upload_download_default",
"name": "example_upload.txt"
}
],
"outputs": [
{
"namespace": "file",
"name": "example_upload_download.txt"
}
]
},

{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "gcs_upload_download.download_file",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
},
"inputs": [
{
"namespace": "gs://bucket_gcs_upload_download_default",
"name": "example_upload.txt"
}
],
"outputs": [
{
"namespace": "file",
"name": "example_upload_download.txt"
}
]
}
]
2 changes: 2 additions & 0 deletions providers/tests/system/google/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import pytest

from providers.tests.system.openlineage.conftest import set_transport_variable # noqa: F401

REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",)


Expand Down
13 changes: 12 additions & 1 deletion providers/tests/system/openlineage/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
import logging
import os
import re
import uuid
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
Expand Down Expand Up @@ -63,6 +64,15 @@ def is_uuid(result: Any) -> str:
return "false"


def regex_match(result: Any, pattern: str) -> str:
try:
if re.match(pattern=pattern, string=result):
return "true"
except Exception:
pass
return "false"


def env_var(var: str, default: str | None = None) -> str:
"""
Use this jinja method to access the environment variable named 'var'.
Expand Down Expand Up @@ -97,6 +107,7 @@ def setup_jinja() -> Environment:
env.globals["any"] = any
env.globals["is_datetime"] = is_datetime
env.globals["is_uuid"] = is_uuid
env.globals["regex_match"] = regex_match
env.globals["env_var"] = env_var
env.globals["not_match"] = not_match
env.filters["url_scheme_authority"] = url_scheme_authority
Expand Down Expand Up @@ -151,7 +162,7 @@ def match(expected, result, env: Environment) -> bool:
except ValueError as e:
log.error("Error rendering jinja template %s: %s", expected, e)
return False
if rendered == "true" or rendered == result:
if str(rendered).lower() == "true" or rendered == result:
return True
log.error("Rendered value %s does not equal 'true' or %s", rendered, result)
return False
Expand Down

0 comments on commit 11c8667

Please sign in to comment.