Skip to content

Commit

Permalink
Replacing gunicornmontor with uvicorn.run() (#45103)
Browse files Browse the repository at this point in the history
* replace gunicorm with uvicorn.run()

* fixing tests

* Daemonized fastapi server

* fixing setproctitle format

* updating setproctitle
  • Loading branch information
vatsrahul1001 authored Jan 24, 2025
1 parent ccbd536 commit 933b003
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 231 deletions.
149 changes: 22 additions & 127 deletions airflow/cli/commands/local_commands/fastapi_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,16 @@

import logging
import os
import signal
import subprocess
import sys
import textwrap
from contextlib import suppress
from pathlib import Path
from time import sleep
from typing import NoReturn

import psutil
from lockfile.pidlockfile import read_pid_from_pidfile
from uvicorn.workers import UvicornWorker
import uvicorn
from gunicorn.util import daemonize
from setproctitle import setproctitle

from airflow import settings
from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.commands.local_commands.webserver_command import GunicornMonitor
from airflow.exceptions import AirflowConfigException
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

log = logging.getLogger(__name__)
Expand All @@ -47,8 +38,6 @@
# This shouldn't be necessary but there seems to be an issue in uvloop that causes bad file descriptor
# errors when shutting down workers. Despite the 'closed' status of the issue it is not solved,
# more info here: https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399
AirflowUvicornWorker = UvicornWorker
AirflowUvicornWorker.CONFIG_KWARGS = {"loop": "asyncio", "http": "auto"}


@cli_utils.action_cli
Expand All @@ -59,18 +48,13 @@ def fastapi_api(args):

apps = args.apps
access_logfile = args.access_logfile or "-"
error_logfile = args.error_logfile or "-"
access_logformat = args.access_logformat
num_workers = args.workers
worker_timeout = args.worker_timeout

worker_class = "airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker"

from airflow.api_fastapi.app import create_app

if args.debug:
print(f"Starting the FastAPI API server on port {args.port} and host {args.hostname} debug.")
log.warning("Running in dev mode, ignoring gunicorn args")
log.warning("Running in dev mode, ignoring uvicorn args")

run_args = [
"fastapi",
Expand All @@ -93,124 +77,35 @@ def fastapi_api(args):
process.wait()
os.environ.pop("AIRFLOW_API_APPS")
else:
if args.daemon:
daemonize()
log.info("Daemonized the FastAPI API server process PID: %s", os.getpid())

log.info(
textwrap.dedent(
f"""\
Running the Gunicorn Server with:
Running the uvicorn with:
Apps: {apps}
Workers: {num_workers} {worker_class}
Workers: {num_workers}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
Logfiles: {access_logfile} {error_logfile}
Logfiles: {access_logfile}
Access Logformat: {access_logformat}
================================================================="""
)
)

pid_file, _, _, _ = setup_locations("fastapi-api", pid=args.pid)

run_args = [
sys.executable,
"-m",
"gunicorn",
"--workers",
str(num_workers),
"--worker-class",
str(worker_class),
"--timeout",
str(worker_timeout),
"--bind",
args.hostname + ":" + str(args.port),
"--name",
"airflow-fastapi-api",
"--pid",
pid_file,
"--access-logfile",
str(access_logfile),
"--error-logfile",
str(error_logfile),
"--config",
"python:airflow.api_fastapi.gunicorn_config",
]

ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)
if ssl_cert and ssl_key:
run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key]

if args.access_logformat and args.access_logformat.strip():
run_args += ["--access-logformat", str(args.access_logformat)]

if args.daemon:
run_args += ["--daemon"]

run_args += [f"airflow.api_fastapi.app:cached_app(apps='{apps}')"]

# To prevent different workers creating the web app and
# all writing to the database at the same time, we use the --preload option.
# With the preload option, the app is loaded before the workers are forked, and each worker will
# then have a copy of the app
run_args += ["--preload"]

def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
log.info("Received signal: %s. Closing gunicorn.", signum)
gunicorn_master_proc.terminate()
with suppress(TimeoutError):
gunicorn_master_proc.wait(timeout=30)
if isinstance(gunicorn_master_proc, subprocess.Popen):
still_running = gunicorn_master_proc.poll() is not None
else:
still_running = gunicorn_master_proc.is_running()
if still_running:
gunicorn_master_proc.kill()
sys.exit(0)

def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
# Register signal handlers
signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc))
signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc))

# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
GunicornMonitor(
gunicorn_master_pid=gunicorn_master_proc.pid,
num_workers_expected=num_workers,
master_timeout=120,
worker_refresh_interval=30,
worker_refresh_batch_size=1,
reload_on_plugin_change=False,
).start()

def start_and_monitor_gunicorn(args):
if args.daemon:
subprocess.Popen(run_args, close_fds=True)

# Reading pid of gunicorn master as it will be different that
# the one of process spawned above.
gunicorn_master_proc_pid = None
while not gunicorn_master_proc_pid:
sleep(0.1)
gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file)

# Run Gunicorn monitor
gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
monitor_gunicorn(gunicorn_master_proc)
else:
with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc:
monitor_gunicorn(gunicorn_master_proc)

if args.daemon:
# This makes possible errors get reported before daemonization
os.environ["SKIP_DAGS_PARSING"] = "True"
create_app(apps)
os.environ.pop("SKIP_DAGS_PARSING")

pid_file_path = Path(pid_file)
monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}"))
run_command_with_daemon_option(
args=args,
process_name="fastapi-api",
callback=lambda: start_and_monitor_gunicorn(args),
should_setup_logging=True,
pid_file=monitor_pid_file,
setproctitle(f"airflow fastapi_api -- host:{args.hostname} port:{args.port}")
uvicorn.run(
"airflow.api_fastapi.main:app",
host=args.hostname,
port=args.port,
workers=num_workers,
timeout_keep_alive=worker_timeout,
timeout_graceful_shutdown=worker_timeout,
ssl_keyfile=ssl_key,
ssl_certfile=ssl_cert,
access_log=access_logfile,
)


Expand Down
115 changes: 11 additions & 104 deletions tests/cli/commands/local_commands/test_fastapi_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
# under the License.
from __future__ import annotations

import os
import subprocess
import sys
import time
from unittest import mock

import pytest
Expand All @@ -37,73 +33,9 @@
class TestCliFastAPI(_CommonCLIGunicornTestClass):
main_process_regexp = r"airflow fastapi-api"

@pytest.mark.execution_timeout(210)
def test_cli_fastapi_api_background(self, tmp_path):
parent_path = tmp_path / "gunicorn"
parent_path.mkdir()
pidfile_fastapi_api = parent_path / "pidflow-fastapi-api.pid"
pidfile_monitor = parent_path / "pidflow-fastapi-api-monitor.pid"
stdout = parent_path / "airflow-fastapi-api.out"
stderr = parent_path / "airflow-fastapi-api.err"
logfile = parent_path / "airflow-fastapi-api.log"
try:
# Run fastapi-api as daemon in background. Note that the wait method is not called.
console.print("[magenta]Starting airflow fastapi-api --daemon")
env = os.environ.copy()
proc = subprocess.Popen(
[
"airflow",
"fastapi-api",
"--daemon",
"--pid",
os.fspath(pidfile_fastapi_api),
"--stdout",
os.fspath(stdout),
"--stderr",
os.fspath(stderr),
"--log-file",
os.fspath(logfile),
],
env=env,
)
assert proc.poll() is None

pid_monitor = self._wait_pidfile(pidfile_monitor)
console.print(f"[blue]Monitor started at {pid_monitor}")
pid_fastapi_api = self._wait_pidfile(pidfile_fastapi_api)
console.print(f"[blue]FastAPI API started at {pid_fastapi_api}")
console.print("[blue]Running airflow fastapi-api process:")
# Assert that the fastapi-api and gunicorn processes are running (by name rather than pid).
assert self._find_process(r"airflow fastapi-api --daemon", print_found_process=True)
console.print("[blue]Waiting for gunicorn processes:")
# wait for gunicorn to start
for _ in range(30):
if self._find_process(r"^gunicorn"):
break
console.print("[blue]Waiting for gunicorn to start ...")
time.sleep(1)
console.print("[blue]Running gunicorn processes:")
assert self._find_all_processes("^gunicorn", print_found_process=True)
console.print("[magenta]fastapi-api process started successfully.")
console.print(
"[magenta]Terminating monitor process and expect "
"fastapi-api and gunicorn processes to terminate as well"
)
self._terminate_multiple_process([pid_fastapi_api, pid_monitor])
self._check_processes(ignore_running=False)
console.print("[magenta]All fastapi-api and gunicorn processes are terminated.")
except Exception:
console.print("[red]Exception occurred. Dumping all logs.")
# Dump all logs
for file in parent_path.glob("*"):
console.print(f"Dumping {file} (size: {file.stat().st_size})")
console.print(file.read_text())
raise

def test_cli_fastapi_api_debug(self, app):
with (
mock.patch("subprocess.Popen") as Popen,
mock.patch.object(fastapi_api_command, "GunicornMonitor"),
):
port = "9092"
hostname = "somehost"
Expand All @@ -130,7 +62,6 @@ def test_cli_fastapi_api_env_var_set_unset(self, app):
"""
with (
mock.patch("subprocess.Popen") as Popen,
mock.patch.object(fastapi_api_command, "GunicornMonitor"),
mock.patch("os.environ", autospec=True) as mock_environ,
):
apps_value = "core,execution"
Expand Down Expand Up @@ -172,8 +103,7 @@ def test_cli_fastapi_api_args(self, ssl_cert_and_key):
cert_path, key_path = ssl_cert_and_key

with (
mock.patch("subprocess.Popen") as Popen,
mock.patch.object(fastapi_api_command, "GunicornMonitor"),
mock.patch("uvicorn.run") as mock_run,
):
args = self.parser.parse_args(
[
Expand All @@ -192,39 +122,16 @@ def test_cli_fastapi_api_args(self, ssl_cert_and_key):
)
fastapi_api_command.fastapi_api(args)

Popen.assert_called_with(
[
sys.executable,
"-m",
"gunicorn",
"--workers",
"4",
"--worker-class",
"airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker",
"--timeout",
"120",
"--bind",
"0.0.0.0:9091",
"--name",
"airflow-fastapi-api",
"--pid",
"/tmp/x.pid",
"--access-logfile",
"-",
"--error-logfile",
"-",
"--config",
"python:airflow.api_fastapi.gunicorn_config",
"--certfile",
str(cert_path),
"--keyfile",
str(key_path),
"--access-logformat",
"custom_log_format",
"airflow.api_fastapi.app:cached_app(apps='core')",
"--preload",
],
close_fds=True,
mock_run.assert_called_with(
"airflow.api_fastapi.main:app",
host="0.0.0.0",
port=9091,
workers=4,
timeout_keep_alive=120,
timeout_graceful_shutdown=120,
ssl_keyfile=str(key_path),
ssl_certfile=str(cert_path),
access_log="-",
)

@pytest.mark.parametrize(
Expand Down

0 comments on commit 933b003

Please sign in to comment.