Skip to content

Commit

Permalink
[Workflow Interface] FederatedRuntime: Refactor DirectorClient into D…
Browse files Browse the repository at this point in the history
…irectorClient and RuntimeDirectorClient (#1291)

* Director-client-refactor

Signed-off-by: refai06 <[email protected]>

* Update DirectorClient Refactor

Signed-off-by: refai06 <[email protected]>

* Improve readability

Signed-off-by: refai06 <[email protected]>

* Docstring Update

Signed-off-by: refai06 <[email protected]>

* Update Dir-client class name

Signed-off-by: refai06 <[email protected]>

* Update code

Signed-off-by: refai06 <[email protected]>

---------

Signed-off-by: refai06 <[email protected]>
  • Loading branch information
refai06 authored Feb 11, 2025
1 parent a306683 commit d723f9d
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 60 deletions.
26 changes: 14 additions & 12 deletions openfl/experimental/workflow/component/envoy/envoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing import Optional, Union

from openfl.experimental.workflow.federated import Plan
from openfl.experimental.workflow.transport.grpc.director_client import DirectorClient
from openfl.experimental.workflow.transport.grpc.director_client import EnvoyDirectorClient
from openfl.experimental.workflow.transport.grpc.exceptions import EnvoyNotFoundError
from openfl.utilities.workspace import ExperimentWorkspace

Expand All @@ -32,7 +32,7 @@ class Envoy:
for TLS.
private_key (Optional[Union[Path, str]]): The path to the private key for TLS.
certificate (Optional[Union[Path, str]]): The path to the certificate for TLS.
director_client (DirectorClient): The director client.
_envoy_dir_client (EnvoyDirectorClient): The envoy director client.
install_requirements (bool): A flag indicating if the requirements
should be installed.
is_experiment_running (bool): A flag indicating if an experiment is
Expand Down Expand Up @@ -80,25 +80,27 @@ def __init__(
self.tls = tls
self._fill_certs(root_certificate, private_key, certificate)
self.install_requirements = install_requirements
self.director_client = self._create_director_client(director_host, director_port)
self._envoy_dir_client = self._create_envoy_dir_client(director_host, director_port)
self.is_experiment_running = False
self.executor = ThreadPoolExecutor()
# This plan path ("plan/plan.yaml") originates from the
# experiment workspace provided by the director
self.plan = "plan/plan.yaml"
self._health_check_future = None

def _create_director_client(self, director_host: str, director_port: int) -> DirectorClient:
"""Create a DirectorClient instance.
def _create_envoy_dir_client(
self, director_host: str, director_port: int
) -> EnvoyDirectorClient:
"""Create a EnvoyDirectorClient instance.
Args:
director_host (str): The host of the director.
director_port (int): The port of the director.
Returns:
DirectorClient: Instance of the client
EnvoyDirectorClient: Instance of the client
"""
return DirectorClient(
return EnvoyDirectorClient(
director_host=director_host,
director_port=director_port,
envoy_name=self.name,
Expand Down Expand Up @@ -134,8 +136,8 @@ def _run(self) -> None:
while True:
try:
# Wait for experiment from Director server
experiment_name = self.director_client.wait_experiment()
data_stream = self.director_client.get_experiment_data(experiment_name)
experiment_name = self._envoy_dir_client.wait_experiment()
data_stream = self._envoy_dir_client.get_experiment_data(experiment_name)
except Exception as exc:
logger.exception("Failed to get experiment: %s", exc)
time.sleep(self.DEFAULT_RETRY_TIMEOUT_IN_SECONDS)
Expand Down Expand Up @@ -180,15 +182,15 @@ def _send_health_check(self) -> None:
timeout = self.DEFAULT_RETRY_TIMEOUT_IN_SECONDS
while True:
try:
timeout = self.director_client.send_health_check(
timeout = self._envoy_dir_client.send_health_check(
envoy_name=self.name,
is_experiment_running=self.is_experiment_running,
)
except EnvoyNotFoundError:
logger.info(
"The director has lost information about current envoy. Reconnecting..."
)
self.director_client.connect_envoy(envoy_name=self.name)
self._envoy_dir_client.connect_envoy(envoy_name=self.name)
time.sleep(timeout)

def _run_collaborator(self) -> None:
Expand All @@ -209,7 +211,7 @@ def _run_collaborator(self) -> None:
def start(self) -> None:
"""Start the envoy"""
try:
is_accepted = self.director_client.connect_envoy(envoy_name=self.name)
is_accepted = self._envoy_dir_client.connect_envoy(envoy_name=self.name)
except Exception as exc:
logger.exception("Failed to connect envoy: %s", exc)
sys.exit(1)
Expand Down
24 changes: 13 additions & 11 deletions openfl/experimental/workflow/runtime/federated_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from tabulate import tabulate

from openfl.experimental.workflow.runtime.runtime import Runtime
from openfl.experimental.workflow.transport.grpc.director_client import DirectorClient
from openfl.experimental.workflow.transport.grpc.director_client import RuntimeDirectorClient
from openfl.experimental.workflow.workspace_export import WorkspaceExport

logger = logging.getLogger(__name__)
Expand All @@ -31,7 +31,7 @@ class FederatedRuntime(Runtime):
tls (bool): A flag indicating if TLS should be used for
connections. Defaults to False.
director (Optional[Dict[str, Any]]): Dictionary containing director info.
_dir_client (DirectorClient): The director client.
_runtime_dir_client (RuntimeDirectorClient): The Runtime director client.
notebook_path (Optional[str]) : Path to the Jupyter notebook
experiment_submitted (bool): Whether the experiment has been submitted.
generated_workspace_path (Path): Path to generated workspace
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(
self.director.get("api_private_key", None),
self.director.get("api_cert", None),
)
self._dir_client = self._create_director_client()
self._runtime_dir_client = self._create_runtime_dir_client()

self.notebook_path = notebook_path
self.experiment_submitted = False
Expand Down Expand Up @@ -123,13 +123,13 @@ def _fill_certs(self, root_certificate, private_key, certificate) -> None:
else:
self.root_certificate = self.private_key = self.certificate = None

def _create_director_client(self) -> DirectorClient:
"""Create a DirectorClient instance.
def _create_runtime_dir_client(self) -> RuntimeDirectorClient:
"""Create a RuntimeDirectorClient instance.
Returns:
DirectorClient: Instance of the client
RuntimeDirectorClient: Instance of the client
"""
return DirectorClient(
return RuntimeDirectorClient(
director_host=self.director["director_node_fqdn"],
director_port=self.director["director_port"],
tls=self.tls,
Expand Down Expand Up @@ -163,7 +163,7 @@ def submit_experiment(self, archive_path, exp_name) -> None:
exp_name (str): The name of the experiment to be submitted.
"""
try:
response = self._dir_client.set_new_experiment(
response = self._runtime_dir_client.set_new_experiment(
archive_path=archive_path, experiment_name=exp_name, col_names=self.__collaborators
)
self.experiment_submitted = response.status
Expand All @@ -186,7 +186,7 @@ def get_flow_state(self) -> Tuple[bool, Any]:
status (bool): The flow status.
flow_object: The deserialized flow object.
"""
status, flspec_obj = self._dir_client.get_flow_state()
status, flspec_obj = self._runtime_dir_client.get_flow_state()

# Append generated workspace path to sys.path
# to allow unpickling of flspec_obj
Expand All @@ -202,7 +202,7 @@ def get_envoys(self) -> List[str]:
online_envoys (List[str]): List of online envoys.
"""
# Fetch envoy data
envoys = self._dir_client.get_envoys()
envoys = self._runtime_dir_client.get_envoys()
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
now = datetime.now().strftime(DATETIME_FORMAT)

Expand Down Expand Up @@ -241,7 +241,9 @@ def stream_experiment_stdout(self, experiment_name) -> None:
print("No experiment has been submitted yet.")
return
print(f"Getting standard output for experiment: {experiment_name}...")
for stdout_message_dict in self._dir_client.stream_experiment_stdout(experiment_name):
for stdout_message_dict in self._runtime_dir_client.stream_experiment_stdout(
experiment_name
):
print(
f"Origin: {stdout_message_dict['stdout_origin']}, "
f"Task: {stdout_message_dict['task_name']}"
Expand Down
3 changes: 2 additions & 1 deletion openfl/experimental/workflow/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from openfl.experimental.workflow.transport.grpc import (
AggregatorGRPCClient,
AggregatorGRPCServer,
DirectorClient,
DirectorGRPCServer,
EnvoyDirectorClient,
RuntimeDirectorClient,
)
5 changes: 4 additions & 1 deletion openfl/experimental/workflow/transport/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@

from openfl.experimental.workflow.transport.grpc.aggregator_client import AggregatorGRPCClient
from openfl.experimental.workflow.transport.grpc.aggregator_server import AggregatorGRPCServer
from openfl.experimental.workflow.transport.grpc.director_client import DirectorClient
from openfl.experimental.workflow.transport.grpc.director_client import (
EnvoyDirectorClient,
RuntimeDirectorClient,
)
from openfl.experimental.workflow.transport.grpc.director_server import DirectorGRPCServer
134 changes: 99 additions & 35 deletions openfl/experimental/workflow/transport/grpc/director_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
logger = logging.getLogger(__name__)


class DirectorClient:
"""Director client class for experiment managers/envoys.
class EnvoyDirectorClient:
"""Envoy director client class for envoys.
This class communicates with the director to manage the envoys
participation in the federation.
Expand Down Expand Up @@ -59,6 +59,7 @@ def __init__(
"""
director_addr = f"{director_host}:{director_port}"
self.envoy_name = envoy_name
logger.info("Director address: %s", director_addr)
if not tls:
channel = grpc.insecure_channel(director_addr, options=channel_options)
else:
Expand Down Expand Up @@ -139,6 +140,102 @@ def _get_experiment_data(self) -> director_pb2.WaitExperimentRequest:
"""
return director_pb2.WaitExperimentRequest(collaborator_name=self.envoy_name)

def send_health_check(
self,
*,
envoy_name: str,
is_experiment_running: bool,
) -> int:
"""Send envoy health check.
Args:
envoy_name (str): The name of the envoy.
is_experiment_running (bool): Whether an experiment is currently
running.
Returns:
health_check_period (int): The period for health checks.
"""
status = director_pb2.UpdateEnvoyStatusRequest(
name=envoy_name,
is_experiment_running=is_experiment_running,
)

logger.debug("Sending health check status: %s", status)
try:
response = self.stub.UpdateEnvoyStatus(status)
except grpc.RpcError as rpc_error:
logger.error(rpc_error)
if rpc_error.code() == grpc.StatusCode.NOT_FOUND:
raise EnvoyNotFoundError
else:
health_check_period = response.health_check_period.seconds

return health_check_period


class RuntimeDirectorClient:
"""
RuntimeDirectorClient class for experiment manager.
This class communicates with the director to manage the experiment manager’s
participation in the federation.
Attributes:
stub (director_pb2_grpc.DirectorStub): The gRPC stub for communication
with the director.
"""

def __init__(
self,
*,
director_host: str,
director_port: int,
tls: bool = False,
root_certificate: Optional[Union[Path, str]] = None,
private_key: Optional[Union[Path, str]] = None,
certificate: Optional[Union[Path, str]] = None,
) -> None:
"""
Initialize RuntimeDirectorClient object.
Args:
director_host (str): The host name for Director server.
director_port (int): The port number for Director server.
tls (bool): Whether to use TLS for the connection.
root_certificate (Optional[Union[Path, str]]): The path to the root certificate for the
TLS connection.
private_key (Optional[Union[Path, str]]): The path to the private key for the TLS
connection.
certificate (Optional[Union[Path, str]]): The path to the certificate for the TLS
connection.
"""
director_addr = f"{director_host}:{director_port}"
logger.info("Director address: %s", director_addr)
if not tls:
channel = grpc.insecure_channel(director_addr, options=channel_options)
else:
if not (root_certificate and private_key and certificate):
raise Exception("No certificates provided for TLS connection")
try:
with open(root_certificate, "rb") as f:
root_certificate_b = f.read()
with open(private_key, "rb") as f:
private_key_b = f.read()
with open(certificate, "rb") as f:
certificate_b = f.read()
except FileNotFoundError as exc:
raise Exception(f"Provided certificate file is not exist: {exc.filename}")

credentials = grpc.ssl_channel_credentials(
root_certificates=root_certificate_b,
private_key=private_key_b,
certificate_chain=certificate_b,
)
channel = grpc.secure_channel(director_addr, credentials, options=channel_options)
self.stub = director_pb2_grpc.DirectorStub(channel)

def set_new_experiment(
self, experiment_name, col_names, archive_path
) -> director_pb2.SetNewExperimentResponse:
Expand Down Expand Up @@ -220,39 +317,6 @@ def get_flow_state(self) -> Tuple:

return response.completed, response.flspec_obj

def send_health_check(
self,
*,
envoy_name: str,
is_experiment_running: bool,
) -> int:
"""Send envoy health check.
Args:
envoy_name (str): The name of the envoy.
is_experiment_running (bool): Whether an experiment is currently
running.
Returns:
health_check_period (int): The period for health checks.
"""
status = director_pb2.UpdateEnvoyStatusRequest(
name=envoy_name,
is_experiment_running=is_experiment_running,
)

logger.debug("Sending health check status: %s", status)
try:
response = self.stub.UpdateEnvoyStatus(status)
except grpc.RpcError as rpc_error:
logger.error(rpc_error)
if rpc_error.code() == grpc.StatusCode.NOT_FOUND:
raise EnvoyNotFoundError
else:
health_check_period = response.health_check_period.seconds

return health_check_period

def stream_experiment_stdout(self, experiment_name) -> Iterator[Dict[str, Any]]:
"""Stream experiment stdout RPC.
Args:
Expand Down

0 comments on commit d723f9d

Please sign in to comment.