diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index b22f0115a..a1925bdc5 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -55,6 +55,7 @@ class DbtBaseOperator(BaseOperator): :param dbt_executable_path: Path to dbt executable can be used with venv (i.e. /home/astro/.pyenv/versions/dbt_venv/bin/dbt) :param dbt_cmd_flags: List of flags to pass to dbt command + :param dbt_cmd_global_flags: List of dbt global flags to be passed to the dbt command """ template_fields: Sequence[str] = ("env", "vars") @@ -100,6 +101,7 @@ def __init__( cancel_query_on_kill: bool = True, dbt_executable_path: str = "dbt", dbt_cmd_flags: list[str] | None = None, + dbt_cmd_global_flags: list[str] | None = None, **kwargs: Any, ) -> None: self.project_dir = project_dir @@ -132,6 +134,7 @@ def __init__( else: self.dbt_executable_path = dbt_executable_path self.dbt_cmd_flags = dbt_cmd_flags + self.dbt_cmd_global_flags = dbt_cmd_global_flags or [] super().__init__(**kwargs) def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]: @@ -210,6 +213,8 @@ def build_cmd( ) -> Tuple[list[str | None], dict[str, str | bytes | os.PathLike[Any]]]: dbt_cmd = [self.dbt_executable_path] + dbt_cmd.extend(self.dbt_cmd_global_flags) + if self.base_cmd: dbt_cmd.extend(self.base_cmd) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 27cfc9f73..9ad2ac432 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -48,7 +48,24 @@ def test_dbt_base_operator_add_user_supplied_flags() -> None: cmd, _ = dbt_base_operator.build_cmd( Context(execution_date=datetime(2023, 2, 15, 12, 30)), ) - assert "--full-refresh" in cmd + assert cmd[-2] == "run" + assert cmd[-1] == "--full-refresh" + + +def test_dbt_base_operator_add_user_supplied_global_flags() -> None: + dbt_base_operator = DbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + base_cmd=["run"], + dbt_cmd_global_flags=["--cache-selected-only"], + ) + + cmd, _ = dbt_base_operator.build_cmd( + Context(execution_date=datetime(2023, 2, 15, 12, 30)), + ) + assert cmd[-2] == "--cache-selected-only" + assert cmd[-1] == "run" @pytest.mark.parametrize(