Skip to content

Commit

Permalink
Merge branch 'main' into fix/escape-columns-as-reserver-words-mssql
Browse files Browse the repository at this point in the history
  • Loading branch information
dabla authored Jan 24, 2025
2 parents 534ec26 + 96ca2a6 commit ffcf0d9
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 235 deletions.
149 changes: 22 additions & 127 deletions airflow/cli/commands/local_commands/fastapi_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,16 @@

import logging
import os
import signal
import subprocess
import sys
import textwrap
from contextlib import suppress
from pathlib import Path
from time import sleep
from typing import NoReturn

import psutil
from lockfile.pidlockfile import read_pid_from_pidfile
from uvicorn.workers import UvicornWorker
import uvicorn
from gunicorn.util import daemonize
from setproctitle import setproctitle

from airflow import settings
from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.commands.local_commands.webserver_command import GunicornMonitor
from airflow.exceptions import AirflowConfigException
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

log = logging.getLogger(__name__)
Expand All @@ -47,8 +38,6 @@
# This shouldn't be necessary but there seems to be an issue in uvloop that causes bad file descriptor
# errors when shutting down workers. Despite the 'closed' status of the issue it is not solved,
# more info here: https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399
AirflowUvicornWorker = UvicornWorker
AirflowUvicornWorker.CONFIG_KWARGS = {"loop": "asyncio", "http": "auto"}


@cli_utils.action_cli
Expand All @@ -59,18 +48,13 @@ def fastapi_api(args):

apps = args.apps
access_logfile = args.access_logfile or "-"
error_logfile = args.error_logfile or "-"
access_logformat = args.access_logformat
num_workers = args.workers
worker_timeout = args.worker_timeout

worker_class = "airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker"

from airflow.api_fastapi.app import create_app

if args.debug:
print(f"Starting the FastAPI API server on port {args.port} and host {args.hostname} debug.")
log.warning("Running in dev mode, ignoring gunicorn args")
log.warning("Running in dev mode, ignoring uvicorn args")

run_args = [
"fastapi",
Expand All @@ -93,124 +77,35 @@ def fastapi_api(args):
process.wait()
os.environ.pop("AIRFLOW_API_APPS")
else:
if args.daemon:
daemonize()
log.info("Daemonized the FastAPI API server process PID: %s", os.getpid())

log.info(
textwrap.dedent(
f"""\
Running the Gunicorn Server with:
Running the uvicorn with:
Apps: {apps}
Workers: {num_workers} {worker_class}
Workers: {num_workers}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
Logfiles: {access_logfile} {error_logfile}
Logfiles: {access_logfile}
Access Logformat: {access_logformat}
================================================================="""
)
)

pid_file, _, _, _ = setup_locations("fastapi-api", pid=args.pid)

run_args = [
sys.executable,
"-m",
"gunicorn",
"--workers",
str(num_workers),
"--worker-class",
str(worker_class),
"--timeout",
str(worker_timeout),
"--bind",
args.hostname + ":" + str(args.port),
"--name",
"airflow-fastapi-api",
"--pid",
pid_file,
"--access-logfile",
str(access_logfile),
"--error-logfile",
str(error_logfile),
"--config",
"python:airflow.api_fastapi.gunicorn_config",
]

ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)
if ssl_cert and ssl_key:
run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key]

if args.access_logformat and args.access_logformat.strip():
run_args += ["--access-logformat", str(args.access_logformat)]

if args.daemon:
run_args += ["--daemon"]

run_args += [f"airflow.api_fastapi.app:cached_app(apps='{apps}')"]

# To prevent different workers creating the web app and
# all writing to the database at the same time, we use the --preload option.
# With the preload option, the app is loaded before the workers are forked, and each worker will
# then have a copy of the app
run_args += ["--preload"]

def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
log.info("Received signal: %s. Closing gunicorn.", signum)
gunicorn_master_proc.terminate()
with suppress(TimeoutError):
gunicorn_master_proc.wait(timeout=30)
if isinstance(gunicorn_master_proc, subprocess.Popen):
still_running = gunicorn_master_proc.poll() is not None
else:
still_running = gunicorn_master_proc.is_running()
if still_running:
gunicorn_master_proc.kill()
sys.exit(0)

def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
# Register signal handlers
signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc))
signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc))

# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
GunicornMonitor(
gunicorn_master_pid=gunicorn_master_proc.pid,
num_workers_expected=num_workers,
master_timeout=120,
worker_refresh_interval=30,
worker_refresh_batch_size=1,
reload_on_plugin_change=False,
).start()

def start_and_monitor_gunicorn(args):
if args.daemon:
subprocess.Popen(run_args, close_fds=True)

# Reading pid of gunicorn master as it will be different that
# the one of process spawned above.
gunicorn_master_proc_pid = None
while not gunicorn_master_proc_pid:
sleep(0.1)
gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file)

# Run Gunicorn monitor
gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
monitor_gunicorn(gunicorn_master_proc)
else:
with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc:
monitor_gunicorn(gunicorn_master_proc)

if args.daemon:
# This makes possible errors get reported before daemonization
os.environ["SKIP_DAGS_PARSING"] = "True"
create_app(apps)
os.environ.pop("SKIP_DAGS_PARSING")

pid_file_path = Path(pid_file)
monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}"))
run_command_with_daemon_option(
args=args,
process_name="fastapi-api",
callback=lambda: start_and_monitor_gunicorn(args),
should_setup_logging=True,
pid_file=monitor_pid_file,
setproctitle(f"airflow fastapi_api -- host:{args.hostname} port:{args.port}")
uvicorn.run(
"airflow.api_fastapi.main:app",
host=args.hostname,
port=args.port,
workers=num_workers,
timeout_keep_alive=worker_timeout,
timeout_graceful_shutdown=worker_timeout,
ssl_keyfile=ssl_key,
ssl_certfile=ssl_cert,
access_log=access_logfile,
)


Expand Down
55 changes: 55 additions & 0 deletions docs/apache-airflow-providers-common-sql/dialects.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
SQL Dialects
=============

The :class:`~airflow.providers.common.sql.dialects.dialect.Dialect` offers an abstraction layer between the
:class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` implementation and the database. For some database multiple
connection types are available, like native, ODBC and or JDBC. As the :class:`~airflow.providers.odbc.hooks.odbc.OdbcHook`
and the :class:`~airflow.providers.jdbc.hooks.jdbc.JdbcHook` are generic hooks which allows you to interact with any
database that has a driver for it, it needed an abstraction layer which allows us to run specialized queries
depending of the database to which we connect and that's why dialects where introduced.

The default :class:`~airflow.providers.common.sql.dialects.dialect.Dialect` class has following operations
available which underneath use SQLAlchemy to execute, but can be overloaded with specialized implementations
per database:

- ``placeholder`` specifies the database specific placeholder used in prepared statements (default: ``%s``);
- ``inspector`` returns the SQLAlchemy inspector which allows us to retrieve database metadata;
- ``extract_schema_from_table`` allows us to extract the schema name from a string.
- ``get_column_names`` returns the column names for the given table and schema (optional) using the SQLAlchemy inspector.
- ``get_primary_keys`` returns the primary keys for the given table and schema (optional) using the SQLAlchemy inspector.
- ``get_target_fields`` returns the columns names that aren't identity or auto incremented columns, this will be used by the insert_rows method of the :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` if the target_fields parameter wasn't specified and the Airflow property ``core.dbapihook_resolve_target_fields`` is set to True (default: False).
- ``reserved_words`` returns the reserved words in SQL for the target database using the SQLAlchemy inspector.
- ``generate_insert_sql`` generates the insert SQL statement for the target database.
- ``generate_replace_sql`` generates the upsert SQL statement for the target database.

At the moment there are only 3 dialects available:

- ``default`` :class:`~airflow.providers.common.sql.dialects.dialect.Dialect` reuses the generic functionality that was already available in the :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook`;
- ``mssql`` :class:`~airflow.providers.microsoft.mssql.dialects.mssql.MsSqlDialect` specialized for Microsoft SQL Server;
- ``postgresql`` :class:`~airflow.providers.postgres.dialects.postgres.PostgresDialect` specialized for PostgreSQL;

The dialect to be used will be derived from the connection string, which sometimes won't be possible. There is always
the possibility to specify the dialect name through the extra options of the connection:

.. code-block::
dialect_name: 'mssql'
If a specific dialect isn't available for a database, the default one will be used, same when a non-existing dialect name is specified.
1 change: 1 addition & 0 deletions docs/apache-airflow-providers-common-sql/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

Python API <_api/airflow/providers/common/sql/index>
Supported Database Types </supported-database-types>
Dialects <dialects>

.. toctree::
:hidden:
Expand Down
55 changes: 55 additions & 0 deletions docs/apache-airflow-providers-microsoft-mssql/dialects.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
SQL Dialects
=============

The :class:`~airflow.providers.common.sql.dialects.dialect.Dialect` offers an abstraction layer between the
:class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` implementation and the database. For some database multiple
connection types are available, like native, ODBC and or JDBC. As the :class:`~airflow.providers.odbc.hooks.odbc.OdbcHook`
and the :class:`~airflow.providers.jdbc.hooks.jdbc.JdbcHook` are generic hooks which allows you to interact with any
database that has a driver for it, it needed an abstraction layer which allows us to run specialized queries
depending of the database to which we connect and that's why dialects where introduced.

The default :class:`~airflow.providers.common.sql.dialects.dialect.Dialect` class has following operations
available which underneath use SQLAlchemy to execute, but can be overloaded with specialized implementations
per database:

- ``placeholder`` specifies the database specific placeholder used in prepared statements (default: ``%s``);
- ``inspector`` returns the SQLAlchemy inspector which allows us to retrieve database metadata;
- ``extract_schema_from_table`` allows us to extract the schema name from a string.
- ``get_column_names`` returns the column names for the given table and schema (optional) using the SQLAlchemy inspector.
- ``get_primary_keys`` returns the primary keys for the given table and schema (optional) using the SQLAlchemy inspector.
- ``get_target_fields`` returns the columns names that aren't identity or auto incremented columns, this will be used by the insert_rows method of the :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` if the target_fields parameter wasn't specified and the Airflow property ``core.dbapihook_resolve_target_fields`` is set to True (default: False).
- ``reserved_words`` returns the reserved words in SQL for the target database using the SQLAlchemy inspector.
- ``generate_insert_sql`` generates the insert SQL statement for the target database.
- ``generate_replace_sql`` generates the upsert SQL statement for the target database.

At the moment there are only 3 dialects available:

- ``default`` :class:`~airflow.providers.common.sql.dialects.dialect.Dialect` reuses the generic functionality that was already available in the :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook`;
- ``mssql`` :class:`~airflow.providers.microsoft.mssql.dialects.mssql.MsSqlDialect` specialized for Microsoft SQL Server;
- ``postgresql`` :class:`~airflow.providers.postgres.dialects.postgres.PostgresDialect` specialized for PostgreSQL;

The dialect to be used will be derived from the connection string, which sometimes won't be possible. There is always
the possibility to specify the dialect name through the extra options of the connection:

.. code-block::
dialect_name: 'mssql'
If a specific dialect isn't available for a database, the default one will be used, same when a non-existing dialect name is specified.
1 change: 1 addition & 0 deletions docs/apache-airflow-providers-microsoft-mssql/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
:caption: References

Python API <_api/airflow/providers/microsoft/mssql/index>
Dialects <dialects>

.. toctree::
:hidden:
Expand Down
55 changes: 55 additions & 0 deletions docs/apache-airflow-providers-postgres/dialects.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
SQL Dialects
=============

The :class:`~airflow.providers.common.sql.dialects.dialect.Dialect` offers an abstraction layer between the
:class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` implementation and the database. For some database multiple
connection types are available, like native, ODBC and or JDBC. As the :class:`~airflow.providers.odbc.hooks.odbc.OdbcHook`
and the :class:`~airflow.providers.jdbc.hooks.jdbc.JdbcHook` are generic hooks which allows you to interact with any
database that has a driver for it, it needed an abstraction layer which allows us to run specialized queries
depending of the database to which we connect and that's why dialects where introduced.

The default :class:`~airflow.providers.common.sql.dialects.dialect.Dialect` class has following operations
available which underneath use SQLAlchemy to execute, but can be overloaded with specialized implementations
per database:

- ``placeholder`` specifies the database specific placeholder used in prepared statements (default: ``%s``);
- ``inspector`` returns the SQLAlchemy inspector which allows us to retrieve database metadata;
- ``extract_schema_from_table`` allows us to extract the schema name from a string.
- ``get_column_names`` returns the column names for the given table and schema (optional) using the SQLAlchemy inspector.
- ``get_primary_keys`` returns the primary keys for the given table and schema (optional) using the SQLAlchemy inspector.
- ``get_target_fields`` returns the columns names that aren't identity or auto incremented columns, this will be used by the insert_rows method of the :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook` if the target_fields parameter wasn't specified and the Airflow property ``core.dbapihook_resolve_target_fields`` is set to True (default: False).
- ``reserved_words`` returns the reserved words in SQL for the target database using the SQLAlchemy inspector.
- ``generate_insert_sql`` generates the insert SQL statement for the target database.
- ``generate_replace_sql`` generates the upsert SQL statement for the target database.

At the moment there are only 3 dialects available:

- ``default`` :class:`~airflow.providers.common.sql.dialects.dialect.Dialect` reuses the generic functionality that was already available in the :class:`~airflow.providers.common.sql.hooks.sql.DbApiHook`;
- ``mssql`` :class:`~airflow.providers.microsoft.mssql.dialects.mssql.MsSqlDialect` specialized for Microsoft SQL Server;
- ``postgresql`` :class:`~airflow.providers.postgres.dialects.postgres.PostgresDialect` specialized for PostgreSQL;

The dialect to be used will be derived from the connection string, which sometimes won't be possible. There is always
the possibility to specify the dialect name through the extra options of the connection:

.. code-block::
dialect_name: 'mssql'
If a specific dialect isn't available for a database, the default one will be used, same when a non-existing dialect name is specified.
1 change: 1 addition & 0 deletions docs/apache-airflow-providers-postgres/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
:caption: References

Python API <_api/airflow/providers/postgres/index>
Dialects <dialects>

.. toctree::
:hidden:
Expand Down
Loading

0 comments on commit ffcf0d9

Please sign in to comment.