Skip to content

Commit

Permalink
Merge pull request #12 from airflow-laminar/tkp/init
Browse files Browse the repository at this point in the history
Hydra integration, tweaks for required blocks, fixes for rpcinterface
  • Loading branch information
timkpaine authored Aug 15, 2024
2 parents 4d70cde + 826df08 commit 6513100
Show file tree
Hide file tree
Showing 55 changed files with 1,025 additions and 626 deletions.
1 change: 1 addition & 0 deletions airflow_supervisor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .airflow import *
from .config import *

__version__ = "0.1.0"
3 changes: 3 additions & 0 deletions airflow_supervisor/airflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .dag import *
from .operators import *
from .sensors import *
6 changes: 6 additions & 0 deletions airflow_supervisor/airflow/dag/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .local import SupervisorLocal

try:
from .ssh import SupervisorRemote
except ImportError:
...
72 changes: 72 additions & 0 deletions airflow_supervisor/airflow/dag/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from airflow.models.dag import DAG
from airflow.models.operator import Operator

from airflow_supervisor.config import SupervisorConfiguration


class SupervisorCommon(DAG):
def __init__(self, supervisor_cfg: SupervisorConfiguration, **kwargs):
super().__init__(**kwargs)
self._supervisor_cfg = supervisor_cfg

# tasks
# configure supervisor
# | start supervisor
# | start programs
# | start watching programs
# | check programs
# | restart programs
# | stop watching programs
# | stop programs
# | stop supervisor
# | remove configuration
self._configure_supervisor = None
self._start_supervisor = None
self._start_programs = None
self._start_watch_programs = None
self._check_programs = None
self._restart_programs = None
self._stop_watch_programs = None
self._stop_programs = None
self._stop_supervisor = None
self._unconfigure_supervisor = None

@property
def configure_supervisor(self) -> Operator:
return self._configure_supervisor

@property
def start_supervisor(self) -> Operator:
return self._start_supervisor

@property
def start_programs(self) -> Operator:
return self._start_programs

@property
def start_watch_programs(self) -> Operator:
return self._start_watch_programs

@property
def check_programs(self) -> Operator:
return self._check_programs

@property
def restart_programs(self) -> Operator:
return self._restart_programs

@property
def stop_watch_programs(self) -> Operator:
return self._stop_watch_programs

@property
def stop_programs(self) -> Operator:
return self._stop_programs

@property
def stop_supervisor(self) -> Operator:
return self._stop_supervisor

@property
def unconfigure_supervisor(self) -> Operator:
return self._unconfigure_supervisor
6 changes: 6 additions & 0 deletions airflow_supervisor/airflow/dag/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .common import SupervisorCommon

# from airflow_supervisor.config import SupervisorConfiguration


class SupervisorLocal(SupervisorCommon): ...
49 changes: 49 additions & 0 deletions airflow_supervisor/airflow/dag/ssh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import List, Optional, Union

from airflow.providers.ssh.hooks.ssh import SSHHook

from airflow_supervisor.config import SupervisorConfiguration

from .common import SupervisorCommon


class SupervisorRemote(SupervisorCommon):
# Mimic SSH Operator: https://airflow.apache.org/docs/apache-airflow-providers-ssh/stable/_api/airflow/providers/ssh/operators/ssh/index.html
def __init__(
self,
supervisor_cfg: SupervisorConfiguration,
ssh_hook: Optional[SSHHook] = None,
ssh_conn_id: Optional[str] = None,
remote_host: Optional[str] = None,
command: Optional[str] = None,
conn_timeout: Optional[int] = None,
cmd_timeout: Optional[int] = None,
environment: Optional[dict] = None,
get_pty: Optional[bool] = None,
banner_timeout: Optional[float] = None,
skip_on_exit_code: Optional[Union[int, List[int]]] = None,
**kwargs,
):
super().__init__(supervisor_cfg=supervisor_cfg, **kwargs)
self._supervisor_cfg = supervisor_cfg
self._ssh_operator_kwargs = {}
if ssh_hook:
self._ssh_operator_kwargs["ssh_hook"] = ssh_hook
if ssh_conn_id:
self._ssh_operator_kwargs["ssh_conn_id"] = ssh_conn_id
if remote_host:
self._ssh_operator_kwargs["remote_host"] = remote_host
if command:
self._ssh_operator_kwargs["command"] = command
if conn_timeout:
self._ssh_operator_kwargs["conn_timeout"] = conn_timeout
if cmd_timeout:
self._ssh_operator_kwargs["cmd_timeout"] = cmd_timeout
if environment:
self._ssh_operator_kwargs["environment"] = environment
if get_pty:
self._ssh_operator_kwargs["get_pty"] = get_pty
if banner_timeout:
self._ssh_operator_kwargs["banner_timeout"] = banner_timeout
if skip_on_exit_code:
self._ssh_operator_kwargs["skip_on_exit_code"] = skip_on_exit_code
Empty file.
4 changes: 4 additions & 0 deletions airflow_supervisor/airflow/operators/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from airflow.models.operator import BaseOperator


class SupervisorOperator(BaseOperator): ...
Empty file.
4 changes: 4 additions & 0 deletions airflow_supervisor/airflow/sensors/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from airflow.sensors.base_sensor_operator import BaseSensorOperator


class SupervisorSensor(BaseSensorOperator): ...
2 changes: 2 additions & 0 deletions airflow_supervisor/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .local import SupervisorLocalClient
from .xmlrpc import SupervisorRemoteXMLRPCClient
6 changes: 6 additions & 0 deletions airflow_supervisor/client/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from ..config import SupervisorConfiguration


class SupervisorClientBase(object):
def __init__(self, cfg: SupervisorConfiguration):
self._cfg = cfg
4 changes: 4 additions & 0 deletions airflow_supervisor/client/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .common import SupervisorClientBase


class SupervisorLocalClient(SupervisorClientBase): ...
4 changes: 4 additions & 0 deletions airflow_supervisor/client/xmlrpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .common import SupervisorClientBase


class SupervisorRemoteXMLRPCClient(SupervisorClientBase): ...
Loading

0 comments on commit 6513100

Please sign in to comment.