Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase Task Throughput #1120

Closed
Tracked by #1192
pankajkoti opened this issue Jul 24, 2024 · 2 comments
Closed
Tracked by #1192

Increase Task Throughput #1120

pankajkoti opened this issue Jul 24, 2024 · 2 comments
Labels
area:performance Related to performance, like memory usage, CPU usage, speed, etc Q3
Milestone

Comments

@pankajkoti
Copy link
Contributor

pankajkoti commented Jul 24, 2024

Description co-authored by @tatiana @pankajastro

As of Cosmos 1.5, we are using dbt core to run transformations in the database. In other words, the Airflow worker node is being blocked during the execution of the SQL transformation. If the transformation takes 6h, we have a Airflow worker node just waiting.

One way to overcome this is to use Airflow deferrable operators, as opposed to using dbt-core commands to execute the transformation. This approach is not new, and it was discussed in Airflow Summit 2023 by Monzo. We (@tatiana ) had a meeting with Monzo to discuss the details afterward, in London.

The overall idea is:

  • Pre-compile the dbt project, so we have the pre-compiled SQL
  • Execute the pre-compiled SQL using native Airflow operators, enabling the deferable mode

For a first end-to-end, we'll only support:

  • SQL models
  • ExecutionMode.LOCAL
  • Databricks

This will allow us to measure the impact of using deferrable operators, and then we can extend to:

Tasks initially planned:

  1. PoC to try to have this feature end-to-end that
    • Introduce ExecutionMode.LOCAL_AIRFLOW_ASYNC (think if we can find a better name)
    • Check how we are populating the compiled_sql template field in Cosmos. If this is being populated after running the task, then, we can simplify the first E2E to use LoadMode.MANIFEST, and assume the compiled SQL is avaialble for Cosmos.
    • If the dbt profile is not related to Databricks, error
    • if it is related to Databricks, than attempt to use the DatabricksSubmitRunOperator to run the previously compiled sql.

Once we've done a PoC, we can then breakdown into other tasks, so we can split the work.

References when we support Snowflake: currently (provider 5.6.0), the official Airflow Snowflake SQLExecuteQueryOperator does not support running SQL using deferrable mode. The two alternatives would be either using this provider's SnowflakeSqlApiOperator which would require end-users to enable/have access to the Snowflake SQL API or using the SnowflakeOperatorAsync from Astronomer Providers. Some future references for when we decide to follow this path:

Tasks originally planned as part of this epic (more to come):

@dosubot dosubot bot added the area:performance Related to performance, like memory usage, CPU usage, speed, etc label Jul 24, 2024
@pankajkoti pankajkoti added this to the Cosmos 1.7.0 milestone Jul 24, 2024
@tatiana tatiana changed the title Increase Task Throughput (Async support) Increase Task Throughput Jul 31, 2024
@tatiana
Copy link
Collaborator

tatiana commented Aug 14, 2024

One of the advantages of this feature would also be to give additional information to end-users that Airflow currently exposes, but dbt does not.

For example, by using dbt, the BigQuery job ID is not printed in the logs, but if we changed Cosmos to use the Airflow BigQuery operator to run the BQ SQL transformations, the logs include the full job ID. We could also expose this information in the Airflow UI.

This was brought up in a discussion with a customer:
https://astronomer.slack.com/archives/C074NP4A8G1/p1722774197657379

@phanikumv phanikumv mentioned this issue Sep 6, 2024
2 tasks
@phanikumv phanikumv removed the epic label Sep 6, 2024
@tatiana tatiana modified the milestones: Cosmos 1.7.0, Triage Sep 20, 2024
tatiana pushed a commit that referenced this issue Sep 30, 2024
This PR is the groundwork for the implementation of
`ExecutionMode.AIRFLOW_ASYNC`
(#1120), which -
once all other epic tasks are completed - will enable asynchronous
execution of dbt resources using Apache Airflow’s deferrable operators.
As part of this work, this PR introduces a new option to the enum
`ExecutionMode` : `AIRFLOW_ASYNC`. When this execution mode is used,
Cosmos now creates a setup task that will pre-compile the dbt project
SQL and make it available to the remaining dbt tasks. This PR, however,
does not yet leverage Airflow's deferrable operators. If users use
`ExecutionMode.AIRFLOW_ASYNC` they will actually be running
`ExecutionMode.LOCAL` operators with this change. The PR (#1230) has a
first experimental version of using deferrable operators for task
execution.

## Setup task as the ground work for a new Execution Mode:
`ExecutionMode.AIRFLOW_ASYNC`:
- Adds a new operator, `DbtCompileAirflowAsyncOperator`, as a root
task(analogous to a setup task) in the DAG, running the dbt compile
command and uploading the compiled SQL files to a remote storage
location for subsequent tasks that fetch these compiled SQL files from
the remote storage and run them asynchronously using Airflow's
deferrable operators.

## Airflow Configurations:
- `remote_target_path`: Introduces a configurable path to store
dbt-generated files remotely, supporting any storage scheme that works
with Airflow’s Object Store (e.g., S3, GCS, Azure Blob).
- `remote_target_path_conn_id`: Allows specifying a custom connection ID
for the remote target path, defaulting to the scheme’s associated
Airflow connection if not set.

## Example DAG for CI Testing:
Introduces an example DAG (`simple_dag_async.py`) demonstrating how to
use the new execution mode(The execution like mentioned earlier would
still run like Execution Mode LOCAL operators at the moment with this PR
alone)
This DAG is integrated into the CI pipeline to run integration tests and
aims at verifying the functionality of the `ExecutionMode.AIRFLOW_ASYNC`
as and when implementation gets added starting with the experimental
implementation in #1230 .

## Unit & Integration Tests:
- Adds comprehensive unit and integration tests to ensure correct
behavior.
- Tests include validation for successful uploads, error handling for
misconfigured remote paths, and scenarios where `remote_target_path` are
not set.

## Documentation:
- Adds detailed documentation explaining how to configure and set the
`ExecutionMode.AIRFLOW_ASYNC`.

## Scope & Limitations of the feature being introduced:
1. This feature is meant to be released as Experimental and is also
marked so in the documentation.
2. Currently, it has been scoped for only dbt models to be executed
asynchronously (being worked upon in PR #1230), while other resource
types would be run synchronously.
3. `BigQuery` will be the only supported target database for this
execution mode ((being worked upon in PR #1230).

Thus, this PR enhances Cosmos by providing the ground work for more
efficient execution of long-running dbt resources

## Additional Notes:
- This feature is planned to be introduced in Cosmos v1.7.0.

related: #1134
tatiana added a commit that referenced this issue Oct 3, 2024
…_ASYNC` (#1230)

Enable BQ users to run dbt models (`full_refresh`) asynchronously. This
releases the Airflow worker node from waiting while the transformation
(I/O) happens in the dataware house, increasing the overall Airflow task
throughput (more information:
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html).
As part of this change, we introduce the capability of not using the dbt
command to run actual SQL transformations. This also avoids creating
subprocesses in the worker node (`ExecutionMode.LOCAL` with
`InvocationMode. SUBPROCESS` and `ExecutionMode.VIRTUALENV`) or the
overhead of creating a Kubernetes Pod to execute the actual dbt command
(`ExecutionMode.KUBERNETES`). This can avoid issues related to memory
and CPU usage.

This PR takes advantage of an already implemented async operator in the
Airflow repo by extending it in the Cosmos async operator. It also
utilizes the pre-compiled SQL generated as part of the PR
#1224. It downloads
the generated SQL from a remote location (S3/GCS), which allows us to
decouple from dbt during task execution.

## Details

- Expose `get_profile_type` on ProfileConfig: This aids in database
selection
- ~Add `async_op_args`: A high-level parameter to forward arguments to
the upstream operator (Airflow operator). (This may change in this PR
itself)~ The async operator params are process as kwargs in the
operator_args parameter
- Implement `DbtRunAirflowAsyncOperator`: This initializes the Airflow
Operator, retrieves the SQL query at task runtime from a remote
location, modifies the query as needed, and triggers the upstream
execute method.

## Limitations

- This feature only works when using Airflow 2.8 and above
- The async execution only works for BigQuery
- The async execution only supports running dbt models (other dbt
resources, such as seeds, sources, snapshots, tests, are run using the
`ExecutionMode.LOCAL`)
- This will work only if the user provides sets `full_refresh=True` in
`operator_args` (which means tables will be dropped before being
populated, as implemented in `dbt-core`)
- Users need to use `ProfileMapping` in `ProfileConfig`, since Cosmos
relies on having the connection (credentials) to be able to run the
transformation in BQ without `dbt-core`
- Users must provide the BQ `location` in `operator_args` (this is a
limitation from the `BigQueryInsertJobOperator` that is being used to
implement the native Airflow asynchronous support)

## Testing 

We have added a new dbt project to the repository to facilitate
asynchronous task execution. The goal is to accelerate development
without disrupting or requiring fixes for the existing tests. Also, we
have added DAG for end-to-end testing
https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py

## Configuration

Users need to configure the below param to execute deferrable tasks in
the Cosmos

- [ExecutionMode:
AIRFLOW_ASYNC](https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html)
-
[remote_target_path](https://astronomer.github.io/astronomer-cosmos/configuration/cosmos-conf.html#remote-target-path)
-
[remote_target_path_conn_id](https://astronomer.github.io/astronomer-cosmos/configuration/cosmos-conf.html#remote-target-path-conn-id)

Example DAG:
https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py

## Installation

You can leverage async operator support by installing an additional
dependency
```
astronomer-cosmos[dbt-bigquery, google]
```


## Documentation 

The PR also document the limitations and uses of Airflow async execution
in the Cosmos.

## Related Issue(s)

Related to: #1120
Closes: #1134

## Breaking Change?

No

## Notes

This is an experimental feature, and as such, it may undergo breaking
changes. We encourage users to share their experiences and feedback to
improve it further.

We'd love support and feedback so we can define the next steps.

## Checklist

- [x] I have made corresponding changes to the documentation (if
required)
- [x] I have added tests that prove my fix is effective or that my
feature works

## Credits

This was a result of teamwork and effort:

Co-authored-by: Pankaj Koti <[email protected]>
Co-authored-by: Tatiana Al-Chueyr <[email protected]>

## Future Work

- Design interface to facilitate the easy addition of new asynchronous
databases operators
#1238
- Improve the test coverage
#1239
- Address the limitations (we need to log these issues)

---------

Co-authored-by: Pankaj Koti <[email protected]>
Co-authored-by: Tatiana Al-Chueyr <[email protected]>
@tatiana tatiana added Q3 and removed Q3 labels Oct 7, 2024
@tatiana
Copy link
Collaborator

tatiana commented Oct 7, 2024

We completed the PoC and released it as part of Cosmos 1.7. We should still log follow-up tickets to address the current limitations and continue on the Async Support long term epic.

@tatiana tatiana closed this as completed Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:performance Related to performance, like memory usage, CPU usage, speed, etc Q3
Projects
None yet
Development

No branches or pull requests

4 participants