diff --git a/faststream/cli/supervisors/multiprocess.py b/faststream/cli/supervisors/multiprocess.py index e7ab9dd413..a08fc5f273 100644 --- a/faststream/cli/supervisors/multiprocess.py +++ b/faststream/cli/supervisors/multiprocess.py @@ -1,3 +1,4 @@ +import signal from typing import TYPE_CHECKING, Any, List, Tuple from faststream.cli.supervisors.basereload import BaseReload @@ -17,8 +18,9 @@ def __init__( target: "DecoratedCallable", args: Tuple[Any, ...], workers: int, + reload_delay: float = 0.5, ) -> None: - super().__init__(target, args, None) + super().__init__(target, args, reload_delay) self.workers = workers self.processes: List[SpawnProcess] = [] @@ -38,3 +40,30 @@ def shutdown(self) -> None: process.join() logger.info(f"Stopping parent process [{self.pid}]") + + def restart(self) -> None: + active_processes = [] + + for process in self.processes: + if process.is_alive(): + active_processes.append(process) + continue + + pid = process.pid + exitcode = process.exitcode + + log_msg = "Worker (pid:%s) exited with code %s." + if exitcode and abs(exitcode) == signal.SIGKILL: + log_msg += " Perhaps out of memory?" + logger.error(log_msg, pid, exitcode) + + process.kill() + + new_process = self._start_process() + logger.info(f"Started child process [{new_process.pid}]") + active_processes.append(new_process) + + self.processes = active_processes + + def should_restart(self) -> bool: + return not all(p.is_alive() for p in self.processes)