Skip to content

Commit

Permalink
Fix datached test tasks names so they do not exceed 250 chars
Browse files Browse the repository at this point in the history
Since we introduced detached test tasks in #1433 (released in 1.8.0) users started facing issues due to very long task names exceeding Airflow's limits.

Example of stacktrace reported by user:
```
 Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 968, in __init__
    validate_key(task_id)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/helpers.py", line 55, in validate_key
    raise AirflowException(f"The key has to be less than {max_length} characters")
airflow.exceptions.AirflowException: The key has to be less than 250 characters
```

This PR fixes this issue. In case the name exceeds Airflow's limit (250 ATM), it will name the detached test using:
- "detached_{incremental unique number}_test"

Closes: #1440
  • Loading branch information
tatiana committed Jan 15, 2025
1 parent 56a9d3f commit d04f711
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 13 deletions.
19 changes: 14 additions & 5 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from collections import defaultdict
from collections import OrderedDict, defaultdict
from typing import Any, Callable, Union

from airflow.models import BaseOperator
from airflow.models.base import ID_LEN as AIRFLOW_MAX_ID_LENGTH
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

Expand Down Expand Up @@ -430,11 +431,19 @@ def identify_detached_nodes(
detached_from_parent[parent_id].append(node)


def calculate_detached_node_name(node: DbtNode) -> str:
def calculate_detached_node_name(node: DbtNode, counter: list[int] = [0]) -> str:
"""
Given a detached test node, calculate its name.
Given a detached test node, calculate its name. It will either be:
- the name of the test with a "_test" suffix, if this is smaller than 250
- or detached_{an incremental number}_test
"""
return f"{node.resource_name.split('.')[0]}_test"
node_name = f"{node.resource_name.split('.')[0]}_test"

if not len(node_name) < AIRFLOW_MAX_ID_LENGTH:
node_name = f"detached_{counter[0]}_test"
counter[0] += 1

return node_name


def build_airflow_graph(
Expand Down Expand Up @@ -481,7 +490,7 @@ def build_airflow_graph(

# Identify test nodes that should be run detached from the associated dbt resource nodes because they
# have multiple parents
detached_nodes: dict[str, DbtNode] = {}
detached_nodes: dict[str, DbtNode] = OrderedDict()
detached_from_parent: dict[str, list[DbtNode]] = defaultdict(list)
identify_detached_nodes(nodes, test_behavior, detached_nodes, detached_from_parent)

Expand Down
9 changes: 1 addition & 8 deletions scripts/test/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,4 @@ airflow db check


rm -rf dbt/jaffle_shop/dbt_packages;
pytest -vv \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
-m 'integration' \
--ignore=tests/perf \
--ignore=tests/test_example_k8s_dags.py \
-k 'not ( example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes)'
pytest -vv tests/test_converter.py::test_converter_creates_dag_with_test_with_multiple_parents
27 changes: 27 additions & 0 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from cosmos.airflow.graph import (
_snake_case_to_camelcase,
build_airflow_graph,
calculate_detached_node_name,
calculate_leaves,
calculate_operator_class,
create_task_metadata,
Expand Down Expand Up @@ -78,6 +79,32 @@
sample_nodes = {node.unique_id: node for node in sample_nodes_list}


def test_calculate_datached_node_name_under_is_under_250():
node = DbtNode(
unique_id="model.my_dbt_project.a_very_short_name",
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path="",
)
assert calculate_detached_node_name(node) == "a_very_short_name_test"

node = DbtNode(
unique_id="model.my_dbt_project." + "this_is_a_very_long_name" * 20, # 24 x 20 = 480 characters
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path="",
)
assert calculate_detached_node_name(node) == "detached_0_test"

node = DbtNode(
unique_id="model.my_dbt_project." + "this_is_another_very_long_name" * 20,
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path="",
)
assert calculate_detached_node_name(node) == "detached_1_test"


@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.4"),
reason="Airflow DAG did not have task_group_dict until the 2.4 release",
Expand Down

0 comments on commit d04f711

Please sign in to comment.