Skip to content

Commit

Permalink
Merge branch 'feat/dynamic-sub' of github.com:airtai/faststream into …
Browse files Browse the repository at this point in the history
…feat/dynamic-sub
  • Loading branch information
Lancetnik committed Jul 9, 2024
2 parents cdd358f + 850872d commit ffdc391
Showing 1 changed file with 30 additions and 1 deletion.
31 changes: 30 additions & 1 deletion faststream/cli/supervisors/multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import signal
from typing import TYPE_CHECKING, Any, List, Tuple

from faststream.cli.supervisors.basereload import BaseReload
Expand All @@ -17,8 +18,9 @@ def __init__(
target: "DecoratedCallable",
args: Tuple[Any, ...],
workers: int,
reload_delay: float = 0.5,
) -> None:
super().__init__(target, args, None)
super().__init__(target, args, reload_delay)

self.workers = workers
self.processes: List[SpawnProcess] = []
Expand All @@ -38,3 +40,30 @@ def shutdown(self) -> None:
process.join()

logger.info(f"Stopping parent process [{self.pid}]")

def restart(self) -> None:
active_processes = []

for process in self.processes:
if process.is_alive():
active_processes.append(process)
continue

pid = process.pid
exitcode = process.exitcode

log_msg = "Worker (pid:%s) exited with code %s."
if exitcode and abs(exitcode) == signal.SIGKILL:
log_msg += " Perhaps out of memory?"
logger.error(log_msg, pid, exitcode)

process.kill()

new_process = self._start_process()
logger.info(f"Started child process [{new_process.pid}]")
active_processes.append(new_process)

self.processes = active_processes

def should_restart(self) -> bool:
return not all(p.is_alive() for p in self.processes)

0 comments on commit ffdc391

Please sign in to comment.