Skip to content

Commit

Permalink
Gracefully error when users set imcompatible RenderConfig.dbt_deps
Browse files Browse the repository at this point in the history
…and `operator_args` `install_deps` (#1505)

Customer is facing this error when `RenderConfig(dbt_deps=True)` and
`operator_args={"install_deps": False}`:

![Image](https://github.com/user-attachments/assets/495e46c8-419d-48d3-9bb4-e8955ce4c43e)

This issue does not happen if both of them are `False`.

Closes: #1458
Closes: #1457
  • Loading branch information
tatiana authored Feb 6, 2025
1 parent 24108f0 commit a94f901
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 22 deletions.
39 changes: 27 additions & 12 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from cosmos import cache, settings
from cosmos.airflow.graph import build_airflow_graph
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import ExecutionMode
from cosmos.constants import ExecutionMode, LoadMode
from cosmos.dbt.graph import DbtGraph
from cosmos.dbt.project import has_non_empty_dependencies_file
from cosmos.dbt.selector import retrieve_by_label
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
Expand Down Expand Up @@ -67,11 +68,11 @@ def airflow_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]:


def validate_arguments(
select: list[str],
exclude: list[str],
render_config: RenderConfig,
profile_config: ProfileConfig,
task_args: dict[str, Any],
execution_mode: ExecutionMode,
execution_config: ExecutionConfig,
project_config: ProjectConfig,
) -> None:
"""
Validate that mutually exclusive selectors filters have not been given.
Expand All @@ -84,8 +85,8 @@ def validate_arguments(
:param execution_mode: the current execution mode
"""
for field in ("tags", "paths"):
select_items = retrieve_by_label(select, field)
exclude_items = retrieve_by_label(exclude, field)
select_items = retrieve_by_label(render_config.select, field)
exclude_items = retrieve_by_label(render_config.exclude, field)
intersection = {str(item) for item in set(select_items).intersection(exclude_items)}
if intersection:
raise CosmosValueError(f"Can't specify the same {field[:-1]} in `select` and `exclude`: " f"{intersection}")
Expand All @@ -96,8 +97,21 @@ def validate_arguments(
if profile_config.profile_mapping:
profile_config.profile_mapping.profile_args["schema"] = task_args["schema"]

if execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]:
if execution_config.execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]:
profile_config.validate_profiles_yml()
has_non_empty_dependencies = execution_config.project_path and has_non_empty_dependencies_file(
execution_config.project_path
)
if (
has_non_empty_dependencies
and (
render_config.load_method == LoadMode.DBT_LS
or (render_config.load_method == LoadMode.AUTOMATIC and not project_config.is_manifest_available())
)
and (render_config.dbt_deps != task_args.get("install_deps", True))
):
err_msg = f"When using `LoadMode.DBT_LS` and `{execution_config.execution_mode}`, the value of `dbt_deps` in `RenderConfig` should be the same as the `operator_args['install_deps']` value."
raise CosmosValueError(err_msg)


def validate_initial_user_config(
Expand Down Expand Up @@ -283,12 +297,13 @@ def __init__(
task_args["invocation_mode"] = execution_config.invocation_mode

validate_arguments(
render_config.select,
render_config.exclude,
profile_config,
task_args,
execution_mode=execution_config.execution_mode,
execution_config=execution_config,
profile_config=profile_config,
render_config=render_config,
task_args=task_args,
project_config=project_config,
)

if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
task_args["virtualenv_dir"] = execution_config.virtualenv_dir

Expand Down
9 changes: 3 additions & 6 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ def has_non_empty_dependencies_file(project_path: Path) -> bool:
:returns: True or False
"""
project_dir = Path(project_path)
has_deps = False
for filename in DBT_DEPENDENCIES_FILE_NAMES:
filepath = project_dir / filename
if filepath.exists() and filepath.stat().st_size > 0:
has_deps = True
break
return True

if not has_deps:
logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}")
return has_deps
logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}")
return False


def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None:
Expand Down
1 change: 1 addition & 0 deletions tests/listeners/test_dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def test_is_cosmos_dag_is_true():
profile_config=profile_config,
start_date=datetime(2023, 1, 1),
dag_id="basic_cosmos_dag",
operator_args={"install_dep": True},
)
assert total_cosmos_tasks(dag) == 13

Expand Down
50 changes: 46 additions & 4 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,60 @@
SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/"
SAMPLE_DBT_MANIFEST = Path(__file__).parent / "sample/manifest.json"
MULTIPLE_PARENTS_TEST_DBT_PROJECT = Path(__file__).parent.parent / "dev/dags/dbt/multiple_parents_test/"
DBT_PROJECTS_PROJ_WITH_DEPS_DIR = Path(__file__).parent.parent / "dev/dags/dbt" / "jaffle_shop"


@pytest.mark.parametrize("argument_key", ["tags", "paths"])
def test_validate_arguments_tags(argument_key):
selector_name = argument_key[:-1]
select = [f"{selector_name}:a,{selector_name}:b"]
exclude = [f"{selector_name}:b,{selector_name}:c"]
project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST, project_name="xubiru")
render_config = RenderConfig(
select=[f"{selector_name}:a,{selector_name}:b"], exclude=[f"{selector_name}:b,{selector_name}:c"]
)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
task_args = {}
with pytest.raises(CosmosValueError) as err:
validate_arguments(select, exclude, profile_config, task_args, execution_mode=ExecutionMode.LOCAL)
validate_arguments(
execution_config=execution_config,
profile_config=profile_config,
project_config=project_config,
render_config=render_config,
task_args=task_args,
)
expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}"
assert err.value.args[0] == expected


def test_validate_arguments_exception():
render_config = RenderConfig(load_method=LoadMode.DBT_LS, dbt_deps=False)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL, dbt_project_path=DBT_PROJECTS_PROJ_WITH_DEPS_DIR
)
project_config = ProjectConfig()

task_args = {"install_deps": True} # this has to be the opposite of RenderConfig.dbt_deps
with pytest.raises(CosmosValueError) as err:
validate_arguments(
execution_config=execution_config,
profile_config=profile_config,
project_config=project_config,
render_config=render_config,
task_args=task_args,
)
expected = "When using `LoadMode.DBT_LS` and `ExecutionMode.LOCAL`, the value of `dbt_deps` in `RenderConfig` should be the same as the `operator_args['install_deps']` value."
assert err.value.args[0] == expected


@pytest.mark.parametrize(
"execution_mode",
(ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV),
Expand Down Expand Up @@ -110,14 +145,21 @@ def test_validate_user_config_fails_project_config_render_config_env_vars():


def test_validate_arguments_schema_in_task_args():
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL, dbt_project_path="/tmp/project-dir")
render_config = RenderConfig()
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
task_args = {"schema": "abcd"}
project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST, project_name="something")
validate_arguments(
select=[], exclude=[], profile_config=profile_config, task_args=task_args, execution_mode=ExecutionMode.LOCAL
execution_config=execution_config,
profile_config=profile_config,
render_config=render_config,
task_args=task_args,
project_config=project_config,
)
assert profile_config.profile_mapping.profile_args["schema"] == "abcd"

Expand Down

0 comments on commit a94f901

Please sign in to comment.