Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flower Integration V2 #3216

Merged
merged 15 commits into from
Feb 12, 2025
14 changes: 10 additions & 4 deletions nvflare/app_common/tie/cli_applet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@


class CLIApplet(Applet, ABC):
def __init__(self):
def __init__(self, stop_method="kill"):
"""Constructor of CLIApplet, which runs the applet as a subprocess started with CLI command."""
Applet.__init__(self)
self.stop_method = stop_method
self._proc_mgr = None
self._start_error = False

Expand Down Expand Up @@ -55,7 +56,7 @@ def start(self, app_ctx: dict):

fl_ctx = app_ctx.get(Constant.APP_CTX_FL_CONTEXT)
try:
self._proc_mgr = start_process(cmd_desc, fl_ctx)
self._proc_mgr = start_process(cmd_desc, fl_ctx, stop_method=self.stop_method)
except Exception as ex:
self.logger.error(f"exception starting applet '{cmd_desc.cmd}': {secure_format_exception(ex)}")
self._start_error = True
Expand All @@ -74,8 +75,10 @@ def stop(self, timeout=0.0) -> int:
self._proc_mgr = None

if not mgr:
raise RuntimeError("no process manager to stop")
self.logger.debug("no process manager to stop")
return 0

self.logger.info(f"stopping applet: {timeout=}")
if timeout > 0:
# wait for the applet to stop by itself
start = time.time()
Expand All @@ -84,10 +87,13 @@ def stop(self, timeout=0.0) -> int:
if rc is not None:
# already stopped
self.logger.info(f"applet stopped ({rc=}) after {time.time() - start} seconds")
break
return rc
time.sleep(0.1)

self.logger.info(f"about to stop process manager: {type(mgr)}")
rc = mgr.stop()
self.logger.info(f"applet stopped: {rc=}")

if rc is None:
self.logger.warning(f"killed the applet process after waiting {timeout} seconds")
return -9
Expand Down
11 changes: 6 additions & 5 deletions nvflare/app_common/tie/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from nvflare.app_common.tie.applet import Applet
from nvflare.app_common.tie.defs import Constant
from nvflare.fuel.f3.cellnet.fqcn import FQCN
from nvflare.fuel.utils.validation_utils import check_object_type
from nvflare.fuel.utils.validation_utils import check_object_type, check_positive_number


class Connector(ABC, FLComponent):
Expand All @@ -35,9 +35,11 @@ class Connector(ABC, FLComponent):
The Connector class defines commonly required methods for all Connector implementations.
"""

def __init__(self):
def __init__(self, monitor_interval: float = 0.5):
"""Constructor of Connector"""
FLComponent.__init__(self)
check_positive_number("monitor_interval", monitor_interval)
self.monitor_interval = monitor_interval
self.abort_signal = None
self.applet = None
self.engine = None
Expand Down Expand Up @@ -138,16 +140,15 @@ def _monitor(self, fl_ctx: FLContext, connector_stopped_cb):
while True:
if self.abort_signal.triggered:
# asked to abort
self.stop(fl_ctx)
return
break

stopped, rc = self._is_stopped()
if stopped:
# connector already stopped - notify the caller
connector_stopped_cb(rc, fl_ctx)
return

time.sleep(0.1)
time.sleep(self.monitor_interval)

def monitor(self, fl_ctx: FLContext, connector_stopped_cb):
"""Called by Controller/Executor to monitor the health of the connector.
Expand Down
20 changes: 10 additions & 10 deletions nvflare/app_common/tie/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,6 @@ def _trigger_stop(self, fl_ctx: FLContext, error=None):
if error:
self.system_panic(reason=error, fl_ctx=fl_ctx)

def _is_stopped(self):
# check whether the abort signal is triggered
return self.abort_signal and self.abort_signal.triggered

def _update_client_status(self, fl_ctx: FLContext, op=None, client_done=False):
"""Update the status of the requesting client.

Expand Down Expand Up @@ -303,7 +299,7 @@ def _handle_app_request(self, topic: str, request: Shareable, fl_ctx: FLContext)
"""
self.log_debug(fl_ctx, f"_handle_app_request {topic}")
op = request.get_header(Constant.MSG_KEY_OP)
if self._is_stopped():
if self.abort_signal and self.abort_signal.triggered:
self.log_warning(fl_ctx, f"dropped app request ({op=}) since server is already stopped")
return make_reply(ReturnCode.SERVICE_UNAVAILABLE)

Expand All @@ -317,7 +313,7 @@ def _handle_app_request(self, topic: str, request: Shareable, fl_ctx: FLContext)
self._trigger_stop(fl_ctx, process_error)
return make_reply(ReturnCode.EXECUTION_EXCEPTION)

self.log_info(fl_ctx, f"received reply for app request '{op=}'")
self.log_debug(fl_ctx, f"received reply for app request '{op=}'")
reply.set_header(Constant.MSG_KEY_OP, op)
return reply

Expand Down Expand Up @@ -434,6 +430,7 @@ def control_flow(self, abort_signal: Signal, fl_ctx: FLContext):
# configure all clients
if not self._configure_clients(abort_signal, fl_ctx):
self.system_panic("failed to configure all clients", fl_ctx)
abort_signal.trigger(True)
return

# configure and start the connector
Expand All @@ -446,30 +443,33 @@ def control_flow(self, abort_signal: Signal, fl_ctx: FLContext):
error = f"failed to start connector: {secure_format_exception(ex)}"
self.log_error(fl_ctx, error)
self.system_panic(error, fl_ctx)
abort_signal.trigger(True)
return

self.connector.monitor(fl_ctx, self._app_stopped)

# start all clients
if not self._start_clients(abort_signal, fl_ctx):
self.system_panic("failed to start all clients", fl_ctx)
abort_signal.trigger(True)
return

# monitor client health
# we periodically check job status until all clients are done or the system is stopped
self.log_info(fl_ctx, "Waiting for clients to finish ...")
while not self._is_stopped():
while not abort_signal.triggered:
done = self._check_job_status(fl_ctx)
if done:
break
self.connector.stop(fl_ctx)
return
time.sleep(self.job_status_check_interval)

def _app_stopped(self, rc, fl_ctx: FLContext):
# This CB is called when app server is stopped
error = None
if rc != 0:
self.log_error(fl_ctx, f"App Server stopped abnormally with code {rc}")
error = "App server abnormal stop"
error = f"App server abnormally stopped: {rc=}"
self.log_error(fl_ctx, error)

# the app server could stop at any moment, we trigger the abort_signal in case it is checked by any
# other components
Expand Down
1 change: 1 addition & 0 deletions nvflare/app_common/tie/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ class Constant:

EXIT_CODE_CANT_START = 101
EXIT_CODE_FATAL_ERROR = 102
EXIT_CODE_FAILED = 103

APP_CTX_FL_CONTEXT = "tie.fl_context"
3 changes: 3 additions & 0 deletions nvflare/app_common/tie/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
self._notify_client_done(Constant.EXIT_CODE_FATAL_ERROR, fl_ctx)
elif event_type == EventType.END_RUN:
self.abort_signal.trigger(True)
if self.connector:
self.logger.info(f"stopping connector {type(self.connector)}")
self.connector.stop(fl_ctx)

def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable:
if task_name == self.configure_task_name:
Expand Down
65 changes: 57 additions & 8 deletions nvflare/app_common/tie/process_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
from nvflare.fuel.utils.validation_utils import check_object_type, check_str


class StopMethod:
KILL = "kill"
TERMINATE = "terminate"


class CommandDescriptor:
def __init__(
self,
Expand All @@ -33,6 +38,7 @@ def __init__(
log_file_name: str = "",
log_stdout: bool = True,
stdout_msg_prefix: str = None,
stop_method=StopMethod.KILL,
):
"""Constructor of CommandDescriptor.
A CommandDescriptor describes the requirements of the new process to be started.
Expand All @@ -46,6 +52,7 @@ def __init__(
stdout_msg_prefix: prefix to be prepended to log message when writing to stdout.
Since multiple processes could be running within the same terminal window, the prefix can help
differentiate log messages from these processes.
stop_method: how to stop the command (kill or terminate)
"""
check_str("cmd", cmd)

Expand All @@ -61,16 +68,21 @@ def __init__(
if stdout_msg_prefix:
check_str("stdout_msg_prefix", stdout_msg_prefix)

valid_stop_methods = [StopMethod.KILL, StopMethod.TERMINATE]
if stop_method not in valid_stop_methods:
raise ValueError(f"invalid stop_method '{stop_method}': must be one of {valid_stop_methods}")

self.cmd = cmd
self.cwd = cwd
self.env = env
self.log_file_name = log_file_name
self.log_stdout = log_stdout
self.stdout_msg_prefix = stdout_msg_prefix
self.stop_method = stop_method


class ProcessManager:
def __init__(self, cmd_desc: CommandDescriptor):
def __init__(self, cmd_desc: CommandDescriptor, stop_method="kill"):
"""Constructor of ProcessManager.
ProcessManager provides methods for managing the lifecycle of a subprocess (start, stop, poll), as well
as the handling of log file to be used by the subprocess.
Expand All @@ -84,6 +96,7 @@ def __init__(self, cmd_desc: CommandDescriptor):
check_object_type("cmd_desc", cmd_desc, CommandDescriptor)
self.process = None
self.cmd_desc = cmd_desc
self.stop_method = stop_method
self.log_file = None
self.msg_prefix = None
self.file_lock = threading.Lock()
Expand Down Expand Up @@ -131,7 +144,6 @@ def start(
env=env,
stdout=subprocess.PIPE,
)

log_writer = threading.Thread(target=self._write_log, daemon=True)
log_writer.start()

Expand Down Expand Up @@ -175,35 +187,72 @@ def stop(self) -> int:
Returns: the exit code of the process. If killed, returns -9.

"""
self.logger.info(f"stopping process: {self.cmd_desc.cmd}")
rc = self.poll()
if rc is None:
# process is still alive
stop_method = self.cmd_desc.stop_method
self.logger.info(f"process still running - {stop_method} process: {self.cmd_desc.cmd}")
try:
self.process.kill()
rc = -9
except:
if stop_method == StopMethod.KILL:
self.process.kill()
rc = -9
else:
self.process.terminate()
rc = -15
except Exception as ex:
# ignore kill error
self.logger.debug(f"ignored exception {ex} from {stop_method}")
pass
else:
self.logger.info(f"process already stopped: {rc=}")

# close the log file if any
with self.file_lock:
if self.log_file:
self.logger.debug("closed subprocess log file!")
self.logger.info("closed subprocess log file!")
self.log_file.close()
self.log_file = None
return rc


def start_process(cmd_desc: CommandDescriptor, fl_ctx: FLContext) -> ProcessManager:
def start_process(cmd_desc: CommandDescriptor, fl_ctx: FLContext, stop_method="kill") -> ProcessManager:
"""Convenience function for starting a subprocess.

Args:
cmd_desc: the CommandDescriptor the describes the command to be executed
fl_ctx: FLContext object
stop_method: how to stop the process

Returns: a ProcessManager object.

"""
mgr = ProcessManager(cmd_desc)
mgr = ProcessManager(cmd_desc, stop_method)
mgr.start(fl_ctx)
return mgr


def run_command(cmd_desc: CommandDescriptor) -> str:
env = os.environ.copy()
if cmd_desc.env:
env.update(cmd_desc.env)

command_seq = shlex.split(cmd_desc.cmd)
p = subprocess.Popen(
command_seq,
stderr=subprocess.STDOUT,
cwd=cmd_desc.cwd,
env=env,
stdout=subprocess.PIPE,
)

output = []
while True:
line = p.stdout.readline()
if not line:
break

assert isinstance(line, bytes)
line = line.decode("utf-8")
output.append(line)
return "".join(output)
Loading
Loading