Skip to content

Commit

Permalink
Enable environment variable based configuration for ML Worker Manager (
Browse files Browse the repository at this point in the history
…#621)

EnvironmentConfigLoader added for ML Worker Manager.
  • Loading branch information
AlyssaCote authored Jul 10, 2024
1 parent 8a2f173 commit 52abd32
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 22 deletions.
19 changes: 16 additions & 3 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
strategy:
fail-fast: false
matrix:
subset: [backends, slow_tests, group_a, group_b]
subset: [backends, slow_tests, group_a, group_b, dragon]
os: [macos-12, macos-14, ubuntu-22.04] # Operating systems
compiler: [8] # GNU compiler version
rai: [1.2.7] # Redis AI versions
Expand Down Expand Up @@ -112,9 +112,15 @@ jobs:
python -m pip install .[dev,ml]
- name: Install ML Runtimes with Smart (with pt, tf, and onnx support)
if: contains( matrix.os, 'ubuntu' ) || contains( matrix.os, 'macos-12')
if: (contains( matrix.os, 'ubuntu' ) || contains( matrix.os, 'macos-12')) && ( matrix.subset != 'dragon' )
run: smart build --device cpu --onnx -v

- name: Install ML Runtimes with Smart (with pt, tf, dragon, and onnx support)
if: (contains( matrix.os, 'ubuntu' ) || contains( matrix.os, 'macos-12')) && ( matrix.subset == 'dragon' )
run: |
smart build --device cpu --onnx --dragon -v
echo "LD_LIBRARY_PATH=$(python -c 'import site; print(site.getsitepackages()[0])')/smartsim/_core/.dragon/dragon-0.9/lib:$LD_LIBRARY_PATH" >> $GITHUB_ENV
- name: Install ML Runtimes with Smart (no ONNX,TF on Apple Silicon)
if: contains( matrix.os, 'macos-14' )
run: smart build --device cpu --no_tf -v
Expand Down Expand Up @@ -143,9 +149,16 @@ jobs:
echo "SMARTSIM_LOG_LEVEL=debug" >> $GITHUB_ENV
py.test -s --import-mode=importlib -o log_cli=true --cov=$(smart site) --cov-report=xml --cov-config=./tests/test_configs/cov/local_cov.cfg --ignore=tests/full_wlm/ ./tests/backends
# Run pytest (dragon subtests)
- name: Run Dragon Pytest
if: (matrix.subset == 'dragon' && matrix.os == 'ubuntu-22.04')
run: |
echo "SMARTSIM_LOG_LEVEL=debug" >> $GITHUB_ENV
dragon -s py.test -s --import-mode=importlib -o log_cli=true --cov=$(smart site) --cov-report=xml --cov-config=./tests/test_configs/cov/local_cov.cfg --ignore=tests/full_wlm/ -m ${{ matrix.subset }} ./tests
# Run pytest (test subsets)
- name: Run Pytest
if: "!contains(matrix.subset, 'backends')" # if not running backend tests
if: (matrix.subset != 'backends' && matrix.subset != 'dragon') # if not running backend tests or dragon tests
run: |
echo "SMARTSIM_LOG_LEVEL=debug" >> $GITHUB_ENV
py.test -s --import-mode=importlib -o log_cli=true --cov=$(smart site) --cov-report=xml --cov-config=./tests/test_configs/cov/local_cov.cfg --ignore=tests/full_wlm/ -m ${{ matrix.subset }} ./tests
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ tutorials-prod:
# help: test - Run all tests
.PHONY: test
test:
@python -m pytest --ignore=tests/full_wlm/
@python -m pytest --ignore=tests/full_wlm/ --ignore=tests/dragon

# help: test-verbose - Run all tests verbosely
.PHONY: test-verbose
Expand Down Expand Up @@ -192,3 +192,8 @@ test-full:
.PHONY: test-wlm
test-wlm:
@python -m pytest -vv tests/full_wlm/ tests/on_wlm

# help: test-dragon - Run dragon-specific tests
.PHONY: test-dragon
test-dragon:
@dragon pytest tests/dragon
3 changes: 2 additions & 1 deletion doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ Jump to:

Description

- Add EnvironmentConfigLoader for ML Worker Manager
- Add Model schema with model metadata included
- Removed device from schemas, MessageHandler and tests
- Add ML worker manager, sample worker, and feature store
- Added schemas and MessageHandler class for de/serialization of
- Add schemas and MessageHandler class for de/serialization of
inference requests and response messages


Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ markers = [
"group_a: fast test subset a",
"group_b: fast test subset b",
"slow_tests: tests that take a long duration to complete",
"dragon: tests that must be executed in a dragon runtime",
]

[tool.isort]
Expand Down
18 changes: 11 additions & 7 deletions smartsim/_core/mli/infrastructure/control/workermanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from smartsim._core.entrypoints.service import Service
from smartsim._core.mli.comm.channel.channel import CommChannelBase
from smartsim._core.mli.comm.channel.dragonchannel import DragonCommChannel
from smartsim._core.mli.infrastructure.environmentloader import EnvironmentConfigLoader
from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore
from smartsim._core.mli.infrastructure.worker.worker import (
InferenceReply,
Expand All @@ -43,6 +44,8 @@
from smartsim.log import get_logger

if t.TYPE_CHECKING:
from dragon.fli import FLInterface

from smartsim._core.mli.mli_schemas.model.model_capnp import Model
from smartsim._core.mli.mli_schemas.response.response_capnp import StatusEnum

Expand Down Expand Up @@ -162,28 +165,29 @@ class WorkerManager(Service):

def __init__(
self,
task_queue: "mp.Queue[bytes]",
config_loader: EnvironmentConfigLoader,
worker: MachineLearningWorkerBase,
feature_store: t.Optional[FeatureStore] = None,
as_service: bool = False,
cooldown: int = 0,
comm_channel_type: t.Type[CommChannelBase] = DragonCommChannel,
) -> None:
"""Initialize the WorkerManager
:param task_queue: The queue to monitor for new tasks
:param config_loader: Environment config loader that loads the task queue and
feature store
:param workers: A worker to manage
:param feature_store: The persistence mechanism
:param as_service: Specifies run-once or run-until-complete behavior of service
:param cooldown: Number of seconds to wait before shutting down afer
:param cooldown: Number of seconds to wait before shutting down after
shutdown criteria are met
:param comm_channel_type: The type of communication channel used for callbacks
"""
super().__init__(as_service, cooldown)

"""a collection of workers the manager is controlling"""
self._task_queue: "mp.Queue[bytes]" = task_queue
self._task_queue: t.Optional["FLInterface"] = config_loader.get_queue()
"""the queue the manager monitors for new tasks"""
self._feature_store: t.Optional[FeatureStore] = feature_store
self._feature_store: t.Optional[FeatureStore] = (
config_loader.get_feature_store()
)
"""a feature store to retrieve models from"""
self._worker = worker
"""The ML Worker implementation"""
Expand Down
61 changes: 61 additions & 0 deletions smartsim/_core/mli/infrastructure/environmentloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import base64
import os
import pickle
import typing as t

from dragon.fli import FLInterface # pylint: disable=all

from smartsim._core.mli.infrastructure.storage.featurestore import FeatureStore


class EnvironmentConfigLoader:
"""
Facilitates the loading of a FeatureStore and Queue
into the WorkerManager.
"""

def __init__(self) -> None:
self._feature_store_descriptor = os.getenv("SSFeatureStore", None)
self._queue_descriptor = os.getenv("SSQueue", None)
self.feature_store: t.Optional[FeatureStore] = None
self.queue: t.Optional["FLInterface"] = None

def get_feature_store(self) -> t.Optional[FeatureStore]:
"""Loads the Feature Store previously set in SSFeatureStore"""
if self._feature_store_descriptor is not None:
self.feature_store = pickle.loads(
base64.b64decode(self._feature_store_descriptor)
)
return self.feature_store

def get_queue(self) -> t.Optional["FLInterface"]:
"""Returns the Queue previously set in SSQueue"""
if self._queue_descriptor is not None:
self.queue = FLInterface.attach(base64.b64decode(self._queue_descriptor))
return self.queue
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from smartsim.log import get_logger

if t.TYPE_CHECKING:
from dragon.data.distdictionary.dragon_dict import DragonDict
from dragon.data.ddict.ddict import DDict


logger = get_logger(__name__)
Expand All @@ -40,7 +40,7 @@
class DragonFeatureStore(FeatureStore):
"""A feature store backed by a dragon distributed dictionary"""

def __init__(self, storage: "DragonDict") -> None:
def __init__(self, storage: "DDict") -> None:
"""Initialize the DragonFeatureStore instance"""
self._storage = storage

Expand Down
Empty file added tests/dragon/__init__.py
Empty file.
152 changes: 152 additions & 0 deletions tests/dragon/test_environment_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import base64
import os
import pickle

import pytest

dragon = pytest.importorskip("dragon")

import dragon.utils as du
from dragon.channels import Channel
from dragon.data.ddict.ddict import DDict
from dragon.fli import DragonFLIError, FLInterface

from smartsim._core.mli.infrastructure.environmentloader import EnvironmentConfigLoader
from smartsim._core.mli.infrastructure.storage.dragonfeaturestore import (
DragonFeatureStore,
)

from .utils.featurestore import MemoryFeatureStore

# The tests in this file belong to the dragon group
pytestmark = pytest.mark.dragon


@pytest.mark.parametrize(
"content",
[
pytest.param(b"a"),
pytest.param(b"new byte string"),
],
)
def test_environment_loader_attach_FLI(content, monkeypatch):
"""A descriptor can be stored, loaded, and reattached"""
chan = Channel.make_process_local()
queue = FLInterface(main_ch=chan)
monkeypatch.setenv("SSQueue", du.B64.bytes_to_str(queue.serialize()))

config = EnvironmentConfigLoader()
config_queue = config.get_queue()

new_sender = config_queue.sendh(use_main_as_stream_channel=True)
new_sender.send_bytes(content)

old_recv = queue.recvh(use_main_as_stream_channel=True)
result, _ = old_recv.recv_bytes()
assert result == content


def test_environment_loader_serialize_FLI(monkeypatch):
"""The serialized descriptors of a loaded and unloaded
queue are the same"""
chan = Channel.make_process_local()
queue = FLInterface(main_ch=chan)
monkeypatch.setenv("SSQueue", du.B64.bytes_to_str(queue.serialize()))

config = EnvironmentConfigLoader()
config_queue = config.get_queue()
assert config_queue.serialize() == queue.serialize()


def test_environment_loader_FLI_fails(monkeypatch):
"""An incorrect serialized descriptor will fails to attach"""
monkeypatch.setenv("SSQueue", "randomstring")
config = EnvironmentConfigLoader()

with pytest.raises(DragonFLIError):
config_queue = config.get_queue()


@pytest.mark.parametrize(
"expected_keys, expected_values",
[
pytest.param(["key1", "key2", "key3"], ["value1", "value2", "value3"]),
pytest.param(["another key"], ["another value"]),
],
)
def test_environment_loader_memory_featurestore(
expected_keys, expected_values, monkeypatch
):
"""MemoryFeatureStores can be correctly serialized and deserialized"""
feature_store = MemoryFeatureStore()
key_value_pairs = zip(expected_keys, expected_values)
for k, v in key_value_pairs:
feature_store[k] = v
monkeypatch.setenv(
"SSFeatureStore", base64.b64encode(pickle.dumps(feature_store)).decode("utf-8")
)
config = EnvironmentConfigLoader()
config_feature_store = config.get_feature_store()

for k, _ in key_value_pairs:
assert config_feature_store[k] == feature_store[k]


@pytest.mark.parametrize(
"expected_keys, expected_values",
[
pytest.param(["key1", "key2", "key3"], ["value1", "value2", "value3"]),
pytest.param(["another key"], ["another value"]),
],
)
def test_environment_loader_dragon_featurestore(
expected_keys, expected_values, monkeypatch
):
"""DragonFeatureStores can be correctly serialized and deserialized"""
storage = DDict()
feature_store = DragonFeatureStore(storage)
key_value_pairs = zip(expected_keys, expected_values)
for k, v in key_value_pairs:
feature_store[k] = v
monkeypatch.setenv(
"SSFeatureStore", base64.b64encode(pickle.dumps(feature_store)).decode("utf-8")
)
config = EnvironmentConfigLoader()
config_feature_store = config.get_feature_store()

for k, _ in key_value_pairs:
assert config_feature_store[k] == feature_store[k]


def test_environment_variables_not_set():
"""EnvironmentConfigLoader getters return None when environment
variables are not set"""
config = EnvironmentConfigLoader()
assert config.get_feature_store() == None
assert config.get_queue() == None
Empty file added tests/dragon/utils/__init__.py
Empty file.
Loading

0 comments on commit 52abd32

Please sign in to comment.