diff --git a/openfl/experimental/workflow/component/envoy/envoy.py b/openfl/experimental/workflow/component/envoy/envoy.py index c532ee1a6b..fd9a87379c 100644 --- a/openfl/experimental/workflow/component/envoy/envoy.py +++ b/openfl/experimental/workflow/component/envoy/envoy.py @@ -12,7 +12,7 @@ from typing import Optional, Union from openfl.experimental.workflow.federated import Plan -from openfl.experimental.workflow.transport.grpc.director_client import DirectorClient +from openfl.experimental.workflow.transport.grpc.director_client import EnvoyDirectorClient from openfl.experimental.workflow.transport.grpc.exceptions import EnvoyNotFoundError from openfl.utilities.workspace import ExperimentWorkspace @@ -32,7 +32,7 @@ class Envoy: for TLS. private_key (Optional[Union[Path, str]]): The path to the private key for TLS. certificate (Optional[Union[Path, str]]): The path to the certificate for TLS. - director_client (DirectorClient): The director client. + _envoy_dir_client (EnvoyDirectorClient): The envoy director client. install_requirements (bool): A flag indicating if the requirements should be installed. is_experiment_running (bool): A flag indicating if an experiment is @@ -80,7 +80,7 @@ def __init__( self.tls = tls self._fill_certs(root_certificate, private_key, certificate) self.install_requirements = install_requirements - self.director_client = self._create_director_client(director_host, director_port) + self._envoy_dir_client = self._create_envoy_dir_client(director_host, director_port) self.is_experiment_running = False self.executor = ThreadPoolExecutor() # This plan path ("plan/plan.yaml") originates from the @@ -88,17 +88,19 @@ def __init__( self.plan = "plan/plan.yaml" self._health_check_future = None - def _create_director_client(self, director_host: str, director_port: int) -> DirectorClient: - """Create a DirectorClient instance. + def _create_envoy_dir_client( + self, director_host: str, director_port: int + ) -> EnvoyDirectorClient: + """Create a EnvoyDirectorClient instance. Args: director_host (str): The host of the director. director_port (int): The port of the director. Returns: - DirectorClient: Instance of the client + EnvoyDirectorClient: Instance of the client """ - return DirectorClient( + return EnvoyDirectorClient( director_host=director_host, director_port=director_port, envoy_name=self.name, @@ -134,8 +136,8 @@ def _run(self) -> None: while True: try: # Wait for experiment from Director server - experiment_name = self.director_client.wait_experiment() - data_stream = self.director_client.get_experiment_data(experiment_name) + experiment_name = self._envoy_dir_client.wait_experiment() + data_stream = self._envoy_dir_client.get_experiment_data(experiment_name) except Exception as exc: logger.exception("Failed to get experiment: %s", exc) time.sleep(self.DEFAULT_RETRY_TIMEOUT_IN_SECONDS) @@ -180,7 +182,7 @@ def _send_health_check(self) -> None: timeout = self.DEFAULT_RETRY_TIMEOUT_IN_SECONDS while True: try: - timeout = self.director_client.send_health_check( + timeout = self._envoy_dir_client.send_health_check( envoy_name=self.name, is_experiment_running=self.is_experiment_running, ) @@ -188,7 +190,7 @@ def _send_health_check(self) -> None: logger.info( "The director has lost information about current envoy. Reconnecting..." ) - self.director_client.connect_envoy(envoy_name=self.name) + self._envoy_dir_client.connect_envoy(envoy_name=self.name) time.sleep(timeout) def _run_collaborator(self) -> None: @@ -209,7 +211,7 @@ def _run_collaborator(self) -> None: def start(self) -> None: """Start the envoy""" try: - is_accepted = self.director_client.connect_envoy(envoy_name=self.name) + is_accepted = self._envoy_dir_client.connect_envoy(envoy_name=self.name) except Exception as exc: logger.exception("Failed to connect envoy: %s", exc) sys.exit(1) diff --git a/openfl/experimental/workflow/runtime/federated_runtime.py b/openfl/experimental/workflow/runtime/federated_runtime.py index 861c27e059..6420b6d8dc 100644 --- a/openfl/experimental/workflow/runtime/federated_runtime.py +++ b/openfl/experimental/workflow/runtime/federated_runtime.py @@ -17,7 +17,7 @@ from tabulate import tabulate from openfl.experimental.workflow.runtime.runtime import Runtime -from openfl.experimental.workflow.transport.grpc.director_client import DirectorClient +from openfl.experimental.workflow.transport.grpc.director_client import RuntimeDirectorClient from openfl.experimental.workflow.workspace_export import WorkspaceExport logger = logging.getLogger(__name__) @@ -31,7 +31,7 @@ class FederatedRuntime(Runtime): tls (bool): A flag indicating if TLS should be used for connections. Defaults to False. director (Optional[Dict[str, Any]]): Dictionary containing director info. - _dir_client (DirectorClient): The director client. + _runtime_dir_client (RuntimeDirectorClient): The Runtime director client. notebook_path (Optional[str]) : Path to the Jupyter notebook experiment_submitted (bool): Whether the experiment has been submitted. generated_workspace_path (Path): Path to generated workspace @@ -64,7 +64,7 @@ def __init__( self.director.get("api_private_key", None), self.director.get("api_cert", None), ) - self._dir_client = self._create_director_client() + self._runtime_dir_client = self._create_runtime_dir_client() self.notebook_path = notebook_path self.experiment_submitted = False @@ -123,13 +123,13 @@ def _fill_certs(self, root_certificate, private_key, certificate) -> None: else: self.root_certificate = self.private_key = self.certificate = None - def _create_director_client(self) -> DirectorClient: - """Create a DirectorClient instance. + def _create_runtime_dir_client(self) -> RuntimeDirectorClient: + """Create a RuntimeDirectorClient instance. Returns: - DirectorClient: Instance of the client + RuntimeDirectorClient: Instance of the client """ - return DirectorClient( + return RuntimeDirectorClient( director_host=self.director["director_node_fqdn"], director_port=self.director["director_port"], tls=self.tls, @@ -163,7 +163,7 @@ def submit_experiment(self, archive_path, exp_name) -> None: exp_name (str): The name of the experiment to be submitted. """ try: - response = self._dir_client.set_new_experiment( + response = self._runtime_dir_client.set_new_experiment( archive_path=archive_path, experiment_name=exp_name, col_names=self.__collaborators ) self.experiment_submitted = response.status @@ -186,7 +186,7 @@ def get_flow_state(self) -> Tuple[bool, Any]: status (bool): The flow status. flow_object: The deserialized flow object. """ - status, flspec_obj = self._dir_client.get_flow_state() + status, flspec_obj = self._runtime_dir_client.get_flow_state() # Append generated workspace path to sys.path # to allow unpickling of flspec_obj @@ -202,7 +202,7 @@ def get_envoys(self) -> List[str]: online_envoys (List[str]): List of online envoys. """ # Fetch envoy data - envoys = self._dir_client.get_envoys() + envoys = self._runtime_dir_client.get_envoys() DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" now = datetime.now().strftime(DATETIME_FORMAT) @@ -241,7 +241,9 @@ def stream_experiment_stdout(self, experiment_name) -> None: print("No experiment has been submitted yet.") return print(f"Getting standard output for experiment: {experiment_name}...") - for stdout_message_dict in self._dir_client.stream_experiment_stdout(experiment_name): + for stdout_message_dict in self._runtime_dir_client.stream_experiment_stdout( + experiment_name + ): print( f"Origin: {stdout_message_dict['stdout_origin']}, " f"Task: {stdout_message_dict['task_name']}" diff --git a/openfl/experimental/workflow/transport/__init__.py b/openfl/experimental/workflow/transport/__init__.py index 3397c94b7e..dd8a859e3b 100644 --- a/openfl/experimental/workflow/transport/__init__.py +++ b/openfl/experimental/workflow/transport/__init__.py @@ -7,6 +7,7 @@ from openfl.experimental.workflow.transport.grpc import ( AggregatorGRPCClient, AggregatorGRPCServer, - DirectorClient, DirectorGRPCServer, + EnvoyDirectorClient, + RuntimeDirectorClient, ) diff --git a/openfl/experimental/workflow/transport/grpc/__init__.py b/openfl/experimental/workflow/transport/grpc/__init__.py index 7406f1dda5..464df70c54 100644 --- a/openfl/experimental/workflow/transport/grpc/__init__.py +++ b/openfl/experimental/workflow/transport/grpc/__init__.py @@ -6,5 +6,8 @@ from openfl.experimental.workflow.transport.grpc.aggregator_client import AggregatorGRPCClient from openfl.experimental.workflow.transport.grpc.aggregator_server import AggregatorGRPCServer -from openfl.experimental.workflow.transport.grpc.director_client import DirectorClient +from openfl.experimental.workflow.transport.grpc.director_client import ( + EnvoyDirectorClient, + RuntimeDirectorClient, +) from openfl.experimental.workflow.transport.grpc.director_server import DirectorGRPCServer diff --git a/openfl/experimental/workflow/transport/grpc/director_client.py b/openfl/experimental/workflow/transport/grpc/director_client.py index 91eea331e8..189ab00937 100644 --- a/openfl/experimental/workflow/transport/grpc/director_client.py +++ b/openfl/experimental/workflow/transport/grpc/director_client.py @@ -18,8 +18,8 @@ logger = logging.getLogger(__name__) -class DirectorClient: - """Director client class for experiment managers/envoys. +class EnvoyDirectorClient: + """Envoy director client class for envoys. This class communicates with the director to manage the envoys participation in the federation. @@ -59,6 +59,7 @@ def __init__( """ director_addr = f"{director_host}:{director_port}" self.envoy_name = envoy_name + logger.info("Director address: %s", director_addr) if not tls: channel = grpc.insecure_channel(director_addr, options=channel_options) else: @@ -139,6 +140,102 @@ def _get_experiment_data(self) -> director_pb2.WaitExperimentRequest: """ return director_pb2.WaitExperimentRequest(collaborator_name=self.envoy_name) + def send_health_check( + self, + *, + envoy_name: str, + is_experiment_running: bool, + ) -> int: + """Send envoy health check. + + Args: + envoy_name (str): The name of the envoy. + is_experiment_running (bool): Whether an experiment is currently + running. + + Returns: + health_check_period (int): The period for health checks. + """ + status = director_pb2.UpdateEnvoyStatusRequest( + name=envoy_name, + is_experiment_running=is_experiment_running, + ) + + logger.debug("Sending health check status: %s", status) + try: + response = self.stub.UpdateEnvoyStatus(status) + except grpc.RpcError as rpc_error: + logger.error(rpc_error) + if rpc_error.code() == grpc.StatusCode.NOT_FOUND: + raise EnvoyNotFoundError + else: + health_check_period = response.health_check_period.seconds + + return health_check_period + + +class RuntimeDirectorClient: + """ + RuntimeDirectorClient class for experiment manager. + + This class communicates with the director to manage the experiment manager’s + participation in the federation. + + Attributes: + stub (director_pb2_grpc.DirectorStub): The gRPC stub for communication + with the director. + """ + + def __init__( + self, + *, + director_host: str, + director_port: int, + tls: bool = False, + root_certificate: Optional[Union[Path, str]] = None, + private_key: Optional[Union[Path, str]] = None, + certificate: Optional[Union[Path, str]] = None, + ) -> None: + """ + Initialize RuntimeDirectorClient object. + + Args: + director_host (str): The host name for Director server. + director_port (int): The port number for Director server. + tls (bool): Whether to use TLS for the connection. + root_certificate (Optional[Union[Path, str]]): The path to the root certificate for the + TLS connection. + private_key (Optional[Union[Path, str]]): The path to the private key for the TLS + connection. + certificate (Optional[Union[Path, str]]): The path to the certificate for the TLS + connection. + + """ + director_addr = f"{director_host}:{director_port}" + logger.info("Director address: %s", director_addr) + if not tls: + channel = grpc.insecure_channel(director_addr, options=channel_options) + else: + if not (root_certificate and private_key and certificate): + raise Exception("No certificates provided for TLS connection") + try: + with open(root_certificate, "rb") as f: + root_certificate_b = f.read() + with open(private_key, "rb") as f: + private_key_b = f.read() + with open(certificate, "rb") as f: + certificate_b = f.read() + except FileNotFoundError as exc: + raise Exception(f"Provided certificate file is not exist: {exc.filename}") + + credentials = grpc.ssl_channel_credentials( + root_certificates=root_certificate_b, + private_key=private_key_b, + certificate_chain=certificate_b, + ) + channel = grpc.secure_channel(director_addr, credentials, options=channel_options) + self.stub = director_pb2_grpc.DirectorStub(channel) + def set_new_experiment( self, experiment_name, col_names, archive_path ) -> director_pb2.SetNewExperimentResponse: @@ -220,39 +317,6 @@ def get_flow_state(self) -> Tuple: return response.completed, response.flspec_obj - def send_health_check( - self, - *, - envoy_name: str, - is_experiment_running: bool, - ) -> int: - """Send envoy health check. - - Args: - envoy_name (str): The name of the envoy. - is_experiment_running (bool): Whether an experiment is currently - running. - - Returns: - health_check_period (int): The period for health checks. - """ - status = director_pb2.UpdateEnvoyStatusRequest( - name=envoy_name, - is_experiment_running=is_experiment_running, - ) - - logger.debug("Sending health check status: %s", status) - try: - response = self.stub.UpdateEnvoyStatus(status) - except grpc.RpcError as rpc_error: - logger.error(rpc_error) - if rpc_error.code() == grpc.StatusCode.NOT_FOUND: - raise EnvoyNotFoundError - else: - health_check_period = response.health_check_period.seconds - - return health_check_period - def stream_experiment_stdout(self, experiment_name) -> Iterator[Dict[str, Any]]: """Stream experiment stdout RPC. Args: