Skip to content

Commit

Permalink
Create and run accurate SQL statements when using `ExecutionMode.AIRF…
Browse files Browse the repository at this point in the history
…LOW_ASYNC` (#1474)

# Overview

This PR introduces a reliable way to extract SQL statements run by
`dbt-core` so Airflow asynchronous operators can use them. It fixes the
experimental BQ implementation of `ExecutionMode.AIRFLOW_ASYNC`
introduced in Cosmos 1.7 (#1230).

Previously, in #1230, we attempted to understand the implementation of
how `dbt-core` runs `--full-refresh` for BQ, and we hard-coded the SQL
header in Cosmos as an experimental feature. Since then, we realised
that this approach was prone to errors (e.g. #1260) and that it is
unrealistic for Cosmos to try to recreate the logic of how `dbt-core`
and its adaptors generate all the SQL statements for different
operations, data warehouses, and types of materialisation.

With this PR, we use `dbt-core` to create the complete SQL statements
without `dbt-core` running those transformations. This enables better
compatibility with various `dbt-core` features while ensuring
correctness in running models.

The drawback of the current approach is that it relies on monkey
patching, a technique used to dynamically update the behaviour of a
piece of code at run-time. Cosmos is monkey patching `dbt-core` adaptors
methods at the moment that they would generally execute SQL statements -
Cosmos modifies this behaviour so that the SQL statements are writen to
disk without performing any operations to the actual data warehouse.

The main drawback of this strategy is in case dbt changes its interface.
For this reason, we logged the follow-up ticket
#1489 to make sure
we test the latest version of dbt and its adapters and confirm the
monkey patching works as expected regardless of the version being used.
That said, since the method being monkey patched is part of the
`dbt-core` interface with its adaptors, we believe the risks of breaking
changes will be low.

The other challenge with the current approach is that every Cosmos task
relies on the following:
1. `dbt-core` being installed alongside the Airflow installation
2. the execution of a significant part of the `dbtRunner` logic

We have logged a follow-up ticket to evaluate the possibility of
overcoming these challenges: #1477

## Key Changes

1. Mocked BigQuery Adapter Execution:
- Introduced `_mock_bigquery_adapter()` to override
`BigQueryConnectionManager.execute`, ensuring SQL is only written to the
`target` directory and skipping execution in the warehouse.
- The generated SQL is then submitted using Airflow’s
BigQueryInsertJobOperator in deferrable mode.
4. Refactoring `AbstractDbtBaseOperator`:
- Previously, `AbstractDbtBaseOperator` inherited `BaseOperator`,
causing conflicts when used with `BigQueryInsertJobOperator` with
our`EXECUTIONMODE.AIRFLOW_ASYNC` classes and the interface built in
#1483
- Refactored to `AbstractDbtBase` (no longer inheriting `BaseOperator`),
requiring explicit `BaseOperator` initialization in all derived
operators.
- Updated the below existing operators to consider this refactoring
needing derived classes to initialise `BaseOperator`:
        - `DbtAzureContainerInstanceBaseOperator`
        - `DbtDockerBaseOperator`
        - `DbtGcpCloudRunJobBaseOperator`
        - `DbtKubernetesBaseOperator`
5. Changes to dbt Compilation Workflow
- Removed `_add_dbt_compile_task`, which previously pre-generated SQL
and uploaded it to remote storage and subsequent task downloaded this
compiled SQL for their execution.
- Instead, `dbt run` is now directly invoked in each task using the
mocked adapter to generate the full SQL.
- A future
[issue](#1477)
will assess whether we should reintroduce a compile task using the
mocked adapter for SQL generation and upload, reducing redundant dbt
calls in each task.

## Issue updates
The PR fixes the following issues:
1. closes: #1260 
- Previously, we only supported --full-refresh dbt run with static SQL
headers (e.g., CREATE/DROP TABLE).
- Now, we support dynamic SQL headers based on materializations,
including CREATE OR REPLACE TABLE, CREATE OR REPLACE VIEW, etc.
2. closes: #1271 
- dbt macros are evaluated at runtime during dbt run invocation using
mocked adapter, and this PR lays the groundwork for supporting them in
async execution mode.
3. closes: #1265 
- Now, large datasets can avoid full drops and recreations, enabling
incremental model updates.
6. closes: #1261 
- Previously, only tables (--full-refresh) were supported; this PR
implements logic for handling different materializations that dbt
supports like table, view, incremental, ephemeral, and materialized
views.
7. closes: #1266 
- Instead of relying on dbt compile (which only outputs SELECT
statements), we now let dbt generate complete SQL queries, including SQL
headers/DDL statements for the queries corresponding to the resource
nodes and state of tables/views in the backend warehouse
8. closes: #1264 
- We support emitting datasets for `EXECUTIONMODE.AIRFLOW_ASYNC` too
with this PR

## Example DAG showing `EXECUTIONMODE.AIRFLOW_ASYNC` deferring tasks and
the dynamic query submitted in the logs

<img width="1532" alt="Screenshot 2025-02-04 at 1 02 42 PM"
src="https://github.com/user-attachments/assets/baf15864-9bf8-4f35-95b7-954a1f547bfe"
/>


## Next Steps & Considerations:
- It's acknowledged that using mock patching may have downsides,
however, this currently seems the best approach to achieve our goals.
It's understood and accepted the risks associated with this method. To
mitigate them, we are expanding our test coverage to include all
currently supported dbt adapter versions in our test matrix in #1489.
This will ensure compatibility across different dbt versions and helps
us catch potential issues early.
- Further validation of different dbt macros and materializations with
`ExecutionMode.AIRFLOW_ASYNC` by seeking feedback from users by testing
alpha
https://github.com/astronomer/astronomer-cosmos/releases/tag/astronomer-cosmos-v1.9.0a5
created with changes from this PR.
- #1477, Compare
the efficiency of generating SQL dynamically vs. pre-compiling and
uploading SQL via a separate task.
- Add compatibility across all major cloud datawarehouse backends (dbt
adapters).

---------

Co-authored-by: Tatiana Al-Chueyr <[email protected]>
Co-authored-by: Pankaj Singh <[email protected]>
  • Loading branch information
3 people authored Feb 5, 2025
1 parent c344eb4 commit 24108f0
Show file tree
Hide file tree
Showing 27 changed files with 825 additions and 432 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Changelog
=========

1.9.0a4 (2025-01-29)
1.9.0a5 (2025-02-03)
--------------------

Breaking changes
Expand All @@ -18,6 +18,7 @@ Features
* Allow users to opt-out of ``dbtRunner`` during DAG parsing with ``InvocationMode.SUBPROCESS`` by @tatiana in #1495. Check out the `documentation <https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html#how-to-run-dbt-ls-invocation-mode>`_.
* Add structure to support multiple db for async operator execution by @pankajastro in #1483
* Support overriding the ``profile_config`` per dbt node or folder using config by @tatiana in #1492. More information `here <https://astronomer.github.io/astronomer-cosmos/profiles/#profile-customise-per-node>`_.
* Create and run accurate SQL statements when using ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1474

Bug Fixes

Expand All @@ -27,9 +28,12 @@ Enhancement

* Fix OpenLineage deprecation warning by @CorsettiS in #1449
* Move ``DbtRunner`` related functions into ``dbt/runner.py`` module by @tatiana in #1480
* Add ``on_warning_callback`` to ``DbtSourceKubernetesOperator`` and refactor previous operators by @LuigiCerone in #1501


Others

* Ignore dbt package tests when running Cosmos tests by @tatiana in #1502
* GitHub Actions Dependabot: #1487
* Pre-commit updates: #1473, #1493

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Contains dags, task groups, and operators.
"""

__version__ = "1.9.0a4"
__version__ = "1.9.0a5"


from cosmos.airflow.dag import DbtDag
Expand Down
28 changes: 0 additions & 28 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from cosmos.config import RenderConfig
from cosmos.constants import (
DBT_COMPILE_TASK_ID,
DEFAULT_DBT_RESOURCES,
SUPPORTED_BUILD_RESOURCES,
TESTABLE_DBT_RESOURCES,
Expand Down Expand Up @@ -392,32 +391,6 @@ def generate_task_or_group(
return task_or_group


def _add_dbt_compile_task(
nodes: dict[str, DbtNode],
dag: DAG,
execution_mode: ExecutionMode,
task_args: dict[str, Any],
tasks_map: dict[str, Any],
task_group: TaskGroup | None,
) -> None:
if execution_mode != ExecutionMode.AIRFLOW_ASYNC:
return

compile_task_metadata = TaskMetadata(
id=DBT_COMPILE_TASK_ID,
operator_class="cosmos.operators.airflow_async.DbtCompileAirflowAsyncOperator",
arguments=task_args,
extra_context={"dbt_dag_task_group_identifier": _get_dbt_dag_task_group_identifier(dag, task_group)},
)
compile_airflow_task = create_airflow_task(compile_task_metadata, dag, task_group=task_group)

for task_id, task in tasks_map.items():
if not task.upstream_list:
compile_airflow_task >> task

tasks_map[DBT_COMPILE_TASK_ID] = compile_airflow_task


def _get_dbt_dag_task_group_identifier(dag: DAG, task_group: TaskGroup | None) -> str:
dag_id = dag.dag_id
task_group_id = task_group.group_id if task_group else None
Expand Down Expand Up @@ -588,7 +561,6 @@ def build_airflow_graph(
tasks_map[node_id] = test_task

create_airflow_task_dependencies(nodes, tasks_map)
_add_dbt_compile_task(nodes, dag, execution_mode, task_args, tasks_map, task_group)
return tasks_map


Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import aenum
from packaging.version import Version

BIGQUERY_PROFILE_TYPE = "bigquery"
DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
DEFAULT_DBT_TARGET_NAME = "cosmos_target"
Expand Down
18 changes: 18 additions & 0 deletions cosmos/dbt_adapters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from typing import Any

from cosmos.constants import BIGQUERY_PROFILE_TYPE
from cosmos.dbt_adapters.bigquery import _associate_bigquery_async_op_args, _mock_bigquery_adapter

PROFILE_TYPE_MOCK_ADAPTER_CALLABLE_MAP = {
BIGQUERY_PROFILE_TYPE: _mock_bigquery_adapter,
}

PROFILE_TYPE_ASSOCIATE_ARGS_CALLABLE_MAP = {
BIGQUERY_PROFILE_TYPE: _associate_bigquery_async_op_args,
}


def associate_async_operator_args(async_operator_obj: Any, profile_type: str, **kwargs: Any) -> Any:
return PROFILE_TYPE_ASSOCIATE_ARGS_CALLABLE_MAP[profile_type](async_operator_obj, **kwargs)
33 changes: 33 additions & 0 deletions cosmos/dbt_adapters/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

from typing import Any

from cosmos.exceptions import CosmosValueError


def _mock_bigquery_adapter() -> None:
from typing import Optional, Tuple

import agate
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager
from dbt_common.clients.agate_helper import empty_table

def execute( # type: ignore[no-untyped-def]
self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None
) -> Tuple[BigQueryAdapterResponse, agate.Table]:
return BigQueryAdapterResponse("mock_bigquery_adapter_response"), empty_table()

BigQueryConnectionManager.execute = execute


def _associate_bigquery_async_op_args(async_op_obj: Any, **kwargs: Any) -> Any:
sql = kwargs.get("sql")
if not sql:
raise CosmosValueError("Keyword argument 'sql' is required for BigQuery Async operator")
async_op_obj.configuration = {
"query": {
"query": sql,
"useLegacySql": False,
}
}
return async_op_obj
31 changes: 19 additions & 12 deletions cosmos/operators/_asynchronous/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from __future__ import annotations

import importlib
import logging
from abc import ABCMeta
from typing import Any, Sequence

from airflow.utils.context import Context
from typing import Any

from cosmos.airflow.graph import _snake_case_to_camelcase
from cosmos.config import ProfileConfig
Expand Down Expand Up @@ -36,11 +35,16 @@ def _create_async_operator_class(profile_type: str, dbt_class: str) -> Any:
return DbtRunLocalOperator


class DbtRunAirflowAsyncFactoryOperator(DbtRunLocalOperator, metaclass=ABCMeta): # type: ignore[misc]
class DbtRunAirflowAsyncFactoryOperator(DbtRunLocalOperator): # type: ignore[misc]

template_fields: Sequence[str] = DbtRunLocalOperator.template_fields + ("project_dir",) # type: ignore[operator]

def __init__(self, project_dir: str, profile_config: ProfileConfig, **kwargs: Any):
def __init__(
self,
project_dir: str,
profile_config: ProfileConfig,
extra_context: dict[str, object] | None = None,
dbt_kwargs: dict[str, object] | None = None,
**kwargs: Any,
) -> None:
self.project_dir = project_dir
self.profile_config = profile_config

Expand All @@ -51,7 +55,13 @@ def __init__(self, project_dir: str, profile_config: ProfileConfig, **kwargs: An
# When using composition instead of inheritance to initialize the async class and run its execute method,
# Airflow throws a `DuplicateTaskIdFound` error.
DbtRunAirflowAsyncFactoryOperator.__bases__ = (async_operator_class,)
super().__init__(project_dir=project_dir, profile_config=profile_config, **kwargs)
super().__init__(
project_dir=project_dir,
profile_config=profile_config,
extra_context=extra_context,
dbt_kwargs=dbt_kwargs,
**kwargs,
)

def create_async_operator(self) -> Any:

Expand All @@ -60,6 +70,3 @@ def create_async_operator(self) -> Any:
async_class_operator = _create_async_operator_class(profile_type, "DbtRun")

return async_class_operator

def execute(self, context: Context) -> None:
super().execute(context)
100 changes: 32 additions & 68 deletions cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any, Sequence
from typing import Any, Sequence

from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.context import Context
from packaging.version import Version

from cosmos import settings
from cosmos.config import ProfileConfig
from cosmos.exceptions import CosmosValueError
from cosmos.settings import remote_target_path, remote_target_path_conn_id
from cosmos.dataset import get_dataset_alias_name
from cosmos.operators.local import AbstractDbtLocalBase

AIRFLOW_VERSION = Version(airflow.__version__)

class DbtRunAirflowAsyncBigqueryOperator(BigQueryInsertJobOperator): # type: ignore[misc]

class DbtRunAirflowAsyncBigqueryOperator(BigQueryInsertJobOperator, AbstractDbtLocalBase): # type: ignore[misc]

template_fields: Sequence[str] = (
"full_refresh",
"gcp_project",
"dataset",
"location",
Expand All @@ -27,6 +28,7 @@ def __init__(
project_dir: str,
profile_config: ProfileConfig,
extra_context: dict[str, Any] | None = None,
dbt_kwargs: dict[str, Any] | None = None,
**kwargs: Any,
):
self.project_dir = project_dir
Expand All @@ -36,73 +38,35 @@ def __init__(
self.gcp_project = profile["project"]
self.dataset = profile["dataset"]
self.extra_context = extra_context or {}
self.full_refresh = None
if "full_refresh" in kwargs:
self.full_refresh = kwargs.pop("full_refresh")
self.configuration: dict[str, Any] = {}
self.dbt_kwargs = dbt_kwargs or {}
task_id = self.dbt_kwargs.pop("task_id")
AbstractDbtLocalBase.__init__(
self, task_id=task_id, project_dir=project_dir, profile_config=profile_config, **self.dbt_kwargs
)
if kwargs.get("emit_datasets", True) and settings.enable_dataset_alias and AIRFLOW_VERSION >= Version("2.10"):
from airflow.datasets import DatasetAlias

# ignoring the type because older versions of Airflow raise the follow error in mypy
# error: Incompatible types in assignment (expression has type "list[DatasetAlias]", target has type "str")
dag_id = kwargs.get("dag")
task_group_id = kwargs.get("task_group")
kwargs["outlets"] = [
DatasetAlias(name=get_dataset_alias_name(dag_id, task_group_id, self.task_id))
] # type: ignore
super().__init__(
gcp_conn_id=self.gcp_conn_id,
configuration=self.configuration,
deferrable=True,
**kwargs,
)
self.async_context = extra_context or {}
self.async_context["profile_type"] = self.profile_config.get_profile_type()
self.async_context["async_operator"] = BigQueryInsertJobOperator

def get_remote_sql(self) -> str:
if not settings.AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(f"Cosmos async support is only available starting in Airflow 2.8 or later.")
from airflow.io.path import ObjectStoragePath

file_path = self.extra_context["dbt_node_config"]["file_path"] # type: ignore
dbt_dag_task_group_identifier = self.extra_context["dbt_dag_task_group_identifier"]

remote_target_path_str = str(remote_target_path).rstrip("/")

if TYPE_CHECKING: # pragma: no cover
assert self.project_dir is not None

project_dir_parent = str(Path(self.project_dir).parent)
relative_file_path = str(file_path).replace(project_dir_parent, "").lstrip("/")
remote_model_path = f"{remote_target_path_str}/{dbt_dag_task_group_identifier}/compiled/{relative_file_path}"

object_storage_path = ObjectStoragePath(remote_model_path, conn_id=remote_target_path_conn_id)
with object_storage_path.open() as fp: # type: ignore
return fp.read() # type: ignore

def drop_table_sql(self) -> None:
model_name = self.extra_context["dbt_node_config"]["resource_name"] # type: ignore
sql = f"DROP TABLE IF EXISTS {self.gcp_project}.{self.dataset}.{model_name};"

hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
self.configuration = {
"query": {
"query": sql,
"useLegacySql": False,
}
}
hook.insert_job(configuration=self.configuration, location=self.location, project_id=self.gcp_project)

def execute(self, context: Context) -> Any | None:
@property
def base_cmd(self) -> list[str]:
return ["run"]

if not self.full_refresh:
raise CosmosValueError("The async execution only supported for full_refresh")
else:
# It may be surprising to some, but the dbt-core --full-refresh argument fully drops the table before populating it
# https://github.com/dbt-labs/dbt-core/blob/5e9f1b515f37dfe6cdae1ab1aa7d190b92490e24/core/dbt/context/base.py#L662-L666
# https://docs.getdbt.com/reference/resource-configs/full_refresh#recommendation
# We're emulating this behaviour here
# The compiled SQL has several limitations here, but these will be addressed in the PR: https://github.com/astronomer/astronomer-cosmos/pull/1474.
self.drop_table_sql()
sql = self.get_remote_sql()
model_name = self.extra_context["dbt_node_config"]["resource_name"] # type: ignore
# prefix explicit create command to create table
sql = f"CREATE TABLE {self.gcp_project}.{self.dataset}.{model_name} AS {sql}"
self.configuration = {
"query": {
"query": sql,
"useLegacySql": False,
}
}
return super().execute(context)
def execute(self, context: Context, **kwargs: Any) -> None:
self.build_and_run_cmd(context=context, run_as_async=True, async_context=self.async_context)
1 change: 1 addition & 0 deletions cosmos/operators/_asynchronous/databricks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# TODO: Implement it
from __future__ import annotations

from typing import Any

Expand Down
Loading

0 comments on commit 24108f0

Please sign in to comment.