Skip to content

Commit

Permalink
feat: Add OpenLineage support for BigQueryDataTransferServiceStartTra…
Browse files Browse the repository at this point in the history
…nsferRunsOperator (#45801)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Jan 24, 2025
1 parent 96ca2a6 commit 0639343
Show file tree
Hide file tree
Showing 2 changed files with 374 additions and 3 deletions.
121 changes: 118 additions & 3 deletions providers/src/airflow/providers/google/cloud/operators/bigquery_dts.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.deferrable = deferrable
self._transfer_run: dict = {}

@cached_property
def hook(self) -> BiqQueryDataTransferServiceHook:
Expand Down Expand Up @@ -339,12 +340,13 @@ def execute(self, context: Context):
self.xcom_push(context, key="run_id", value=run_id)

if not self.deferrable:
result = self._wait_for_transfer_to_be_done(
# Save as attribute for further use by OpenLineage
self._transfer_run = self._wait_for_transfer_to_be_done(
run_id=run_id,
transfer_config_id=transfer_config["config_id"],
)
self.log.info("Transfer run %s submitted successfully.", run_id)
return result
return self._transfer_run

self.defer(
trigger=BigQueryDataTransferRunTrigger(
Expand Down Expand Up @@ -412,4 +414,117 @@ def execute_completed(self, context: Context, event: dict):
event["message"],
)

return TransferRun.to_dict(transfer_run)
# Save as attribute for further use by OpenLineage
self._transfer_run = TransferRun.to_dict(transfer_run)
return self._transfer_run

def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we need a run config to extract information."""
from urllib.parse import urlsplit

from airflow.providers.common.compat.openlineage.facet import Dataset, ErrorMessageRunFacet
from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
extract_ds_name_from_gcs_path,
)
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.sqlparser import DatabaseInfo, SQLParser

if not self._transfer_run:
self.log.debug("No BigQuery Data Transfer configuration was found by OpenLineage.")
return OperatorLineage()

data_source_id = self._transfer_run["data_source_id"]
dest_dataset_id = self._transfer_run["destination_dataset_id"]
params = self._transfer_run["params"]

input_datasets, output_datasets = [], []
run_facets, job_facets = {}, {}
if data_source_id in ("google_cloud_storage", "amazon_s3", "azure_blob_storage"):
if data_source_id == "google_cloud_storage":
bucket, path = _parse_gcs_url(params["data_path_template"]) # gs://bucket...
namespace = f"gs://{bucket}"
name = extract_ds_name_from_gcs_path(path)
elif data_source_id == "amazon_s3":
parsed_url = urlsplit(params["data_path"]) # s3://bucket...
namespace = f"s3://{parsed_url.netloc}"
name = extract_ds_name_from_gcs_path(parsed_url.path)
else: # azure_blob_storage
storage_account = params["storage_account"]
container = params["container"]
namespace = f"abfss://{container}@{storage_account}.dfs.core.windows.net"
name = extract_ds_name_from_gcs_path(params["data_path"])

input_datasets.append(Dataset(namespace=namespace, name=name))
dest_table_name = params["destination_table_name_template"]
output_datasets.append(
Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{self.project_id}.{dest_dataset_id}.{dest_table_name}",
)
)
elif data_source_id in ("postgresql", "oracle", "mysql"):
scheme = data_source_id if data_source_id != "postgresql" else "postgres"
host = params["connector.endpoint.host"]
port = params["connector.endpoint.port"]

for asset in params["assets"]:
# MySQL: db/table; Other: db/schema/table;
table_name = asset.split("/")[-1]

input_datasets.append(
Dataset(namespace=f"{scheme}://{host}:{int(port)}", name=asset.replace("/", "."))
)
output_datasets.append(
Dataset(
namespace=BIGQUERY_NAMESPACE, name=f"{self.project_id}.{dest_dataset_id}.{table_name}"
)
)
elif data_source_id == "scheduled_query":
bq_db_info = DatabaseInfo(
scheme="bigquery",
authority=None,
database=self.project_id,
)
parser_result = SQLParser("bigquery").generate_openlineage_metadata_from_sql(
sql=params["query"],
database_info=bq_db_info,
database=self.project_id,
use_connection=False,
hook=None, # Hook is not used when use_connection=False
sqlalchemy_engine=None,
)
if parser_result.inputs:
input_datasets.extend(parser_result.inputs)
if parser_result.outputs:
output_datasets.extend(parser_result.outputs)
if parser_result.job_facets:
job_facets = {**job_facets, **parser_result.job_facets}
if parser_result.run_facets:
run_facets = {**run_facets, **parser_result.run_facets}
dest_table_name = params.get("destination_table_name_template")
if dest_table_name:
output_datasets.append(
Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{self.project_id}.{dest_dataset_id}.{dest_table_name}",
)
)
else:
self.log.debug(
"BigQuery Data Transfer data_source_id `%s` is not supported by OpenLineage.", data_source_id
)
return OperatorLineage()

error_status = self._transfer_run.get("error_status")
if error_status and str(error_status["code"]) != "0":
run_facets["errorMessage"] = ErrorMessageRunFacet(
message=error_status["message"],
programmingLanguage="python",
stackTrace=str(error_status["details"]),
)

return OperatorLineage(
inputs=input_datasets, outputs=output_datasets, job_facets=job_facets, run_facets=run_facets
)
Loading

0 comments on commit 0639343

Please sign in to comment.