Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenLineage support for some BQ operators #45422

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 131 additions & 34 deletions providers/src/airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from google.api_core.exceptions import Conflict
from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob, QueryJob, Row
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import RowIterator, Table, TableReference

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
Expand Down Expand Up @@ -1339,6 +1339,7 @@ def __init__(
self.cluster_fields = cluster_fields
self.table_resource = table_resource
self.impersonation_chain = impersonation_chain
self._table: Table | None = None
if exists_ok is not None:
warnings.warn(
"`exists_ok` parameter is deprecated, please use `if_exists`",
Expand Down Expand Up @@ -1369,6 +1370,7 @@ def execute(self, context: Context) -> None:

try:
self.log.info("Creating table")
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.create_empty_table(
project_id=self.project_id,
dataset_id=self.dataset_id,
Expand Down Expand Up @@ -1414,19 +1416,22 @@ def execute(self, context: Context) -> None:

BigQueryTableLink.persist(**persist_kwargs)

def get_openlineage_facets_on_complete(self, task_instance):
def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by create method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

table_info = self._table.to_api_repr()["tableReference"]
table_id = ".".join((table_info["projectId"], table_info["datasetId"], table_info["tableId"]))
if not self._table:
self.log.debug("OpenLineage did not find `self._table` attribute.")
return OperatorLineage()

output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=table_id,
name=f"{self._table.project}.{self._table.dataset_id}.{self._table.table_id}",
facets=get_facets_from_bq_table(self._table),
)

Expand Down Expand Up @@ -1649,6 +1654,7 @@ def __init__(
self.encryption_configuration = encryption_configuration
self.location = location
self.impersonation_chain = impersonation_chain
self._table: Table | None = None

def execute(self, context: Context) -> None:
bq_hook = BigQueryHook(
Expand All @@ -1657,15 +1663,16 @@ def execute(self, context: Context) -> None:
impersonation_chain=self.impersonation_chain,
)
if self.table_resource:
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.create_empty_table(
table_resource=self.table_resource,
)
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=self._table.to_api_repr()["tableReference"]["datasetId"],
project_id=self._table.to_api_repr()["tableReference"]["projectId"],
table_id=self._table.to_api_repr()["tableReference"]["tableId"],
dataset_id=self._table.dataset_id,
project_id=self._table.project,
table_id=self._table.table_id,
)
return

Expand Down Expand Up @@ -1716,31 +1723,29 @@ def execute(self, context: Context) -> None:
"encryptionConfiguration": self.encryption_configuration,
}

self._table = bq_hook.create_empty_table(
table_resource=table_resource,
)
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.create_empty_table(table_resource=table_resource)

BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=self._table.to_api_repr()["tableReference"]["datasetId"],
project_id=self._table.to_api_repr()["tableReference"]["projectId"],
table_id=self._table.to_api_repr()["tableReference"]["tableId"],
dataset_id=self._table.dataset_id,
project_id=self._table.project,
table_id=self._table.table_id,
)

def get_openlineage_facets_on_complete(self, task_instance):
def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by create method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

table_info = self._table.to_api_repr()["tableReference"]
table_id = ".".join((table_info["projectId"], table_info["datasetId"], table_info["tableId"]))
output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=table_id,
name=f"{self._table.project}.{self._table.dataset_id}.{self._table.table_id}",
facets=get_facets_from_bq_table(self._table),
)

Expand Down Expand Up @@ -2133,6 +2138,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.table_resource = table_resource
self.impersonation_chain = impersonation_chain
self._table: dict | None = None
super().__init__(**kwargs)

def execute(self, context: Context):
Expand All @@ -2141,7 +2147,8 @@ def execute(self, context: Context):
impersonation_chain=self.impersonation_chain,
)

table = bq_hook.update_table(
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.update_table(
table_resource=self.table_resource,
fields=self.fields,
dataset_id=self.dataset_id,
Expand All @@ -2152,12 +2159,30 @@ def execute(self, context: Context):
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["tableReference"]["datasetId"],
project_id=table["tableReference"]["projectId"],
table_id=table["tableReference"]["tableId"],
dataset_id=self._table["tableReference"]["datasetId"],
project_id=self._table["tableReference"]["projectId"],
table_id=self._table["tableReference"]["tableId"],
)

return self._table

def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by update method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

return table
table = Table.from_api_repr(self._table)
output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{table.project}.{table.dataset_id}.{table.table_id}",
facets=get_facets_from_bq_table(table),
)

return OperatorLineage(outputs=[output_dataset])


class BigQueryUpdateDatasetOperator(GoogleCloudBaseOperator):
Expand Down Expand Up @@ -2291,15 +2316,47 @@ def __init__(
self.ignore_if_missing = ignore_if_missing
self.location = location
self.impersonation_chain = impersonation_chain
self.hook: BigQueryHook | None = None

def execute(self, context: Context) -> None:
self.log.info("Deleting: %s", self.deletion_dataset_table)
hook = BigQueryHook(
# Save hook as attribute for further use by OpenLineage
self.hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)
hook.delete_table(table_id=self.deletion_dataset_table, not_found_ok=self.ignore_if_missing)
self.hook.delete_table(table_id=self.deletion_dataset_table, not_found_ok=self.ignore_if_missing)

def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we need default project_id from hook."""
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
PreviousIdentifier,
)
from airflow.providers.google.cloud.openlineage.utils import BIGQUERY_NAMESPACE
from airflow.providers.openlineage.extractors import OperatorLineage

bq_table_id = str(
TableReference.from_string(self.deletion_dataset_table, default_project=self.hook.project_id)
)
ds = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=bq_table_id,
facets={
"lifecycleStateChange": LifecycleStateChangeDatasetFacet(
lifecycleStateChange=LifecycleStateChange.DROP.value,
previousIdentifier=PreviousIdentifier(
namespace=BIGQUERY_NAMESPACE,
name=bq_table_id,
),
)
},
)

return OperatorLineage(inputs=[ds])


class BigQueryUpsertTableOperator(GoogleCloudBaseOperator):
Expand Down Expand Up @@ -2358,6 +2415,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.location = location
self.impersonation_chain = impersonation_chain
self._table: dict | None = None

def execute(self, context: Context) -> None:
self.log.info("Upserting Dataset: %s with table_resource: %s", self.dataset_id, self.table_resource)
Expand All @@ -2366,19 +2424,38 @@ def execute(self, context: Context) -> None:
location=self.location,
impersonation_chain=self.impersonation_chain,
)
table = hook.run_table_upsert(
# Save table as attribute for further use by OpenLineage
self._table = hook.run_table_upsert(
dataset_id=self.dataset_id,
table_resource=self.table_resource,
project_id=self.project_id,
)
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["tableReference"]["datasetId"],
project_id=table["tableReference"]["projectId"],
table_id=table["tableReference"]["tableId"],
dataset_id=self._table["tableReference"]["datasetId"],
project_id=self._table["tableReference"]["projectId"],
table_id=self._table["tableReference"]["tableId"],
)

def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by upsert method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

table = Table.from_api_repr(self._table)
output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{table.project}.{table.dataset_id}.{table.table_id}",
facets=get_facets_from_bq_table(table),
)

return OperatorLineage(outputs=[output_dataset])


class BigQueryUpdateTableSchemaOperator(GoogleCloudBaseOperator):
"""
Expand Down Expand Up @@ -2466,14 +2543,16 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.location = location
self._table: dict | None = None
super().__init__(**kwargs)

def execute(self, context: Context):
bq_hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, location=self.location
)

table = bq_hook.update_table_schema(
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.update_table_schema(
schema_fields_updates=self.schema_fields_updates,
include_policy_tags=self.include_policy_tags,
dataset_id=self.dataset_id,
Expand All @@ -2484,11 +2563,29 @@ def execute(self, context: Context):
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["tableReference"]["datasetId"],
project_id=table["tableReference"]["projectId"],
table_id=table["tableReference"]["tableId"],
dataset_id=self._table["tableReference"]["datasetId"],
project_id=self._table["tableReference"]["projectId"],
table_id=self._table["tableReference"]["tableId"],
)
return table
return self._table

def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by update method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

table = Table.from_api_repr(self._table)
output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{table.project}.{table.dataset_id}.{table.table_id}",
facets=get_facets_from_bq_table(table),
)

return OperatorLineage(outputs=[output_dataset])


class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOperatorOpenLineageMixin):
Expand Down
Loading