Skip to content

Commit

Permalink
Add few removed Task properties in AirflowRunFacet (apache#40371)
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
mobuchowski authored and romsharon98 committed Jul 26, 2024
1 parent c74a301 commit d030897
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
13 changes: 12 additions & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _cast_basic_types(value):
return value.isoformat()
if isinstance(value, datetime.timedelta):
return f"{value.total_seconds()} seconds"
if isinstance(value, (set, list, tuple)):
if isinstance(value, (set, tuple)):
return str(list(value))
return value

Expand Down Expand Up @@ -214,6 +214,12 @@ class TaskInstanceInfo(InfoJsonEncodable):
}


class DatasetInfo(InfoJsonEncodable):
"""Defines encoding Airflow Dataset object to JSON."""

includes = ["uri", "extra"]


class TaskInfo(InfoJsonEncodable):
"""Defines encoding BaseOperator/AbstractOperator object to JSON."""

Expand Down Expand Up @@ -242,6 +248,9 @@ class TaskInfo(InfoJsonEncodable):
"run_as_user",
"sla",
"task_id",
"trigger_dag_id",
"external_dag_id",
"external_task_id",
"trigger_rule",
"upstream_task_ids",
"wait_for_downstream",
Expand All @@ -255,6 +264,8 @@ class TaskInfo(InfoJsonEncodable):
if hasattr(task, "task_group") and getattr(task.task_group, "_group_id", None)
else None
),
"inlets": lambda task: [DatasetInfo(inlet) for inlet in task.inlets],
"outlets": lambda task: [DatasetInfo(outlet) for outlet in task.outlets],
}


Expand Down
26 changes: 26 additions & 0 deletions tests/providers/openlineage/plugins/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,32 @@ class Test:
}


def test_info_json_encodable_list_does_not_flatten():
class TestInfo(InfoJsonEncodable):
includes = ["alist"]

@define(slots=False)
class Test:
alist: list[str]

obj = Test(["a", "b", "c"])

assert json.loads(json.dumps(TestInfo(obj))) == {"alist": ["a", "b", "c"]}


def test_info_json_encodable_list_does_include_nonexisting():
class TestInfo(InfoJsonEncodable):
includes = ["exists", "doesnotexist"]

@define(slots=False)
class Test:
exists: str

obj = Test("something")

assert json.loads(json.dumps(TestInfo(obj))) == {"exists": "something"}


def test_is_name_redactable():
class NotMixin:
def __init__(self):
Expand Down

0 comments on commit d030897

Please sign in to comment.