diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index ebae7f32f..5639d3bcb 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -95,6 +95,7 @@ def create_test_task_metadata( task_args["on_warning_callback"] = on_warning_callback extra_context = {} + task_owner = "" if test_indirect_selection != TestIndirectSelection.EAGER: task_args["indirect_selection"] = test_indirect_selection.value if node is not None: @@ -106,6 +107,7 @@ def create_test_task_metadata( task_args["select"] = node.resource_name extra_context = {"dbt_node_config": node.context_dict} + task_owner = node.owner elif render_config is not None: # TestBehavior.AFTER_ALL task_args["select"] = render_config.select @@ -114,6 +116,7 @@ def create_test_task_metadata( return TaskMetadata( id=test_task_name, + owner=task_owner, operator_class=calculate_operator_class( execution_mode=execution_mode, dbt_class="DbtTest", @@ -158,6 +161,7 @@ def create_task_metadata( task_metadata = TaskMetadata( id=task_id, + owner=node.owner, operator_class=calculate_operator_class( execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type] ), diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index d4a962483..38d5113ec 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -25,10 +25,16 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None module = importlib.import_module(module_name) Operator = getattr(module, class_name) + if task.owner != "": + task_owner = task.owner + else: + task_owner = dag.owner + airflow_task = Operator( task_id=task.id, dag=dag, task_group=task_group, + owner=task_owner, extra_context=task.extra_context, **task.arguments, ) diff --git a/cosmos/core/graph/entities.py b/cosmos/core/graph/entities.py index 3c3ee58d0..6bf9ff046 100644 --- a/cosmos/core/graph/entities.py +++ b/cosmos/core/graph/entities.py @@ -57,6 +57,7 @@ class Task(CosmosEntity): :param arguments: The arguments to pass to the operator """ + owner: str = "" operator_class: str = "airflow.operators.empty.EmptyOperator" arguments: Dict[str, Any] = field(default_factory=dict) extra_context: Dict[str, Any] = field(default_factory=dict) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index a26467d6d..6171d8e4b 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -76,6 +76,10 @@ def name(self) -> str: """ return self.resource_name.replace(".", "_") + @property + def owner(self) -> str: + return str(self.config.get("meta", {}).get("owner", "")) + @property def context_dict(self) -> dict[str, Any]: """ diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index a238475c2..f9a62106d 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -42,7 +42,7 @@ depends_on=[parent_seed.unique_id], file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql", tags=["has_child"], - config={"materialized": "view"}, + config={"materialized": "view", "meta": {"owner": "parent_node"}}, has_test=True, ) test_parent_node = DbtNode( @@ -123,6 +123,12 @@ def test_build_airflow_graph_with_after_each(): assert dag.leaves[0].task_id == "child_run" assert dag.leaves[1].task_id == "child2_v2_run" + task_seed_parent_seed = dag.tasks[0] + task_parent_run = dag.tasks[1] + + assert task_seed_parent_seed.owner == "" + assert task_parent_run.owner == "parent_node" + @pytest.mark.parametrize( "node_type,task_suffix",