diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index c36db366e..ee03e17f9 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -26,7 +26,7 @@ from cosmos.core.graph.entities import Task as TaskMetadata from cosmos.dbt.graph import DbtNode from cosmos.log import get_logger -from cosmos.settings import enable_setup_async_task, enable_teardown_task +from cosmos.settings import enable_setup_async_task, enable_teardown_async_task logger = get_logger(__name__) @@ -627,7 +627,7 @@ def build_airflow_graph( create_airflow_task_dependencies(nodes, tasks_map) if enable_setup_async_task: _add_dbt_setup_async_task(dag, execution_mode, task_args, tasks_map, task_group, render_config=render_config) - if enable_teardown_task: + if enable_teardown_async_task: _add_teardown_task(dag, execution_mode, task_args, tasks_map, task_group, render_config=render_config) return tasks_map diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 5f1068675..45b546d6a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -37,7 +37,7 @@ from cosmos.exceptions import AirflowCompatibilityError, CosmosDbtRunError, CosmosValueError from cosmos.settings import ( enable_setup_async_task, - enable_teardown_task, + enable_teardown_async_task, remote_target_path, remote_target_path_conn_id, ) @@ -477,7 +477,7 @@ def _handle_post_execution(self, tmp_project_dir: str, context: Context) -> None self.callback(tmp_project_dir, **self.callback_args) def _handle_async_execution(self, tmp_project_dir: str, context: Context, async_context: dict[str, Any]) -> None: - if async_context.get("teardown_task") and enable_teardown_task: + if async_context.get("teardown_task") and enable_teardown_async_task: self._delete_sql_files(Path(tmp_project_dir), "run") return diff --git a/cosmos/settings.py b/cosmos/settings.py index 60401805d..05c05f46f 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -39,7 +39,7 @@ # Related to async operators enable_setup_async_task = conf.getboolean("cosmos", "enable_setup_async_task", fallback=True) -enable_teardown_task = conf.getboolean("cosmos", "", fallback=True) +enable_teardown_async_task = conf.getboolean("cosmos", "enable_teardown_async_task", fallback=True) AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")