Skip to content

Commit

Permalink
Merge branch 'main' into fix/escape-columns-as-reserver-words-mssql
Browse files Browse the repository at this point in the history
  • Loading branch information
dabla authored Jan 24, 2025
2 parents ce0cf51 + bb77ebf commit 534ec26
Show file tree
Hide file tree
Showing 156 changed files with 3,676 additions and 627 deletions.
1 change: 1 addition & 0 deletions .github/workflows/news-fragment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ jobs:
'Behaviour changes'
'Plugin changes'
'Dependency changes'
'Code interface changes'
)
news_fragment_content=`git diff origin/${BASE_REF} newsfragments/*.significant.rst`
Expand Down
3 changes: 3 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class AssetEventResponse(BaseModel):

id: int
asset_id: int
uri: str | None = Field(alias="uri", default=None)
name: str | None = Field(alias="name", default=None)
group: str | None = Field(alias="group", default=None)
extra: dict | None = None
source_task_id: str | None = None
source_dag_id: str | None = None
Expand Down
15 changes: 15 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6447,6 +6447,21 @@ components:
asset_id:
type: integer
title: Asset Id
uri:
anyOf:
- type: string
- type: 'null'
title: Uri
name:
anyOf:
- type: string
- type: 'null'
title: Name
group:
anyOf:
- type: string
- type: 'null'
title: Group
extra:
anyOf:
- type: object
Expand Down
15 changes: 15 additions & 0 deletions airflow/api_fastapi/execution_api/datamodels/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,18 @@ class AssetAliasResponse(BaseModel):

name: str
group: str


class AssetProfile(BaseModel):
"""
Profile of an Asset.
Asset will have name, uri and asset_type defined.
AssetNameRef will have name and asset_type defined.
AssetUriRef will have uri and asset_type defined.
"""

name: str | None = None
uri: str | None = None
asset_type: str
48 changes: 44 additions & 4 deletions airflow/api_fastapi/execution_api/datamodels/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@
from datetime import timedelta
from typing import Annotated, Any, Literal, Union

from pydantic import AwareDatetime, Discriminator, Field, Tag, TypeAdapter, WithJsonSchema, field_validator
from pydantic import (
AwareDatetime,
Discriminator,
Field,
Tag,
TypeAdapter,
WithJsonSchema,
field_validator,
)

from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.execution_api.datamodels.asset import AssetProfile
from airflow.api_fastapi.execution_api.datamodels.connection import ConnectionResponse
from airflow.api_fastapi.execution_api.datamodels.variable import VariableResponse
from airflow.utils.state import IntermediateTIState, TaskInstanceState as TIState, TerminalTIState
Expand Down Expand Up @@ -52,14 +61,41 @@ class TIEnterRunningPayload(BaseModel):


class TITerminalStatePayload(BaseModel):
"""Schema for updating TaskInstance to a terminal state (e.g., SUCCESS or FAILED)."""
"""Schema for updating TaskInstance to a terminal state except SUCCESS state."""

state: TerminalTIState
state: Literal[
TerminalTIState.FAILED,
TerminalTIState.SKIPPED,
TerminalTIState.REMOVED,
TerminalTIState.FAIL_WITHOUT_RETRY,
]

end_date: UtcDateTime
"""When the task completed executing"""


class TISuccessStatePayload(BaseModel):
"""Schema for updating TaskInstance to success state."""

state: Annotated[
Literal[TerminalTIState.SUCCESS],
# Specify a default in the schema, but not in code, so Pydantic marks it as required.
WithJsonSchema(
{
"type": "string",
"enum": [TerminalTIState.SUCCESS],
"default": TerminalTIState.SUCCESS,
}
),
]

end_date: UtcDateTime
"""When the task completed executing"""

task_outlets: Annotated[list[AssetProfile], Field(default_factory=list)]
outlet_events: Annotated[list[Any], Field(default_factory=list)]


class TITargetStatePayload(BaseModel):
"""Schema for updating TaskInstance to a target state, excluding terminal and running states."""

Expand Down Expand Up @@ -123,7 +159,10 @@ def ti_state_discriminator(v: dict[str, str] | BaseModel) -> str:
state = v.get("state")
else:
state = getattr(v, "state", None)
if state in set(TerminalTIState):

if state == TIState.SUCCESS:
return "success"
elif state in set(TerminalTIState):
return "_terminal_"
elif state == TIState.DEFERRED:
return "deferred"
Expand All @@ -137,6 +176,7 @@ def ti_state_discriminator(v: dict[str, str] | BaseModel) -> str:
TIStateUpdate = Annotated[
Union[
Annotated[TITerminalStatePayload, Tag("_terminal_")],
Annotated[TISuccessStatePayload, Tag("success")],
Annotated[TITargetStatePayload, Tag("_other_")],
Annotated[TIDeferredStatePayload, Tag("deferred")],
Annotated[TIRescheduleStatePayload, Tag("up_for_reschedule")],
Expand Down
14 changes: 13 additions & 1 deletion airflow/api_fastapi/execution_api/routes/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
TIRescheduleStatePayload,
TIRunContext,
TIStateUpdate,
TISuccessStatePayload,
TITerminalStatePayload,
)
from airflow.models.dagrun import DagRun as DR
Expand Down Expand Up @@ -226,7 +227,7 @@ def ti_update_state(
)

# We exclude_unset to avoid updating fields that are not set in the payload
data = ti_patch_payload.model_dump(exclude_unset=True)
data = ti_patch_payload.model_dump(exclude={"task_outlets", "outlet_events"}, exclude_unset=True)

query = update(TI).where(TI.id == ti_id_str).values(data)

Expand All @@ -243,6 +244,17 @@ def ti_update_state(
else:
updated_state = State.FAILED
query = query.values(state=updated_state)
elif isinstance(ti_patch_payload, TISuccessStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)
updated_state = ti_patch_payload.state
task_instance = session.get(TI, ti_id_str)
TI.register_asset_changes_in_db(
task_instance,
ti_patch_payload.task_outlets, # type: ignore
ti_patch_payload.outlet_events,
session,
)
query = query.values(state=updated_state)
elif isinstance(ti_patch_payload, TIDeferredStatePayload):
# Calculate timeout if it was passed
timeout = None
Expand Down
8 changes: 5 additions & 3 deletions airflow/auth/managers/simple/ui/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion airflow/auth/managers/simple/ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"@vitejs/plugin-react-swc": "^3.7.0",
"eslint": "^9.10.0",
"happy-dom": "^15.10.2",
"vite": "^5.4.6",
"vite": "^5.4.14",
"vite-plugin-css-injected-by-js": "^3.5.2",
"vitest": "^2.1.1"
}
Expand Down
Loading

0 comments on commit 534ec26

Please sign in to comment.