From 5ae74bfd2954b7cd9b5befb4898f81ee1530d57b Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 25 Jan 2024 10:08:09 +0000 Subject: [PATCH 1/3] Fix forwarding selectors to AFTERALL test Closes: #643 --- cosmos/airflow/graph.py | 15 +++++++++++---- cosmos/converter.py | 3 +-- cosmos/operators/base.py | 25 +++++++++++++++++++++++++ cosmos/operators/local.py | 2 +- tests/airflow/test_graph.py | 13 ++++++++++--- tests/operators/test_local.py | 11 ++++++----- 6 files changed, 54 insertions(+), 15 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 615c2a124..06080260a 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -14,6 +14,7 @@ TESTABLE_DBT_RESOURCES, DEFAULT_DBT_RESOURCES, ) +from cosmos.config import RenderConfig from cosmos.core.airflow import get_airflow_task as create_airflow_task from cosmos.core.graph.entities import Task as TaskMetadata from cosmos.dbt.graph import DbtNode @@ -66,6 +67,7 @@ def create_test_task_metadata( task_args: dict[str, Any], on_warning_callback: Callable[..., Any] | None = None, node: DbtNode | None = None, + render_config: RenderConfig | None = None, ) -> TaskMetadata: """ Create the metadata that will be used to instantiate the Airflow Task that will be used to run the Dbt test node. @@ -89,6 +91,11 @@ def create_test_task_metadata( task_args["select"] = f"source:{node.resource_name}" else: # tested with node.resource_type == DbtResourceType.SEED or DbtResourceType.SNAPSHOT task_args["select"] = node.resource_name + elif render_config is not None: # TestBehavior.AFTER_ALL + task_args["select"] = render_config.select + task_args["selector"] = render_config.selector + task_args["exclude"] = render_config.exclude + return TaskMetadata( id=test_task_name, operator_class=calculate_operator_class( @@ -199,12 +206,11 @@ def build_airflow_graph( dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups execution_mode: ExecutionMode, # Cosmos-specific - decide what which class to use task_args: dict[str, Any], # Cosmos/DBT - used to instantiate tasks - test_behavior: TestBehavior, # Cosmos-specific: how to inject tests to Airflow DAG test_indirect_selection: TestIndirectSelection, # Cosmos/DBT - used to set test indirect selection mode dbt_project_name: str, # DBT / Cosmos - used to name test task if mode is after_all, + render_config: RenderConfig, task_group: TaskGroup | None = None, on_warning_callback: Callable[..., Any] | None = None, # argument specific to the DBT test command - node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None, ) -> None: """ Instantiate dbt `nodes` as Airflow tasks within the given `task_group` (optional) or `dag` (mandatory). @@ -224,13 +230,13 @@ def build_airflow_graph( :param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.LOCAL, ExecutionMode.KUBERNETES). Default is ExecutionMode.LOCAL. :param task_args: Arguments to be used to instantiate an Airflow Task - :param test_behavior: When to run `dbt` tests. Default is TestBehavior.AFTER_EACH, that runs tests after each model. :param dbt_project_name: Name of the dbt pipeline of interest :param task_group: Airflow Task Group instance :param on_warning_callback: A callback function called on warnings with additional Context variables “test_names” and “test_results” of type List. """ - node_converters = node_converters or {} + node_converters = render_config.node_converters or {} + test_behavior = render_config.test_behavior tasks_map = {} task_or_group: TaskGroup | BaseOperator @@ -266,6 +272,7 @@ def build_airflow_graph( test_indirect_selection, task_args=task_args, on_warning_callback=on_warning_callback, + render_config=render_config, ) test_task = create_airflow_task(test_meta, dag, task_group=task_group) leaves_ids = calculate_leaves(tasks_ids=list(tasks_map.keys()), nodes=nodes) diff --git a/cosmos/converter.py b/cosmos/converter.py index c2b31700b..97e8190dd 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -268,9 +268,8 @@ def __init__( task_group=task_group, execution_mode=execution_config.execution_mode, task_args=task_args, - test_behavior=render_config.test_behavior, test_indirect_selection=execution_config.test_indirect_selection, dbt_project_name=project_config.project_name, on_warning_callback=on_warning_callback, - node_converters=render_config.node_converters, + render_config=render_config, ) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index b0f0d335a..b98ad679f 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -324,6 +324,31 @@ class DbtTestMixin: base_cmd = ["test"] ui_color = "#8194E0" + def __init__( + self, + select: str | None = None, + exclude: str | None = None, + selector: str | None = None, + *args: Any, + **kwargs: Any, + ) -> None: + self.select = select + self.exclude = exclude + self.selector = selector + super().__init__(*args, **kwargs) + + def add_cmd_flags(self) -> list[str]: + flags = [] + if self.exclude: + flags.extend(["--exclude", *self.exclude]) + + if self.select: + flags.extend(["--select", *self.select]) + + if self.selector: + flags.extend(["--selector", self.selector]) + return flags + class DbtRunOperationMixin: """ diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 93cf9577e..83618be77 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -447,7 +447,7 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) self.on_warning_callback and self.on_warning_callback(warning_context) def execute(self, context: Context) -> None: - result = self.build_and_run_cmd(context=context) + result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) should_trigger_callback = all( [ self.on_warning_callback, diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 35e313fca..255f8afc3 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -16,7 +16,7 @@ create_test_task_metadata, generate_task_or_group, ) -from cosmos.config import ProfileConfig +from cosmos.config import ProfileConfig, RenderConfig from cosmos.constants import ( DbtResourceType, ExecutionMode, @@ -96,7 +96,9 @@ def test_build_airflow_graph_with_after_each(): execution_mode=ExecutionMode.LOCAL, test_indirect_selection=TestIndirectSelection.EAGER, task_args=task_args, - test_behavior=TestBehavior.AFTER_EACH, + render_config=RenderConfig( + test_behavior=TestBehavior.AFTER_EACH, + ), dbt_project_name="astro_shop", ) topological_sort = [task.task_id for task in dag.topological_sort()] @@ -183,14 +185,18 @@ def test_build_airflow_graph_with_after_all(): ), ), } + render_config = RenderConfig( + select=["tag:some"], + test_behavior=TestBehavior.AFTER_ALL, + ) build_airflow_graph( nodes=sample_nodes, dag=dag, execution_mode=ExecutionMode.LOCAL, test_indirect_selection=TestIndirectSelection.EAGER, task_args=task_args, - test_behavior=TestBehavior.AFTER_ALL, dbt_project_name="astro_shop", + render_config=render_config, ) topological_sort = [task.task_id for task in dag.topological_sort()] expected_sort = ["seed_parent_seed", "parent_run", "child_run", "child2_v2_run", "astro_shop_test"] @@ -201,6 +207,7 @@ def test_build_airflow_graph_with_after_all(): assert len(dag.leaves) == 1 assert dag.leaves[0].task_id == "astro_shop_test" + assert dag.leaves[0].select == ["tag:some"] def test_calculate_operator_class(): diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 7e758b885..5854b86ba 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -360,7 +360,11 @@ def test_store_compiled_sql() -> None: [ (DbtSeedLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), (DbtRunLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), - (DbtTestLocalOperator, {"full_refresh": True}, {"context": {}}), + ( + DbtTestLocalOperator, + {"full_refresh": True, "select": ["tag:daily"]}, + {"context": {}, "cmd_flags": ["--select", "tag:daily"]}, + ), ( DbtRunOperationLocalOperator, {"args": {"days": 7, "dry_run": True}, "macro_name": "bla"}, @@ -402,10 +406,7 @@ def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class): **operator_class_kwargs.get(operator_class, {}), ) task.execute(context={}) - if operator_class == DbtTestLocalOperator: - mock_build_and_run_cmd.assert_called_once_with(context={}) - else: - mock_build_and_run_cmd.assert_called_once_with(context={}, cmd_flags=[]) + mock_build_and_run_cmd.assert_called_once_with(context={}, cmd_flags=[]) @patch("cosmos.operators.local.DbtLocalArtifactProcessor") From af3a08c4cfbc544bc781cb9482c6666481edc9b9 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 25 Jan 2024 12:05:17 +0000 Subject: [PATCH 2/3] Fix broken test --- cosmos/operators/base.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index b98ad679f..25aef7764 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -326,16 +326,15 @@ class DbtTestMixin: def __init__( self, - select: str | None = None, exclude: str | None = None, + select: str | None = None, selector: str | None = None, - *args: Any, **kwargs: Any, ) -> None: self.select = select self.exclude = exclude self.selector = selector - super().__init__(*args, **kwargs) + super().__init__(exclude=exclude, select=select, selector=selector, **kwargs) # type: ignore def add_cmd_flags(self) -> list[str]: flags = [] From 617338d198a551d99ab88669e424d73fc65493bf Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 25 Jan 2024 13:24:54 +0000 Subject: [PATCH 3/3] Improve code coverage --- tests/operators/test_local.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 5854b86ba..babb425ef 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -362,8 +362,13 @@ def test_store_compiled_sql() -> None: (DbtRunLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), ( DbtTestLocalOperator, - {"full_refresh": True, "select": ["tag:daily"]}, - {"context": {}, "cmd_flags": ["--select", "tag:daily"]}, + {"full_refresh": True, "select": ["tag:daily"], "exclude": ["tag:disabled"]}, + {"context": {}, "cmd_flags": ["--exclude", "tag:disabled", "--select", "tag:daily"]}, + ), + ( + DbtTestLocalOperator, + {"full_refresh": True, "selector": "nightly_snowplow"}, + {"context": {}, "cmd_flags": ["--selector", "nightly_snowplow"]}, ), ( DbtRunOperationLocalOperator,