Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Unable to do partial parsing after disabling Cosmos caching. #1041

Closed
1 task
josefbits opened this issue Jun 11, 2024 · 3 comments · Fixed by #1070
Closed
1 task

[Bug] Unable to do partial parsing after disabling Cosmos caching. #1041

josefbits opened this issue Jun 11, 2024 · 3 comments · Fixed by #1070
Assignees
Labels
area:parsing Related to parsing DAG/DBT improvement, issues, or fixes bug Something isn't working dbt:parse Primarily related to dbt parse command or functionality execution:virtualenv Related to Virtualenv execution environment parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing profile:postgres Related to Postgres ProfileConfig triage-needed Items need to be reviewed / assigned to milestone
Milestone

Comments

@josefbits
Copy link

josefbits commented Jun 11, 2024

Astronomer Cosmos Version

Other Astronomer Cosmos version (please specify below)

If "Other Astronomer Cosmos version" selected, which one?

1.4.3

dbt-core version

1.7.15

Versions of dbt adapters

dbt-postgres 1.7.15

LoadMode

DBT_LS_MANIFEST

ExecutionMode

VIRTUALENV

InvocationMode

DBT_RUNNER

airflow version

2.8.2

Operating System

Ubuntu 22.04.4 LTS

If a you think it's an UI issue, what browsers are you seeing the problem on?

No response

Deployment

Docker-Compose

Deployment details

  • Docker version 25.0.3, build 4debf41
  • Docker Compose version v2.24.6

What happened?

After setting AIRFLOW__COSMOS__ENABLE_CACHE=False dbt tasks are unable to run partial parsing due to the error "Unable to do partial parsing because saved manifest not found. Starting full parse."

The dbt artifacts are pre-generated in CI and the entire target/ directory synced to the dbt project path on the Airflow host.

Relevant log output

**** With AIRFLOW__COSMOS__ENABLE_CACHE=False ****
08a120f7b403
*** Found local files:
***   * /opt/airflow/logs/dag_id=cosmos_dag/run_id=manual__2024-06-11T13:37:41.235278+00:00/task_id=run_models.raw_payments_seed/attempt=1.log
[2024-06-11, 13:37:47 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:37:41.235278+00:00 [queued]>
[2024-06-11, 13:37:47 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:37:41.235278+00:00 [queued]>
[2024-06-11, 13:37:47 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 3
[2024-06-11, 13:37:47 UTC] {taskinstance.py:2214} INFO - Executing <Task(DbtSeedVirtualenvOperator): run_models.raw_payments_seed> on 2024-06-11 13:37:41.235278+00:00
[2024-06-11, 13:37:47 UTC] {standard_task_runner.py:60} INFO - Started process 1265 to run task
[2024-06-11, 13:37:47 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'cosmos_dag', 'run_models.raw_payments_seed', 'manual__2024-06-11T13:37:41.235278+00:00', '--job-id', '862', '--raw', '--subdir', 'DAGS_FOLDER/cosmos_dag.py', '--cfg-path', '/tmp/tmppaayxmna']
[2024-06-11, 13:37:47 UTC] {standard_task_runner.py:88} INFO - Job 862: Subtask run_models.raw_payments_seed
[2024-06-11, 13:37:47 UTC] {task_command.py:423} INFO - Running <TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:37:41.235278+00:00 [running]> on host 08a120f7b403
[2024-06-11, 13:37:47 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='cosmos_dag' AIRFLOW_CTX_TASK_ID='run_models.raw_payments_seed' AIRFLOW_CTX_EXECUTION_DATE='2024-06-11T13:37:41.235278+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-06-11T13:37:41.235278+00:00'
[2024-06-11, 13:37:47 UTC] {local.py:292} INFO - Cloning project to writable temp directory /tmp/tmpn1tuhvtw from /opt/airflow/dags/dbt/jaffle_shop
[2024-06-11, 13:37:47 UTC] {config.py:266} INFO - Using user-supplied profiles.yml at /opt/***/dags/dbt/jaffle_shop/profiles.yml
[2024-06-11, 13:37:47 UTC] {subprocess.py:60} INFO - Tmp dir root location: 
 /tmp
[2024-06-11, 13:37:47 UTC] {subprocess.py:73} INFO - Running command: ['/opt/***/dbt_venv/bin/dbt', 'seed', '--models', 'raw_payments', '--project-dir', '/tmp/tmpn1tuhvtw', '--profiles-dir', '/opt/***/dags/dbt/jaffle_shop', '--profile', 'jaffle_shop', '--target', 'dev']
[2024-06-11, 13:37:47 UTC] {subprocess.py:84} INFO - Command output:
[2024-06-11, 13:37:57 UTC] {subprocess.py:94} INFO - �[0m13:37:57  Running with dbt=1.7.15
[2024-06-11, 13:37:58 UTC] {subprocess.py:94} INFO - �[0m13:37:58  Registered adapter: postgres=1.7.15
[2024-06-11, 13:37:58 UTC] {subprocess.py:94} INFO - �[0m13:37:58  Unable to do partial parsing because saved manifest not found. Starting full parse.
[2024-06-11, 13:38:05 UTC] {subprocess.py:94} INFO - �[0m13:38:05  Found 5 models, 3 seeds, 20 tests, 0 sources, 0 exposures, 0 metrics, 401 macros, 0 groups, 0 semantic models


**** With AIRFLOW__COSMOS__ENABLE_CACHE=True ****
1cf95644acec
*** Found local files:
***   * /opt/airflow/logs/dag_id=cosmos_dag/run_id=manual__2024-06-11T13:55:34.501323+00:00/task_id=run_models.raw_payments_seed/attempt=1.log
[2024-06-11, 13:55:43 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:55:34.501323+00:00 [queued]>
[2024-06-11, 13:55:43 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:55:34.501323+00:00 [queued]>
[2024-06-11, 13:55:43 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 3
[2024-06-11, 13:55:43 UTC] {taskinstance.py:2214} INFO - Executing <Task(DbtSeedVirtualenvOperator): run_models.raw_payments_seed> on 2024-06-11 13:55:34.501323+00:00
[2024-06-11, 13:55:43 UTC] {standard_task_runner.py:60} INFO - Started process 75 to run task
[2024-06-11, 13:55:43 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'cosmos_dag', 'run_models.raw_payments_seed', 'manual__2024-06-11T13:55:34.501323+00:00', '--job-id', '874', '--raw', '--subdir', 'DAGS_FOLDER/cosmos_dag.py', '--cfg-path', '/tmp/tmpb4__880_']
[2024-06-11, 13:55:43 UTC] {standard_task_runner.py:88} INFO - Job 874: Subtask run_models.raw_payments_seed
[2024-06-11, 13:55:43 UTC] {task_command.py:423} INFO - Running <TaskInstance: cosmos_dag.run_models.raw_payments_seed manual__2024-06-11T13:55:34.501323+00:00 [running]> on host 1cf95644acec
[2024-06-11, 13:55:43 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='cosmos_dag' AIRFLOW_CTX_TASK_ID='run_models.raw_payments_seed' AIRFLOW_CTX_EXECUTION_DATE='2024-06-11T13:55:34.501323+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-06-11T13:55:34.501323+00:00'
[2024-06-11, 13:55:43 UTC] {local.py:292} INFO - Cloning project to writable temp directory /tmp/tmpeszetm6h from /opt/airflow/dags/dbt/jaffle_shop
[2024-06-11, 13:55:43 UTC] {local.py:303} INFO - Partial parse is enabled and the latest partial parse file is /opt/airflow/dags/dbt/jaffle_shop/target/partial_parse.msgpack
[2024-06-11, 13:55:44 UTC] {config.py:266} INFO - Using user-supplied profiles.yml at /opt/***/dags/dbt/jaffle_shop/profiles.yml
[2024-06-11, 13:55:44 UTC] {subprocess.py:60} INFO - Tmp dir root location: 
 /tmp
[2024-06-11, 13:55:44 UTC] {subprocess.py:73} INFO - Running command: ['/opt/***/dbt_venv/bin/dbt', 'seed', '--models', 'raw_payments', '--project-dir', '/tmp/tmpeszetm6h', '--profiles-dir', '/opt/***/dags/dbt/jaffle_shop', '--profile', 'jaffle_shop', '--target', 'dev']
[2024-06-11, 13:55:44 UTC] {subprocess.py:84} INFO - Command output:[2024-06-11, 13:55:54 UTC] {subprocess.py:94} INFO - �[0m13:55:54  Running with dbt=1.7.15
[2024-06-11, 13:55:55 UTC] {subprocess.py:94} INFO - �[0m13:55:55  Registered adapter: postgres=1.7.15
[2024-06-11, 13:55:55 UTC] {subprocess.py:94} INFO - �[0m13:55:55  Found 5 models, 3 seeds, 20 tests, 0 sources, 0 exposures, 0 metrics, 401 macros, 0 groups, 0 semantic models

How to reproduce

Set AIRFLOW__COSMOS__ENABLE_CACHE=False environment variable in the Airflow configuration.

Anything else :)?

I have been able to reproduce this in our MWAA deployment as well. It's also running astronomer-cosmos v1.4.3 but on a different Airflow version, dbt-core version, and the dbt-teradata adapter so it doesn't seem to be environment specific.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Contact Details

@josefbits josefbits added bug Something isn't working triage-needed Items need to be reviewed / assigned to milestone labels Jun 11, 2024
@dosubot dosubot bot added area:parsing Related to parsing DAG/DBT improvement, issues, or fixes dbt:parse Primarily related to dbt parse command or functionality execution:virtualenv Related to Virtualenv execution environment parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing profile:postgres Related to Postgres ProfileConfig labels Jun 11, 2024
Copy link

dosubot bot commented Jun 12, 2024

Hello @josefbits! I'm here to help you with any bugs, questions, or contributions you might have. Let me know how I can assist you!

To resolve the issue with partial parsing in dbt after disabling Cosmos caching in Airflow, you need to ensure that the partial parse file and the manifest file are correctly copied and updated in the target dbt project directory. Here is a step-by-step approach:

  1. Identify the latest partial parse file:
    Use the _get_latest_partial_parse function to determine the most up-to-date partial parse file.

  2. Update the cache with the latest partial parse file:
    Use the _update_partial_parse_cache function to update the cache directory with the latest partial parse file contents.

  3. Copy the partial parse file to the project directory:
    Use the _copy_partial_parse_to_project function to copy the partial parse file and the manifest file to the target dbt project directory.

Here is the relevant code to perform these steps:

from pathlib import Path
from cosmos.dbt.project import get_partial_parse_path
from cosmos.cache import _get_latest_partial_parse, _update_partial_parse_cache, _copy_partial_parse_to_project

# Define the paths
dbt_project_path = Path("/path/to/your/dbt/project")
cache_dir = Path("/path/to/your/cache/directory")

# Step 1: Identify the latest partial parse file
latest_partial_parse_filepath = _get_latest_partial_parse(dbt_project_path, cache_dir)

if latest_partial_parse_filepath:
    # Step 2: Update the cache with the latest partial parse file
    _update_partial_parse_cache(latest_partial_parse_filepath, cache_dir)
    
    # Step 3: Copy the partial parse file to the project directory
    _copy_partial_parse_to_project(latest_partial_parse_filepath, dbt_project_path)
else:
    print("No partial parse file found.")

Ensure that the paths to your dbt project and cache directory are correctly specified. This code will help maintain the partial parse file and manifest file in sync, thus resolving the issue with partial parsing in dbt [1].

To continue talking to Dosu, mention @dosu.

@tatiana tatiana added this to the Cosmos 1.5.0 milestone Jun 12, 2024
@tatiana tatiana self-assigned this Jun 12, 2024
@josefbits
Copy link
Author

josefbits commented Jun 16, 2024

Hello @josefbits! I'm here to help you with any bugs, questions, or contributions you might have. Let me know how I can assist you!

To resolve the issue with partial parsing in dbt after disabling Cosmos caching in Airflow, you need to ensure that the partial parse file and the manifest file are correctly copied and updated in the target dbt project directory. Here is a step-by-step approach:

  1. Identify the latest partial parse file:
    Use the _get_latest_partial_parse function to determine the most up-to-date partial parse file.
  2. Update the cache with the latest partial parse file:
    Use the _update_partial_parse_cache function to update the cache directory with the latest partial parse file contents.
  3. Copy the partial parse file to the project directory:
    Use the _copy_partial_parse_to_project function to copy the partial parse file and the manifest file to the target dbt project directory.

Here is the relevant code to perform these steps:

from pathlib import Path
from cosmos.dbt.project import get_partial_parse_path
from cosmos.cache import _get_latest_partial_parse, _update_partial_parse_cache, _copy_partial_parse_to_project

# Define the paths
dbt_project_path = Path("/path/to/your/dbt/project")
cache_dir = Path("/path/to/your/cache/directory")

# Step 1: Identify the latest partial parse file
latest_partial_parse_filepath = _get_latest_partial_parse(dbt_project_path, cache_dir)

if latest_partial_parse_filepath:
    # Step 2: Update the cache with the latest partial parse file
    _update_partial_parse_cache(latest_partial_parse_filepath, cache_dir)
    
    # Step 3: Copy the partial parse file to the project directory
    _copy_partial_parse_to_project(latest_partial_parse_filepath, dbt_project_path)
else:
    print("No partial parse file found.")

Ensure that the paths to your dbt project and cache directory are correctly specified. This code will help maintain the partial parse file and manifest file in sync, thus resolving the issue with partial parsing in dbt [1].

To continue talking to Dosu, mention @dosu.

@dosu @tatiana If I understand correctly, during execution Cosmos will symlink the project files from the dbt project path to the temporary directory. However, as per the create_symlinks function it ignores logs, target, and profiles.yml. As per the above solution, copying the manifests to the project directory doesn't solve that problem. It's possible to work around this in some cases by specifying a target path like:

operator_args={"dbt_cmd_flags": ["--target-path", f"{DBT_PROJECT_PATH}/target"],},

However, this will not work on most managed solutions using an object storage back end as the directory is read-only. Of course it can be synced to Airflow worker local disk, but this brings us full circle to caching and whether or not concurrent context updates to the manifests will be a problem (#1042).

IMO the simplest solution would be to check for existing manifests, either in the dbt project target/ directory or, when using LoadMode.DBT_MANIFEST or AUTOMATIC, the manifest_path root provided in the ProjectConfig. These could then be copied instead to the temporary directory using a function similar to _copy_partial_parse_to_xxx.

Copy link

dosubot bot commented Jun 16, 2024

You're correct that Cosmos symlinks project files to a temporary directory, ignoring logs, target, and profiles.yml. This can indeed cause issues with partial parsing when AIRFLOW__COSMOS__ENABLE_CACHE is set to False.

To address this, you can modify the process to check for existing manifests in the target/ directory or the manifest_path root provided in the ProjectConfig. If found, these manifests can be copied to the temporary directory. Here’s how you can implement this:

  1. Check for Existing Manifests: Before symlinking, check if the manifest files exist in the target/ directory or the manifest_path.

  2. Copy Manifests to Temporary Directory: If manifests are found, copy them to the temporary directory.

Here’s a code snippet to illustrate this approach:

from pathlib import Path
import shutil

def copy_manifests_to_temp(project_path: Path, tmp_dir: Path) -> None:
    """Copy existing manifest files to the temporary directory."""
    target_dir = project_path / "target"
    manifest_files = ["manifest.json", "partial_parse.msgpack"]

    for manifest_file in manifest_files:
        source_path = target_dir / manifest_file
        if source_path.exists():
            shutil.copy(source_path, tmp_dir / manifest_file)

def create_symlinks_with_manifests(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None:
    """Create symlinks to the dbt project files and copy manifests to the temp directory."""
    ignore_paths = ["logs", "target", "profiles.yml"]
    if ignore_dbt_packages:
        ignore_paths.append("dbt_packages")

    for child_name in os.listdir(project_path):
        if child_name not in ignore_paths:
            os.symlink(project_path / child_name, tmp_dir / child_name)

    # Copy manifests to the temporary directory
    copy_manifests_to_temp(project_path, tmp_dir)

# Example usage
dbt_project_path = Path("/path/to/dbt/project")
tmp_dir = Path("/path/to/temp/directory")
create_symlinks_with_manifests(dbt_project_path, tmp_dir, ignore_dbt_packages=False)

This approach ensures that the necessary manifest files are available in the temporary directory, enabling partial parsing even when caching is disabled. This should help mitigate issues with concurrent context updates to the manifests in environments with multiple Airflow workers or managed solutions using object storage backends [1][2].

To continue talking to Dosu, mention @dosu.

tatiana added a commit that referenced this issue Jun 27, 2024
tatiana added a commit that referenced this issue Jun 27, 2024
pankajkoti pushed a commit that referenced this issue Jun 27, 2024
Partial parsing support was introduced in #800 and improved in #904
(caching). However, as the caching layer was introduced, we removed
support to use partial parsing if the cache was disabled.

This PR solves the issue.

Fix: #1041
arojasb3 pushed a commit to arojasb3/astronomer-cosmos that referenced this issue Jul 14, 2024
Partial parsing support was introduced in astronomer#800 and improved in astronomer#904
(caching). However, as the caching layer was introduced, we removed
support to use partial parsing if the cache was disabled.

This PR solves the issue.

Fix: astronomer#1041
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:parsing Related to parsing DAG/DBT improvement, issues, or fixes bug Something isn't working dbt:parse Primarily related to dbt parse command or functionality execution:virtualenv Related to Virtualenv execution environment parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing profile:postgres Related to Postgres ProfileConfig triage-needed Items need to be reviewed / assigned to milestone
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants