Skip to content

Commit

Permalink
Log elapsed_time for sql file upload/download (#1536)
Browse files Browse the repository at this point in the history
Add logs to records the upload and download time for the SQL files on
the remote storage

**Upload logs**

```
[2025-02-12, 14:40:48 UTC] {logging_mixin.py:190} INFO - 14:40:48  Completed successfully
[2025-02-12, 14:40:48 UTC] {logging_mixin.py:190} INFO - 14:40:48
[2025-02-12, 14:40:48 UTC] {logging_mixin.py:190} INFO - 14:40:48  Done. PASS=5 WARN=0 ERROR=0 SKIP=0 TOTAL=5
[2025-02-12, 14:40:50 UTC] {local.py:196} WARNING - Artifact schema version: https://schemas.getdbt.com/dbt/manifest/v12.json is above dbt-ol supported version 7. This might cause errors.
[2025-02-12, 14:40:50 UTC] {connection.py:277} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2025-02-12, 14:40:50 UTC] {base.py:84} INFO - Retrieving connection 'gcp_gs_conn'
[2025-02-12, 14:41:03 UTC] {local.py:339} INFO - SQL file upload completed in 12.52 seconds.
```

**Download logs:**

```
[2025-02-12, 14:41:25 UTC] {connection.py:277} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2025-02-12, 14:41:25 UTC] {base.py:84} INFO - Retrieving connection 'gcp_gs_conn'
[2025-02-12, 14:41:27 UTC] {bigquery.py:126} INFO - SQL file download completed in 2.43 seconds.
[2025-02-12, 14:41:27 UTC] {baseoperator.py:416} WARNING - DbtRunAirflowAsyncOperator.execute cannot be called outside TaskInstance!
[2025-02-12, 14:41:27 UTC] {connection.py:277} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2025-02-12, 14:41:27 UTC] {base.py:84} INFO - Retrieving connection 'gcp_gs_conn'
```

closes: #1522
  • Loading branch information
pankajastro authored Feb 13, 2025
1 parent 577c08e commit a43a991
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
8 changes: 7 additions & 1 deletion cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Sequence

Expand Down Expand Up @@ -100,6 +101,8 @@ def base_cmd(self) -> list[str]:
return ["run"]

def get_remote_sql(self) -> str:
start_time = time.time()

if not settings.AIRFLOW_IO_AVAILABLE: # pragma: no cover
raise CosmosValueError(f"Cosmos async support is only available starting in Airflow 2.8 or later.")
from airflow.io.path import ObjectStoragePath
Expand All @@ -118,7 +121,10 @@ def get_remote_sql(self) -> str:

object_storage_path = ObjectStoragePath(remote_model_path, conn_id=remote_target_path_conn_id)
with object_storage_path.open() as fp: # type: ignore
return fp.read() # type: ignore
sql = fp.read()
elapsed_time = time.time() - start_time
self.log.info("SQL file download completed in %.2f seconds.", elapsed_time)
return sql # type: ignore

def execute(self, context: Context, **kwargs: Any) -> None:
if enable_setup_async_task:
Expand Down
6 changes: 6 additions & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import os
import tempfile
import time
import urllib.parse
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -317,6 +318,8 @@ def _construct_dest_file_path(
return f"{dest_target_dir_str}/{dag_task_group_identifier}/{resource_type}/{rel_path}"

def _upload_sql_files(self, tmp_project_dir: str, resource_type: str) -> None:
start_time = time.time()

dest_target_dir, dest_conn_id = self._configure_remote_target_path()

if not dest_target_dir:
Expand All @@ -332,6 +335,9 @@ def _upload_sql_files(self, tmp_project_dir: str, resource_type: str) -> None:
ObjectStoragePath(file_path).copy(dest_object_storage_path)
self.log.debug("Copied %s to %s", file_path, dest_object_storage_path)

elapsed_time = time.time() - start_time
self.log.info("SQL files upload completed in %.2f seconds.", elapsed_time)

@provide_session
def store_freshness_json(self, tmp_project_dir: str, context: Context, session: Session = NEW_SESSION) -> None:
"""
Expand Down

0 comments on commit a43a991

Please sign in to comment.