Skip to content

Commit

Permalink
Rework the TriggererJobRunner to run triggers in a process without DB…
Browse files Browse the repository at this point in the history
… access

This uses a similar approach to the DAG Parser -- the subprocess runs the
async Triggers (i.e. user code) in a process and sends messages back and forth
to the supervisor/parent to perform CRUD operations on the DB.

I have also massively re-worked how per-trigger logging works to greatly simplify it. I hope @dstandish will approve.
The main way it has been simplified is with the switch to TaskSDK then all
(100%! Really) of logs are set as JSON over a socket to the parent process;
everything in the subprocess logs to this output, there is no differentiation
needed in stdlib, no custom handlers etc. and by making use of structlog's
automatic context vars we can include a trigger_id field -- if we find that we
route the output to the right trigger specific log file.

This is all now so much simpler with structlog in the mix.

Logging from the async process works as follows:

- stdlib logging is configured to send messages via struct log as json
- As part of the stdlib->structlog processing change we include structlog
  bound contextvars
- When a triggerer coro starts it binds trigger_id as a paramter
- When the Supervisor receives a log message (which happens as LD JSON over a
  dedicated socket channel) it parses the JSON, and if it finds trigger_id key
  in there it redirects it to the trigger file log, else prints it.
  • Loading branch information
ashb committed Feb 18, 2025
1 parent 8fbdf75 commit d1ee9ca
Show file tree
Hide file tree
Showing 19 changed files with 1,085 additions and 1,541 deletions.
1 change: 0 additions & 1 deletion .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,6 @@ labelPRBasedOnFilePath:
- tests/cli/commands/local_commands/test_triggerer_command.py
- tests/jobs/test_triggerer_job.py
- tests/models/test_trigger.py
- tests/jobs/test_triggerer_job_logging.py
- providers/standard/tests/unit/standard/triggers/**/*

area:Serialization:
Expand Down
4 changes: 2 additions & 2 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import sys
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Callable, Literal, Union
from typing import TYPE_CHECKING, Annotated, Callable, ClassVar, Literal, Union

import attrs
from pydantic import BaseModel, Field, TypeAdapter
Expand Down Expand Up @@ -207,7 +207,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
"""

parsing_result: DagFileParsingResult | None = None
decoder: TypeAdapter[ToParent] = TypeAdapter[ToParent](ToParent)
decoder: ClassVar[TypeAdapter[ToParent]] = TypeAdapter[ToParent](ToParent)

@classmethod
def start( # type: ignore[override]
Expand Down
6 changes: 4 additions & 2 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@
from setproctitle import setproctitle

from airflow import settings
from airflow.executors import workloads
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.executors import workloads

TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState, Optional[Exception]]


Expand Down Expand Up @@ -82,6 +81,9 @@ def _run_worker(
# Received poison pill, no more tasks to run
return

if not isinstance(workload, workloads.ExecuteTask):
raise ValueError(f"LocalExecutor does not now how to handle {type(workload)}")

# Decrement this as soon as we pick up a message off the queue
with unread_messages:
unread_messages.value -= 1
Expand Down
34 changes: 32 additions & 2 deletions airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

import os
import uuid
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Literal, Union
from typing import TYPE_CHECKING, Annotated, Literal, Union

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -106,4 +107,33 @@ def make(cls, ti: TIModel, dag_rel_path: Path | None = None) -> ExecuteTask:
return cls(ti=ser_ti, dag_rel_path=path, token="", log_path=fname, bundle_info=bundle_info)


All = Union[ExecuteTask]
class RunTrigger(BaseModel):
"""Execute an async "trigger" process that yields events."""

id: int

ti: TaskInstance | None
"""
The task instance associated with this trigger.
Could be none for asset-based triggers.
"""

classpath: str
"""
Dot-separated name of the module+fn to import and run this workload.
Consumers of this Workload must perform their own validation of this input.
"""

encrypted_kwargs: str

timeout_after: datetime | None = None

kind: Literal["RunTrigger"] = Field(init=False, default="RunTrigger")


All = Annotated[
Union[ExecuteTask, RunTrigger],
Field(discriminator="kind"),
]
Loading

0 comments on commit d1ee9ca

Please sign in to comment.