Skip to content

Commit

Permalink
Add basic pydantic models
Browse files Browse the repository at this point in the history
  • Loading branch information
timkpaine committed Aug 11, 2024
1 parent 1a66918 commit 3e0189f
Show file tree
Hide file tree
Showing 17 changed files with 937 additions and 1 deletion.
2 changes: 2 additions & 0 deletions airflow_supervisor/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .config import *

__version__ = "0.1.0"
496 changes: 496 additions & 0 deletions airflow_supervisor/config.py

Large diffs are not rendered by default.

119 changes: 119 additions & 0 deletions airflow_supervisor/daggen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from typing import TYPE_CHECKING, List, Optional, Union

from airflow.models.dag import DAG
from airflow.models.operator import Operator

if TYPE_CHECKING:
from airflow.providers.ssh.hooks.ssh import SSHHook

from .config import SupervisorConfiguration


class SupervisorLocal(DAG):
def __init__(self, supervisor_cfg: SupervisorConfiguration, **kwargs):
super().__init__(**kwargs)
self._supervisor_cfg = supervisor_cfg

# tasks
# configure supervisor
# | start supervisor
# | start programs
# | start watching programs
# | check programs
# | restart programs
# | stop watching programs
# | stop programs
# | stop supervisor
# | remove configuration
self._configure_supervisor = None
self._start_supervisor = None
self._start_programs = None
self._start_watch_programs = None
self._check_programs = None
self._restart_programs = None
self._stop_watch_programs = None
self._stop_programs = None
self._stop_supervisor = None
self._unconfigure_supervisor = None

@property
def configure_supervisor(self) -> Operator:
return self._configure_supervisor

@property
def start_supervisor(self) -> Operator:
return self._start_supervisor

@property
def start_programs(self) -> Operator:
return self._start_programs

@property
def start_watch_programs(self) -> Operator:
return self._start_watch_programs

@property
def check_programs(self) -> Operator:
return self._check_programs

@property
def restart_programs(self) -> Operator:
return self._restart_programs

@property
def stop_watch_programs(self) -> Operator:
return self._stop_watch_programs

@property
def stop_programs(self) -> Operator:
return self._stop_programs

@property
def stop_supervisor(self) -> Operator:
return self._stop_supervisor

@property
def unconfigure_supervisor(self) -> Operator:
return self._unconfigure_supervisor


class SupervisorRemove(SupervisorLocal):
# Mimic SSH Operator: https://airflow.apache.org/docs/apache-airflow-providers-ssh/stable/_api/airflow/providers/ssh/operators/ssh/index.html
def __init__(
self,
supervisor_cfg: SupervisorConfiguration,
ssh_hook: Optional[SSHHook] = None,
ssh_conn_id: Optional[str] = None,
remote_host: Optional[str] = None,
command: Optional[str] = None,
conn_timeout: Optional[int] = None,
cmd_timeout: Optional[int] = None,
environment: Optional[dict] = None,
get_pty: Optional[bool] = None,
banner_timeout: Optional[float] = None,
skip_on_exit_code: Optional[Union[int, List[int]]] = None,
**kwargs,
):
super().__init__(supervisor_cfg=supervisor_cfg, **kwargs)
self._supervisor_cfg = supervisor_cfg
self._ssh_operator_kwargs = {}
if ssh_hook:
self._ssh_operator_kwargs["ssh_hook"] = ssh_hook
if ssh_conn_id:
self._ssh_operator_kwargs["ssh_conn_id"] = ssh_conn_id
if remote_host:
self._ssh_operator_kwargs["remote_host"] = remote_host
if command:
self._ssh_operator_kwargs["command"] = command
if conn_timeout:
self._ssh_operator_kwargs["conn_timeout"] = conn_timeout
if cmd_timeout:
self._ssh_operator_kwargs["cmd_timeout"] = cmd_timeout
if environment:
self._ssh_operator_kwargs["environment"] = environment
if get_pty:
self._ssh_operator_kwargs["get_pty"] = get_pty
if banner_timeout:
self._ssh_operator_kwargs["banner_timeout"] = banner_timeout
if skip_on_exit_code:
self._ssh_operator_kwargs["skip_on_exit_code"] = skip_on_exit_code
108 changes: 108 additions & 0 deletions airflow_supervisor/tests/config/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import patch

from pydantic import ValidationError
from pytest import raises

from airflow_supervisor import (
EventListenerConfiguration,
FcgiProgramConfiguration,
GroupConfiguration,
IncludeConfiguration,
InetHttpServerConfiguration,
ProgramConfiguration,
RpcInterfaceConfiguration,
SupervisorConfiguration,
SupervisorctlConfiguration,
SupervisordConfiguration,
UnixHttpServerConfiguration,
)
from airflow_supervisor.config import _generate_supervisor_config_path


def test_generate_supervisor_config_path():
with patch("airflow_supervisor.config.gettempdir") as p1, patch("airflow_supervisor.config.datetime") as p2:
pth = Path(__file__).resolve().parent.parent.parent.parent / ".pytest_cache"
p1.return_value = str(pth)
p2.now.return_value = datetime(2000, 1, 1, 0, 0, 0, 1, tzinfo=UTC)
tmp = _generate_supervisor_config_path()
assert str(tmp) == str(pth / "supervisor-2000-01-01T00:00:00")


def test_inst():
with raises(ValidationError):
SupervisorConfiguration()
with patch("airflow_supervisor.config.gettempdir") as p1, patch("airflow_supervisor.config.datetime") as p2:
pth = Path(__file__).resolve().parent.parent.parent.parent / ".pytest_cache"
p1.return_value = str(pth)
p2.now.return_value = datetime(2000, 1, 1, 0, 0, 0, 1, tzinfo=UTC)
c = SupervisorConfiguration(program={"test": ProgramConfiguration(command="test")})
assert str(c.path) == str(pth / "supervisor-2000-01-01T00:00:00")


def test_cfg():
c = SupervisorConfiguration(program={"test": ProgramConfiguration(command="test")})
assert c.to_cfg().strip() == "[program:test]\ncommand=test"


def test_cfg_all():
c = SupervisorConfiguration(
unix_http_server=UnixHttpServerConfiguration(
file="/a/test/file",
chmod="0777",
chown="test",
username="test",
password="testpw",
),
inet_http_server=InetHttpServerConfiguration(port="127.0.0.1:8000", username="test", password="testpw"),
supervisord=SupervisordConfiguration(directory="/test"),
supervisorctl=SupervisorctlConfiguration(username="test", password="testpw"),
include=IncludeConfiguration(files=["a/test/file", "another/test/file"]),
program={"test": ProgramConfiguration(command="test")},
group={"testgroup": GroupConfiguration(programs=["test"])},
fcgiprogram={"testfcgi": FcgiProgramConfiguration(command="echo 'test'", socket="test")},
eventlistener={"testeventlistener": EventListenerConfiguration(command="echo 'test'")},
rpcinterface={"testrpcinterface": RpcInterfaceConfiguration(supervisor_rpcinterface_factory="a.test.module")},
)
print(c.to_cfg().strip())
assert (
c.to_cfg().strip()
== """[unix_http_server]
file=/a/test/file
chmod=0777
chown=test
username=test
password=testpw
[inet_http_server]
port=127.0.0.1:8000
username=test
password=testpw
[supervisord]
directory=/test
[supervisorctl]
username=test
password=testpw
[include]
files=a/test/file another/test/file
[program:test]
command=test
[group:testgroup]
programs=test
[fcgi-program:testfcgi]
command=echo 'test'
socket=test
[eventlistener:testeventlistener]
command=echo 'test'
[rpcinterface:testrpcinterface]
supervisor_rpcinterface_factory=a.test.module"""
)
17 changes: 17 additions & 0 deletions airflow_supervisor/tests/config/test_eventlistener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from pydantic import ValidationError
from pytest import raises

from airflow_supervisor import EventListenerConfiguration


def test_inst():
with raises(ValidationError):
EventListenerConfiguration()
with raises(ValidationError):
EventListenerConfiguration(stdout_capture_maxbytes=10)
EventListenerConfiguration(command="echo 'test'")


def test_cfg():
c = EventListenerConfiguration(command="echo 'test'")
assert c.to_cfg("name").strip() == "[eventlistener:name]\ncommand=echo 'test'"
19 changes: 19 additions & 0 deletions airflow_supervisor/tests/config/test_fcgiprogram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from pydantic import ValidationError
from pytest import raises

from airflow_supervisor import FcgiProgramConfiguration


def test_inst():
with raises(ValidationError):
FcgiProgramConfiguration()
with raises(ValidationError):
FcgiProgramConfiguration(socket="test")
with raises(ValidationError):
FcgiProgramConfiguration(command="test")
FcgiProgramConfiguration(command="echo 'test'", socket="test")


def test_cfg():
c = FcgiProgramConfiguration(command="echo 'test'", socket="test")
assert c.to_cfg("name").strip() == "[fcgi-program:name]\ncommand=echo 'test'\nsocket=test"
15 changes: 15 additions & 0 deletions airflow_supervisor/tests/config/test_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pydantic import ValidationError
from pytest import raises

from airflow_supervisor import GroupConfiguration


def test_inst():
with raises(ValidationError):
GroupConfiguration()
GroupConfiguration(programs=["test"])


def test_cfg():
c = GroupConfiguration(programs=["test"])
assert c.to_cfg("name").strip() == "[group:name]\nprograms=test"
15 changes: 15 additions & 0 deletions airflow_supervisor/tests/config/test_include.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pydantic import ValidationError
from pytest import raises

from airflow_supervisor import IncludeConfiguration


def test_inst():
with raises(ValidationError):
IncludeConfiguration()
IncludeConfiguration(files=[""])


def test_cfg():
c = IncludeConfiguration(files=["a/test/file", "another/test/file"])
assert c.to_cfg().strip() == "[include]\nfiles=a/test/file another/test/file"
23 changes: 23 additions & 0 deletions airflow_supervisor/tests/config/test_inethttpserver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from airflow_supervisor import InetHttpServerConfiguration


def test_inst():
InetHttpServerConfiguration()
InetHttpServerConfiguration(port="127.0.0.1:8000")
InetHttpServerConfiguration(port="127.0.0.1:8000", username="test")
InetHttpServerConfiguration(port="127.0.0.1:8000", username="test", password="testpw")


def test_cfg():
c = InetHttpServerConfiguration()
assert c.to_cfg().strip() == """[inet_http_server]"""

c = InetHttpServerConfiguration(port="127.0.0.1:8000", username="test", password="testpw")
assert (
c.to_cfg()
== """[inet_http_server]
port=127.0.0.1:8000
username=test
password=testpw
"""
)
15 changes: 15 additions & 0 deletions airflow_supervisor/tests/config/test_program.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pydantic import ValidationError
from pytest import raises

from airflow_supervisor import ProgramConfiguration


def test_inst():
with raises(ValidationError):
ProgramConfiguration()
ProgramConfiguration(command="echo 'test'")


def test_cfg():
c = ProgramConfiguration(command="echo 'test'")
assert c.to_cfg("name").strip() == "[program:name]\ncommand=echo 'test'"
15 changes: 15 additions & 0 deletions airflow_supervisor/tests/config/test_rpcinterface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pydantic import ValidationError
from pytest import raises

from airflow_supervisor import RpcInterfaceConfiguration


def test_inst():
with raises(ValidationError):
RpcInterfaceConfiguration()
RpcInterfaceConfiguration(supervisor_rpcinterface_factory="a.test.module")


def test_cfg():
c = RpcInterfaceConfiguration(supervisor_rpcinterface_factory="a.test.module")
assert c.to_cfg("name").strip() == "[rpcinterface:name]\nsupervisor_rpcinterface_factory=a.test.module"
10 changes: 10 additions & 0 deletions airflow_supervisor/tests/config/test_supervisorctl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from airflow_supervisor import SupervisorctlConfiguration


def test_inst():
SupervisorctlConfiguration()


def test_cfg():
c = SupervisorctlConfiguration(username="test", password="testpw")
assert c.to_cfg().strip() == "[supervisorctl]\nusername=test\npassword=testpw"
10 changes: 10 additions & 0 deletions airflow_supervisor/tests/config/test_supervisord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from airflow_supervisor import SupervisordConfiguration


def test_inst():
SupervisordConfiguration()


def test_cfg():
c = SupervisordConfiguration(directory="/test")
assert c.to_cfg().strip() == "[supervisord]\ndirectory=/test"
11 changes: 11 additions & 0 deletions airflow_supervisor/tests/config/test_toplevel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pydantic import ValidationError
from pytest import raises

from airflow_supervisor import ProgramConfiguration, SupervisorConfiguration


def test_config_instantiation():
with raises(ValidationError):
c = SupervisorConfiguration()
c = SupervisorConfiguration(program={"test": ProgramConfiguration(command="echo 'hello'")})
assert c
Loading

0 comments on commit 3e0189f

Please sign in to comment.