Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task owner to dbt operator #1082

Merged
merged 14 commits into from
Jul 23, 2024
2 changes: 2 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def create_test_task_metadata(

return TaskMetadata(
id=test_task_name,
owner=node.owner,
operator_class=calculate_operator_class(
execution_mode=execution_mode,
dbt_class="DbtTest",
Expand Down Expand Up @@ -158,6 +159,7 @@ def create_task_metadata(

task_metadata = TaskMetadata(
id=task_id,
owner=node.owner,
operator_class=calculate_operator_class(
execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type]
),
Expand Down
1 change: 1 addition & 0 deletions cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None
task_id=task.id,
dag=dag,
task_group=task_group,
owner=task.owner,
extra_context=task.extra_context,
**task.arguments,
)
Expand Down
1 change: 1 addition & 0 deletions cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Task(CosmosEntity):
:param arguments: The arguments to pass to the operator
"""

owner: str = ""
operator_class: str = "airflow.operators.empty.EmptyOperator"
arguments: Dict[str, Any] = field(default_factory=dict)
extra_context: Dict[str, Any] = field(default_factory=dict)
4 changes: 4 additions & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ def name(self) -> str:
"""
return self.resource_name.replace(".", "_")

@property
def owner(self) -> str:
return str(self.config.get("meta", {}).get("owner", "airflow"))

@property
def context_dict(self) -> dict[str, Any]:
"""
Expand Down
8 changes: 7 additions & 1 deletion tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
depends_on=[parent_seed.unique_id],
file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql",
tags=["has_child"],
config={"materialized": "view"},
config={"materialized": "view", "meta": {"owner": "parent_node"}},
has_test=True,
)
test_parent_node = DbtNode(
Expand Down Expand Up @@ -123,6 +123,12 @@ def test_build_airflow_graph_with_after_each():
assert dag.leaves[0].task_id == "child_run"
assert dag.leaves[1].task_id == "child2_v2_run"

task_seed_parent_seed = dag.tasks[0]
task_parent_run = dag.tasks[1]

assert task_seed_parent_seed.owner == "airflow"
assert task_parent_run.owner == "parent_node"


@pytest.mark.parametrize(
"node_type,task_suffix",
Expand Down
Loading