Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/async message producer #725

Merged
106 changes: 106 additions & 0 deletions examples/src/message_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
Message producer for non-HTTP interactions.

This modules implements a very basic message producer which could
send to an eventing system, such as Kafka, or a message queue.
"""

from __future__ import annotations

import enum
import json
from typing import Literal, NamedTuple


class FileSystemAction(enum.Enum):
"""
Represents a file system action.
"""

READ = "READ"
WRITE = "WRITE"


class FileSystemEvent(NamedTuple):
"""
Represents a file system event.
"""

action: Literal[FileSystemAction.READ, FileSystemAction.WRITE]
path: str
contents: str | None


class MockMessageQueue:
"""
A mock message queue.
"""

def __init__(self) -> None:
"""
Initialize the message queue.
"""
self.messages: list[str] = []

def send(self, message: str) -> None:
"""
Send a message to the queue.

Args:
message: The message to send.
"""
self.messages.append(message)


class FileSystemMessageProducer:
"""
A message producer for file system events.
"""

def __init__(self) -> None:
"""
Initialize the message producer.
"""
self.queue = MockMessageQueue()

def send_to_queue(self, message: FileSystemEvent) -> None:
"""
Send a message to a message queue.

:param message: The message to send.
"""
self.queue.send(
json.dumps({
"action": message.action.value,
"path": message.path,
"contents": message.contents,
})
)

def send_write_event(self, filename: str, contents: str) -> None:
"""
Send a write event to a message queue.

Args:
filename: The name of the file.
contents: The contents of the file.
"""
message = FileSystemEvent(
action=FileSystemAction.WRITE,
path=filename,
contents=contents,
)
self.send_to_queue(message)

def send_read_event(self, filename: str) -> None:
"""
Send a read event to a message queue.

:param filename: The name of the file.
"""
message = FileSystemEvent(
action=FileSystemAction.READ,
path=filename,
contents=None,
)
self.send_to_queue(message)
253 changes: 253 additions & 0 deletions examples/tests/v3/provider_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
"""
HTTP Server to route message requests to message producer function.
"""

from __future__ import annotations

import logging
import re
import signal
import socket
import subprocess
import sys
import time
from contextlib import closing, contextmanager
from importlib import import_module
from pathlib import Path
from threading import Thread
from typing import Generator, NoReturn, Tuple

import requests

sys.path.append(str(Path(__file__).parent.parent.parent.parent))

import flask
from yarl import URL

logger = logging.getLogger(__name__)


class Provider:
"""
Provider class to route message requests to message producer function.

Sets up three endpoints:
- /_test/ping: A simple ping endpoint for testing.
- /produce_message: Route message requests to the handler function.
- /set_provider_state: Set the provider state.

The specific `produce_message` and `set_provider_state` URLs can be configured
with the `produce_message_url` and `set_provider_state_url` arguments.
"""

def __init__( # noqa: PLR0913
self,
handler_module: str,
handler_function: str,
produce_message_url: str,
state_provider_module: str,
state_provider_function: str,
set_provider_state_url: str,
) -> None:
"""
Initialize the provider.

Args:
handler_module:
The name of the module containing the handler function.
handler_function:
The name of the handler function.
produce_message_url:
The URL to route message requests to the handler function.
state_provider_module:
The name of the module containing the state provider setup function.
state_provider_function:
The name of the state provider setup function.
set_provider_state_url:
The URL to set the provider state.
"""
self.app = flask.Flask("Provider")
self.handler_function = getattr(import_module(handler_module), handler_function)
self.produce_message_url = produce_message_url
self.set_provider_state_url = set_provider_state_url
if state_provider_module:
self.state_provider_function = getattr(
import_module(state_provider_module), state_provider_function
)

@self.app.get("/_test/ping")
def ping() -> str:
"""Simple ping endpoint for testing."""
return "pong"

@self.app.route(self.produce_message_url, methods=["POST"])
def produce_message() -> flask.Response | Tuple[str, int]:
"""
Route a message request to the handler function.

Returns:
The response from the handler function.
"""
try:
body, content_type = self.handler_function()
return flask.Response(
response=body,
status=200,
content_type=content_type,
direct_passthrough=True,
)
except Exception as e: # noqa: BLE001
return str(e), 500

@self.app.route(self.set_provider_state_url, methods=["POST"])
def set_provider_state() -> Tuple[str, int]:
"""
Calls the state provider function with the state provided in the request.

Returns:
A response indicating that the state has been set.
"""
if self.state_provider_function:
self.state_provider_function(flask.request.args["state"])
return "Provider state set", 200

def _find_free_port(self) -> int:
"""
Find a free port.

This is used to find a free port to host the API on when running locally. It
is allocated, and then released immediately so that it can be used by the
API.

Returns:
The port number.
"""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]

def run(self) -> None:
"""
Start the provider.
"""
url = URL(f"http://localhost:{self._find_free_port()}")
sys.stderr.write(f"Starting provider on {url}\n")

self.app.run(
host=url.host,
port=url.port,
debug=True,
)


@contextmanager
def start_provider(**kwargs: str) -> Generator[URL, None, None]: # noqa: C901
"""
Start the provider app.

Expects kwargs to to contain the following:
handler_module: Required. The name of the module containing
the handler function.
handler_function: Required. The name of the handler function.
produce_message_url: Optional. The URL to route message requests to
the handler function.
state_provider_module: Optional. The name of the module containing
the state provider setup function.
state_provider_function: Optional. The name of the state provider
setup function.
set_provider_state_url: Optional. The URL to set the provider state.
"""
process = subprocess.Popen( # noqa: S603
[
sys.executable,
Path(__file__),
kwargs.pop("handler_module"),
kwargs.pop("handler_function"),
kwargs.pop("produce_message_url", "/produce_message"),
kwargs.pop("state_provider_module", ""),
kwargs.pop("state_provider_function", ""),
kwargs.pop("set_provider_state_url", "/set_provider_state"),
],
cwd=Path.cwd(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)

pattern = re.compile(r" \* Running on (?P<url>[^ ]+)")
while True:
if process.poll() is not None:
logger.error("Provider process exited with code %d", process.returncode)
logger.error(
"Provider stdout: %s", process.stdout.read() if process.stdout else ""
)
logger.error(
"Provider stderr: %s", process.stderr.read() if process.stderr else ""
)
msg = f"Provider process exited with code {process.returncode}"
raise RuntimeError(msg)
if (
process.stderr
and (line := process.stderr.readline())
and (match := pattern.match(line))
):
break
time.sleep(0.1)

url = URL(match.group("url"))
logger.debug("Provider started on %s", url)
for _ in range(50):
try:
response = requests.get(str(url / "_test" / "ping"), timeout=1)
assert response.text == "pong"
break
except (requests.RequestException, AssertionError):
time.sleep(0.1)
continue
else:
msg = "Failed to ping provider"
raise RuntimeError(msg)

def redirect() -> NoReturn:
while True:
if process.stdout:
while line := process.stdout.readline():
logger.debug("Provider stdout: %s", line.strip())
if process.stderr:
while line := process.stderr.readline():
logger.debug("Provider stderr: %s", line.strip())

thread = Thread(target=redirect, daemon=True)
thread.start()

try:
yield url
finally:
process.send_signal(signal.SIGINT)


if __name__ == "__main__":
import sys

if len(sys.argv) < 5: # noqa: PLR2004
sys.stderr.write(
f"Usage: {sys.argv[0]} <state_provider_module> <state_provider_function> "
f"<handler_module> <handler_function>"
)
sys.exit(1)

handler_module = sys.argv[1]
handler_function = sys.argv[2]
produce_message_url = sys.argv[3]
state_provider_module = sys.argv[4]
state_provider_function = sys.argv[5]
set_provider_state_url = sys.argv[6]
Provider(
handler_module,
handler_function,
produce_message_url,
state_provider_module,
state_provider_function,
set_provider_state_url,
).run()
Loading
Loading