-
Notifications
You must be signed in to change notification settings - Fork 193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix on_warning_callback
on DbtSourceGcpCloudRunJobOperator
and DbtTestGcpCloudRunJobOperator
#1532
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,15 @@ | ||
from __future__ import annotations | ||
|
||
import inspect | ||
import time | ||
from abc import ABC | ||
from typing import Any, Callable, Sequence | ||
|
||
from airflow.models import TaskInstance | ||
from airflow.utils.context import Context | ||
|
||
from cosmos.config import ProfileConfig | ||
from cosmos.log import get_logger | ||
from cosmos.dbt.parser.output import extract_log_issues | ||
from cosmos.operators.base import ( | ||
AbstractDbtBase, | ||
DbtBuildMixin, | ||
|
@@ -20,12 +23,15 @@ | |
DbtTestMixin, | ||
) | ||
|
||
logger = get_logger(__name__) | ||
DBT_NO_TESTS_MSG = "Nothing to do" | ||
DBT_WARN_MSG = "WARN" | ||
|
||
DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} | ||
|
||
try: | ||
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator | ||
from google.cloud import logging | ||
from google.cloud.exceptions import GoogleCloudError | ||
|
||
# The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.13.0 | ||
init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__) | ||
|
@@ -102,7 +108,37 @@ def build_and_run_cmd( | |
self.build_command(context, cmd_flags) | ||
self.log.info(f"Running command: {self.command}") | ||
result = CloudRunExecuteJobOperator.execute(self, context) | ||
logger.info(result) | ||
|
||
# Pull Google Cloud Run job logs from Google Cloud Logging to Airflow logs | ||
execution_name = result["latest_created_execution"]["name"] | ||
execution_time = result["latest_created_execution"]["create_time"] | ||
filter_ = f'resource.type = "cloud_run_job" AND resource.labels.job_name = "{self.job_name}" AND timestamp>="{execution_time}"' | ||
|
||
self.log.info("Attempt to retrieve logs from Google Cloud Logging") | ||
time.sleep(5) # Add sleep time to make sure all the job logs are available when we do the request | ||
|
||
# List to store log messages | ||
log_messages = [] | ||
|
||
try: | ||
client = logging.Client(project=self.project_id) | ||
# Search for logs associated with the job name | ||
entries = client.list_entries(filter_=filter_) | ||
self.log.info(f"Listing logs of the execution {execution_name}:") | ||
if not entries: | ||
self.log.warning("No logs found for the Cloud Run job.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How often does this happen? Should it be a warning or just an info? |
||
else: | ||
for entry in entries: | ||
# Search for logs associated with the job executed | ||
if entry.labels["run.googleapis.com/execution_name"] == execution_name: | ||
log_messages.append(entry.payload) | ||
self.log.info(f"Cloud Run Log: {entry.payload}") | ||
return log_messages | ||
|
||
except GoogleCloudError as e: | ||
# Catch Google Cloud-related errors (e.g., permission issues) | ||
self.log.warning(f"Warning: Error retrieving logs from Google Cloud Logging: {str(e)}") | ||
# Continue without raising an error, just log the warning | ||
Comment on lines
+112
to
+141
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be worth having a method with this logic, something like |
||
|
||
def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None: | ||
# For the first round, we're going to assume that the command is dbt | ||
|
@@ -165,15 +201,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: | |
super().__init__(*args, **kwargs) | ||
|
||
|
||
class DbtSourceGcpCloudRunJobOperator(DbtSourceMixin, DbtGcpCloudRunJobBaseOperator): | ||
""" | ||
Executes a dbt core source freshness command. | ||
""" | ||
|
||
def __init__(self, *args: Any, **kwargs: Any) -> None: | ||
super().__init__(*args, **kwargs) | ||
|
||
|
||
class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator): | ||
""" | ||
Executes a dbt core run command. | ||
|
@@ -185,17 +212,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: | |
super().__init__(*args, **kwargs) | ||
|
||
|
||
class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtGcpCloudRunJobBaseOperator): | ||
""" | ||
Executes a dbt core test command. | ||
""" | ||
|
||
def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None: | ||
super().__init__(**kwargs) | ||
# as of now, on_warning_callback in docker executor does nothing | ||
self.on_warning_callback = on_warning_callback | ||
|
||
|
||
class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRunJobBaseOperator): | ||
""" | ||
Executes a dbt core run-operation command. | ||
|
@@ -218,3 +234,73 @@ class DbtCloneGcpCloudRunJobOperator(DbtCloneMixin, DbtGcpCloudRunJobBaseOperato | |
|
||
def __init__(self, *args: Any, **kwargs: Any): | ||
super().__init__(*args, **kwargs) | ||
|
||
|
||
class DbtWarningGcpCloudRunJobOperator(DbtGcpCloudRunJobBaseOperator, ABC): | ||
def __init__(self, on_warning_callback: Callable[..., Any] | None = None, *args: Any, **kwargs: Any) -> None: | ||
if not on_warning_callback: | ||
super().__init__(*args, **kwargs) | ||
else: | ||
super().__init__(*args, **kwargs) | ||
self.on_warning_callback = on_warning_callback | ||
|
||
def _handle_warnings(self, logs: list[str], context: Context) -> None: | ||
""" | ||
Handles warnings by extracting log issues, creating additional context, and calling the | ||
on_warning_callback with the updated context. | ||
|
||
:param logs: The log list with the cleaned Cloud Run Job logs. | ||
:param context: The original airflow context in which the build and run command was executed. | ||
""" | ||
test_names, test_results = extract_log_issues(logs) | ||
|
||
warning_context = dict(context) | ||
warning_context["test_names"] = test_names | ||
warning_context["test_results"] = test_results | ||
|
||
self.on_warning_callback(warning_context) | ||
|
||
def execute(self, context: Context, **kwargs: Any) -> None: | ||
result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) | ||
log_list = [log for log in result if type(log) == str] # clean log list with only string type values | ||
|
||
if not ( | ||
isinstance(context["task_instance"], TaskInstance) | ||
and ( | ||
isinstance(context["task_instance"].task, DbtTestGcpCloudRunJobOperator) | ||
or isinstance(context["task_instance"].task, DbtSourceGcpCloudRunJobOperator) | ||
Comment on lines
+270
to
+271
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels a bit weird that we check for subclasses here. It feels this logic should be handled directly in the subclasses, not in the parent class |
||
) | ||
): | ||
return | ||
|
||
should_trigger_callback = all( | ||
[ | ||
log_list, | ||
self.on_warning_callback, | ||
DBT_NO_TESTS_MSG not in log_list[-2], | ||
DBT_WARN_MSG in log_list[-2], | ||
] | ||
) | ||
|
||
if should_trigger_callback: | ||
warnings = int(log_list[-2].split(f"{DBT_WARN_MSG}=")[1].split()[0]) | ||
if warnings > 0: | ||
self._handle_warnings(log_list, context) | ||
|
||
|
||
class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtWarningGcpCloudRunJobOperator): | ||
""" | ||
Executes a dbt core test command. | ||
""" | ||
|
||
def __init__(self, *args: Any, **kwargs: Any) -> None: | ||
super().__init__(*args, **kwargs) | ||
|
||
|
||
class DbtSourceGcpCloudRunJobOperator(DbtSourceMixin, DbtWarningGcpCloudRunJobOperator): | ||
""" | ||
Executes a dbt core source freshness command. | ||
""" | ||
|
||
def __init__(self, *args: Any, **kwargs: Any) -> None: | ||
super().__init__(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we introducing this change to the base operator?