Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-evaluate adding compile task when using ExecutionMode.AIRFLOW_ASYNC #1477

Closed
tatiana opened this issue Jan 21, 2025 · 2 comments
Closed
Assignees
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:compile Primarily related to dbt compile command or functionality execution:async Related to the Async execution mode execution:virtualenv Related to Virtualenv execution environment priority:high High priority issues are blocking or critical issues without a workaround and large impact
Milestone

Comments

@tatiana
Copy link
Collaborator

tatiana commented Jan 21, 2025

In Cosmos 1.7.0, we introduced experimental support to ExecutionMode.AIRFLOW_ASYNC, as discussed in this article and documentation page:

A fundamental characteristic of the approach implemented in 1.7.0 is that we'd pre-compute the SQL in a single first setup task, called dbt_compile, and the remaining tasks would only need to run the SQL statements, as illustrated in:

Image

This approach had some problems:

  1. The SQL run by Cosmos was incorrect, as discussed in [bug] Fix ExecutionMode.AIRFLOW_ASYNC query #1260
  2. Need to pre-compile and upload SQL statements to remote storage (additional configuration / possible latency)
  3. The fact it was not clearing out those files after

As part of fixing #1260 using a monkey-patch, in #1474, we noticed that the dbt_compile step did not have to happen beforehand since we could monkey-patch per run statement, leading to a refactor to remove the dbt_compile and run the monkey-patched dbt command with dbtRunner per task. While this is cleaner from a DAG topology perspective, it is unclear what is the best approach moving forward since to run the patched dbt version in every run task will require:

a) dbt and Airflow being installed in the same Python environment on every worker node
b) possible memory/CPU overhead of running dbtRunner for every task

An alternative approach we could consider is:

  • Re-introduce the dbt_compile task, but identify the possibility of running it with Cosmos ExecutionMode.VIRTUALENV
  • Upload SQL to remote object store
  • No longer have the dependency of running dbtRunner per run task

The advantages of this approach would be:

  • dbt and Airflow would not have to be installed in the same Python environment, potentially (some changes to how we monkey-patch may be needed)
  • most worker nodes would not have to run dbt commands
  • the memory and CPU usage per worker node should go down while just executing the transformation

The downsides would be

Ideally, we'd compare these two approaches with real dbt projects and evaluate the numbers before making a decision.

@tatiana tatiana added this to the Cosmos 1.9.0 milestone Jan 21, 2025
@dosubot dosubot bot added area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:compile Primarily related to dbt compile command or functionality execution:virtualenv Related to Virtualenv execution environment labels Jan 21, 2025
@pankajkoti pankajkoti self-assigned this Jan 22, 2025
@tatiana tatiana assigned pankajastro and unassigned pankajkoti Jan 27, 2025
@tatiana
Copy link
Collaborator Author

tatiana commented Jan 27, 2025

We can split this ticket into three tasks:

  1. Make sure we can pre-calculate the run for the entire project in advance and use the compile strategy
  2. Check if we can monkey patch the dbt run when using ExecutionMode.VIRTUALENV
  3. Replace using dbtRunner by the virtualenv call

If we can deliver (1) as part of this ticket, it is already great, and we can create other tickets for the second and third parts.

@tatiana tatiana added the priority:high High priority issues are blocking or critical issues without a workaround and large impact label Jan 27, 2025
pankajkoti added a commit that referenced this issue Feb 5, 2025
…LOW_ASYNC` (#1474)

# Overview

This PR introduces a reliable way to extract SQL statements run by
`dbt-core` so Airflow asynchronous operators can use them. It fixes the
experimental BQ implementation of `ExecutionMode.AIRFLOW_ASYNC`
introduced in Cosmos 1.7 (#1230).

Previously, in #1230, we attempted to understand the implementation of
how `dbt-core` runs `--full-refresh` for BQ, and we hard-coded the SQL
header in Cosmos as an experimental feature. Since then, we realised
that this approach was prone to errors (e.g. #1260) and that it is
unrealistic for Cosmos to try to recreate the logic of how `dbt-core`
and its adaptors generate all the SQL statements for different
operations, data warehouses, and types of materialisation.

With this PR, we use `dbt-core` to create the complete SQL statements
without `dbt-core` running those transformations. This enables better
compatibility with various `dbt-core` features while ensuring
correctness in running models.

The drawback of the current approach is that it relies on monkey
patching, a technique used to dynamically update the behaviour of a
piece of code at run-time. Cosmos is monkey patching `dbt-core` adaptors
methods at the moment that they would generally execute SQL statements -
Cosmos modifies this behaviour so that the SQL statements are writen to
disk without performing any operations to the actual data warehouse.

The main drawback of this strategy is in case dbt changes its interface.
For this reason, we logged the follow-up ticket
#1489 to make sure
we test the latest version of dbt and its adapters and confirm the
monkey patching works as expected regardless of the version being used.
That said, since the method being monkey patched is part of the
`dbt-core` interface with its adaptors, we believe the risks of breaking
changes will be low.

The other challenge with the current approach is that every Cosmos task
relies on the following:
1. `dbt-core` being installed alongside the Airflow installation
2. the execution of a significant part of the `dbtRunner` logic

We have logged a follow-up ticket to evaluate the possibility of
overcoming these challenges: #1477

## Key Changes

1. Mocked BigQuery Adapter Execution:
- Introduced `_mock_bigquery_adapter()` to override
`BigQueryConnectionManager.execute`, ensuring SQL is only written to the
`target` directory and skipping execution in the warehouse.
- The generated SQL is then submitted using Airflow’s
BigQueryInsertJobOperator in deferrable mode.
4. Refactoring `AbstractDbtBaseOperator`:
- Previously, `AbstractDbtBaseOperator` inherited `BaseOperator`,
causing conflicts when used with `BigQueryInsertJobOperator` with
our`EXECUTIONMODE.AIRFLOW_ASYNC` classes and the interface built in
#1483
- Refactored to `AbstractDbtBase` (no longer inheriting `BaseOperator`),
requiring explicit `BaseOperator` initialization in all derived
operators.
- Updated the below existing operators to consider this refactoring
needing derived classes to initialise `BaseOperator`:
        - `DbtAzureContainerInstanceBaseOperator`
        - `DbtDockerBaseOperator`
        - `DbtGcpCloudRunJobBaseOperator`
        - `DbtKubernetesBaseOperator`
5. Changes to dbt Compilation Workflow
- Removed `_add_dbt_compile_task`, which previously pre-generated SQL
and uploaded it to remote storage and subsequent task downloaded this
compiled SQL for their execution.
- Instead, `dbt run` is now directly invoked in each task using the
mocked adapter to generate the full SQL.
- A future
[issue](#1477)
will assess whether we should reintroduce a compile task using the
mocked adapter for SQL generation and upload, reducing redundant dbt
calls in each task.

## Issue updates
The PR fixes the following issues:
1. closes: #1260 
- Previously, we only supported --full-refresh dbt run with static SQL
headers (e.g., CREATE/DROP TABLE).
- Now, we support dynamic SQL headers based on materializations,
including CREATE OR REPLACE TABLE, CREATE OR REPLACE VIEW, etc.
2. closes: #1271 
- dbt macros are evaluated at runtime during dbt run invocation using
mocked adapter, and this PR lays the groundwork for supporting them in
async execution mode.
3. closes: #1265 
- Now, large datasets can avoid full drops and recreations, enabling
incremental model updates.
6. closes: #1261 
- Previously, only tables (--full-refresh) were supported; this PR
implements logic for handling different materializations that dbt
supports like table, view, incremental, ephemeral, and materialized
views.
7. closes: #1266 
- Instead of relying on dbt compile (which only outputs SELECT
statements), we now let dbt generate complete SQL queries, including SQL
headers/DDL statements for the queries corresponding to the resource
nodes and state of tables/views in the backend warehouse
8. closes: #1264 
- We support emitting datasets for `EXECUTIONMODE.AIRFLOW_ASYNC` too
with this PR

## Example DAG showing `EXECUTIONMODE.AIRFLOW_ASYNC` deferring tasks and
the dynamic query submitted in the logs

<img width="1532" alt="Screenshot 2025-02-04 at 1 02 42 PM"
src="https://github.com/user-attachments/assets/baf15864-9bf8-4f35-95b7-954a1f547bfe"
/>


## Next Steps & Considerations:
- It's acknowledged that using mock patching may have downsides,
however, this currently seems the best approach to achieve our goals.
It's understood and accepted the risks associated with this method. To
mitigate them, we are expanding our test coverage to include all
currently supported dbt adapter versions in our test matrix in #1489.
This will ensure compatibility across different dbt versions and helps
us catch potential issues early.
- Further validation of different dbt macros and materializations with
`ExecutionMode.AIRFLOW_ASYNC` by seeking feedback from users by testing
alpha
https://github.com/astronomer/astronomer-cosmos/releases/tag/astronomer-cosmos-v1.9.0a5
created with changes from this PR.
- #1477, Compare
the efficiency of generating SQL dynamically vs. pre-compiling and
uploading SQL via a separate task.
- Add compatibility across all major cloud datawarehouse backends (dbt
adapters).

---------

Co-authored-by: Tatiana Al-Chueyr <[email protected]>
Co-authored-by: Pankaj Singh <[email protected]>
@pankajkoti pankajkoti added the execution:async Related to the Async execution mode label Feb 5, 2025
pankajastro added a commit that referenced this issue Feb 11, 2025
This PR reintroduces the setup task for ExecutionMode.AIRFLOW_ASYNC,
This will prevent the execution of the dbt command for each Airflow task
node
for run operator and will only run the dbt command mock version for
start job node.

Additionally, a new configuration, `enable_setup_task`, has been
introduced to
enable or disable this feature.

**With enable_setup_task**

<img width="1686" alt="Screenshot 2025-02-08 at 9 53 35 PM"
src="https://github.com/user-attachments/assets/cb9daf76-2bd6-4bcf-8219-b64562c4151a"
/>


**Without enable_setup_task**

<img width="1668" alt="Screenshot 2025-02-08 at 10 23 49 PM"
src="https://github.com/user-attachments/assets/4ac99c3b-15df-484e-b9e4-cc7b1022ea31"
/>



Related-to: #1477
@pankajastro
Copy link
Contributor

PR: #1518 decouple SQL files generation from run operator to new setup operator and we have created follow-up issue: #1533 to isolate the execution env for setup task.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:compile Primarily related to dbt compile command or functionality execution:async Related to the Async execution mode execution:virtualenv Related to Virtualenv execution environment priority:high High priority issues are blocking or critical issues without a workaround and large impact
Projects
None yet
Development

No branches or pull requests

3 participants