Skip to content

Commit

Permalink
Add task owner to dbt operator (#1082)
Browse files Browse the repository at this point in the history
Set dbt meta owner as task owner in all Cosmos `DbtxxxOperator`s.

Closes: #1065

---------
Co-authored-by: Tatiana Al-Chueyr <[email protected]>
  • Loading branch information
wornjs and tatiana authored Jul 23, 2024
1 parent 0cdf2f3 commit d383792
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 1 deletion.
4 changes: 4 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def create_test_task_metadata(
task_args["on_warning_callback"] = on_warning_callback
extra_context = {}

task_owner = ""
if test_indirect_selection != TestIndirectSelection.EAGER:
task_args["indirect_selection"] = test_indirect_selection.value
if node is not None:
Expand All @@ -106,6 +107,7 @@ def create_test_task_metadata(
task_args["select"] = node.resource_name

extra_context = {"dbt_node_config": node.context_dict}
task_owner = node.owner

elif render_config is not None: # TestBehavior.AFTER_ALL
task_args["select"] = render_config.select
Expand All @@ -114,6 +116,7 @@ def create_test_task_metadata(

return TaskMetadata(
id=test_task_name,
owner=task_owner,
operator_class=calculate_operator_class(
execution_mode=execution_mode,
dbt_class="DbtTest",
Expand Down Expand Up @@ -158,6 +161,7 @@ def create_task_metadata(

task_metadata = TaskMetadata(
id=task_id,
owner=node.owner,
operator_class=calculate_operator_class(
execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type]
),
Expand Down
6 changes: 6 additions & 0 deletions cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None
module = importlib.import_module(module_name)
Operator = getattr(module, class_name)

if task.owner != "":
task_owner = task.owner
else:
task_owner = dag.owner

airflow_task = Operator(
task_id=task.id,
dag=dag,
task_group=task_group,
owner=task_owner,
extra_context=task.extra_context,
**task.arguments,
)
Expand Down
1 change: 1 addition & 0 deletions cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Task(CosmosEntity):
:param arguments: The arguments to pass to the operator
"""

owner: str = ""
operator_class: str = "airflow.operators.empty.EmptyOperator"
arguments: Dict[str, Any] = field(default_factory=dict)
extra_context: Dict[str, Any] = field(default_factory=dict)
4 changes: 4 additions & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ def name(self) -> str:
"""
return self.resource_name.replace(".", "_")

@property
def owner(self) -> str:
return str(self.config.get("meta", {}).get("owner", ""))

@property
def context_dict(self) -> dict[str, Any]:
"""
Expand Down
8 changes: 7 additions & 1 deletion tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
depends_on=[parent_seed.unique_id],
file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql",
tags=["has_child"],
config={"materialized": "view"},
config={"materialized": "view", "meta": {"owner": "parent_node"}},
has_test=True,
)
test_parent_node = DbtNode(
Expand Down Expand Up @@ -123,6 +123,12 @@ def test_build_airflow_graph_with_after_each():
assert dag.leaves[0].task_id == "child_run"
assert dag.leaves[1].task_id == "child2_v2_run"

task_seed_parent_seed = dag.tasks[0]
task_parent_run = dag.tasks[1]

assert task_seed_parent_seed.owner == ""
assert task_parent_run.owner == "parent_node"


@pytest.mark.parametrize(
"node_type,task_suffix",
Expand Down

0 comments on commit d383792

Please sign in to comment.