Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for client authentication; fix e2e client tests #62

Merged
merged 1 commit into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions airflow_supervisor/client/xmlrpc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime
from enum import Enum
from typing import Dict, List
from typing import Dict, List, Optional
from xmlrpc.client import Fault, ServerProxy

from pydantic import BaseModel
Expand Down Expand Up @@ -116,17 +116,16 @@ def __init__(self, cfg: SupervisorAirflowConfiguration):
self._port = int(cfg.airflow.port.split(":")[-1])
self._protocol = cfg.airflow.protocol
self._rpcpath = "/" + cfg.airflow.rpcpath if not cfg.airflow.rpcpath.startswith("/") else cfg.airflow.rpcpath

if cfg.airflow.port == 80:
# force http
self._rpcurl = f"http://{self._host}{self._rpcpath}"
elif cfg.airflow.port == 443:
# force https
self._rpcurl = f"https://{self._host}{self._rpcpath}"
else:
self._rpcurl = f"{self._protocol}://{self._host}:{self._port}{self._rpcpath}"
self._rpcurl = self._build_rpcurl(username=cfg.airflow.username, password=cfg.airflow.password)
self._client = ServerProxy(self._rpcurl)

def _build_rpcurl(self, username: Optional[str], password: Optional[str]) -> str:
# Forces http or https based on port, otherwise resolves to given protocol
protocol = {80: "http", 443: "https"}.get(self._port, self._protocol)
port = "" if self._port in {80, 443} else f":{self._port}"
authentication = f"{username}:{password.get_secret_value()}@" if username and password else ""
return f"{protocol}://{authentication}{self._host}{port}{self._rpcpath}"

#######################
# supervisord methods #
#######################
Expand Down
2 changes: 1 addition & 1 deletion airflow_supervisor/config/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class AirflowConfiguration(_BaseCfgModel):
)
password: Optional[SecretStr] = Field(
default=None,
description="he password required for authentication to the HTTP/Unix server. This can be a cleartext password, or can be specified as a SHA-1 hash if prefixed by the string {SHA}. For example, {SHA}82ab876d1387bfafe46cc1c8a2ef074eae50cb1d is the SHA-stored version of the password “thepassword”. Note that hashed password must be in hex format.",
description="The password required for authentication to the HTTP/Unix server. This can be a cleartext password, or can be specified as a SHA-1 hash if prefixed by the string {SHA}. For example, {SHA}82ab876d1387bfafe46cc1c8a2ef074eae50cb1d is the SHA-stored version of the password “thepassword”. Note that hashed password must be in hex format.",
)

#################
Expand Down
51 changes: 30 additions & 21 deletions airflow_supervisor/tests/client/test_client_e2e.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
import xmlrpc
from time import sleep

import pytest

from airflow_supervisor import SupervisorAirflowConfiguration, SupervisorRemoteXMLRPCClient
from airflow_supervisor.client.xmlrpc import ProcessState


def test_supervisor_client(supervisor_instance: SupervisorAirflowConfiguration):
client = SupervisorRemoteXMLRPCClient(supervisor_instance)
print(client.getProcessInfo("test"))
sleep(0.5)
print(client.startAllProcesses())
sleep(0.5)
print(client.getProcessInfo("test"))
sleep(0.5)
print(client.getProcessInfo("test"))
sleep(0.5)
print(client.getProcessInfo("test"))
print(client.startProcess("test"))
sleep(0.5)
print(client.startProcess("test"))
sleep(0.5)
print(client.stopAllProcesses())
sleep(0.5)
print(client.startProcess("test"))
def _assert_client_actions(client: SupervisorRemoteXMLRPCClient):
assert client.getProcessInfo("test").state == ProcessState.STOPPED
sleep(0.5)
print(client.stopAllProcesses())
sleep(0.5)
print(client.stopProcess("test"))
assert client.startAllProcesses()["test"].state == ProcessState.RUNNING
sleep(0.5)
assert client.getProcessInfo("test").state == ProcessState.EXITED
assert client.startProcess("test").state == ProcessState.RUNNING
assert client.stopProcess("test").state == ProcessState.STOPPED
assert client.startProcess("test").state == ProcessState.RUNNING
assert client.stopAllProcesses()["test"].state == ProcessState.STOPPED


def test_supervisor_client(supervisor_instance: SupervisorAirflowConfiguration):
client = SupervisorRemoteXMLRPCClient(supervisor_instance)
_assert_client_actions(client=client)


def test_permissioned_supervisor_client_rejected(permissioned_supervisor_instance: SupervisorAirflowConfiguration):
permissioned_supervisor_instance.airflow.username = "bad-username"
client = SupervisorRemoteXMLRPCClient(permissioned_supervisor_instance)
with pytest.raises(xmlrpc.client.ProtocolError):
client.getProcessInfo("test")


def test_permissioned_supervisor_client(permissioned_supervisor_instance: SupervisorAirflowConfiguration):
permissioned_supervisor_instance.airflow.username = "user1"
client = SupervisorRemoteXMLRPCClient(permissioned_supervisor_instance)
_assert_client_actions(client=client)
45 changes: 44 additions & 1 deletion airflow_supervisor/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ def open_port() -> int:
return port


@fixture(scope="module")
def permissioned_open_port() -> int:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port


@fixture(scope="module")
def supervisor_airflow_configuration(open_port: int) -> Iterator[SupervisorAirflowConfiguration]:
with NamedTemporaryFile("w", suffix=".cfg") as tf:
Expand All @@ -26,7 +36,26 @@ def supervisor_airflow_configuration(open_port: int) -> Iterator[SupervisorAirfl
path=tf.name,
program={
"test": ProgramConfiguration(
command="sleep 1 && exit 1",
command="bash -c 'sleep 1; exit 1'",
)
},
)
yield cfg


@fixture(scope="module")
def permissioned_supervisor_airflow_configuration(
permissioned_open_port: int,
) -> Iterator[SupervisorAirflowConfiguration]:
with NamedTemporaryFile("w", suffix=".cfg") as tf:
cfg = SupervisorAirflowConfiguration(
airflow=AirflowConfiguration(
port=f"*:{permissioned_open_port}", username="user1", password="testpassword1"
),
path=tf.name,
program={
"test": ProgramConfiguration(
command="bash -c 'sleep 1; exit 1'",
)
},
)
Expand All @@ -45,3 +74,17 @@ def supervisor_instance(
sleep(1)
yield cfg
cfg.kill()


@fixture(scope="module")
def permissioned_supervisor_instance(
permissioned_supervisor_airflow_configuration: SupervisorAirflowConfiguration,
) -> Iterator[SupervisorAirflowConfiguration]:
cfg = permissioned_supervisor_airflow_configuration
cfg.write()
cfg.start(daemon=False)
for _ in range(5):
if not cfg.running():
sleep(1)
yield cfg
cfg.kill()
Loading