diff --git a/doc/changelog.md b/doc/changelog.md index 0ada4e4ec..809ad5e8e 100644 --- a/doc/changelog.md +++ b/doc/changelog.md @@ -13,6 +13,7 @@ Jump to: Description +- Enable dynamic feature store selection - Fix dragon package installation bug - Adjust schemas for better performance - Add TorchWorker first implementation and mock inference app example diff --git a/ex/high_throughput_inference/mli_driver.py b/ex/high_throughput_inference/mli_driver.py index 6da559aa6..0cf87ef2e 100644 --- a/ex/high_throughput_inference/mli_driver.py +++ b/ex/high_throughput_inference/mli_driver.py @@ -1,5 +1,4 @@ - - +import argparse import os import base64 import cloudpickle @@ -26,11 +25,23 @@ torch_worker_str = base64.b64encode(cloudpickle.dumps(TorchWorker)).decode("ascii") -worker_manager_rs = exp.create_run_settings(sys.executable, [worker_manager_script_name, "--device", device, "--worker_class", torch_worker_str]) +worker_manager_rs = exp.create_run_settings( + sys.executable, + [ + worker_manager_script_name, + "--device", + device, + "--worker_class", + torch_worker_str, + ], +) worker_manager = exp.create_model("worker_manager", run_settings=worker_manager_rs) worker_manager.attach_generator_files(to_copy=[worker_manager_script_name]) -app_rs = exp.create_run_settings(sys.executable, exe_args = [app_script_name, "--device", device]) +app_rs = exp.create_run_settings( + sys.executable, + exe_args=[app_script_name, "--device", device], +) app = exp.create_model("app", run_settings=app_rs) app.attach_generator_files(to_copy=[app_script_name], to_symlink=[model_name]) @@ -47,4 +58,4 @@ break time.sleep(5) -print("Exiting.") \ No newline at end of file +print("Exiting.") diff --git a/ex/high_throughput_inference/mock_app.py b/ex/high_throughput_inference/mock_app.py index e244c93e0..3a5169a66 100644 --- a/ex/high_throughput_inference/mock_app.py +++ b/ex/high_throughput_inference/mock_app.py @@ -44,16 +44,21 @@ import numbers from collections import OrderedDict +from smartsim._core.mli.infrastructure.storage.dragonfeaturestore import ( + DragonFeatureStore, +) from smartsim._core.mli.message_handler import MessageHandler from smartsim.log import get_logger logger = get_logger("App") + class ProtoClient: def __init__(self, timing_on: bool): connect_to_infrastructure() - ddict_str = os.environ["SS_DRG_DDICT"] + ddict_str = os.environ["SS_INFRA_BACKBONE"] self._ddict = DDict.attach(ddict_str) + self._backbone_descriptor = DragonFeatureStore(self._ddict).descriptor to_worker_fli_str = None while to_worker_fli_str is None: try: @@ -88,17 +93,23 @@ def start_timings(self, batch_size: int): def end_timings(self): if self._timing_on: self._add_label_to_timings("total_time") - self._timings["total_time"].append(self._format_number(time.perf_counter()-self._start)) + self._timings["total_time"].append( + self._format_number(time.perf_counter() - self._start) + ) def measure_time(self, label: str): if self._timing_on: self._add_label_to_timings(label) - self._timings[label].append(self._format_number(time.perf_counter()-self._interm)) + self._timings[label].append( + self._format_number(time.perf_counter() - self._interm) + ) self._interm = time.perf_counter() def print_timings(self, to_file: bool = False): print(" ".join(self._timings.keys())) - value_array = numpy.array([value for value in self._timings.values()], dtype=float) + value_array = numpy.array( + [value for value in self._timings.values()], dtype=float + ) value_array = numpy.transpose(value_array) for i in range(value_array.shape[0]): print(" ".join(self._format_number(value) for value in value_array[i])) @@ -106,21 +117,21 @@ def print_timings(self, to_file: bool = False): numpy.save("timings.npy", value_array) numpy.savetxt("timings.txt", value_array) - def run_model(self, model: bytes | str, batch: torch.Tensor): tensors = [batch.numpy()] self.start_timings(batch.shape[0]) built_tensor_desc = MessageHandler.build_tensor_descriptor( - "c", "float32", list(batch.shape)) + "c", "float32", list(batch.shape) + ) self.measure_time("build_tensor_descriptor") built_model = None if isinstance(model, str): - model_arg = MessageHandler.build_model_key(model) + model_arg = MessageHandler.build_model_key(model, self._backbone_descriptor) else: model_arg = MessageHandler.build_model(model, "resnet-50", "1.0") request = MessageHandler.build_request( reply_channel=self._from_worker_ch_serialized, - model= model_arg, + model=model_arg, inputs=[built_tensor_desc], outputs=[], output_descriptors=[], @@ -129,10 +140,12 @@ def run_model(self, model: bytes | str, batch: torch.Tensor): self.measure_time("build_request") request_bytes = MessageHandler.serialize_request(request) self.measure_time("serialize_request") - with self._to_worker_fli.sendh(timeout=None, stream_channel=self._to_worker_ch) as to_sendh: + with self._to_worker_fli.sendh( + timeout=None, stream_channel=self._to_worker_ch + ) as to_sendh: to_sendh.send_bytes(request_bytes) for t in tensors: - to_sendh.send_bytes(t.tobytes()) #TODO NOT FAST ENOUGH!!! + to_sendh.send_bytes(t.tobytes()) # TODO NOT FAST ENOUGH!!! # to_sendh.send_bytes(bytes(t.data)) logger.info(f"Message size: {len(request_bytes)} bytes") @@ -159,7 +172,7 @@ def set_model(self, key: str, model: bytes): self._ddict[key] = model -class ResNetWrapper(): +class ResNetWrapper: def __init__(self, name: str, model: str): self._model = torch.jit.load(model) self._name = name @@ -168,7 +181,7 @@ def __init__(self, name: str, model: str): torch.jit.save(scripted, buffer) self._serialized_model = buffer.getvalue() - def get_batch(self, batch_size: int=32): + def get_batch(self, batch_size: int = 32): return torch.randn((batch_size, 3, 224, 224), dtype=torch.float32) @property @@ -179,6 +192,7 @@ def model(self): def name(self): return self._name + if __name__ == "__main__": parser = argparse.ArgumentParser("Mock application") @@ -194,8 +208,8 @@ def name(self): for batch_size in [1, 2, 4, 8, 16, 32, 64, 128]: logger.info(f"Batch size: {batch_size}") - for iteration_number in range(total_iterations + int(batch_size==1)): + for iteration_number in range(total_iterations + int(batch_size == 1)): logger.info(f"Iteration: {iteration_number}") client.run_model(resnet.name, resnet.get_batch(batch_size)) - client.print_timings(to_file=True) \ No newline at end of file + client.print_timings(to_file=True) diff --git a/ex/high_throughput_inference/standalone_workermanager.py b/ex/high_throughput_inference/standalone_workermanager.py index c56e11a7c..2b5ba7df4 100644 --- a/ex/high_throughput_inference/standalone_workermanager.py +++ b/ex/high_throughput_inference/standalone_workermanager.py @@ -31,17 +31,19 @@ from dragon.data.ddict.ddict import DDict from dragon.utils import b64decode, b64encode from dragon.globalservices.api_setup import connect_to_infrastructure + # isort: on import argparse import base64 import cloudpickle -import pickle +import optparse import os from smartsim._core.mli.comm.channel.dragonchannel import DragonCommChannel -from smartsim._core.mli.infrastructure.storage.dragonfeaturestore import DragonFeatureStore from smartsim._core.mli.comm.channel.dragonfli import DragonFLIChannel -from smartsim._core.mli.infrastructure.worker.torch_worker import TorchWorker +from smartsim._core.mli.infrastructure.storage.dragonfeaturestore import ( + DragonFeatureStore, +) from smartsim._core.mli.infrastructure.control.workermanager import WorkerManager from smartsim._core.mli.infrastructure.environmentloader import EnvironmentConfigLoader @@ -67,7 +69,7 @@ args = parser.parse_args() connect_to_infrastructure() - ddict_str = os.environ["SS_DRG_DDICT"] + ddict_str = os.environ["SS_INFRA_BACKBONE"] ddict = DDict.attach(ddict_str) to_worker_channel = Channel.make_process_local() @@ -75,22 +77,23 @@ to_worker_fli_serialized = to_worker_fli.serialize() ddict["to_worker_fli"] = to_worker_fli_serialized - torch_worker = cloudpickle.loads(base64.b64decode(args.worker_class.encode('ascii')))() - - dfs = DragonFeatureStore(ddict) - comm_channel = DragonFLIChannel(to_worker_fli_serialized) + worker_type_name = base64.b64decode(args.worker_class.encode("ascii")) + torch_worker = cloudpickle.loads(worker_type_name)() - os.environ["SSFeatureStore"] = base64.b64encode(pickle.dumps(dfs)).decode("utf-8") - os.environ["SSQueue"] = base64.b64encode(to_worker_fli_serialized).decode("utf-8") + descriptor = base64.b64encode(to_worker_fli_serialized).decode("utf-8") + os.environ["SS_REQUEST_QUEUE"] = descriptor - config_loader = EnvironmentConfigLoader() + config_loader = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=DragonCommChannel, + queue_factory=DragonFLIChannel.from_descriptor, + ) worker_manager = WorkerManager( config_loader=config_loader, worker=torch_worker, as_service=True, cooldown=10, - comm_channel_type=DragonCommChannel, - device = args.device, + device=args.device, ) worker_manager.execute() diff --git a/smartsim/_core/_cli/scripts/dragon_install.py b/smartsim/_core/_cli/scripts/dragon_install.py index 03a128ab8..f88af4eb4 100644 --- a/smartsim/_core/_cli/scripts/dragon_install.py +++ b/smartsim/_core/_cli/scripts/dragon_install.py @@ -1,7 +1,9 @@ import os import pathlib +import shutil import sys import typing as t +from urllib.request import urlretrieve from github import Github from github.GitReleaseAsset import GitReleaseAsset @@ -160,13 +162,26 @@ def retrieve_asset(working_dir: pathlib.Path, asset: GitReleaseAsset) -> pathlib # if we've previously downloaded the release and still have # wheels laying around, use that cached version instead - if download_dir.exists() and list(download_dir.rglob("*.whl")): + if download_dir.exists() or list(download_dir.rglob("*.whl")): return download_dir - archive = WebTGZ(asset.browser_download_url) + download_dir.mkdir(parents=True, exist_ok=True) + + # grab a copy of the complete asset + asset_path = download_dir / str(asset.name) + download_url = asset.browser_download_url + + try: + urlretrieve(download_url, str(asset_path)) + logger.debug(f"Retrieved asset {asset.name} from {download_url}") + except Exception: + logger.exception(f"Unable to download asset from: {download_url}") + + # extract the asset + archive = WebTGZ(download_url) archive.extract(download_dir) - logger.debug(f"Retrieved {asset.browser_download_url} to {download_dir}") + logger.debug(f"Extracted {download_url} to {download_dir}") return download_dir diff --git a/smartsim/_core/launcher/dragon/dragonBackend.py b/smartsim/_core/launcher/dragon/dragonBackend.py index 445538f20..4fe6d55ad 100644 --- a/smartsim/_core/launcher/dragon/dragonBackend.py +++ b/smartsim/_core/launcher/dragon/dragonBackend.py @@ -457,7 +457,6 @@ def create_run_policy( if isinstance(request, DragonRunRequest): run_request: DragonRunRequest = request - affinity = dragon_policy.Policy.Affinity.DEFAULT cpu_affinity: t.List[int] = [] gpu_affinity: t.List[int] = [] @@ -465,25 +464,20 @@ def create_run_policy( if run_request.policy is not None: # Affinities are not mutually exclusive. If specified, both are used if run_request.policy.cpu_affinity: - affinity = dragon_policy.Policy.Affinity.SPECIFIC cpu_affinity = run_request.policy.cpu_affinity if run_request.policy.gpu_affinity: - affinity = dragon_policy.Policy.Affinity.SPECIFIC gpu_affinity = run_request.policy.gpu_affinity logger.debug( - f"Affinity strategy: {affinity}, " f"CPU affinity mask: {cpu_affinity}, " f"GPU affinity mask: {gpu_affinity}" ) - if affinity != dragon_policy.Policy.Affinity.DEFAULT: - return dragon_policy.Policy( - placement=dragon_policy.Policy.Placement.HOST_NAME, - host_name=node_name, - affinity=affinity, - cpu_affinity=cpu_affinity, - gpu_affinity=gpu_affinity, - ) + return dragon_policy.Policy( + placement=dragon_policy.Policy.Placement.HOST_NAME, + host_name=node_name, + cpu_affinity=cpu_affinity, + gpu_affinity=gpu_affinity, + ) return dragon_policy.Policy( placement=dragon_policy.Policy.Placement.HOST_NAME, @@ -521,7 +515,7 @@ def _start_steps(self) -> None: env={ **request.current_env, **request.env, - "SS_DRG_DDICT": self.infra_ddict, + "SS_INFRA_BACKBONE": self.infra_ddict, }, stdout=dragon_process.Popen.PIPE, stderr=dragon_process.Popen.PIPE, diff --git a/smartsim/_core/mli/comm/channel/channel.py b/smartsim/_core/mli/comm/channel/channel.py index a3cce2181..d91859126 100644 --- a/smartsim/_core/mli/comm/channel/channel.py +++ b/smartsim/_core/mli/comm/channel/channel.py @@ -42,11 +42,13 @@ def __init__(self, descriptor: t.Union[str, bytes]) -> None: @abstractmethod def send(self, value: bytes) -> None: """Send a message through the underlying communication channel + :param value: The value to send""" @abstractmethod def recv(self) -> t.List[bytes]: """Receieve a message through the underlying communication channel + :returns: the received message""" @property diff --git a/smartsim/_core/mli/comm/channel/dragonchannel.py b/smartsim/_core/mli/comm/channel/dragonchannel.py index 672fce75b..80fdd9cdc 100644 --- a/smartsim/_core/mli/comm/channel/dragonchannel.py +++ b/smartsim/_core/mli/comm/channel/dragonchannel.py @@ -24,6 +24,7 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import base64 import sys import typing as t @@ -55,7 +56,23 @@ def send(self, value: bytes) -> None: def recv(self) -> t.List[bytes]: """Receieve a message through the underlying communication channel + :returns: the received message""" with self._channel.recvh(timeout=None) as recvh: message_bytes: bytes = recvh.recv_bytes(timeout=None) return [message_bytes] + + @classmethod + def from_descriptor( + cls, + descriptor: str, + ) -> "DragonCommChannel": + """A factory method that creates an instance from a descriptor string + + :param descriptor: The descriptor that uniquely identifies the resource + :returns: An attached DragonCommChannel""" + try: + return DragonCommChannel(base64.b64decode(descriptor)) + except: + logger.error(f"Failed to create dragon comm channel: {descriptor}") + raise diff --git a/smartsim/_core/mli/comm/channel/dragonfli.py b/smartsim/_core/mli/comm/channel/dragonfli.py index 28b4c2bf3..4636894bd 100644 --- a/smartsim/_core/mli/comm/channel/dragonfli.py +++ b/smartsim/_core/mli/comm/channel/dragonfli.py @@ -30,7 +30,7 @@ # isort: on -import sys +import base64 import typing as t import smartsim._core.mli.comm.channel.channel as cch @@ -43,7 +43,11 @@ class DragonFLIChannel(cch.CommChannelBase): """Passes messages by writing to a Dragon FLI Channel""" def __init__(self, fli_desc: bytes, sender_supplied: bool = True) -> None: - """Initialize the DragonFLIChannel instance""" + """Initialize the DragonFLIChannel instance + + :param fli_desc: the descriptor of the FLI channel to attach + :param sender_supplied: flag indicating if the FLI uses sender-supplied streams + """ super().__init__(fli_desc) # todo: do we need memory pool information to construct the channel correctly? self._fli: "fli" = fli.FLInterface.attach(fli_desc) @@ -53,12 +57,14 @@ def __init__(self, fli_desc: bytes, sender_supplied: bool = True) -> None: def send(self, value: bytes) -> None: """Send a message through the underlying communication channel + :param value: The value to send""" with self._fli.sendh(timeout=None, stream_channel=self._channel) as sendh: sendh.send_bytes(value) def recv(self) -> t.List[bytes]: """Receieve a message through the underlying communication channel + :returns: the received message""" messages = [] eot = False @@ -70,3 +76,21 @@ def recv(self) -> t.List[bytes]: except fli.FLIEOT as exc: eot = True return messages + + @classmethod + def from_descriptor( + cls, + descriptor: str, + ) -> "DragonFLIChannel": + """A factory method that creates an instance from a descriptor string + + :param descriptor: The descriptor that uniquely identifies the resource + :returns: An attached DragonFLIChannel""" + try: + return DragonFLIChannel( + fli_desc=base64.b64decode(descriptor), + sender_supplied=True, + ) + except: + logger.error(f"Error while creating DragonFLIChannel: {descriptor}") + raise diff --git a/smartsim/_core/mli/infrastructure/control/workermanager.py b/smartsim/_core/mli/infrastructure/control/workermanager.py index 27f5bfc97..dcc35ae83 100644 --- a/smartsim/_core/mli/infrastructure/control/workermanager.py +++ b/smartsim/_core/mli/infrastructure/control/workermanager.py @@ -24,26 +24,16 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import sys - -# isort: off -import dragon -from dragon import fli - -# isort: on - import time import typing as t -import numpy as np +from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore -from .....error import SmartSimError from .....log import get_logger from ....entrypoints.service import Service from ...comm.channel.channel import CommChannelBase from ...comm.channel.dragonchannel import DragonCommChannel from ...infrastructure.environmentloader import EnvironmentConfigLoader -from ...infrastructure.storage.featurestore import FeatureStore from ...infrastructure.worker.worker import ( InferenceReply, InferenceRequest, @@ -51,113 +41,24 @@ MachineLearningWorkerBase, ) from ...message_handler import MessageHandler -from ...mli_schemas.response.response_capnp import Response, ResponseBuilder +from ...mli_schemas.response.response_capnp import ResponseBuilder if t.TYPE_CHECKING: from dragon.fli import FLInterface - from smartsim._core.mli.mli_schemas.model.model_capnp import Model from smartsim._core.mli.mli_schemas.response.response_capnp import Status - from smartsim._core.mli.mli_schemas.tensor.tensor_capnp import TensorDescriptor logger = get_logger(__name__) -def deserialize_message( - data_blob: bytes, - channel_type: t.Type[CommChannelBase], - device: t.Literal["cpu", "gpu"], -) -> InferenceRequest: - """Deserialize a message from a byte stream into an InferenceRequest - :param data_blob: The byte stream to deserialize""" - # todo: consider moving to XxxCore and only making - # workers implement the inputs and model conversion? - - # alternatively, consider passing the capnproto models - # to this method instead of the data_blob... - - # something is definitely wrong here... client shouldn't have to touch - # callback (or batch size) - - request = MessageHandler.deserialize_request(data_blob) - # return request - model_key: t.Optional[str] = None - model_bytes: t.Optional[Model] = None - - if request.model.which() == "key": - model_key = request.model.key.key - elif request.model.which() == "data": - model_bytes = request.model.data - - callback_key = request.replyChannel.descriptor - - # todo: shouldn't this be `CommChannel.find` instead of `DragonCommChannel` - comm_channel = channel_type(callback_key) - # comm_channel = DragonCommChannel(request.replyChannel) - - input_keys: t.Optional[t.List[str]] = None - input_bytes: t.Optional[t.List[bytes]] = None - - output_keys: t.Optional[t.List[str]] = None - - input_meta: t.Optional[t.List[TensorDescriptor]] = None - - if request.input.which() == "keys": - input_keys = [input_key.key for input_key in request.input.keys] - elif request.input.which() == "descriptors": - input_meta = request.input.descriptors # type: ignore - - if request.output: - output_keys = [tensor_key.key for tensor_key in request.output] - - inference_request = InferenceRequest( - model_key=model_key, - callback=comm_channel, - raw_inputs=input_bytes, - input_keys=input_keys, - input_meta=input_meta, - output_keys=output_keys, - raw_model=model_bytes, - batch_size=0, - ) - return inference_request - - def build_failure_reply(status: "Status", message: str) -> ResponseBuilder: + """Build a response indicating a failure occurred + :param status: The status of the response + :param message: The error message to include in the response""" return MessageHandler.build_response( status=status, message=message, - result=[], - custom_attributes=None, - ) - - -def prepare_outputs(reply: InferenceReply) -> t.List[t.Any]: - prepared_outputs: t.List[t.Any] = [] - if reply.output_keys: - for key in reply.output_keys: - if not key: - continue - msg_key = MessageHandler.build_tensor_key(key) - prepared_outputs.append(msg_key) - elif reply.outputs: - for _ in reply.outputs: - msg_tensor_desc = MessageHandler.build_tensor_descriptor( - "c", - "float32", - [1], - ) - prepared_outputs.append(msg_tensor_desc) - return prepared_outputs - - -def build_reply(reply: InferenceReply) -> ResponseBuilder: - results = prepare_outputs(reply) - - return MessageHandler.build_response( - status=reply.status_enum, - message=reply.message, - result=results, + result=None, custom_attributes=None, ) @@ -194,73 +95,127 @@ def __init__( worker: MachineLearningWorkerBase, as_service: bool = False, cooldown: int = 0, - comm_channel_type: t.Type[CommChannelBase] = DragonCommChannel, device: t.Literal["cpu", "gpu"] = "cpu", ) -> None: """Initialize the WorkerManager + :param config_loader: Environment config loader that loads the task queue and feature store :param workers: A worker to manage :param as_service: Specifies run-once or run-until-complete behavior of service :param cooldown: Number of seconds to wait before shutting down after shutdown criteria are met - :param comm_channel_type: The type of communication channel used for callbacks + :param device: The type of hardware the workers must be executed on """ super().__init__(as_service, cooldown) self._task_queue: t.Optional[CommChannelBase] = config_loader.get_queue() """the queue the manager monitors for new tasks""" - self._feature_store: t.Optional[FeatureStore] = ( - config_loader.get_feature_store() - ) - """a feature store to retrieve models from""" self._worker = worker """The ML Worker implementation""" - self._comm_channel_type = comm_channel_type + self._callback_factory = config_loader._callback_factory """The type of communication channel to construct for callbacks""" self._device = device """Device on which workers need to run""" self._cached_models: dict[str, t.Any] = {} """Dictionary of previously loaded models""" + self._feature_stores: t.Dict[str, FeatureStore] = {} + """A collection of attached feature stores""" + self._featurestore_factory = config_loader._featurestore_factory + """A factory method to create a desired feature store client type""" + self._backbone: t.Optional[FeatureStore] = config_loader.get_backbone() + """A standalone, system-created feature store used to share internal + information among MLI components""" - def _validate_request(self, request: InferenceRequest) -> bool: - """Ensure the request can be processed. - :param request: The request to validate - :return: True if the request is valid, False otherwise""" - if not self._feature_store: - if request.model_key: - logger.error("Unable to load model by key without feature store") - return False - - if request.input_keys: - logger.error("Unable to load inputs by key without feature store") - return False - - if request.output_keys: - logger.error("Unable to persist outputs by key without feature store") - return False - - if not request.model_key and not request.raw_model: - logger.error("Unable to continue without model bytes or feature store key") - return False + def _check_feature_stores(self, request: InferenceRequest) -> bool: + """Ensures that all feature stores required by the request are available - if not request.input_keys and not request.raw_inputs: - logger.error("Unable to continue without input bytes or feature store keys") + :param request: The request to validate + :returns: False if feature store validation fails for the request, True otherwise + """ + # collect all feature stores required by the request + fs_model: t.Set[str] = set() + if request.model_key: + fs_model = {request.model_key.descriptor} + fs_inputs = {key.descriptor for key in request.input_keys} + fs_outputs = {key.descriptor for key in request.output_keys} + + # identify which feature stores are requested and unknown + fs_desired = fs_model.union(fs_inputs).union(fs_outputs) + fs_actual = {item.descriptor for item in self._feature_stores.values()} + fs_missing = fs_desired - fs_actual + + if self._featurestore_factory is None: + logger.error("No feature store factory configured") return False - if request.callback is None: - logger.error("No callback channel provided in request") - return False + # create the feature stores we need to service request + if fs_missing: + logger.debug(f"Adding feature store(s): {fs_missing}") + for descriptor in fs_missing: + feature_store = self._featurestore_factory(descriptor) + self._feature_stores[descriptor] = feature_store return True + def _check_model(self, request: InferenceRequest) -> bool: + """Ensure that a model is available for the request + + :param request: The request to validate + :returns: False if model validation fails for the request, True otherwise + """ + if request.model_key or request.raw_model: + return True + + logger.error("Unable to continue without model bytes or feature store key") + return False + + def _check_inputs(self, request: InferenceRequest) -> bool: + """Ensure that inputs are available for the request + + :param request: The request to validate + :returns: False if input validation fails for the request, True otherwise + """ + if request.input_keys or request.raw_inputs: + return True + + logger.error("Unable to continue without input bytes or feature store keys") + return False + + def _check_callback(self, request: InferenceRequest) -> bool: + """Ensure that a callback channel is available for the request + + :param request: The request to validate + :returns: False if callback validation fails for the request, True otherwise + """ + if request.callback is not None: + return True + + logger.error("No callback channel provided in request") + return False + + def _validate_request(self, request: InferenceRequest) -> bool: + """Ensure the request can be processed + + :param request: The request to validate + :return: False if the request fails any validation checks, True otherwise""" + checks = [ + self._check_feature_stores(request), + self._check_model(request), + self._check_inputs(request), + self._check_callback(request), + ] + + return all(checks) + def _on_iteration(self) -> None: """Executes calls to the machine learning worker implementation to complete + the inference pipeline""" logger.debug("executing worker manager pipeline") if self._task_queue is None: - logger.warning("No queue to check for tasks") + logger.error("No queue to check for tasks") return timings = [] # timing @@ -279,15 +234,19 @@ def _on_iteration(self) -> None: tensor_bytes_list = bytes_list[1:] interm = time.perf_counter() # timing - request = deserialize_message( - request_bytes, self._comm_channel_type, self._device + request = self._worker.deserialize_message( + request_bytes, self._callback_factory ) if request.input_meta and tensor_bytes_list: request.raw_inputs = tensor_bytes_list if not self._validate_request(request): - return + exception_handler( + ValueError("Error validating the request"), + request.callback, + "Error validating the request.", + ) timings.append(time.perf_counter() - interm) # timing interm = time.perf_counter() # timing @@ -302,17 +261,20 @@ def _on_iteration(self) -> None: "Could not find model key or model.", ) return - if request.model_key in self._cached_models: + + if request.model_key.key in self._cached_models: timings.append(time.perf_counter() - interm) # timing interm = time.perf_counter() # timing - model_result = LoadModelResult(self._cached_models[request.model_key]) + model_result = LoadModelResult( + self._cached_models[request.model_key.key] + ) else: timings.append(time.perf_counter() - interm) # timing interm = time.perf_counter() # timing try: fetch_model_result = self._worker.fetch_model( - request, self._feature_store + request, self._feature_stores ) except Exception as e: exception_handler( @@ -328,10 +290,12 @@ def _on_iteration(self) -> None: fetch_result=fetch_model_result, device=self._device, ) - self._cached_models[request.model_key] = model_result.model + self._cached_models[request.model_key.key] = model_result.model except Exception as e: exception_handler( - e, request.callback, "Failed while loading the model." + e, + request.callback, + "Failed while loading model from feature store.", ) return @@ -340,7 +304,7 @@ def _on_iteration(self) -> None: interm = time.perf_counter() # timing try: fetch_model_result = self._worker.fetch_model( - request, self._feature_store + request, self._feature_stores ) except Exception as e: exception_handler( @@ -356,14 +320,18 @@ def _on_iteration(self) -> None: ) except Exception as e: exception_handler( - e, request.callback, "Failed while loading the model." + e, + request.callback, + "Failed while loading model from feature store.", ) return timings.append(time.perf_counter() - interm) # timing interm = time.perf_counter() # timing try: - fetch_input_result = self._worker.fetch_inputs(request, self._feature_store) + fetch_input_result = self._worker.fetch_inputs( + request, self._feature_stores + ) except Exception as e: exception_handler(e, request.callback, "Failed while fetching the inputs.") return @@ -407,9 +375,7 @@ def _on_iteration(self) -> None: if request.output_keys: try: reply.output_keys = self._worker.place_output( - request, - transformed_output, - self._feature_store, + request, transformed_output, self._feature_stores ) except Exception as e: exception_handler( @@ -427,7 +393,14 @@ def _on_iteration(self) -> None: else: reply.status_enum = "complete" reply.message = "Success" - response = build_reply(reply) + + results = self._worker.prepare_outputs(reply) + response = MessageHandler.build_response( + status=reply.status_enum, + message=reply.message, + result=results, + custom_attributes=None, + ) timings.append(time.perf_counter() - interm) # timing interm = time.perf_counter() # timing diff --git a/smartsim/_core/mli/infrastructure/environmentloader.py b/smartsim/_core/mli/infrastructure/environmentloader.py index 9f6770623..b4b9e565c 100644 --- a/smartsim/_core/mli/infrastructure/environmentloader.py +++ b/smartsim/_core/mli/infrastructure/environmentloader.py @@ -24,44 +24,82 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import base64 import os -import pickle import typing as t -from dragon.fli import FLInterface # pylint: disable=all - -from smartsim._core.mli.comm.channel.dragonfli import DragonFLIChannel +from smartsim._core.mli.comm.channel.channel import CommChannelBase from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore +from smartsim.log import get_logger + +logger = get_logger(__name__) class EnvironmentConfigLoader: """ - Facilitates the loading of a FeatureStore and Queue - into the WorkerManager. + Facilitates the loading of a FeatureStore and Queue into the WorkerManager. """ - def __init__(self) -> None: - self._feature_store_descriptor: t.Optional[str] = os.getenv( - "SSFeatureStore", None - ) - self._queue_descriptor: t.Optional[str] = os.getenv("SSQueue", None) - self.feature_store: t.Optional[FeatureStore] = None - self.queue: t.Optional[DragonFLIChannel] = None - - def get_feature_store(self) -> t.Optional[FeatureStore]: - """Loads the Feature Store previously set in SSFeatureStore""" - if self._feature_store_descriptor is not None: - self.feature_store = pickle.loads( - base64.b64decode(self._feature_store_descriptor) - ) - return self.feature_store - - def get_queue(self, sender_supplied: bool = True) -> t.Optional[DragonFLIChannel]: - """Returns the Queue previously set in SSQueue""" - if self._queue_descriptor is not None: - self.queue = DragonFLIChannel( - fli_desc=base64.b64decode(self._queue_descriptor), - sender_supplied=sender_supplied, - ) + def __init__( + self, + featurestore_factory: t.Callable[[str], FeatureStore], + callback_factory: t.Callable[[bytes], CommChannelBase], + queue_factory: t.Callable[[str], CommChannelBase], + ) -> None: + """Initialize the config loader instance with the factories necessary for + creating additional objects. + + :param featurestore_factory: A factory method that produces a feature store + given a descriptor + :param callback_factory: A factory method that produces a callback + channel given a descriptor + :param queue_factory: A factory method that produces a queue + channel given a descriptor""" + self.queue: t.Optional[CommChannelBase] = None + """The attached incoming event queue channel""" + self.backbone: t.Optional[FeatureStore] = None + """The attached backbone feature store""" + self._featurestore_factory = featurestore_factory + """A factory method to instantiate a FeatureStore""" + self._callback_factory = callback_factory + """A factory method to instantiate a concrete CommChannelBase + for inference callbacks""" + self._queue_factory = queue_factory + """A factory method to instantiate a concrete CommChannelBase + for inference requests""" + + def get_backbone(self) -> t.Optional[FeatureStore]: + """Attach to the backbone feature store using the descriptor found in + an environment variable. The backbone is a standalone, system-created + feature store used to share internal information among MLI components + + :returns: The attached feature store via SS_INFRA_BACKBONE""" + descriptor = os.getenv("SS_INFRA_BACKBONE", "") + + if not descriptor: + logger.warning("No backbone descriptor is configured") + return None + + if self._featurestore_factory is None: + logger.warning("No feature store factory is configured") + return None + + self.backbone = self._featurestore_factory(descriptor) + return self.backbone + + def get_queue(self) -> t.Optional[CommChannelBase]: + """Attach to a queue-like communication channel using the descriptor + found in an environment variable. + + :returns: The attached queue specified via `SS_REQUEST_QUEUE`""" + descriptor = os.getenv("SS_REQUEST_QUEUE", "") + + if not descriptor: + logger.warning("No queue descriptor is configured") + return None + + if self._queue_factory is None: + logger.warning("No queue factory is configured") + return None + + self.queue = self._queue_factory(descriptor) return self.queue diff --git a/smartsim/_core/mli/infrastructure/storage/dragonfeaturestore.py b/smartsim/_core/mli/infrastructure/storage/dragonfeaturestore.py index af592ed0a..e89abcd2a 100644 --- a/smartsim/_core/mli/infrastructure/storage/dragonfeaturestore.py +++ b/smartsim/_core/mli/infrastructure/storage/dragonfeaturestore.py @@ -26,13 +26,15 @@ import typing as t -import smartsim.error as sse -from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore -from smartsim.log import get_logger +# pylint: disable=import-error +# isort: off +import dragon.data.ddict.ddict as dragon_ddict -if t.TYPE_CHECKING: - from dragon.data.ddict.ddict import DDict +# isort: on +from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore +from smartsim.error import SmartSimError +from smartsim.log import get_logger logger = get_logger(__name__) @@ -40,32 +42,67 @@ class DragonFeatureStore(FeatureStore): """A feature store backed by a dragon distributed dictionary""" - def __init__(self, storage: "DDict") -> None: - """Initialize the DragonFeatureStore instance""" + def __init__(self, storage: "dragon_ddict.DDict") -> None: + """Initialize the DragonFeatureStore instance + + :param storage: A distributed dictionary to be used as the underlying + storage mechanism of the feature store""" self._storage = storage def __getitem__(self, key: str) -> t.Union[str, bytes]: """Retrieve an item using key - :param key: Unique key of an item to retrieve from the feature store""" + + :param key: Unique key of an item to retrieve from the feature store + :returns: The value identified by the supplied key + :raises KeyError: if the key is not found in the feature store + :raises SmartSimError: if retrieval from the feature store fails""" try: value: t.Union[str, bytes] = self._storage[key] return value - except KeyError as ex: - raise ex + except KeyError: + logger.warning(f"An unknown key was requested: {key}") + raise except Exception as ex: # note: explicitly avoid round-trip to check for key existence - raise sse.SmartSimError( + raise SmartSimError( f"Could not get value for existing key {key}, error:\n{ex}" ) from ex def __setitem__(self, key: str, value: t.Union[str, bytes]) -> None: """Assign a value using key + :param key: Unique key of an item to set in the feature store :param value: Value to persist in the feature store""" self._storage[key] = value def __contains__(self, key: str) -> bool: """Membership operator to test for a key existing within the feature store. - Return `True` if the key is found, `False` otherwise - :param key: Unique key of an item to retrieve from the feature store""" + + :param key: Unique key of an item to retrieve from the feature store + :returns: `True` if the key is found, `False` otherwise""" return key in self._storage + + @property + def descriptor(self) -> str: + """A unique identifier enabling a client to connect to the feature store + + :returns: A descriptor encoded as a string""" + return str(self._storage.serialize()) + + @classmethod + def from_descriptor( + cls, + descriptor: str, + ) -> "DragonFeatureStore": + """A factory method that creates an instance from a descriptor string + + :param descriptor: The descriptor that uniquely identifies the resource + :returns: An attached DragonFeatureStore + :raises SmartSimError: if attachment to DragonFeatureStore fails""" + try: + return DragonFeatureStore(dragon_ddict.DDict.attach(descriptor)) + except Exception as ex: + logger.error(f"Error creating dragon feature store: {descriptor}") + raise SmartSimError( + f"Error creating dragon feature store: {descriptor}" + ) from ex diff --git a/smartsim/_core/mli/infrastructure/storage/featurestore.py b/smartsim/_core/mli/infrastructure/storage/featurestore.py index 553e13b10..d511d588e 100644 --- a/smartsim/_core/mli/infrastructure/storage/featurestore.py +++ b/smartsim/_core/mli/infrastructure/storage/featurestore.py @@ -27,6 +27,21 @@ import typing as t from abc import ABC, abstractmethod +from pydantic import BaseModel, Field + +from smartsim.log import get_logger + +logger = get_logger(__name__) + + +class FeatureStoreKey(BaseModel): + """A key,descriptor pair enabling retrieval of an item from a feature store""" + + key: str = Field(min_length=1) + """The unique key of an item in a feature store""" + descriptor: str = Field(min_length=1) + """The unique identifier of the feature store containing the key""" + class FeatureStore(ABC): """Abstract base class providing the common interface for retrieving @@ -35,16 +50,26 @@ class FeatureStore(ABC): @abstractmethod def __getitem__(self, key: str) -> t.Union[str, bytes]: """Retrieve an item using key + :param key: Unique key of an item to retrieve from the feature store""" @abstractmethod def __setitem__(self, key: str, value: t.Union[str, bytes]) -> None: """Assign a value using key + :param key: Unique key of an item to set in the feature store :param value: Value to persist in the feature store""" @abstractmethod def __contains__(self, key: str) -> bool: """Membership operator to test for a key existing within the feature store. - Return `True` if the key is found, `False` otherwise - :param key: Unique key of an item to retrieve from the feature store""" + + :param key: Unique key of an item to retrieve from the feature store + :returns: `True` if the key is found, `False` otherwise""" + + @property + @abstractmethod + def descriptor(self) -> str: + """Unique identifier enabling a client to connect to the feature store + + :returns: A descriptor encoded as a string""" diff --git a/smartsim/_core/mli/infrastructure/worker/worker.py b/smartsim/_core/mli/infrastructure/worker/worker.py index bb8d82231..89fb63524 100644 --- a/smartsim/_core/mli/infrastructure/worker/worker.py +++ b/smartsim/_core/mli/infrastructure/worker/worker.py @@ -30,11 +30,13 @@ from .....error import SmartSimError from .....log import get_logger from ...comm.channel.channel import CommChannelBase -from ...infrastructure.storage.featurestore import FeatureStore +from ...infrastructure.storage.featurestore import FeatureStore, FeatureStoreKey +from ...message_handler import MessageHandler from ...mli_schemas.model.model_capnp import Model if t.TYPE_CHECKING: from smartsim._core.mli.mli_schemas.response.response_capnp import Status + from smartsim._core.mli.mli_schemas.tensor.tensor_capnp import TensorDescriptor logger = get_logger(__name__) @@ -44,26 +46,32 @@ class InferenceRequest: def __init__( self, - model_key: t.Optional[str] = None, + model_key: t.Optional[FeatureStoreKey] = None, callback: t.Optional[CommChannelBase] = None, raw_inputs: t.Optional[t.List[bytes]] = None, - # todo: copying byte array is likely to create a copy of the data in - # capnproto and will be a performance issue later - input_keys: t.Optional[t.List[str]] = None, + input_keys: t.Optional[t.List[FeatureStoreKey]] = None, input_meta: t.Optional[t.List[t.Any]] = None, - output_keys: t.Optional[t.List[str]] = None, + output_keys: t.Optional[t.List[FeatureStoreKey]] = None, raw_model: t.Optional[Model] = None, batch_size: int = 0, ): """Initialize the object""" self.model_key = model_key + """A tuple containing a (key, descriptor) pair""" self.raw_model = raw_model + """Raw bytes of an ML model""" self.callback = callback + """The channel used for notification of inference completion""" self.raw_inputs = raw_inputs or [] + """Raw bytes of tensor inputs""" self.input_keys = input_keys or [] + """A list of tuples containing a (key, descriptor) pair""" self.input_meta = input_meta or [] + """Metadata about the input data""" self.output_keys = output_keys or [] + """A list of tuples containing a (key, descriptor) pair""" self.batch_size = batch_size + """The batch size to apply when batching""" class InferenceReply: @@ -72,13 +80,13 @@ class InferenceReply: def __init__( self, outputs: t.Optional[t.Collection[t.Any]] = None, - output_keys: t.Optional[t.Collection[str]] = None, + output_keys: t.Optional[t.Collection[FeatureStoreKey]] = None, status_enum: "Status" = "running", message: str = "In progress", ) -> None: """Initialize the object""" self.outputs: t.Collection[t.Any] = outputs or [] - self.output_keys: t.Collection[t.Optional[str]] = output_keys or [] + self.output_keys: t.Collection[t.Optional[FeatureStoreKey]] = output_keys or [] self.status_enum = status_enum self.message = message @@ -148,13 +156,88 @@ def __init__(self, result: bytes) -> None: class MachineLearningWorkerCore: """Basic functionality of ML worker that is shared across all worker types""" + @staticmethod + def deserialize_message( + data_blob: bytes, + callback_factory: t.Callable[[bytes], CommChannelBase], + ) -> InferenceRequest: + """Deserialize a message from a byte stream into an InferenceRequest + :param data_blob: The byte stream to deserialize + :param callback_factory: A factory method that can create an instance + of the desired concrete comm channel type + :returns: The raw input message deserialized into an InferenceRequest + """ + request = MessageHandler.deserialize_request(data_blob) + model_key: t.Optional[FeatureStoreKey] = None + model_bytes: t.Optional[Model] = None + + if request.model.which() == "key": + model_key = FeatureStoreKey( + key=request.model.key.key, + descriptor=request.model.key.featureStoreDescriptor, + ) + elif request.model.which() == "data": + model_bytes = request.model.data + + callback_key = request.replyChannel.descriptor + comm_channel = callback_factory(callback_key) + input_keys: t.Optional[t.List[FeatureStoreKey]] = None + input_bytes: t.Optional[t.List[bytes]] = None + output_keys: t.Optional[t.List[FeatureStoreKey]] = None + input_meta: t.Optional[t.List[TensorDescriptor]] = None + + if request.input.which() == "keys": + input_keys = [ + FeatureStoreKey(key=value.key, descriptor=value.featureStoreDescriptor) + for value in request.input.keys + ] + elif request.input.which() == "descriptors": + input_meta = request.input.descriptors # type: ignore + + if request.output: + output_keys = [ + FeatureStoreKey(key=value.key, descriptor=value.featureStoreDescriptor) + for value in request.output + ] + + inference_request = InferenceRequest( + model_key=model_key, + callback=comm_channel, + raw_inputs=input_bytes, + input_meta=input_meta, + input_keys=input_keys, + output_keys=output_keys, + raw_model=model_bytes, + batch_size=0, + ) + return inference_request + + @staticmethod + def prepare_outputs(reply: InferenceReply) -> t.List[t.Any]: + prepared_outputs: t.List[t.Any] = [] + if reply.output_keys: + for value in reply.output_keys: + if not value: + continue + msg_key = MessageHandler.build_tensor_key(value.key, value.descriptor) + prepared_outputs.append(msg_key) + elif reply.outputs: + for _ in reply.outputs: + msg_tensor_desc = MessageHandler.build_tensor_descriptor( + "c", + "float32", + [1], + ) + prepared_outputs.append(msg_tensor_desc) + return prepared_outputs + @staticmethod def fetch_model( - request: InferenceRequest, feature_store: t.Optional[FeatureStore] + request: InferenceRequest, feature_stores: t.Dict[str, FeatureStore] ) -> FetchModelResult: """Given a resource key, retrieve the raw model from a feature store :param request: The request that triggered the pipeline - :param feature_store: The feature store used for persistence + :param feature_stores: Available feature stores used for persistence :return: Raw bytes of the model""" if request.raw_model: @@ -164,7 +247,7 @@ def fetch_model( # short-circuit and return the directly supplied model return FetchModelResult(request.raw_model.data) - if not feature_store: + if not feature_stores: raise ValueError("Feature store is required for model retrieval") if not request.model_key: @@ -172,44 +255,47 @@ def fetch_model( "Key must be provided to retrieve model from feature store" ) + key, fsd = request.model_key.key, request.model_key.descriptor + try: - raw_bytes: bytes = t.cast(bytes, feature_store[request.model_key]) + feature_store = feature_stores[fsd] + raw_bytes: bytes = t.cast(bytes, feature_store[key]) return FetchModelResult(raw_bytes) except FileNotFoundError as ex: logger.exception(ex) - raise SmartSimError( - f"Model could not be retrieved with key {request.model_key}" - ) from ex + raise SmartSimError(f"Model could not be retrieved with key {key}") from ex @staticmethod def fetch_inputs( - request: InferenceRequest, feature_store: t.Optional[FeatureStore] + request: InferenceRequest, feature_stores: t.Dict[str, FeatureStore] ) -> FetchInputResult: """Given a collection of ResourceKeys, identify the physical location and input metadata :param request: The request that triggered the pipeline - :param feature_store: The feature store used for persistence + :param feature_stores: Available feature stores used for persistence :return: the fetched input""" if request.raw_inputs: return FetchInputResult(request.raw_inputs, request.input_meta) - if not feature_store: + if not feature_stores: raise ValueError("No input and no feature store provided") if request.input_keys: data: t.List[bytes] = [] - for input_ in request.input_keys: + + for fs_key in request.input_keys: try: - tensor_bytes = t.cast(bytes, feature_store[input_]) + feature_store = feature_stores[fs_key.descriptor] + tensor_bytes = t.cast(bytes, feature_store[fs_key.key]) data.append(tensor_bytes) except KeyError as ex: logger.exception(ex) raise SmartSimError( - f"Model could not be retrieved with key {input_}" + f"Model could not be retrieved with key {fs_key.key}" ) from ex return FetchInputResult( - data, None + data, meta=None ) # fixme: need to get both tensor and descriptor raise ValueError("No input source") @@ -231,25 +317,26 @@ def batch_requests( def place_output( request: InferenceRequest, transform_result: TransformOutputResult, - feature_store: t.Optional[FeatureStore], - ) -> t.Collection[t.Optional[str]]: + feature_stores: t.Dict[str, FeatureStore], + ) -> t.Collection[t.Optional[FeatureStoreKey]]: """Given a collection of data, make it available as a shared resource in the feature store :param request: The request that triggered the pipeline :param execute_result: Results from inference - :param feature_store: The feature store used for persistence + :param feature_stores: Available feature stores used for persistence :return: A collection of keys that were placed in the feature store""" - if not feature_store: + if not feature_stores: raise ValueError("Feature store is required for output persistence") - keys: t.List[t.Optional[str]] = [] + keys: t.List[t.Optional[FeatureStoreKey]] = [] # need to decide how to get back to original sub-batch inputs so they can be # accurately placed, datum might need to include this. # Consider parallelizing all PUT feature_store operations - for k, v in zip(request.output_keys, transform_result.outputs): - feature_store[k] = v - keys.append(k) + for fs_key, v in zip(request.output_keys, transform_result.outputs): + feature_store = feature_stores[fs_key.descriptor] + feature_store[fs_key.key] = v + keys.append(fs_key) return keys diff --git a/smartsim/_core/mli/message_handler.py b/smartsim/_core/mli/message_handler.py index 00670dce8..ee632e24e 100644 --- a/smartsim/_core/mli/message_handler.py +++ b/smartsim/_core/mli/message_handler.py @@ -92,16 +92,21 @@ def build_output_tensor_descriptor( return description @staticmethod - def build_tensor_key(key: str) -> data_references_capnp.TensorKey: + def build_tensor_key( + key: str, feature_store_descriptor: str + ) -> data_references_capnp.TensorKey: """ Builds a new TensorKey message with the provided key. :param key: String to set the TensorKey + :param feature_store_descriptor: A descriptor identifying the feature store + containing the key :raises ValueError: if building fails """ try: tensor_key = data_references_capnp.TensorKey.new_message() tensor_key.key = key + tensor_key.featureStoreDescriptor = feature_store_descriptor except Exception as e: raise ValueError("Error building tensor key.") from e return tensor_key @@ -126,16 +131,21 @@ def build_model(data: bytes, name: str, version: str) -> model_capnp.Model: return model @staticmethod - def build_model_key(key: str) -> data_references_capnp.ModelKey: + def build_model_key( + key: str, feature_store_descriptor: str + ) -> data_references_capnp.ModelKey: """ Builds a new ModelKey message with the provided key. :param key: String to set the ModelKey + :param feature_store_descriptor: A descriptor identifying the feature store + containing the key :raises ValueError: if building fails """ try: model_key = data_references_capnp.ModelKey.new_message() model_key.key = key + model_key.featureStoreDescriptor = feature_store_descriptor except Exception as e: raise ValueError("Error building model key.") from e return model_key @@ -433,6 +443,7 @@ def _assign_result( result: t.Union[ t.List[tensor_capnp.TensorDescriptor], t.List[data_references_capnp.TensorKey], + None, ], ) -> None: """ @@ -498,6 +509,7 @@ def build_response( result: t.Union[ t.List[tensor_capnp.TensorDescriptor], t.List[data_references_capnp.TensorKey], + None, ], custom_attributes: t.Union[ response_attributes_capnp.TorchResponseAttributes, diff --git a/smartsim/_core/mli/mli_schemas/data/data_references.capnp b/smartsim/_core/mli/mli_schemas/data/data_references.capnp index f37a95726..699abe5d2 100644 --- a/smartsim/_core/mli/mli_schemas/data/data_references.capnp +++ b/smartsim/_core/mli/mli_schemas/data/data_references.capnp @@ -28,8 +28,10 @@ struct ModelKey { key @0 :Text; + featureStoreDescriptor @1 :Text; } struct TensorKey { key @0 :Text; + featureStoreDescriptor @1 :Text; } diff --git a/smartsim/_core/mli/mli_schemas/data/data_references_capnp.pyi b/smartsim/_core/mli/mli_schemas/data/data_references_capnp.pyi index 6f775cf8f..bcf53e0a0 100644 --- a/smartsim/_core/mli/mli_schemas/data/data_references_capnp.pyi +++ b/smartsim/_core/mli/mli_schemas/data/data_references_capnp.pyi @@ -36,6 +36,7 @@ from typing import Iterator class ModelKey: key: str + featureStoreDescriptor: str @staticmethod @contextmanager def from_bytes( @@ -71,6 +72,7 @@ class ModelKeyBuilder(ModelKey): class TensorKey: key: str + featureStoreDescriptor: str @staticmethod @contextmanager def from_bytes( diff --git a/tests/dragon/utils/featurestore.py b/tests/dragon/featurestore.py similarity index 69% rename from tests/dragon/utils/featurestore.py rename to tests/dragon/featurestore.py index 93b313431..d06035fd7 100644 --- a/tests/dragon/utils/featurestore.py +++ b/tests/dragon/featurestore.py @@ -29,6 +29,9 @@ import smartsim.error as sse from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore +from smartsim.log import get_logger + +logger = get_logger(__name__) class MemoryFeatureStore(FeatureStore): @@ -40,6 +43,7 @@ def __init__(self) -> None: def __getitem__(self, key: str) -> bytes: """Retrieve an item using key + :param key: Unique key of an item to retrieve from the feature store""" if key not in self._storage: raise sse.SmartSimError(f"{key} not found in feature store") @@ -47,28 +51,43 @@ def __getitem__(self, key: str) -> bytes: def __setitem__(self, key: str, value: bytes) -> None: """Membership operator to test for a key existing within the feature store. - Return `True` if the key is found, `False` otherwise - :param key: Unique key of an item to retrieve from the feature store""" + + :param key: Unique key of an item to retrieve from the feature store + :returns: `True` if the key is found, `False` otherwise""" self._storage[key] = value def __contains__(self, key: str) -> bool: """Membership operator to test for a key existing within the feature store. - Return `True` if the key is found, `False` otherwise - :param key: Unique key of an item to retrieve from the feature store""" + + :param key: Unique key of an item to retrieve from the feature store + :returns: `True` if the key is found, `False` otherwise""" return key in self._storage + @property + def descriptor(self) -> str: + """Unique identifier enabling a client to connect to the feature store + + :returns: A descriptor encoded as a string""" + return "file-system-fs" + class FileSystemFeatureStore(FeatureStore): """Alternative feature store implementation for testing. Stores all data on the file system""" - def __init__(self, storage_dir: t.Optional[pathlib.Path] = None) -> None: + def __init__( + self, storage_dir: t.Optional[t.Union[pathlib.Path, str]] = None + ) -> None: """Initialize the FileSystemFeatureStore instance + :param storage_dir: (optional) root directory to store all data relative to""" + if isinstance(storage_dir, str): + storage_dir = pathlib.Path(storage_dir) self._storage_dir = storage_dir def __getitem__(self, key: str) -> bytes: """Retrieve an item using key + :param key: Unique key of an item to retrieve from the feature store""" path = self._key_path(key) if not path.exists(): @@ -77,6 +96,7 @@ def __getitem__(self, key: str) -> bytes: def __setitem__(self, key: str, value: bytes) -> None: """Assign a value using key + :param key: Unique key of an item to set in the feature store :param value: Value to persist in the feature store""" path = self._key_path(key, create=True) @@ -84,14 +104,16 @@ def __setitem__(self, key: str, value: bytes) -> None: def __contains__(self, key: str) -> bool: """Membership operator to test for a key existing within the feature store. - Return `True` if the key is found, `False` otherwise - :param key: Unique key of an item to retrieve from the feature store""" + + :param key: Unique key of an item to retrieve from the feature store + :returns: `True` if the key is found, `False` otherwise""" path = self._key_path(key) return path.exists() def _key_path(self, key: str, create: bool = False) -> pathlib.Path: """Given a key, return a path that is optionally combined with a base directory used by the FileSystemFeatureStore. + :param key: Unique key of an item to retrieve from the feature store""" value = pathlib.Path(key) @@ -103,26 +125,32 @@ def _key_path(self, key: str, create: bool = False) -> pathlib.Path: return value - -class DragonDict: - """Mock implementation of a dragon dictionary""" - - def __init__(self) -> None: - """Initialize the mock DragonDict instance""" - self._storage: t.Dict[bytes, t.Any] = {} - - def __getitem__(self, key: bytes) -> t.Any: - """Retrieve an item using key - :param key: Unique key of an item to retrieve from the feature store""" - return self._storage[key] - - def __setitem__(self, key: bytes, value: t.Any) -> None: - """Assign a value using key - :param key: Unique key of an item to set in the feature store - :param value: Value to persist in the feature store""" - self._storage[key] = value - - def __contains__(self, key: bytes) -> bool: - """Return `True` if the key is found, `False` otherwise - :param key: Unique key of an item to retrieve from the feature store""" - return key in self._storage + @property + def descriptor(self) -> str: + """Unique identifier enabling a client to connect to the feature store + + :returns: A descriptor encoded as a string""" + if not self._storage_dir: + raise ValueError("No storage path configured") + return self._storage_dir.as_posix() + + @classmethod + def from_descriptor( + cls, + descriptor: str, + ) -> "FileSystemFeatureStore": + """A factory method that creates an instance from a descriptor string + + :param descriptor: The descriptor that uniquely identifies the resource + :returns: An attached FileSystemFeatureStore""" + try: + path = pathlib.Path(descriptor) + path.mkdir(parents=True, exist_ok=True) + if not path.is_dir(): + raise ValueError("FileSystemFeatureStore requires a directory path") + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + return FileSystemFeatureStore(path) + except: + logger.error(f"Error while creating FileSystemFeatureStore: {descriptor}") + raise diff --git a/tests/dragon/test_environment_loader.py b/tests/dragon/test_environment_loader.py index 00db0a9d3..6ae5d2b30 100644 --- a/tests/dragon/test_environment_loader.py +++ b/tests/dragon/test_environment_loader.py @@ -24,10 +24,6 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import base64 -import os -import pickle - import pytest dragon = pytest.importorskip("dragon") @@ -37,13 +33,13 @@ from dragon.data.ddict.ddict import DDict from dragon.fli import DragonFLIError, FLInterface +from smartsim._core.mli.comm.channel.dragonchannel import DragonCommChannel +from smartsim._core.mli.comm.channel.dragonfli import DragonFLIChannel from smartsim._core.mli.infrastructure.environmentloader import EnvironmentConfigLoader from smartsim._core.mli.infrastructure.storage.dragonfeaturestore import ( DragonFeatureStore, ) -from .utils.featurestore import MemoryFeatureStore - # The tests in this file belong to the dragon group pytestmark = pytest.mark.dragon @@ -55,97 +51,80 @@ pytest.param(b"new byte string"), ], ) -def test_environment_loader_attach_FLI(content, monkeypatch): +def test_environment_loader_attach_fli(content: bytes, monkeypatch: pytest.MonkeyPatch): """A descriptor can be stored, loaded, and reattached""" chan = Channel.make_process_local() queue = FLInterface(main_ch=chan) - monkeypatch.setenv("SSQueue", du.B64.bytes_to_str(queue.serialize())) + monkeypatch.setenv("SS_REQUEST_QUEUE", du.B64.bytes_to_str(queue.serialize())) - config = EnvironmentConfigLoader() + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=DragonCommChannel.from_descriptor, + queue_factory=DragonFLIChannel.from_descriptor, + ) config_queue = config.get_queue() - new_sender = config_queue.send(content) + _ = config_queue.send(content) old_recv = queue.recvh() result, _ = old_recv.recv_bytes() assert result == content -def test_environment_loader_serialize_FLI(monkeypatch): +def test_environment_loader_serialize_fli(monkeypatch: pytest.MonkeyPatch): """The serialized descriptors of a loaded and unloaded queue are the same""" chan = Channel.make_process_local() queue = FLInterface(main_ch=chan) - monkeypatch.setenv("SSQueue", du.B64.bytes_to_str(queue.serialize())) + monkeypatch.setenv("SS_REQUEST_QUEUE", du.B64.bytes_to_str(queue.serialize())) - config = EnvironmentConfigLoader() + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=DragonCommChannel.from_descriptor, + queue_factory=DragonFLIChannel.from_descriptor, + ) config_queue = config.get_queue() assert config_queue._fli.serialize() == queue.serialize() -def test_environment_loader_FLI_fails(monkeypatch): +def test_environment_loader_flifails(monkeypatch: pytest.MonkeyPatch): """An incorrect serialized descriptor will fails to attach""" - monkeypatch.setenv("SSQueue", "randomstring") - config = EnvironmentConfigLoader() + monkeypatch.setenv("SS_REQUEST_QUEUE", "randomstring") + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=None, + queue_factory=DragonFLIChannel.from_descriptor, + ) with pytest.raises(DragonFLIError): - config_queue = config.get_queue() - - -@pytest.mark.parametrize( - "expected_keys, expected_values", - [ - pytest.param(["key1", "key2", "key3"], ["value1", "value2", "value3"]), - pytest.param(["another key"], ["another value"]), - ], -) -def test_environment_loader_memory_featurestore( - expected_keys, expected_values, monkeypatch -): - """MemoryFeatureStores can be correctly serialized and deserialized""" - feature_store = MemoryFeatureStore() - key_value_pairs = zip(expected_keys, expected_values) - for k, v in key_value_pairs: - feature_store[k] = v - monkeypatch.setenv( - "SSFeatureStore", base64.b64encode(pickle.dumps(feature_store)).decode("utf-8") - ) - config = EnvironmentConfigLoader() - config_feature_store = config.get_feature_store() + config.get_queue() - for k, _ in key_value_pairs: - assert config_feature_store[k] == feature_store[k] +def test_environment_loader_backbone_load_dfs(monkeypatch: pytest.MonkeyPatch): + """Verify the dragon feature store is loaded correctly by the + EnvironmentConfigLoader to demonstrate featurestore_factory correctness""" + feature_store = DragonFeatureStore(DDict()) + monkeypatch.setenv("SS_INFRA_BACKBONE", feature_store.descriptor) -@pytest.mark.parametrize( - "expected_keys, expected_values", - [ - pytest.param(["key1", "key2", "key3"], ["value1", "value2", "value3"]), - pytest.param(["another key"], ["another value"]), - ], -) -def test_environment_loader_dragon_featurestore( - expected_keys, expected_values, monkeypatch -): - """DragonFeatureStores can be correctly serialized and deserialized""" - storage = DDict() - feature_store = DragonFeatureStore(storage) - key_value_pairs = zip(expected_keys, expected_values) - for k, v in key_value_pairs: - feature_store[k] = v - monkeypatch.setenv( - "SSFeatureStore", base64.b64encode(pickle.dumps(feature_store)).decode("utf-8") + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=None, + queue_factory=None, ) - config = EnvironmentConfigLoader() - config_feature_store = config.get_feature_store() - for k, _ in key_value_pairs: - assert config_feature_store[k] == feature_store[k] + print(f"calling config.get_backbone: `{feature_store.descriptor}`") + + backbone = config.get_backbone() + assert backbone is not None def test_environment_variables_not_set(): """EnvironmentConfigLoader getters return None when environment variables are not set""" - config = EnvironmentConfigLoader() - assert config.get_feature_store() == None - assert config.get_queue() == None + config = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=DragonCommChannel.from_descriptor, + queue_factory=DragonCommChannel.from_descriptor, + ) + assert config.get_backbone() is None + assert config.get_queue() is None diff --git a/tests/dragon/test_error_handling.py b/tests/dragon/test_error_handling.py index 151bdd2fc..208ab1e5e 100644 --- a/tests/dragon/test_error_handling.py +++ b/tests/dragon/test_error_handling.py @@ -24,8 +24,6 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import base64 -import pickle from unittest.mock import MagicMock import pytest @@ -37,6 +35,7 @@ from dragon.data.ddict.ddict import DDict from dragon.fli import FLInterface +from smartsim._core.mli.comm.channel.dragonfli import DragonFLIChannel from smartsim._core.mli.infrastructure.control.workermanager import ( WorkerManager, exception_handler, @@ -45,6 +44,7 @@ from smartsim._core.mli.infrastructure.storage.dragonfeaturestore import ( DragonFeatureStore, ) +from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore from smartsim._core.mli.infrastructure.worker.worker import ( ExecuteResult, FetchInputResult, @@ -64,30 +64,51 @@ @pytest.fixture -def setup_worker_manager_model_bytes(test_dir, monkeypatch: pytest.MonkeyPatch): +def backbone_descriptor() -> str: + # create a shared backbone featurestore + feature_store = DragonFeatureStore(DDict()) + return feature_store.descriptor + + +@pytest.fixture +def app_feature_store() -> FeatureStore: + # create a standalone feature store to mimic a user application putting + # data into an application-owned resource (app should not access backbone) + app_fs = DragonFeatureStore(DDict()) + return app_fs + + +@pytest.fixture +def setup_worker_manager_model_bytes( + test_dir, + monkeypatch: pytest.MonkeyPatch, + backbone_descriptor: str, + app_feature_store: FeatureStore, +): integrated_worker = IntegratedTorchWorker() chan = Channel.make_process_local() queue = FLInterface(main_ch=chan) - monkeypatch.setenv("SSQueue", du.B64.bytes_to_str(queue.serialize())) - storage = DDict() - feature_store = DragonFeatureStore(storage) - monkeypatch.setenv( - "SSFeatureStore", base64.b64encode(pickle.dumps(feature_store)).decode("utf-8") - ) + monkeypatch.setenv("SS_REQUEST_QUEUE", du.B64.bytes_to_str(queue.serialize())) + # Put backbone descriptor into env var for the `EnvironmentConfigLoader` + monkeypatch.setenv("SS_INFRA_BACKBONE", backbone_descriptor) worker_manager = WorkerManager( - EnvironmentConfigLoader(), + EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=FileSystemCommChannel.from_descriptor, + queue_factory=DragonFLIChannel.from_descriptor, + ), integrated_worker, as_service=False, cooldown=3, - comm_channel_type=FileSystemCommChannel, ) - tensor_key = MessageHandler.build_tensor_key("key") + tensor_key = MessageHandler.build_tensor_key("key", app_feature_store.descriptor) + output_key = MessageHandler.build_tensor_key("key", app_feature_store.descriptor) model = MessageHandler.build_model(b"model", "model name", "v 0.0.1") request = MessageHandler.build_request( - test_dir, model, [tensor_key], [tensor_key], [], None + test_dir, model, [tensor_key], [output_key], [], None ) ser_request = MessageHandler.serialize_request(request) worker_manager._task_queue.send(ser_request) @@ -96,30 +117,38 @@ def setup_worker_manager_model_bytes(test_dir, monkeypatch: pytest.MonkeyPatch): @pytest.fixture -def setup_worker_manager_model_key(test_dir, monkeypatch: pytest.MonkeyPatch): +def setup_worker_manager_model_key( + test_dir: str, + monkeypatch: pytest.MonkeyPatch, + backbone_descriptor: str, + app_feature_store: FeatureStore, +): integrated_worker = IntegratedTorchWorker() chan = Channel.make_process_local() queue = FLInterface(main_ch=chan) - monkeypatch.setenv("SSQueue", du.B64.bytes_to_str(queue.serialize())) - storage = DDict() - feature_store = DragonFeatureStore(storage) - monkeypatch.setenv( - "SSFeatureStore", base64.b64encode(pickle.dumps(feature_store)).decode("utf-8") - ) + monkeypatch.setenv("SS_REQUEST_QUEUE", du.B64.bytes_to_str(queue.serialize())) + # Put backbone descriptor into env var for the `EnvironmentConfigLoader` + monkeypatch.setenv("SS_INFRA_BACKBONE", backbone_descriptor) worker_manager = WorkerManager( - EnvironmentConfigLoader(), + EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=FileSystemCommChannel.from_descriptor, + queue_factory=DragonFLIChannel.from_descriptor, + ), integrated_worker, as_service=False, cooldown=3, - comm_channel_type=FileSystemCommChannel, ) - tensor_key = MessageHandler.build_tensor_key("key") - model_key = MessageHandler.build_model_key("model key") + tensor_key = MessageHandler.build_tensor_key("key", app_feature_store.descriptor) + output_key = MessageHandler.build_tensor_key("key", app_feature_store.descriptor) + model_key = MessageHandler.build_model_key( + "model key", app_feature_store.descriptor + ) request = MessageHandler.build_request( - test_dir, model_key, [tensor_key], [tensor_key], [], None + test_dir, model_key, [tensor_key], [output_key], [], None ) ser_request = MessageHandler.serialize_request(request) worker_manager._task_queue.send(ser_request) @@ -162,7 +191,11 @@ def mock_exception_handler(exc, reply_channel, failure_message): pytest.param( "fetch_model", "Failed while fetching the model.", id="fetch model" ), - pytest.param("load_model", "Failed while loading the model.", id="load model"), + pytest.param( + "load_model", + "Failed while loading model from feature store.", + id="load model", + ), pytest.param( "fetch_inputs", "Failed while fetching the inputs.", id="fetch inputs" ), diff --git a/tests/dragon/test_reply_building.py b/tests/dragon/test_reply_building.py index d1c4d226b..5f179bbae 100644 --- a/tests/dragon/test_reply_building.py +++ b/tests/dragon/test_reply_building.py @@ -30,10 +30,7 @@ dragon = pytest.importorskip("dragon") -from smartsim._core.mli.infrastructure.control.workermanager import ( - build_failure_reply, - build_reply, -) +from smartsim._core.mli.infrastructure.control.workermanager import build_failure_reply from smartsim._core.mli.infrastructure.worker.worker import InferenceReply if t.TYPE_CHECKING: @@ -63,29 +60,3 @@ def test_build_failure_reply_fails(): response = build_failure_reply("not a status enum", "message") assert "Error assigning status to response" in ex.value.args[0] - - -@pytest.mark.parametrize( - "status, message", - [ - pytest.param("complete", "Success", id="complete"), - ], -) -def test_build_reply(status: "Status", message: str): - "Ensures replies can be built successfully" - reply = InferenceReply() - reply.status_enum = status - reply.message = message - response = build_reply(reply) - assert response.status == status - assert response.message == message - - -def test_build_reply_fails(): - "Ensures ValueError is raised if a Status Enum is not used" - with pytest.raises(ValueError) as ex: - reply = InferenceReply() - reply.status_enum = "not a status enum" - response = build_reply(reply) - - assert "Error assigning status to response" in ex.value.args[0] diff --git a/tests/mli/test_worker_manager.py b/tests/dragon/test_worker_manager.py similarity index 77% rename from tests/mli/test_worker_manager.py rename to tests/dragon/test_worker_manager.py index df4b0a637..864e14993 100644 --- a/tests/mli/test_worker_manager.py +++ b/tests/dragon/test_worker_manager.py @@ -35,46 +35,34 @@ torch = pytest.importorskip("torch") dragon = pytest.importorskip("dragon") +import base64 +import os + +import dragon.channels as dch +from dragon import fli + +from smartsim._core.mli.comm.channel.channel import CommChannelBase +from smartsim._core.mli.comm.channel.dragonfli import DragonFLIChannel from smartsim._core.mli.infrastructure.control.workermanager import ( EnvironmentConfigLoader, WorkerManager, ) +from smartsim._core.mli.infrastructure.storage.dragonfeaturestore import ( + DragonFeatureStore, +) from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore +from smartsim._core.mli.infrastructure.worker.torch_worker import TorchWorker from smartsim._core.mli.message_handler import MessageHandler from smartsim.log import get_logger -from .channel import FileSystemCommChannel from .featurestore import FileSystemFeatureStore -from .worker import IntegratedTorchWorker +from .utils.channel import FileSystemCommChannel logger = get_logger(__name__) # The tests in this file belong to the dragon group pytestmark = pytest.mark.dragon -def mock_work(worker_manager_queue: "mp.Queue[bytes]") -> None: - """Mock event producer for triggering the inference pipeline""" - # todo: move to unit tests - while True: - time.sleep(1) - # 1. for demo, ignore upstream and just put stuff into downstream - # 2. for demo, only one downstream but we'd normally have to filter - # msg content and send to the correct downstream (worker) queue - timestamp = time.time_ns() - output_dir = "/lus/bnchlu1/mcbridch/code/ss/_tmp" - output_path = pathlib.Path(output_dir) - - mock_channel = output_path / f"brainstorm-{timestamp}.txt" - mock_model = output_path / "brainstorm.pt" - - output_path.mkdir(parents=True, exist_ok=True) - mock_channel.touch() - mock_model.touch() - - msg = f"PyTorch:{mock_model}:MockInputToReplace:{mock_channel}" - worker_manager_queue.put(msg.encode("utf-8")) - - def persist_model_file(model_path: pathlib.Path) -> pathlib.Path: """Create a simple torch model and persist to disk for testing purposes. @@ -94,7 +82,7 @@ def persist_model_file(model_path: pathlib.Path) -> pathlib.Path: def mock_messages( - worker_manager_queue: "mp.Queue[bytes]", + worker_manager_queue: CommChannelBase, feature_store: FeatureStore, feature_store_root_dir: pathlib.Path, comm_channel_root_dir: pathlib.Path, @@ -139,10 +127,11 @@ def mock_messages( tensor = torch.randn((1, 2), dtype=torch.float32) torch.save(tensor, buffer) feature_store[input_key] = buffer.getvalue() + fsd = feature_store.descriptor - message_tensor_output_key = MessageHandler.build_tensor_key(output_key) - message_tensor_input_key = MessageHandler.build_tensor_key(input_key) - message_model_key = MessageHandler.build_model_key(model_key) + message_tensor_output_key = MessageHandler.build_tensor_key(output_key, fsd) + message_tensor_input_key = MessageHandler.build_tensor_key(input_key, fsd) + message_model_key = MessageHandler.build_model_key(model_key, fsd) request = MessageHandler.build_request( reply_channel=callback_channel.descriptor, @@ -153,7 +142,7 @@ def mock_messages( custom_attributes=None, ) request_bytes = MessageHandler.serialize_request(request) - worker_manager_queue.put(request_bytes) + worker_manager_queue.send(request_bytes) @pytest.fixture @@ -171,30 +160,49 @@ def test_worker_manager(prepare_environment: pathlib.Path) -> None: fs_path = test_path / "feature_store" comm_path = test_path / "comm_store" - config_loader = EnvironmentConfigLoader() - integrated_worker = IntegratedTorchWorker() + to_worker_channel = dch.Channel.make_process_local() + to_worker_fli = fli.FLInterface(main_ch=to_worker_channel, manager_ch=None) + to_worker_fli_serialized = to_worker_fli.serialize() + + # NOTE: env vars should be set prior to instantiating EnvironmentConfigLoader + # or test environment may be unable to send messages w/queue + descriptor = base64.b64encode(to_worker_fli_serialized).decode("utf-8") + os.environ["SS_REQUEST_QUEUE"] = descriptor + + config_loader = EnvironmentConfigLoader( + featurestore_factory=DragonFeatureStore.from_descriptor, + callback_factory=FileSystemCommChannel.from_descriptor, + queue_factory=DragonFLIChannel.from_descriptor, + ) + integrated_worker = TorchWorker() worker_manager = WorkerManager( config_loader, integrated_worker, as_service=True, - cooldown=10, - comm_channel_type=FileSystemCommChannel, + cooldown=5, + device="cpu", ) + worker_queue = config_loader.get_queue() + if worker_queue is None: + logger.warn( + f"FLI input queue not loaded correctly from config_loader: {config_loader._queue_descriptor}" + ) + # create a mock client application to populate the request queue msg_pump = mp.Process( target=mock_messages, args=( - config_loader.get_queue(), - config_loader.get_feature_store(), + worker_queue, + FileSystemFeatureStore(fs_path), fs_path, comm_path, ), ) msg_pump.start() - # # create a process to process commands + # create a process to execute commands process = mp.Process(target=worker_manager.execute) process.start() process.join(timeout=5) diff --git a/tests/dragon/utils/channel.py b/tests/dragon/utils/channel.py index df76c484b..08b659c07 100644 --- a/tests/dragon/utils/channel.py +++ b/tests/dragon/utils/channel.py @@ -25,6 +25,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import pathlib +import threading import typing as t from smartsim._core.mli.comm.channel.channel import CommChannelBase @@ -37,7 +38,11 @@ class FileSystemCommChannel(CommChannelBase): """Passes messages by writing to a file""" def __init__(self, key: t.Union[bytes, pathlib.Path]) -> None: - """Initialize the FileSystemCommChannel instance""" + """Initialize the FileSystemCommChannel instance + + :param key: a path to the root directory of the feature store""" + self._lock = threading.RLock() + if not isinstance(key, bytes): super().__init__(key.as_posix().encode("utf-8")) self._file_path = key @@ -52,13 +57,38 @@ def __init__(self, key: t.Union[bytes, pathlib.Path]) -> None: def send(self, value: bytes) -> None: """Send a message throuh the underlying communication channel + :param value: The value to send""" logger.debug( f"Channel {self.descriptor.decode('utf-8')} sending message to {self._file_path}" ) - self._file_path.write_bytes(value) + with self._lock: + self._file_path.write_bytes(value) def recv(self) -> bytes: """Receieve a message through the underlying communication channel + :returns: the received message""" - ... + with self._lock: + if self._file_path.exists(): + incoming = self._file_path.read_bytes() + self._file_path.unlink() + return incoming + + @classmethod + def from_descriptor( + cls, + descriptor: t.Union[str, bytes], + ) -> "FileSystemCommChannel": + """A factory method that creates an instance from a descriptor string + + :param descriptor: The descriptor that uniquely identifies the resource + :returns: An attached FileSystemCommChannel""" + try: + if isinstance(descriptor, str): + path = pathlib.Path(descriptor) + else: + path = pathlib.Path(descriptor.decode("utf-8")) + return FileSystemCommChannel(path) + except: + print("failed to create FS comm channel: {descriptor}") diff --git a/tests/dragon/utils/worker.py b/tests/dragon/utils/worker.py index b1de28018..0582cae56 100644 --- a/tests/dragon/utils/worker.py +++ b/tests/dragon/utils/worker.py @@ -47,7 +47,7 @@ class IntegratedTorchWorker(mliw.MachineLearningWorkerBase): @staticmethod def load_model( - request: mliw.InferenceRequest, fetch_result: mliw.FetchModelResult + request: mliw.InferenceRequest, fetch_result: mliw.FetchModelResult, device: str ) -> mliw.LoadModelResult: model_bytes = fetch_result.model_bytes or request.raw_model if not model_bytes: @@ -61,6 +61,7 @@ def load_model( def transform_input( request: mliw.InferenceRequest, fetch_result: mliw.FetchInputResult, + device: str, ) -> mliw.TransformInputResult: # extra metadata for assembly can be found in request.input_meta raw_inputs = request.raw_inputs or fetch_result.inputs @@ -93,36 +94,11 @@ def execute( def transform_output( request: mliw.InferenceRequest, execute_result: mliw.ExecuteResult, + result_device: str, ) -> mliw.TransformOutputResult: - # transformed = [item.clone() for item in execute_result.predictions] - # return OutputTransformResult(transformed) - - # transformed = [item.bytes() for item in execute_result.predictions] - - # OutputTransformResult.transformed SHOULD be a list of - # capnproto Tensors Or tensor descriptors accompanying bytes - # send the original tensors... execute_result.predictions = [t.detach() for t in execute_result.predictions] # todo: solve sending all tensor metadata that coincisdes with each prediction return mliw.TransformOutputResult( execute_result.predictions, [1], "c", "float32" ) - # return OutputTransformResult(transformed) - - # @staticmethod - # def serialize_reply( - # request: InferenceRequest, results: OutputTransformResult - # ) -> t.Any: - # # results = IntegratedTorchWorker._prepare_outputs(results.outputs) - # # return results - # return None - # # response = MessageHandler.build_response( - # # status=200, # todo: are we satisfied with 0/1 (success, fail) - # # # todo: if not detailed messages, this shouldn't be returned. - # # message="success", - # # result=results, - # # custom_attributes=None, - # # ) - # # serialized_resp = MessageHandler.serialize_response(response) - # # return serialized_resp diff --git a/tests/mli/channel.py b/tests/mli/channel.py index 4bc2014ea..226e8683d 100644 --- a/tests/mli/channel.py +++ b/tests/mli/channel.py @@ -25,6 +25,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import pathlib +import threading import typing as t from smartsim._core.mli.comm.channel.channel import CommChannelBase @@ -37,7 +38,10 @@ class FileSystemCommChannel(CommChannelBase): """Passes messages by writing to a file""" def __init__(self, key: t.Union[bytes, pathlib.Path]) -> None: - """Initialize the FileSystemCommChannel instance""" + """Initialize the FileSystemCommChannel instance + + :param key: a path to the root directory of the feature store""" + self._lock = threading.RLock() if not isinstance(key, bytes): super().__init__(key.as_posix().encode("utf-8")) self._file_path = key @@ -52,8 +56,36 @@ def __init__(self, key: t.Union[bytes, pathlib.Path]) -> None: def send(self, value: bytes) -> None: """Send a message throuh the underlying communication channel + :param value: The value to send""" logger.debug( f"Channel {self.descriptor.decode('utf-8')} sending message to {self._file_path}" ) - self._file_path.write_bytes(value) + with self._lock: + self._file_path.write_bytes(value) + + def recv(self) -> bytes: + """Receieve a message through the underlying communication channel + + :returns: the received message""" + with self._lock: + if self._file_path.exists(): + incoming = self._file_path.read_bytes() + self._file_path.unlink() + return incoming + + @classmethod + def from_descriptor( + cls, + descriptor: str, + ) -> "FileSystemCommChannel": + """A factory method that creates an instance from a descriptor string + + :param descriptor: The descriptor that uniquely identifies the resource + :returns: An attached FileSystemCommChannel""" + try: + path = pathlib.Path(descriptor) + return FileSystemCommChannel(path) + except: + print(f"failed to create fs comm channel: {descriptor}") + raise diff --git a/tests/mli/featurestore.py b/tests/mli/featurestore.py index 93b313431..de748ae6e 100644 --- a/tests/mli/featurestore.py +++ b/tests/mli/featurestore.py @@ -29,6 +29,9 @@ import smartsim.error as sse from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore +from smartsim.log import get_logger + +logger = get_logger(__name__) class MemoryFeatureStore(FeatureStore): @@ -40,6 +43,7 @@ def __init__(self) -> None: def __getitem__(self, key: str) -> bytes: """Retrieve an item using key + :param key: Unique key of an item to retrieve from the feature store""" if key not in self._storage: raise sse.SmartSimError(f"{key} not found in feature store") @@ -47,8 +51,9 @@ def __getitem__(self, key: str) -> bytes: def __setitem__(self, key: str, value: bytes) -> None: """Membership operator to test for a key existing within the feature store. - Return `True` if the key is found, `False` otherwise - :param key: Unique key of an item to retrieve from the feature store""" + + :param key: Unique key of an item to retrieve from the feature store + :returns: `True` if the key is found, `False` otherwise""" self._storage[key] = value def __contains__(self, key: str) -> bool: @@ -57,18 +62,31 @@ def __contains__(self, key: str) -> bool: :param key: Unique key of an item to retrieve from the feature store""" return key in self._storage + @property + def descriptor(self) -> str: + """Unique identifier enabling a client to connect to the feature store + + :returns: A descriptor encoded as a string""" + return "in-memory-fs" + class FileSystemFeatureStore(FeatureStore): """Alternative feature store implementation for testing. Stores all data on the file system""" - def __init__(self, storage_dir: t.Optional[pathlib.Path] = None) -> None: + def __init__( + self, storage_dir: t.Optional[t.Union[pathlib.Path, str]] = None + ) -> None: """Initialize the FileSystemFeatureStore instance + :param storage_dir: (optional) root directory to store all data relative to""" + if isinstance(storage_dir, str): + storage_dir = pathlib.Path(storage_dir) self._storage_dir = storage_dir def __getitem__(self, key: str) -> bytes: """Retrieve an item using key + :param key: Unique key of an item to retrieve from the feature store""" path = self._key_path(key) if not path.exists(): @@ -77,6 +95,7 @@ def __getitem__(self, key: str) -> bytes: def __setitem__(self, key: str, value: bytes) -> None: """Assign a value using key + :param key: Unique key of an item to set in the feature store :param value: Value to persist in the feature store""" path = self._key_path(key, create=True) @@ -84,14 +103,16 @@ def __setitem__(self, key: str, value: bytes) -> None: def __contains__(self, key: str) -> bool: """Membership operator to test for a key existing within the feature store. - Return `True` if the key is found, `False` otherwise - :param key: Unique key of an item to retrieve from the feature store""" + + :param key: Unique key of an item to retrieve from the feature store + :returns: `True` if the key is found, `False` otherwise""" path = self._key_path(key) return path.exists() def _key_path(self, key: str, create: bool = False) -> pathlib.Path: """Given a key, return a path that is optionally combined with a base directory used by the FileSystemFeatureStore. + :param key: Unique key of an item to retrieve from the feature store""" value = pathlib.Path(key) @@ -103,26 +124,32 @@ def _key_path(self, key: str, create: bool = False) -> pathlib.Path: return value - -class DragonDict: - """Mock implementation of a dragon dictionary""" - - def __init__(self) -> None: - """Initialize the mock DragonDict instance""" - self._storage: t.Dict[bytes, t.Any] = {} - - def __getitem__(self, key: bytes) -> t.Any: - """Retrieve an item using key - :param key: Unique key of an item to retrieve from the feature store""" - return self._storage[key] - - def __setitem__(self, key: bytes, value: t.Any) -> None: - """Assign a value using key - :param key: Unique key of an item to set in the feature store - :param value: Value to persist in the feature store""" - self._storage[key] = value - - def __contains__(self, key: bytes) -> bool: - """Return `True` if the key is found, `False` otherwise - :param key: Unique key of an item to retrieve from the feature store""" - return key in self._storage + @property + def descriptor(self) -> str: + """Unique identifier enabling a client to connect to the feature store + + :returns: A descriptor encoded as a string""" + if not self._storage_dir: + raise ValueError("No storage path configured") + return self._storage_dir.as_posix() + + @classmethod + def from_descriptor( + cls, + descriptor: str, + ) -> "FileSystemFeatureStore": + """A factory method that creates an instance from a descriptor string + + :param descriptor: The descriptor that uniquely identifies the resource + :returns: An attached FileSystemFeatureStore""" + try: + path = pathlib.Path(descriptor) + path.mkdir(parents=True, exist_ok=True) + if not path.is_dir(): + raise ValueError("FileSystemFeatureStore requires a directory path") + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + return FileSystemFeatureStore(path) + except: + logger.error(f"Error while creating FileSystemFeatureStore: {descriptor}") + raise diff --git a/tests/mli/test_core_machine_learning_worker.py b/tests/mli/test_core_machine_learning_worker.py index cff02c9c1..6fa9f9944 100644 --- a/tests/mli/test_core_machine_learning_worker.py +++ b/tests/mli/test_core_machine_learning_worker.py @@ -31,6 +31,7 @@ import torch import smartsim.error as sse +from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStoreKey from smartsim._core.mli.infrastructure.worker.worker import ( InferenceRequest, MachineLearningWorkerCore, @@ -84,17 +85,18 @@ def persist_torch_tensor(test_dir: str) -> pathlib.Path: @pytest.mark.skipif(not torch_available, reason="Torch backend is not installed") -def test_fetch_model_disk(persist_torch_model: pathlib.Path) -> None: +def test_fetch_model_disk(persist_torch_model: pathlib.Path, test_dir: str) -> None: """Verify that the ML worker successfully retrieves a model when given a valid (file system) key""" worker = MachineLearningWorkerCore key = str(persist_torch_model) - feature_store = FileSystemFeatureStore() + feature_store = FileSystemFeatureStore(test_dir) + fsd = feature_store.descriptor feature_store[str(persist_torch_model)] = persist_torch_model.read_bytes() - request = InferenceRequest(model_key=key) + request = InferenceRequest(model_key=FeatureStoreKey(key=key, descriptor=fsd)) - fetch_result = worker.fetch_model(request, feature_store) + fetch_result = worker.fetch_model(request, {fsd: feature_store}) assert fetch_result.model_bytes assert fetch_result.model_bytes == persist_torch_model.read_bytes() @@ -104,13 +106,14 @@ def test_fetch_model_disk_missing() -> None: when given an invalid (file system) key""" worker = MachineLearningWorkerCore feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor key = "/path/that/doesnt/exist" - request = InferenceRequest(model_key=key) + request = InferenceRequest(model_key=FeatureStoreKey(key=key, descriptor=fsd)) with pytest.raises(sse.SmartSimError) as ex: - worker.fetch_model(request, feature_store) + worker.fetch_model(request, {fsd: feature_store}) # ensure the error message includes key-identifying information assert key in ex.value.args[0] @@ -127,10 +130,13 @@ def test_fetch_model_feature_store(persist_torch_model: pathlib.Path) -> None: # put model bytes into the feature store feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor feature_store[key] = persist_torch_model.read_bytes() - request = InferenceRequest(model_key=key) - fetch_result = worker.fetch_model(request, feature_store) + request = InferenceRequest( + model_key=FeatureStoreKey(key=key, descriptor=feature_store.descriptor) + ) + fetch_result = worker.fetch_model(request, {fsd: feature_store}) assert fetch_result.model_bytes assert fetch_result.model_bytes == persist_torch_model.read_bytes() @@ -140,17 +146,20 @@ def test_fetch_model_feature_store_missing() -> None: when given an invalid (feature store) key""" worker = MachineLearningWorkerCore - bad_key = "some-key" + key = "some-key" feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor - request = InferenceRequest(model_key=bad_key) + request = InferenceRequest( + model_key=FeatureStoreKey(key=key, descriptor=feature_store.descriptor) + ) # todo: consider that raising this exception shows impl. replace... with pytest.raises(sse.SmartSimError) as ex: - worker.fetch_model(request, feature_store) + worker.fetch_model(request, {fsd: feature_store}) # ensure the error message includes key-identifying information - assert bad_key in ex.value.args[0] + assert key in ex.value.args[0] @pytest.mark.skipif(not torch_available, reason="Torch backend is not installed") @@ -161,11 +170,14 @@ def test_fetch_model_memory(persist_torch_model: pathlib.Path) -> None: key = "test-model" feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor feature_store[key] = persist_torch_model.read_bytes() - request = InferenceRequest(model_key=key) + request = InferenceRequest( + model_key=FeatureStoreKey(key=key, descriptor=feature_store.descriptor) + ) - fetch_result = worker.fetch_model(request, feature_store) + fetch_result = worker.fetch_model(request, {fsd: feature_store}) assert fetch_result.model_bytes assert fetch_result.model_bytes == persist_torch_model.read_bytes() @@ -176,13 +188,16 @@ def test_fetch_input_disk(persist_torch_tensor: pathlib.Path) -> None: when given a valid (file system) key""" tensor_name = str(persist_torch_tensor) - request = InferenceRequest(input_keys=[tensor_name]) + feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor + request = InferenceRequest( + input_keys=[FeatureStoreKey(key=tensor_name, descriptor=fsd)] + ) worker = MachineLearningWorkerCore - feature_store = MemoryFeatureStore() feature_store[tensor_name] = persist_torch_tensor.read_bytes() - fetch_result = worker.fetch_inputs(request, feature_store) + fetch_result = worker.fetch_inputs(request, {fsd: feature_store}) assert fetch_result.inputs is not None @@ -191,16 +206,17 @@ def test_fetch_input_disk_missing() -> None: when given an invalid (file system) key""" worker = MachineLearningWorkerCore - key = "/path/that/doesnt/exist" feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor + key = "/path/that/doesnt/exist" - request = InferenceRequest(input_keys=[key]) + request = InferenceRequest(input_keys=[FeatureStoreKey(key=key, descriptor=fsd)]) with pytest.raises(sse.SmartSimError) as ex: - worker.fetch_inputs(request, feature_store) + worker.fetch_inputs(request, {fsd: feature_store}) # ensure the error message includes key-identifying information - assert key in ex.value.args[0] + assert key[0] in ex.value.args[0] @pytest.mark.skipif(not torch_available, reason="Torch backend is not installed") @@ -211,13 +227,16 @@ def test_fetch_input_feature_store(persist_torch_tensor: pathlib.Path) -> None: tensor_name = "test-tensor" feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor - request = InferenceRequest(input_keys=[tensor_name]) + request = InferenceRequest( + input_keys=[FeatureStoreKey(key=tensor_name, descriptor=fsd)] + ) # put model bytes into the feature store feature_store[tensor_name] = persist_torch_tensor.read_bytes() - fetch_result = worker.fetch_inputs(request, feature_store) + fetch_result = worker.fetch_inputs(request, {fsd: feature_store}) assert fetch_result.inputs assert list(fetch_result.inputs)[0][:10] == persist_torch_tensor.read_bytes()[:10] @@ -230,6 +249,7 @@ def test_fetch_multi_input_feature_store(persist_torch_tensor: pathlib.Path) -> tensor_name = "test-tensor" feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor # put model bytes into the feature store body1 = persist_torch_tensor.read_bytes() @@ -242,10 +262,14 @@ def test_fetch_multi_input_feature_store(persist_torch_tensor: pathlib.Path) -> feature_store[tensor_name + "3"] = body3 request = InferenceRequest( - input_keys=[tensor_name + "1", tensor_name + "2", tensor_name + "3"] + input_keys=[ + FeatureStoreKey(key=tensor_name + "1", descriptor=fsd), + FeatureStoreKey(key=tensor_name + "2", descriptor=fsd), + FeatureStoreKey(key=tensor_name + "3", descriptor=fsd), + ] ) - fetch_result = worker.fetch_inputs(request, feature_store) + fetch_result = worker.fetch_inputs(request, {fsd: feature_store}) raw_bytes = list(fetch_result.inputs) assert raw_bytes @@ -259,15 +283,16 @@ def test_fetch_input_feature_store_missing() -> None: when given an invalid (feature store) key""" worker = MachineLearningWorkerCore - bad_key = "some-key" + key = "bad-key" feature_store = MemoryFeatureStore() - request = InferenceRequest(input_keys=[bad_key]) + fsd = feature_store.descriptor + request = InferenceRequest(input_keys=[FeatureStoreKey(key=key, descriptor=fsd)]) with pytest.raises(sse.SmartSimError) as ex: - worker.fetch_inputs(request, feature_store) + worker.fetch_inputs(request, {fsd: feature_store}) # ensure the error message includes key-identifying information - assert bad_key in ex.value.args[0] + assert key in ex.value.args[0] @pytest.mark.skipif(not torch_available, reason="Torch backend is not installed") @@ -276,12 +301,13 @@ def test_fetch_input_memory(persist_torch_tensor: pathlib.Path) -> None: when given a valid (file system) key""" worker = MachineLearningWorkerCore feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor - model_name = "test-model" - feature_store[model_name] = persist_torch_tensor.read_bytes() - request = InferenceRequest(input_keys=[model_name]) + key = "test-model" + feature_store[key] = persist_torch_tensor.read_bytes() + request = InferenceRequest(input_keys=[FeatureStoreKey(key=key, descriptor=fsd)]) - fetch_result = worker.fetch_inputs(request, feature_store) + fetch_result = worker.fetch_inputs(request, {fsd: feature_store}) assert fetch_result.inputs is not None @@ -304,18 +330,23 @@ def test_place_outputs() -> None: key_name = "test-model" feature_store = MemoryFeatureStore() + fsd = feature_store.descriptor # create a key to retrieve from the feature store - keys = [key_name + "1", key_name + "2", key_name + "3"] + keys = [ + FeatureStoreKey(key=key_name + "1", descriptor=fsd), + FeatureStoreKey(key=key_name + "2", descriptor=fsd), + FeatureStoreKey(key=key_name + "3", descriptor=fsd), + ] data = [b"abcdef", b"ghijkl", b"mnopqr"] - for k, v in zip(keys, data): - feature_store[k] = v + for fsk, v in zip(keys, data): + feature_store[fsk.key] = v request = InferenceRequest(output_keys=keys) transform_result = TransformOutputResult(data, [1], "c", "float32") - worker.place_output(request, transform_result, feature_store) + worker.place_output(request, transform_result, {fsd: feature_store}) for i in range(3): - assert feature_store[keys[i]] == data[i] + assert feature_store[keys[i].key] == data[i] diff --git a/tests/mli/test_torch_worker.py b/tests/mli/test_torch_worker.py index b73e4a31b..1e8bba7e3 100644 --- a/tests/mli/test_torch_worker.py +++ b/tests/mli/test_torch_worker.py @@ -26,12 +26,12 @@ import io -import numpy as np import pytest import torch from torch import nn from torch.nn import functional as F +from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStoreKey from smartsim._core.mli.infrastructure.worker.torch_worker import TorchWorker from smartsim._core.mli.infrastructure.worker.worker import ( ExecuteResult, @@ -102,7 +102,7 @@ def get_request() -> InferenceRequest: ] return InferenceRequest( - model_key="model", + model_key=FeatureStoreKey(key="model", descriptor="xyz"), callback=None, raw_inputs=tensor_numpy, input_keys=None, diff --git a/tests/mli/worker.py b/tests/mli/worker.py index b1de28018..0582cae56 100644 --- a/tests/mli/worker.py +++ b/tests/mli/worker.py @@ -47,7 +47,7 @@ class IntegratedTorchWorker(mliw.MachineLearningWorkerBase): @staticmethod def load_model( - request: mliw.InferenceRequest, fetch_result: mliw.FetchModelResult + request: mliw.InferenceRequest, fetch_result: mliw.FetchModelResult, device: str ) -> mliw.LoadModelResult: model_bytes = fetch_result.model_bytes or request.raw_model if not model_bytes: @@ -61,6 +61,7 @@ def load_model( def transform_input( request: mliw.InferenceRequest, fetch_result: mliw.FetchInputResult, + device: str, ) -> mliw.TransformInputResult: # extra metadata for assembly can be found in request.input_meta raw_inputs = request.raw_inputs or fetch_result.inputs @@ -93,36 +94,11 @@ def execute( def transform_output( request: mliw.InferenceRequest, execute_result: mliw.ExecuteResult, + result_device: str, ) -> mliw.TransformOutputResult: - # transformed = [item.clone() for item in execute_result.predictions] - # return OutputTransformResult(transformed) - - # transformed = [item.bytes() for item in execute_result.predictions] - - # OutputTransformResult.transformed SHOULD be a list of - # capnproto Tensors Or tensor descriptors accompanying bytes - # send the original tensors... execute_result.predictions = [t.detach() for t in execute_result.predictions] # todo: solve sending all tensor metadata that coincisdes with each prediction return mliw.TransformOutputResult( execute_result.predictions, [1], "c", "float32" ) - # return OutputTransformResult(transformed) - - # @staticmethod - # def serialize_reply( - # request: InferenceRequest, results: OutputTransformResult - # ) -> t.Any: - # # results = IntegratedTorchWorker._prepare_outputs(results.outputs) - # # return results - # return None - # # response = MessageHandler.build_response( - # # status=200, # todo: are we satisfied with 0/1 (success, fail) - # # # todo: if not detailed messages, this shouldn't be returned. - # # message="success", - # # result=results, - # # custom_attributes=None, - # # ) - # # serialized_resp = MessageHandler.serialize_response(response) - # # return serialized_resp diff --git a/tests/test_dragon_run_policy.py b/tests/test_dragon_run_policy.py index 1d8d069fa..c94ae375b 100644 --- a/tests/test_dragon_run_policy.py +++ b/tests/test_dragon_run_policy.py @@ -143,7 +143,6 @@ def test_create_run_policy_run_request_no_run_policy() -> None: assert policy.device == Policy.Device.DEFAULT assert set(policy.cpu_affinity) == set() assert policy.gpu_affinity == [] - assert policy.affinity == Policy.Affinity.DEFAULT @pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @@ -167,7 +166,6 @@ def test_create_run_policy_run_request_default_run_policy() -> None: assert set(policy.cpu_affinity) == set() assert set(policy.gpu_affinity) == set() - assert policy.affinity == Policy.Affinity.DEFAULT @pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @@ -192,7 +190,6 @@ def test_create_run_policy_run_request_cpu_affinity_no_device() -> None: assert set(policy.cpu_affinity) == affinity assert policy.gpu_affinity == [] - assert policy.affinity == Policy.Affinity.SPECIFIC @pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @@ -216,7 +213,6 @@ def test_create_run_policy_run_request_cpu_affinity() -> None: assert set(policy.cpu_affinity) == affinity assert policy.gpu_affinity == [] - assert policy.affinity == Policy.Affinity.SPECIFIC @pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @@ -240,7 +236,6 @@ def test_create_run_policy_run_request_gpu_affinity() -> None: assert policy.cpu_affinity == [] assert set(policy.gpu_affinity) == set(affinity) - assert policy.affinity == Policy.Affinity.SPECIFIC @pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") diff --git a/tests/test_message_handler/test_build_model_key.py b/tests/test_message_handler/test_build_model_key.py index 135e96798..c09c787fc 100644 --- a/tests/test_message_handler/test_build_model_key.py +++ b/tests/test_message_handler/test_build_model_key.py @@ -35,10 +35,13 @@ def test_build_model_key_successful(): - model_key = handler.build_model_key("tensor_key") + fsd = "mock-feature-store-descriptor" + model_key = handler.build_model_key("tensor_key", fsd) assert model_key.key == "tensor_key" + assert model_key.featureStoreDescriptor == fsd def test_build_model_key_unsuccessful(): with pytest.raises(ValueError): - model_key = handler.build_model_key(100) + fsd = "mock-feature-store-descriptor" + model_key = handler.build_model_key(100, fsd) diff --git a/tests/test_message_handler/test_build_tensor_key.py b/tests/test_message_handler/test_build_tensor_key.py index 7abe9e853..6a28b80c4 100644 --- a/tests/test_message_handler/test_build_tensor_key.py +++ b/tests/test_message_handler/test_build_tensor_key.py @@ -35,10 +35,12 @@ def test_build_tensor_key_successful(): - tensor_key = handler.build_tensor_key("tensor_key") + fsd = "mock-feature-store-descriptor" + tensor_key = handler.build_tensor_key("tensor_key", fsd) assert tensor_key.key == "tensor_key" def test_build_tensor_key_unsuccessful(): with pytest.raises(ValueError): - tensor_key = handler.build_tensor_key(100) + fsd = "mock-feature-store-descriptor" + tensor_key = handler.build_tensor_key(100, fsd) diff --git a/tests/test_message_handler/test_output_descriptor.py b/tests/test_message_handler/test_output_descriptor.py index fd21eeb0d..beb9a4765 100644 --- a/tests/test_message_handler/test_output_descriptor.py +++ b/tests/test_message_handler/test_output_descriptor.py @@ -33,7 +33,8 @@ handler = MessageHandler() -tensor_key = handler.build_tensor_key("key") +fsd = "mock-feature-store-descriptor" +tensor_key = handler.build_tensor_key("key", fsd) @pytest.mark.parametrize( diff --git a/tests/test_message_handler/test_request.py b/tests/test_message_handler/test_request.py index 4cfc11584..ea9b04d64 100644 --- a/tests/test_message_handler/test_request.py +++ b/tests/test_message_handler/test_request.py @@ -31,14 +31,16 @@ # The tests in this file belong to the group_a group pytestmark = pytest.mark.group_a -model_key = MessageHandler.build_model_key("model_key") +fsd = "mock-feature-store-descriptor" + +model_key = MessageHandler.build_model_key("model_key", fsd) model = MessageHandler.build_model(b"model data", "model_name", "v0.0.1") -input_key1 = MessageHandler.build_tensor_key("input_key1") -input_key2 = MessageHandler.build_tensor_key("input_key2") +input_key1 = MessageHandler.build_tensor_key("input_key1", fsd) +input_key2 = MessageHandler.build_tensor_key("input_key2", fsd) -output_key1 = MessageHandler.build_tensor_key("output_key1") -output_key2 = MessageHandler.build_tensor_key("output_key2") +output_key1 = MessageHandler.build_tensor_key("output_key1", fsd) +output_key2 = MessageHandler.build_tensor_key("output_key2", fsd) output_descriptor1 = MessageHandler.build_output_tensor_descriptor( "c", [output_key1, output_key2], "int64", [] diff --git a/tests/test_message_handler/test_response.py b/tests/test_message_handler/test_response.py index 03bd9ba73..d6894eb5c 100644 --- a/tests/test_message_handler/test_response.py +++ b/tests/test_message_handler/test_response.py @@ -31,9 +31,10 @@ # The tests in this file belong to the group_a group pytestmark = pytest.mark.group_a +fsd = "mock-feature-store-descriptor" -result_key1 = MessageHandler.build_tensor_key("result_key1") -result_key2 = MessageHandler.build_tensor_key("result_key2") +result_key1 = MessageHandler.build_tensor_key("result_key1", fsd) +result_key2 = MessageHandler.build_tensor_key("result_key2", fsd) torch_attributes = MessageHandler.build_torch_response_attributes() tf_attributes = MessageHandler.build_tf_response_attributes()