Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add support for DAG & TaskGroup level caching (performance improvement) #992

Closed
wants to merge 7 commits into from

Conversation

tatiana
Copy link
Collaborator

@tatiana tatiana commented May 21, 2024

Introduce DAG level cache. Based on initial tests, this can improve the performance of running a Cosmos-powered DAG by 59% (validated with the repo's basic_cosmos_dag).

To enable this feature, users should set the environment variable AIRFLOW__COSMOS__EXPERIMENTAL_CACHE=1.

What is missing (why this is draft PR):

  • Allow users to customise their own calculate_current_version
  • Refactor so there isn't so much code duplicate between DbtDag and TaskGroup
  • Allow users to purge the cache (via env var or param passed in the Airflow UI - second option is probably preferable)
  • Unify _create_cache_identifier and create_cache_identifier_v2
  • Write tests
  • The cache should be invalidated if there is an error unpickling

This will be done in a separate PR:

  • have a way of storing cache remotely (to be addressed in a separate ticket: Support caching remotely #927). There is a concern about how secure is pickle, so we may need to cache representation.
  • button in the UI to purge cache for individual DAG/TaskGroup
  • button in the UI to purge all Cosmos cache
  • enable feature by default before 1.5 release (TODO: log ticket)

Closes: #926
Closes: #990

How to reproduce

Run using the new cache feature:

time AIRFLOW__COSMOS__ENABLE_CACHE=1 AIRFLOW__COSMOS__EXPERIMENTAL_CACHE=1 airflow dags test basic_cosmos_dag  `date -Iseconds`

Ran in 7.03s user 2.59s system 81% cpu 11.808 total
Screenshot 2024-05-21 at 17 28 27

Run without the new cache feature:

time  AIRFLOW__COSMOS__ENABLE_CACHE=1 AIRFLOW__COSMOS__EXPERIMENTAL_CACHE=0 airflow dags test basic_cosmos_dag  `date -Iseconds`

Ran in 18.75s user 2.62s system 73% cpu 28.898 total
Screenshot 2024-05-21 at 17 28 39

Copy link

netlify bot commented May 21, 2024

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit 83480fd
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/664f6d2df440b30008e0fc3a

Copy link

codecov bot commented May 21, 2024

Codecov Report

Attention: Patch coverage is 60.16260% with 49 lines in your changes are missing coverage. Please review.

Project coverage is 94.28%. Comparing base (cb2a27a) to head (83480fd).

Files Patch % Lines
cosmos/airflow/task_group.py 50.00% 19 Missing ⚠️
cosmos/airflow/dag.py 59.45% 15 Missing ⚠️
cosmos/cache.py 66.66% 15 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #992      +/-   ##
==========================================
- Coverage   95.72%   94.28%   -1.44%     
==========================================
  Files          60       60              
  Lines        2926     3047     +121     
==========================================
+ Hits         2801     2873      +72     
- Misses        125      174      +49     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tatiana tatiana changed the title Introduce DAG-level caching Add support for DAG-level caching to futher improve DAG parsing performance May 21, 2024
@tatiana tatiana force-pushed the dag-level-cache branch from 9c616ca to daad0b3 Compare May 23, 2024 01:05
@tatiana tatiana changed the title Add support for DAG-level caching to futher improve DAG parsing performance Add support for DAG & TaskGroup level caching (performance improvement) May 23, 2024
@tatiana tatiana added this to the Cosmos 1.5.0 milestone May 23, 2024
@tatiana
Copy link
Collaborator Author

tatiana commented Jun 3, 2024

When using the current approach in a distributed environment, there are two challenges:

  1. Concurrent tasks (in the same node) try to create the cache at the same time
  2. Tasks running in different nodes (that didn't have the cache) have to generate it

We'll look into improving this.

Examples of the behavior in a distributed Airflow environment:

Image
Image

@tatiana
Copy link
Collaborator Author

tatiana commented Jun 5, 2024

For this approach to work in a distributed Airflow environment, the cache would have to be stored remotely.

storing a pickle in a remote location (e.g., an object store) has security implications. For example, someone could provide malicious shell code as input, causing remote code execution. For this reason, I'm closing this PR.

I'm closing this PR in favor of a more promising dbt ls caching strategy in #1014.

@tatiana tatiana closed this Jun 5, 2024
@tatiana tatiana changed the title Add support for DAG & TaskGroup level caching (performance improvement) WIP: Add support for DAG & TaskGroup level caching (performance improvement) Jun 10, 2024
tatiana added a commit that referenced this pull request Jun 25, 2024
…le (#1014)

Improve significantly the `LoadMode.DBT_LS` performance. The example
DAGs tested reduced the task queueing time significantly (from ~30s to
~0.5s) and the total DAG run time for Jaffle Shop from 1 min 25s to 40s
(by more than 50%). Some users[ reported improvements of
84%](#1014 (comment))
in the DAG run time when trying out these changes. This difference can
be even more significant on larger dbt projects.

The improvement was accomplished by caching the dbt ls output as an
Airflow Variable. This is an alternative to #992, when we cached the
pickled DAG/TaskGroup into a local file in the Airflow node. Unlike
#992, this approach works well for distributed deployments of Airflow.

As with any caching solution, this strategy does not guarantee optimal
performance on every run—whenever the cache is regenerated, the
scheduler or DAG processor will experience a delay. It was also observed
that the key value could change across platforms (e.g., `Darwin` and
`Linux`). Therefore, if using a deployment with heterogeneous OS, the
key may be regenerated often.

Closes: #990
Closes: #1061

**Enabling/disabling this feature**

This feature is enabled by default.
Users can disable it by setting the environment variable
`AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0`.

**How the cache is refreshed**

Users can purge or delete the cache via Airflow UI by identifying and
deleting the cache key.

The cache will be automatically refreshed in case any files of the dbt
project change. Changes are calculated using the SHA256 of all the files
in the directory. Initially, this feature was implemented using the
files' modified timestamp, but this did not work well for some Airflow
deployments (e.g., `astro --dags` since the timestamp was changed during
deployments).

Additionally, if any of the following DAG configurations are changed,
we'll automatically purge the cache of the DAGs that use that specific
configuration:
* `ProjectConfig.dbt_vars`
* `ProjectConfig.env_vars`
* `ProjectConfig.partial_parse`
* `RenderConfig.env_vars`
* `RenderConfig.exclude`
* `RenderConfig.select`
* `RenderConfig.selector`

The following argument was introduced in case users would like to define
Airflow variables that could be used to refresh the cache (it expects a
list with Airflow variable names):
* `RenderConfig.airflow_vars_to_purge_cache`

Example:
```
RenderConfig(
    airflow_vars_to_purge_cache==["refresh_cache"]
)
```

**Cache key**

The Airflow variables that represent the dbt ls cache are prefixed by
`cosmos_cache`. When using `DbtDag`, the keys use the DAG name. When
using `DbtTaskGroup`, they consider the TaskGroup and parent task groups
and DAG.

Examples:
1. The `DbtDag` "cosmos_dag" will have the cache represented by
`"cosmos_cache__basic_cosmos_dag"`.
2. The `DbtTaskGroup` "customers" declared inside teh DAG
"basic_cosmos_task_group" will have the cache key
`"cosmos_cache__basic_cosmos_task_group__customers"`.

**Cache value**

The cache values contain a few properties:
- `last_modified` timestamp, represented using the ISO 8601 format.
- `version` is a hash that represents the version of the dbt project and
arguments used to run dbt ls by the time the cache was created
- `dbt_ls_compressed` represents the dbt ls output compressed using zlib
and encoded to base64 to be recorded as a string to the Airflow metadata
database.

Steps used to compress:
```
        compressed_data = zlib.compress(dbt_ls_output.encode("utf-8"))
        encoded_data = base64.b64encode(compressed_data)
        dbt_ls_compressed = encoded_data.decode("utf-8")
```

We are compressing this value because it will be significant for larger
dbt projects, depending on the selectors used, and we wanted this
approach to be safe and not clutter the Airflow metadata database.

Some numbers on the compression
* A dbt project with 100 models can lead to a dbt ls output of 257k
characters when using JSON. Zlib could compress it by 20x.
* Another [real-life dbt
project](https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt?ref_type=heads)
with 9,285 models led to a dbt ls output of 8.4 MB, uncompressed. It
reduces to 489 KB after being compressed using `zlib` and encoded using
`base64` - to 6% of the original size.
* Maximum cell size in Postgres: 20MB

The latency used to compress is in the order of milliseconds, not
interfering in the performance of this solution.

**Future work**

* How this will affect the Airflow db in the long term
* How does this performance compare to `ObjectStorage`?

**Example of results before and after this change**

Task queue times in Astro before the change:
<img width="1488" alt="Screenshot 2024-06-03 at 11 15 26"
src="https://github.com/astronomer/astronomer-cosmos/assets/272048/20f6ae8f-02e0-4974-b445-740925ab1b3c">

Task queue times in Astro after the change on the second run of the DAG:
<img width="1624" alt="Screenshot 2024-06-03 at 11 15 44"
src="https://github.com/astronomer/astronomer-cosmos/assets/272048/c7b8a821-8751-4d2c-8feb-1d0c9bbba97e">

This feature will be available in `astronomer-cosmos==1.5.0a8`.
arojasb3 pushed a commit to arojasb3/astronomer-cosmos that referenced this pull request Jul 14, 2024
…le (astronomer#1014)

Improve significantly the `LoadMode.DBT_LS` performance. The example
DAGs tested reduced the task queueing time significantly (from ~30s to
~0.5s) and the total DAG run time for Jaffle Shop from 1 min 25s to 40s
(by more than 50%). Some users[ reported improvements of
84%](astronomer#1014 (comment))
in the DAG run time when trying out these changes. This difference can
be even more significant on larger dbt projects.

The improvement was accomplished by caching the dbt ls output as an
Airflow Variable. This is an alternative to astronomer#992, when we cached the
pickled DAG/TaskGroup into a local file in the Airflow node. Unlike
astronomer#992, this approach works well for distributed deployments of Airflow.

As with any caching solution, this strategy does not guarantee optimal
performance on every run—whenever the cache is regenerated, the
scheduler or DAG processor will experience a delay. It was also observed
that the key value could change across platforms (e.g., `Darwin` and
`Linux`). Therefore, if using a deployment with heterogeneous OS, the
key may be regenerated often.

Closes: astronomer#990
Closes: astronomer#1061

**Enabling/disabling this feature**

This feature is enabled by default.
Users can disable it by setting the environment variable
`AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0`.

**How the cache is refreshed**

Users can purge or delete the cache via Airflow UI by identifying and
deleting the cache key.

The cache will be automatically refreshed in case any files of the dbt
project change. Changes are calculated using the SHA256 of all the files
in the directory. Initially, this feature was implemented using the
files' modified timestamp, but this did not work well for some Airflow
deployments (e.g., `astro --dags` since the timestamp was changed during
deployments).

Additionally, if any of the following DAG configurations are changed,
we'll automatically purge the cache of the DAGs that use that specific
configuration:
* `ProjectConfig.dbt_vars`
* `ProjectConfig.env_vars`
* `ProjectConfig.partial_parse`
* `RenderConfig.env_vars`
* `RenderConfig.exclude`
* `RenderConfig.select`
* `RenderConfig.selector`

The following argument was introduced in case users would like to define
Airflow variables that could be used to refresh the cache (it expects a
list with Airflow variable names):
* `RenderConfig.airflow_vars_to_purge_cache`

Example:
```
RenderConfig(
    airflow_vars_to_purge_cache==["refresh_cache"]
)
```

**Cache key**

The Airflow variables that represent the dbt ls cache are prefixed by
`cosmos_cache`. When using `DbtDag`, the keys use the DAG name. When
using `DbtTaskGroup`, they consider the TaskGroup and parent task groups
and DAG.

Examples:
1. The `DbtDag` "cosmos_dag" will have the cache represented by
`"cosmos_cache__basic_cosmos_dag"`.
2. The `DbtTaskGroup` "customers" declared inside teh DAG
"basic_cosmos_task_group" will have the cache key
`"cosmos_cache__basic_cosmos_task_group__customers"`.

**Cache value**

The cache values contain a few properties:
- `last_modified` timestamp, represented using the ISO 8601 format.
- `version` is a hash that represents the version of the dbt project and
arguments used to run dbt ls by the time the cache was created
- `dbt_ls_compressed` represents the dbt ls output compressed using zlib
and encoded to base64 to be recorded as a string to the Airflow metadata
database.

Steps used to compress:
```
        compressed_data = zlib.compress(dbt_ls_output.encode("utf-8"))
        encoded_data = base64.b64encode(compressed_data)
        dbt_ls_compressed = encoded_data.decode("utf-8")
```

We are compressing this value because it will be significant for larger
dbt projects, depending on the selectors used, and we wanted this
approach to be safe and not clutter the Airflow metadata database.

Some numbers on the compression
* A dbt project with 100 models can lead to a dbt ls output of 257k
characters when using JSON. Zlib could compress it by 20x.
* Another [real-life dbt
project](https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt?ref_type=heads)
with 9,285 models led to a dbt ls output of 8.4 MB, uncompressed. It
reduces to 489 KB after being compressed using `zlib` and encoded using
`base64` - to 6% of the original size.
* Maximum cell size in Postgres: 20MB

The latency used to compress is in the order of milliseconds, not
interfering in the performance of this solution.

**Future work**

* How this will affect the Airflow db in the long term
* How does this performance compare to `ObjectStorage`?

**Example of results before and after this change**

Task queue times in Astro before the change:
<img width="1488" alt="Screenshot 2024-06-03 at 11 15 26"
src="https://github.com/astronomer/astronomer-cosmos/assets/272048/20f6ae8f-02e0-4974-b445-740925ab1b3c">

Task queue times in Astro after the change on the second run of the DAG:
<img width="1624" alt="Screenshot 2024-06-03 at 11 15 44"
src="https://github.com/astronomer/astronomer-cosmos/assets/272048/c7b8a821-8751-4d2c-8feb-1d0c9bbba97e">

This feature will be available in `astronomer-cosmos==1.5.0a8`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce task queueing latency when using Cosmos Cache TaskGroup/DAG regardless of the load_method
1 participant