Skip to content

Commit

Permalink
Improve callback documentation (#1516)
Browse files Browse the repository at this point in the history
Add missing details about dbt artifact's, object store folder structure and example of custom callback.

Closes: #1468
  • Loading branch information
tatiana authored Feb 7, 2025
1 parent bdb77e4 commit f7d37fe
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 15 deletions.
Binary file added docs/_static/cosmos_callback_object_storage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
88 changes: 73 additions & 15 deletions docs/configuration/callbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,30 @@
Callbacks
=========

Cosmos supports callback functions that execute at the end of a task's execution when using ``ExecutionMode.LOCAL`` and
``ExecutionMode.VIRTUALENV``.
These callbacks can be used for various purposes, such as uploading files from the target directory to remote
storage. While this feature has been available for some time, users may not be fully aware of its capabilities.
.. note::
Feature available when using ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``.

Most dbt commands output `one or more artifacts <https://docs.getdbt.com/reference/artifacts/dbt-artifacts>`_
such as ``semantic_manifest.json``, ``manifest.json``, ``catalog.json``, ``run_results.json``, and ``sources.json`` in the target folder, which by default resides in the dbt project's root folder.
However, since Cosmos creates temporary folders to run each dbt command, this folder vanishes by the end of the Cosmos task execution,
alongside the artifacts created by dbt.

Many users care about those artifacts and want to perform additional actions after running the dbt command. Some examples of usage:

* Upload the artifacts to an object storage;
* Run a command after the dbt command runs, such as `montecarlo <https://docs.getmontecarlo.com/docs/dbt-core>`_; or
* Define other custom behaviours based on a specific artifact.

To support these use cases, Cosmos allows users to define functions called callbacks that can run as part of the task execution before deleting the target's folder.

Users can define their custom callback methods or, since Cosmos 1.8.0, they can leverage built-in callbacks, available in `cosmos/io.py <https://github.com/astronomer/astronomer-cosmos/blob/main/cosmos/io.py>`_ module.
These functions illustrate how to upload the generated dbt artifacts to remote cloud storage providers such as AWS S3, GCP GCS, and Azure WASB.

There are two ways users can leverage using Cosmos auxiliary callback functions:

* When instantiating a Cosmos operator;
* When using ``DbtDag`` or ``DbtTaskGroup`` (users can define a callback that will apply to all tasks).

With the Cosmos 1.8.0 release, several helper functions were added in the ``cosmos/io.py`` module. These functions
provide examples of callback functions that can be hooked into Cosmos DAGs to upload files from the project’s
target directory to remote cloud storage providers such as AWS S3, GCP GS, and Azure WASB.

Example: Using Callbacks with a Single Operator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -23,8 +39,8 @@ using a single operator in an Airflow DAG:
:start-after: [START single_operator_callback]
:end-before: [END single_operator_callback]

Example: Using Callbacks with ``remote_target_path`` (Airflow 2.8+)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Example: Using DbtDag or DbtTaskGroup
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you're using Airflow 2.8 or later, you can leverage the :ref:`remote_target_path` configuration to upload files
from the target directory to a remote storage. Below is an example of how to define a callback helper function in your
Expand All @@ -35,18 +51,60 @@ from the target directory to a remote storage. Below is an example of how to def
:start-after: [START cosmos_callback_example]
:end-before: [END cosmos_callback_example]

An example of how the data uploaded to GCS looks like when using ``upload_to_gcp_gs`` in a ``DbtDag``:

.. image:: /_static/cosmos_callback_object_storage.png

The path naming convention is:

* Bucket configured by the user
* Name of the DAG
* DAG Run identifier
* Task ID
* Task retry identifier
* Target folder with its contents

If users are unhappy with this structure or format, they can implement similar methods, which can be based (or not) on the Cosmos standard ones.

Custom Callbacks
~~~~~~~~~~~~~~~~

The helper functions introduced in Cosmos 1.8.0 are just examples of how callback functions can be written and passed
to Cosmos DAGs. Users are not limited to using these predefined functions — they can also create their own custom
callback functions to meet specific needs. These custom functions can be provided to Cosmos DAGs, where they will
receive the path to the cloned project directory and the Airflow task context, which includes DAG and task instance
metadata.
The helper functions introduced in Cosmos 1.8.0 are examples of how callback functions. Users are not limited to using these predefined functions — they can also create their custom
callback functions to meet specific needs.

Cosmos passes a few arguments to these functions, including the path to the dbt project directory and the Airflow task context, which includes DAG and task instance metadata.

Below, find an example of a callback method that raises an exception if the query takes more than 10 seconds to run:

.. code-block:: python
def error_if_slow(project_dir: str, **kwargs: Any) -> None:
"""
An example of a custom callback that errors if a particular query is slow.
:param project_dir: Path of the project directory used by Cosmos to run the dbt command
"""
import json
from pathlib import Path
slow_query_threshold = 10
run_results_path = Path(project_dir, "run_results.json")
if run_results_path.exists():
with open(run_results_path) as fp:
run_results = json.load(fp)
node_name = run_results["unique_id"]
execution_time = run_results["execution_time"]
if execution_time > slow_query_threshold:
raise TimeoutError(
f"The query for the node {node_name} took too long: {execution_time}"
)
Users can use the same approach to call the data observability platform `montecarlo <https://docs.getmontecarlo.com/docs/dbt-core>`_ or other services.

Limitations and Contributions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Currently, callback support is available only when using ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``.
Callback support is available only when using ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``.
Contributions to extend this functionality to other execution modes are welcome and encouraged. You can reference the
implementation for ``ExecutionMode.LOCAL`` to add support for other modes.

0 comments on commit f7d37fe

Please sign in to comment.