-
Notifications
You must be signed in to change notification settings - Fork 193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix on_warning_callback
on DbtSourceGcpCloudRunJobOperator
and DbtTestGcpCloudRunJobOperator
#1532
Fix on_warning_callback
on DbtSourceGcpCloudRunJobOperator
and DbtTestGcpCloudRunJobOperator
#1532
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
on_warning_callback
on DbtSourceGcpCloudRunJobOperator
and DbtTestGcpCloudRunJobOperator
@@ -265,7 +265,7 @@ def build_cmd( | |||
def build_and_run_cmd( | |||
self, | |||
context: Context, | |||
cmd_flags: list[str], | |||
cmd_flags: list[str] | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we introducing this change to the base operator?
# Pull Google Cloud Run job logs from Google Cloud Logging to Airflow logs | ||
execution_name = result["latest_created_execution"]["name"] | ||
execution_time = result["latest_created_execution"]["create_time"] | ||
filter_ = f'resource.type = "cloud_run_job" AND resource.labels.job_name = "{self.job_name}" AND timestamp>="{execution_time}"' | ||
|
||
self.log.info("Attempt to retrieve logs from Google Cloud Logging") | ||
time.sleep(5) # Add sleep time to make sure all the job logs are available when we do the request | ||
|
||
# List to store log messages | ||
log_messages = [] | ||
|
||
try: | ||
client = logging.Client(project=self.project_id) | ||
# Search for logs associated with the job name | ||
entries = client.list_entries(filter_=filter_) | ||
self.log.info(f"Listing logs of the execution {execution_name}:") | ||
if not entries: | ||
self.log.warning("No logs found for the Cloud Run job.") | ||
else: | ||
for entry in entries: | ||
# Search for logs associated with the job executed | ||
if entry.labels["run.googleapis.com/execution_name"] == execution_name: | ||
log_messages.append(entry.payload) | ||
self.log.info(f"Cloud Run Log: {entry.payload}") | ||
return log_messages | ||
|
||
except GoogleCloudError as e: | ||
# Catch Google Cloud-related errors (e.g., permission issues) | ||
self.log.warning(f"Warning: Error retrieving logs from Google Cloud Logging: {str(e)}") | ||
# Continue without raising an error, just log the warning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be worth having a method with this logic, something like fetch_remote_logs
entries = client.list_entries(filter_=filter_) | ||
self.log.info(f"Listing logs of the execution {execution_name}:") | ||
if not entries: | ||
self.log.warning("No logs found for the Cloud Run job.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How often does this happen? Should it be a warning or just an info?
isinstance(context["task_instance"].task, DbtTestGcpCloudRunJobOperator) | ||
or isinstance(context["task_instance"].task, DbtSourceGcpCloudRunJobOperator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels a bit weird that we check for subclasses here. It feels this logic should be handled directly in the subclasses, not in the parent class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @anai-s thank you very much for your contribution.
I left a few comments in-line, and it would be great if you could fix the tests that are not passing in the CI. It seems we're trying to run integration tests but defining them as unit tests right now. We may have to mock something.
Hi @tatiana , thank you for the review. I will close this PR as I didn't finish to make change on it (I was following the contribution guideline and was expecting a review only when I will ping someone), so I can add tests and make change based on your comments. |
HI @anai-s ! Please, feel free to keep the PR open while working on it. You can set it as a draft PR if you don't want early feedback. Sorry, I didn't mean to add any pressure. |
Description
This PR initially fix the DbtSourceGcpCloudRunJobOperator bug with missing argument for on_warning_callback.
Related Issue(s)
closes #1506
Breaking Change?
This PR also add logs from dbt commands to Airflow logs. Initially there were only available in Google Cloud Run console and Google Cloud Logging.
We can now use this logs in the _handle_warnings function as we did in the LocalOperator and KubernetesOperator.
Checklist