Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix on_warning_callback on DbtSourceGcpCloudRunJobOperator and DbtTestGcpCloudRunJobOperator #1532

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def build_cmd(
def build_and_run_cmd(
self,
context: Context,
cmd_flags: list[str],
cmd_flags: list[str] | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we introducing this change to the base operator?

run_as_async: bool = False,
async_context: dict[str, Any] | None = None,
) -> Any:
Expand Down
132 changes: 109 additions & 23 deletions cosmos/operators/gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import annotations

import inspect
import time
from abc import ABC
from typing import Any, Callable, Sequence

from airflow.models import TaskInstance
from airflow.utils.context import Context

from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.dbt.parser.output import extract_log_issues
from cosmos.operators.base import (
AbstractDbtBase,
DbtBuildMixin,
Expand All @@ -20,12 +23,15 @@
DbtTestMixin,
)

logger = get_logger(__name__)
DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"

DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {}

try:
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator
from google.cloud import logging
from google.cloud.exceptions import GoogleCloudError

# The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.13.0
init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__)
Expand Down Expand Up @@ -102,7 +108,37 @@ def build_and_run_cmd(
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
result = CloudRunExecuteJobOperator.execute(self, context)
logger.info(result)

# Pull Google Cloud Run job logs from Google Cloud Logging to Airflow logs
execution_name = result["latest_created_execution"]["name"]
execution_time = result["latest_created_execution"]["create_time"]
filter_ = f'resource.type = "cloud_run_job" AND resource.labels.job_name = "{self.job_name}" AND timestamp>="{execution_time}"'

self.log.info("Attempt to retrieve logs from Google Cloud Logging")
time.sleep(5) # Add sleep time to make sure all the job logs are available when we do the request

# List to store log messages
log_messages = []

try:
client = logging.Client(project=self.project_id)
# Search for logs associated with the job name
entries = client.list_entries(filter_=filter_)
self.log.info(f"Listing logs of the execution {execution_name}:")
if not entries:
self.log.warning("No logs found for the Cloud Run job.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often does this happen? Should it be a warning or just an info?

else:
for entry in entries:
# Search for logs associated with the job executed
if entry.labels["run.googleapis.com/execution_name"] == execution_name:
log_messages.append(entry.payload)
self.log.info(f"Cloud Run Log: {entry.payload}")
return log_messages

except GoogleCloudError as e:
# Catch Google Cloud-related errors (e.g., permission issues)
self.log.warning(f"Warning: Error retrieving logs from Google Cloud Logging: {str(e)}")
# Continue without raising an error, just log the warning
Comment on lines +112 to +141
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth having a method with this logic, something like fetch_remote_logs


def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
Expand Down Expand Up @@ -165,15 +201,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtSourceGcpCloudRunJobOperator(DbtSourceMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core source freshness command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core run command.
Expand All @@ -185,17 +212,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core test command.
"""

def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None:
super().__init__(**kwargs)
# as of now, on_warning_callback in docker executor does nothing
self.on_warning_callback = on_warning_callback


class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core run-operation command.
Expand All @@ -218,3 +234,73 @@ class DbtCloneGcpCloudRunJobOperator(DbtCloneMixin, DbtGcpCloudRunJobBaseOperato

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)


class DbtWarningGcpCloudRunJobOperator(DbtGcpCloudRunJobBaseOperator, ABC):
def __init__(self, on_warning_callback: Callable[..., Any] | None = None, *args: Any, **kwargs: Any) -> None:
if not on_warning_callback:
super().__init__(*args, **kwargs)
else:
super().__init__(*args, **kwargs)
self.on_warning_callback = on_warning_callback

def _handle_warnings(self, logs: list[str], context: Context) -> None:
"""
Handles warnings by extracting log issues, creating additional context, and calling the
on_warning_callback with the updated context.

:param logs: The log list with the cleaned Cloud Run Job logs.
:param context: The original airflow context in which the build and run command was executed.
"""
test_names, test_results = extract_log_issues(logs)

warning_context = dict(context)
warning_context["test_names"] = test_names
warning_context["test_results"] = test_results

self.on_warning_callback(warning_context)

def execute(self, context: Context, **kwargs: Any) -> None:
result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags())
log_list = [log for log in result if type(log) == str] # clean log list with only string type values

if not (
isinstance(context["task_instance"], TaskInstance)
and (
isinstance(context["task_instance"].task, DbtTestGcpCloudRunJobOperator)
or isinstance(context["task_instance"].task, DbtSourceGcpCloudRunJobOperator)
Comment on lines +270 to +271
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a bit weird that we check for subclasses here. It feels this logic should be handled directly in the subclasses, not in the parent class

)
):
return

should_trigger_callback = all(
[
log_list,
self.on_warning_callback,
DBT_NO_TESTS_MSG not in log_list[-2],
DBT_WARN_MSG in log_list[-2],
]
)

if should_trigger_callback:
warnings = int(log_list[-2].split(f"{DBT_WARN_MSG}=")[1].split()[0])
if warnings > 0:
self._handle_warnings(log_list, context)


class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtWarningGcpCloudRunJobOperator):
"""
Executes a dbt core test command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtSourceGcpCloudRunJobOperator(DbtSourceMixin, DbtWarningGcpCloudRunJobOperator):
"""
Executes a dbt core source freshness command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
Loading