From bdbfba3019078870f35784d503a82394e194c208 Mon Sep 17 00:00:00 2001 From: anai-s Date: Mon, 10 Feb 2025 21:35:35 +0100 Subject: [PATCH] fix-1506: fix on_warning_callback --- cosmos/operators/base.py | 2 +- cosmos/operators/gcp_cloud_run_job.py | 132 +++++++++++++++++++++----- 2 files changed, 110 insertions(+), 24 deletions(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 18019ab92..e3a544c1b 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -265,7 +265,7 @@ def build_cmd( def build_and_run_cmd( self, context: Context, - cmd_flags: list[str], + cmd_flags: list[str] | None = None, run_as_async: bool = False, async_context: dict[str, Any] | None = None, ) -> Any: diff --git a/cosmos/operators/gcp_cloud_run_job.py b/cosmos/operators/gcp_cloud_run_job.py index e24191d6a..8be86959e 100644 --- a/cosmos/operators/gcp_cloud_run_job.py +++ b/cosmos/operators/gcp_cloud_run_job.py @@ -1,12 +1,15 @@ from __future__ import annotations import inspect +import time +from abc import ABC from typing import Any, Callable, Sequence +from airflow.models import TaskInstance from airflow.utils.context import Context from cosmos.config import ProfileConfig -from cosmos.log import get_logger +from cosmos.dbt.parser.output import extract_log_issues from cosmos.operators.base import ( AbstractDbtBase, DbtBuildMixin, @@ -20,12 +23,15 @@ DbtTestMixin, ) -logger = get_logger(__name__) +DBT_NO_TESTS_MSG = "Nothing to do" +DBT_WARN_MSG = "WARN" DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} try: from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator + from google.cloud import logging + from google.cloud.exceptions import GoogleCloudError # The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.13.0 init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__) @@ -102,7 +108,37 @@ def build_and_run_cmd( self.build_command(context, cmd_flags) self.log.info(f"Running command: {self.command}") result = CloudRunExecuteJobOperator.execute(self, context) - logger.info(result) + + # Pull Google Cloud Run job logs from Google Cloud Logging to Airflow logs + execution_name = result["latest_created_execution"]["name"] + execution_time = result["latest_created_execution"]["create_time"] + filter_ = f'resource.type = "cloud_run_job" AND resource.labels.job_name = "{self.job_name}" AND timestamp>="{execution_time}"' + + self.log.info("Attempt to retrieve logs from Google Cloud Logging") + time.sleep(5) # Add sleep time to make sure all the job logs are available when we do the request + + # List to store log messages + log_messages = [] + + try: + client = logging.Client(project=self.project_id) + # Search for logs associated with the job name + entries = client.list_entries(filter_=filter_) + self.log.info(f"Listing logs of the execution {execution_name}:") + if not entries: + self.log.warning("No logs found for the Cloud Run job.") + else: + for entry in entries: + # Search for logs associated with the job executed + if entry.labels["run.googleapis.com/execution_name"] == execution_name: + log_messages.append(entry.payload) + self.log.info(f"Cloud Run Log: {entry.payload}") + return log_messages + + except GoogleCloudError as e: + # Catch Google Cloud-related errors (e.g., permission issues) + self.log.warning(f"Warning: Error retrieving logs from Google Cloud Logging: {str(e)}") + # Continue without raising an error, just log the warning def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None: # For the first round, we're going to assume that the command is dbt @@ -165,15 +201,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) -class DbtSourceGcpCloudRunJobOperator(DbtSourceMixin, DbtGcpCloudRunJobBaseOperator): - """ - Executes a dbt core source freshness command. - """ - - def __init__(self, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - - class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator): """ Executes a dbt core run command. @@ -185,17 +212,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) -class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtGcpCloudRunJobBaseOperator): - """ - Executes a dbt core test command. - """ - - def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None: - super().__init__(**kwargs) - # as of now, on_warning_callback in docker executor does nothing - self.on_warning_callback = on_warning_callback - - class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRunJobBaseOperator): """ Executes a dbt core run-operation command. @@ -218,3 +234,73 @@ class DbtCloneGcpCloudRunJobOperator(DbtCloneMixin, DbtGcpCloudRunJobBaseOperato def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) + + +class DbtWarningGcpCloudRunJobOperator(DbtGcpCloudRunJobBaseOperator, ABC): + def __init__(self, on_warning_callback: Callable[..., Any] | None = None, *args: Any, **kwargs: Any) -> None: + if not on_warning_callback: + super().__init__(*args, **kwargs) + else: + super().__init__(*args, **kwargs) + self.on_warning_callback = on_warning_callback + + def _handle_warnings(self, logs: list[str], context: Context) -> None: + """ + Handles warnings by extracting log issues, creating additional context, and calling the + on_warning_callback with the updated context. + + :param logs: The log list with the cleaned Cloud Run Job logs. + :param context: The original airflow context in which the build and run command was executed. + """ + test_names, test_results = extract_log_issues(logs) + + warning_context = dict(context) + warning_context["test_names"] = test_names + warning_context["test_results"] = test_results + + self.on_warning_callback(warning_context) + + def execute(self, context: Context, **kwargs: Any) -> None: + result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) + log_list = [log for log in result if type(log) == str] # clean log list with only string type values + + if not ( + isinstance(context["task_instance"], TaskInstance) + and ( + isinstance(context["task_instance"].task, DbtTestGcpCloudRunJobOperator) + or isinstance(context["task_instance"].task, DbtSourceGcpCloudRunJobOperator) + ) + ): + return + + should_trigger_callback = all( + [ + log_list, + self.on_warning_callback, + DBT_NO_TESTS_MSG not in log_list[-2], + DBT_WARN_MSG in log_list[-2], + ] + ) + + if should_trigger_callback: + warnings = int(log_list[-2].split(f"{DBT_WARN_MSG}=")[1].split()[0]) + if warnings > 0: + self._handle_warnings(log_list, context) + + +class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtWarningGcpCloudRunJobOperator): + """ + Executes a dbt core test command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtSourceGcpCloudRunJobOperator(DbtSourceMixin, DbtWarningGcpCloudRunJobOperator): + """ + Executes a dbt core source freshness command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs)