Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration of dragon-based event broadcasting #710

Merged
merged 40 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c6a145a
Squash event integration
ankona Sep 18, 2024
49e0da4
Revert featurestorekey test changes
ankona Sep 18, 2024
fd0a5ec
capture dragon start method ex
ankona Sep 20, 2024
00a4496
Split message pump into separate module from dispatcher test
ankona Sep 20, 2024
a87aba7
extract msg_pump_factory for reuse in test_worker_manager.py
ankona Sep 23, 2024
4879122
add test verifying protoclient event raises
ankona Sep 24, 2024
6f1cba7
reduce timeouts & backoffs, share backbone across protoclient tests
ankona Sep 24, 2024
c0a1bca
Test eventing end-to-end in single process
ankona Sep 25, 2024
d81334d
docstrings & miscellaneous minor fixes (reuse descriptor code, add dr…
ankona Sep 25, 2024
f5b7b7d
fix infinite loop bug in consumer batch receive
ankona Sep 25, 2024
bd03711
Sort imports to solve dragon import issue in non-dragon tests
ankona Sep 25, 2024
98260f3
swap session scopes to module to destroy dragon resources
ankona Sep 26, 2024
5ba2a42
use make process local to avoid MPI issue, fix some test regressions,…
ankona Sep 26, 2024
1f4e6e3
more docstrings standard fixes
ankona Sep 26, 2024
9c8d127
reduce default worker connect timeout, fix test timeout issue due to …
ankona Sep 26, 2024
7442eb1
use constants in tests for env var strings, docstrings, remove commen…
ankona Sep 26, 2024
da20b5f
docstring formatting in tests
ankona Sep 27, 2024
f5ba5a6
docstrings
ankona Sep 27, 2024
d689754
parameterize ddict creation, add single ddict touchpoint util module,…
ankona Sep 27, 2024
7ddcd7c
remove completed todos, fix docstrings, remove obsolete/commented code
ankona Sep 27, 2024
762937c
extract notify listener from dragon backend, fix dragon import order,…
ankona Oct 1, 2024
51baf61
stringification bug fix
ankona Oct 1, 2024
3f4af8e
remove use of deprecated class
ankona Oct 1, 2024
979373c
review changes part 1, improve dragon errors handling, add unregister…
ankona Oct 1, 2024
5898005
import order follow-up
ankona Oct 1, 2024
af870f9
pr review updates
ankona Oct 4, 2024
27ab4f9
fix extra arg copypasta
ankona Oct 4, 2024
fac30bd
test bugs
ankona Oct 4, 2024
a1cf7ff
Remove from_sender_supplied_descriptor factory on FLI channel
ankona Oct 7, 2024
124a195
re-home protoclient into mli subpackage
ankona Oct 7, 2024
b2c4cb7
additional missed docstrings
ankona Oct 7, 2024
0c495f6
improve service param docstrings, avoid separate var/param descriptions
ankona Oct 8, 2024
1fc59e4
docstring fixes
ankona Oct 8, 2024
b316128
increase default memory per backbone node
ankona Oct 8, 2024
2ed47a4
fix fixture usage bug (worker queue preloaded into backbone)
ankona Oct 8, 2024
8ccebb5
extract dragon fixtures into dragon conftest.py
ankona Oct 8, 2024
608d6bd
remove unused import
ankona Oct 8, 2024
68d0d0c
fix send-multi with FLI after sender-supplied channel removal
ankona Oct 9, 2024
c28870f
Update dispatch tests to use dragon processes
ankona Oct 9, 2024
78d5598
Use cached FLI channel on single-send
ankona Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
test_hostlist = None
has_aprun = shutil.which("aprun") is not None


def get_account() -> str:
return test_account

Expand Down Expand Up @@ -227,7 +228,6 @@ def kill_all_test_spawned_processes() -> None:
print("Not all processes were killed after test")



def get_hostlist() -> t.Optional[t.List[str]]:
global test_hostlist
if not test_hostlist:
Expand Down
2 changes: 1 addition & 1 deletion doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ Jump to:

Description

- Implement asynchronous notifications for shared data
- Quick bug fix in _validate
- Add helper methods to MLI classes
- Update error handling for consistency
- Parameterize installation of dragon package with `smart build`
- Update docstrings
- Implement asynchronous notifications for shared data
- Filenames conform to snake case
- Update SmartSim environment variables using new naming convention
- Refactor `exception_handler`
Expand Down
125 changes: 39 additions & 86 deletions ex/high_throughput_inference/mock_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,102 +37,35 @@

import argparse
import io
import numpy
import os
import time

import torch

from mpi4py import MPI
from smartsim._core.mli.infrastructure.storage.dragon_feature_store import (
DragonFeatureStore,
)
from smartsim._core.mli.message_handler import MessageHandler
from smartsim.log import get_logger
from smartsim._core.utils.timings import PerfTimer

torch.set_num_interop_threads(16)
torch.set_num_threads(1)

logger = get_logger("App")
logger.info("Started app")

CHECK_RESULTS_AND_MAKE_ALL_SLOWER = False
from collections import OrderedDict

class ProtoClient:
def __init__(self, timing_on: bool):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
connect_to_infrastructure()
ddict_str = os.environ["_SMARTSIM_INFRA_BACKBONE"]
self._ddict = DDict.attach(ddict_str)
self._backbone_descriptor = DragonFeatureStore(self._ddict).descriptor
to_worker_fli_str = None
while to_worker_fli_str is None:
try:
to_worker_fli_str = self._ddict["to_worker_fli"]
self._to_worker_fli = fli.FLInterface.attach(to_worker_fli_str)
except KeyError:
time.sleep(1)
self._from_worker_ch = Channel.make_process_local()
self._from_worker_ch_serialized = self._from_worker_ch.serialize()
self._to_worker_ch = Channel.make_process_local()

self.perf_timer: PerfTimer = PerfTimer(debug=False, timing_on=timing_on, prefix=f"a{rank}_")

def run_model(self, model: bytes | str, batch: torch.Tensor):
tensors = [batch.numpy()]
self.perf_timer.start_timings("batch_size", batch.shape[0])
built_tensor_desc = MessageHandler.build_tensor_descriptor(
"c", "float32", list(batch.shape)
)
self.perf_timer.measure_time("build_tensor_descriptor")
if isinstance(model, str):
model_arg = MessageHandler.build_model_key(model, self._backbone_descriptor)
else:
model_arg = MessageHandler.build_model(model, "resnet-50", "1.0")
request = MessageHandler.build_request(
reply_channel=self._from_worker_ch_serialized,
model=model_arg,
inputs=[built_tensor_desc],
outputs=[],
output_descriptors=[],
custom_attributes=None,
)
self.perf_timer.measure_time("build_request")
request_bytes = MessageHandler.serialize_request(request)
self.perf_timer.measure_time("serialize_request")
with self._to_worker_fli.sendh(timeout=None, stream_channel=self._to_worker_ch) as to_sendh:
to_sendh.send_bytes(request_bytes)
self.perf_timer.measure_time("send_request")
for tensor in tensors:
to_sendh.send_bytes(tensor.tobytes()) #TODO NOT FAST ENOUGH!!!
self.perf_timer.measure_time("send_tensors")
with self._from_worker_ch.recvh(timeout=None) as from_recvh:
resp = from_recvh.recv_bytes(timeout=None)
self.perf_timer.measure_time("receive_response")
response = MessageHandler.deserialize_response(resp)
self.perf_timer.measure_time("deserialize_response")
# list of data blobs? recv depending on the len(response.result.descriptors)?
data_blob: bytes = from_recvh.recv_bytes(timeout=None)
self.perf_timer.measure_time("receive_tensor")
result = torch.from_numpy(
numpy.frombuffer(
data_blob,
dtype=str(response.result.descriptors[0].dataType),
)
)
self.perf_timer.measure_time("deserialize_tensor")
from smartsim.log import get_logger, log_to_file
from smartsim._core.mli.client.protoclient import ProtoClient

self.perf_timer.end_timings()
return result
logger = get_logger("App")

def set_model(self, key: str, model: bytes):
self._ddict[key] = model

CHECK_RESULTS_AND_MAKE_ALL_SLOWER = False


class ResNetWrapper:
"""Wrapper around a pre-rained ResNet model."""
def __init__(self, name: str, model: str):
"""Initialize the instance.

:param name: The name to use for the model
:param model: The path to the pre-trained PyTorch model"""
self._model = torch.jit.load(model)
self._name = name
buffer = io.BytesIO()
Expand All @@ -141,16 +74,28 @@ def __init__(self, name: str, model: str):
self._serialized_model = buffer.getvalue()

def get_batch(self, batch_size: int = 32):
"""Create a random batch of data with the correct dimensions to
invoke a ResNet model.

:param batch_size: The desired number of samples to produce
:returns: A PyTorch tensor"""
return torch.randn((batch_size, 3, 224, 224), dtype=torch.float32)

@property
def model(self):
def model(self) -> bytes:
"""The content of a model file.

:returns: The model bytes"""
return self._serialized_model

@property
def name(self):
def name(self) -> str:
"""The name applied to the model.

:returns: The name"""
return self._name


if __name__ == "__main__":

parser = argparse.ArgumentParser("Mock application")
Expand All @@ -166,24 +111,32 @@ def name(self):
if CHECK_RESULTS_AND_MAKE_ALL_SLOWER:
# TODO: adapt to non-Nvidia devices
torch_device = args.device.replace("gpu", "cuda")
pt_model = torch.jit.load(io.BytesIO(initial_bytes=(resnet.model))).to(torch_device)
pt_model = torch.jit.load(io.BytesIO(initial_bytes=(resnet.model))).to(
torch_device
)

TOTAL_ITERATIONS = 100

for log2_bsize in range(args.log_max_batchsize+1):
for log2_bsize in range(args.log_max_batchsize + 1):
b_size: int = 2**log2_bsize
logger.info(f"Batch size: {b_size}")
for iteration_number in range(TOTAL_ITERATIONS + int(b_size==1)):
for iteration_number in range(TOTAL_ITERATIONS + int(b_size == 1)):
logger.info(f"Iteration: {iteration_number}")
sample_batch = resnet.get_batch(b_size)
remote_result = client.run_model(resnet.name, sample_batch)
logger.info(client.perf_timer.get_last("total_time"))
if CHECK_RESULTS_AND_MAKE_ALL_SLOWER:
local_res = pt_model(sample_batch.to(torch_device))
err_norm = torch.linalg.vector_norm(torch.flatten(remote_result).to(torch_device)-torch.flatten(local_res), ord=1).cpu()
err_norm = torch.linalg.vector_norm(
torch.flatten(remote_result).to(torch_device)
- torch.flatten(local_res),
ord=1,
).cpu()
res_norm = torch.linalg.vector_norm(remote_result, ord=1).item()
local_res_norm = torch.linalg.vector_norm(local_res, ord=1).item()
logger.info(f"Avg norm of error {err_norm.item()/b_size} compared to result norm of {res_norm/b_size}:{local_res_norm/b_size}")
logger.info(
f"Avg norm of error {err_norm.item()/b_size} compared to result norm of {res_norm/b_size}:{local_res_norm/b_size}"
)
torch.cuda.synchronize()

client.perf_timer.print_timings(to_file=True)
client.perf_timer.print_timings(to_file=True)
61 changes: 28 additions & 33 deletions ex/high_throughput_inference/standalone_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from dragon.globalservices.api_setup import connect_to_infrastructure
from dragon.managed_memory import MemoryPool
from dragon.utils import b64decode, b64encode

# pylint enable=import-error

# isort: off
Expand All @@ -46,33 +47,27 @@
import base64
import multiprocessing as mp
import os
import pickle
import socket
import sys
import time
import typing as t

import cloudpickle
import optparse
import os

from smartsim._core.entrypoints.service import Service
from smartsim._core.mli.comm.channel.channel import CommChannelBase
from smartsim._core.mli.comm.channel.dragon_channel import DragonCommChannel
from smartsim._core.mli.comm.channel.dragon_fli import DragonFLIChannel
from smartsim._core.mli.infrastructure.storage.dragon_feature_store import (
DragonFeatureStore,
)
from smartsim._core.mli.comm.channel.dragon_util import create_local
from smartsim._core.mli.infrastructure.control.request_dispatcher import (
RequestDispatcher,
)
from smartsim._core.mli.infrastructure.control.worker_manager import WorkerManager
from smartsim._core.mli.infrastructure.environment_loader import EnvironmentConfigLoader
from smartsim._core.mli.infrastructure.storage.backbone_feature_store import (
BackboneFeatureStore,
)
from smartsim._core.mli.infrastructure.storage.dragon_feature_store import (
DragonFeatureStore,
)
from smartsim._core.mli.infrastructure.worker.worker import MachineLearningWorkerBase

from smartsim.log import get_logger

logger = get_logger("Worker Manager Entry Point")
Expand All @@ -85,7 +80,6 @@
logger.info(f"CPUS: {os.cpu_count()}")



def service_as_dragon_proc(
service: Service, cpu_affinity: list[int], gpu_affinity: list[int]
) -> dragon_process.Process:
Expand All @@ -108,8 +102,6 @@ def service_as_dragon_proc(
)




if __name__ == "__main__":
parser = argparse.ArgumentParser("Worker Manager")
parser.add_argument(
Expand Down Expand Up @@ -143,27 +135,26 @@ def service_as_dragon_proc(
args = parser.parse_args()

connect_to_infrastructure()
ddict_str = os.environ["_SMARTSIM_INFRA_BACKBONE"]
ddict = DDict.attach(ddict_str)
ddict_str = os.environ[BackboneFeatureStore.MLI_BACKBONE]

backbone = BackboneFeatureStore.from_descriptor(ddict_str)

to_worker_channel = Channel.make_process_local()
to_worker_channel = create_local()
to_worker_fli = fli.FLInterface(main_ch=to_worker_channel, manager_ch=None)
to_worker_fli_serialized = to_worker_fli.serialize()
ddict["to_worker_fli"] = to_worker_fli_serialized
to_worker_fli_comm_ch = DragonFLIChannel(to_worker_fli)

backbone.worker_queue = to_worker_fli_comm_ch.descriptor

os.environ[BackboneFeatureStore.MLI_WORKER_QUEUE] = to_worker_fli_comm_ch.descriptor
os.environ[BackboneFeatureStore.MLI_BACKBONE] = backbone.descriptor

arg_worker_type = cloudpickle.loads(
base64.b64decode(args.worker_class.encode("ascii"))
)

dfs = DragonFeatureStore(ddict)
comm_channel = DragonFLIChannel(to_worker_fli_serialized)

descriptor = base64.b64encode(to_worker_fli_serialized).decode("utf-8")
os.environ["_SMARTSIM_REQUEST_QUEUE"] = descriptor

config_loader = EnvironmentConfigLoader(
featurestore_factory=DragonFeatureStore.from_descriptor,
callback_factory=DragonCommChannel,
callback_factory=DragonCommChannel.from_descriptor,
queue_factory=DragonFLIChannel.from_descriptor,
)

Expand All @@ -178,7 +169,7 @@ def service_as_dragon_proc(
worker_device = args.device
for wm_idx in range(args.num_workers):

worker_manager = WorkerManager(
worker_manager = WorkerManager(
config_loader=config_loader,
worker_type=arg_worker_type,
as_service=True,
Expand All @@ -196,21 +187,25 @@ def service_as_dragon_proc(
# the GPU-to-CPU mapping is taken from the nvidia-smi tool
# TODO can this be computed on the fly?
gpu_to_cpu_aff: dict[int, list[int]] = {}
gpu_to_cpu_aff[0] = list(range(48,64)) + list(range(112,128))
gpu_to_cpu_aff[1] = list(range(32,48)) + list(range(96,112))
gpu_to_cpu_aff[2] = list(range(16,32)) + list(range(80,96))
gpu_to_cpu_aff[3] = list(range(0,16)) + list(range(64,80))
gpu_to_cpu_aff[0] = list(range(48, 64)) + list(range(112, 128))
gpu_to_cpu_aff[1] = list(range(32, 48)) + list(range(96, 112))
gpu_to_cpu_aff[2] = list(range(16, 32)) + list(range(80, 96))
gpu_to_cpu_aff[3] = list(range(0, 16)) + list(range(64, 80))

worker_manager_procs = []
for worker_idx in range(args.num_workers):
wm_cpus = len(gpu_to_cpu_aff[worker_idx]) - 4
wm_affinity = gpu_to_cpu_aff[worker_idx][:wm_cpus]
disp_affinity.extend(gpu_to_cpu_aff[worker_idx][wm_cpus:])
worker_manager_procs.append(service_as_dragon_proc(
worker_manager_procs.append(
service_as_dragon_proc(
worker_manager, cpu_affinity=wm_affinity, gpu_affinity=[worker_idx]
))
)
)

dispatcher_proc = service_as_dragon_proc(dispatcher, cpu_affinity=disp_affinity, gpu_affinity=[])
dispatcher_proc = service_as_dragon_proc(
dispatcher, cpu_affinity=disp_affinity, gpu_affinity=[]
)

# TODO: use ProcessGroup and restart=True?
all_procs = [dispatcher_proc, *worker_manager_procs]
Expand Down
12 changes: 6 additions & 6 deletions smartsim/_core/_cli/scripts/dragon_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
def _check(self) -> None:
"""Perform validation of this instance

:raises: ValueError if any value fails validation"""
:raises ValueError: if any value fails validation"""
if not self.repo_name or len(self.repo_name.split("/")) != 2:
raise ValueError(
f"Invalid dragon repository name. Example: `dragonhpc/dragon`"
Expand Down Expand Up @@ -95,13 +95,13 @@ def get_auth_token(request: DragonInstallRequest) -> t.Optional[Token]:
def create_dotenv(dragon_root_dir: pathlib.Path, dragon_version: str) -> None:
"""Create a .env file with required environment variables for the Dragon runtime"""
dragon_root = str(dragon_root_dir)
dragon_inc_dir = str(dragon_root_dir / "include")
dragon_lib_dir = str(dragon_root_dir / "lib")
dragon_bin_dir = str(dragon_root_dir / "bin")
dragon_inc_dir = dragon_root + "/include"
dragon_lib_dir = dragon_root + "/lib"
dragon_bin_dir = dragon_root + "/bin"

dragon_vars = {
"DRAGON_BASE_DIR": dragon_root,
"DRAGON_ROOT_DIR": dragon_root, # note: same as base_dir
"DRAGON_ROOT_DIR": dragon_root,
"DRAGON_INCLUDE_DIR": dragon_inc_dir,
"DRAGON_LIB_DIR": dragon_lib_dir,
"DRAGON_VERSION": dragon_version,
Expand Down Expand Up @@ -286,7 +286,7 @@ def retrieve_asset(
:param request: details of a request for the installation of the dragon package
:param asset: GitHub release asset to retrieve
:returns: path to the directory containing the extracted release asset
:raises: SmartSimCLIActionCancelled if the asset cannot be downloaded or extracted
:raises SmartSimCLIActionCancelled: if the asset cannot be downloaded or extracted
"""
download_dir = request.working_dir / str(asset.id)

Expand Down
Loading
Loading