Skip to content

Commit

Permalink
Squash event integration
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Sep 18, 2024
1 parent b4798da commit 9b437cb
Show file tree
Hide file tree
Showing 37 changed files with 2,874 additions and 559 deletions.
2 changes: 1 addition & 1 deletion doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ Jump to:

Description

- Implement asynchronous notifications for shared data
- Parameterize installation of dragon package with `smart build`
- Update docstrings
- Implement asynchronous notifications for shared data
- Filenames conform to snake case
- Update SmartSim environment variables using new naming convention
- Refactor `exception_handler`
Expand Down
109 changes: 23 additions & 86 deletions ex/high_throughput_inference/mock_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,98 +37,26 @@

import argparse
import io
import numpy
import os
import time

import torch

from mpi4py import MPI
from smartsim._core.mli.infrastructure.storage.dragon_feature_store import (
DragonFeatureStore,
)
from smartsim._core.mli.message_handler import MessageHandler
from smartsim.log import get_logger
from smartsim._core.utils.timings import PerfTimer

torch.set_num_interop_threads(16)
torch.set_num_threads(1)

logger = get_logger("App")
logger.info("Started app")

CHECK_RESULTS_AND_MAKE_ALL_SLOWER = False
from collections import OrderedDict

class ProtoClient:
def __init__(self, timing_on: bool):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
connect_to_infrastructure()
ddict_str = os.environ["_SMARTSIM_INFRA_BACKBONE"]
self._ddict = DDict.attach(ddict_str)
self._backbone_descriptor = DragonFeatureStore(self._ddict).descriptor
to_worker_fli_str = None
while to_worker_fli_str is None:
try:
to_worker_fli_str = self._ddict["to_worker_fli"]
self._to_worker_fli = fli.FLInterface.attach(to_worker_fli_str)
except KeyError:
time.sleep(1)
self._from_worker_ch = Channel.make_process_local()
self._from_worker_ch_serialized = self._from_worker_ch.serialize()
self._to_worker_ch = Channel.make_process_local()

self.perf_timer: PerfTimer = PerfTimer(debug=False, timing_on=timing_on, prefix=f"a{rank}_")

def run_model(self, model: bytes | str, batch: torch.Tensor):
tensors = [batch.numpy()]
self.perf_timer.start_timings("batch_size", batch.shape[0])
built_tensor_desc = MessageHandler.build_tensor_descriptor(
"c", "float32", list(batch.shape)
)
self.perf_timer.measure_time("build_tensor_descriptor")
if isinstance(model, str):
model_arg = MessageHandler.build_model_key(model, self._backbone_descriptor)
else:
model_arg = MessageHandler.build_model(model, "resnet-50", "1.0")
request = MessageHandler.build_request(
reply_channel=self._from_worker_ch_serialized,
model=model_arg,
inputs=[built_tensor_desc],
outputs=[],
output_descriptors=[],
custom_attributes=None,
)
self.perf_timer.measure_time("build_request")
request_bytes = MessageHandler.serialize_request(request)
self.perf_timer.measure_time("serialize_request")
with self._to_worker_fli.sendh(timeout=None, stream_channel=self._to_worker_ch) as to_sendh:
to_sendh.send_bytes(request_bytes)
self.perf_timer.measure_time("send_request")
for tensor in tensors:
to_sendh.send_bytes(tensor.tobytes()) #TODO NOT FAST ENOUGH!!!
self.perf_timer.measure_time("send_tensors")
with self._from_worker_ch.recvh(timeout=None) as from_recvh:
resp = from_recvh.recv_bytes(timeout=None)
self.perf_timer.measure_time("receive_response")
response = MessageHandler.deserialize_response(resp)
self.perf_timer.measure_time("deserialize_response")
# list of data blobs? recv depending on the len(response.result.descriptors)?
data_blob: bytes = from_recvh.recv_bytes(timeout=None)
self.perf_timer.measure_time("receive_tensor")
result = torch.from_numpy(
numpy.frombuffer(
data_blob,
dtype=str(response.result.descriptors[0].dataType),
)
)
self.perf_timer.measure_time("deserialize_tensor")
from smartsim.log import get_logger, log_to_file
from smartsim.protoclient import ProtoClient

self.perf_timer.end_timings()
return result
logger = get_logger("App", "DEBUG")

def set_model(self, key: str, model: bytes):
self._ddict[key] = model

CHECK_RESULTS_AND_MAKE_ALL_SLOWER = False


class ResNetWrapper:
Expand All @@ -151,6 +79,7 @@ def model(self):
def name(self):
return self._name


if __name__ == "__main__":

parser = argparse.ArgumentParser("Mock application")
Expand All @@ -160,30 +89,38 @@ def name(self):

resnet = ResNetWrapper("resnet50", f"resnet50.{args.device}.pt")

client = ProtoClient(timing_on=True)
client.set_model(resnet.name, resnet.model)
client = ProtoClient(timing_on=True, wait_timeout=0)
# client.set_model(resnet.name, resnet.model)

if CHECK_RESULTS_AND_MAKE_ALL_SLOWER:
# TODO: adapt to non-Nvidia devices
torch_device = args.device.replace("gpu", "cuda")
pt_model = torch.jit.load(io.BytesIO(initial_bytes=(resnet.model))).to(torch_device)
pt_model = torch.jit.load(io.BytesIO(initial_bytes=(resnet.model))).to(
torch_device
)

TOTAL_ITERATIONS = 100

for log2_bsize in range(args.log_max_batchsize+1):
for log2_bsize in range(args.log_max_batchsize + 1):
b_size: int = 2**log2_bsize
logger.info(f"Batch size: {b_size}")
for iteration_number in range(TOTAL_ITERATIONS + int(b_size==1)):
for iteration_number in range(TOTAL_ITERATIONS + int(b_size == 1)):
logger.info(f"Iteration: {iteration_number}")
sample_batch = resnet.get_batch(b_size)
remote_result = client.run_model(resnet.name, sample_batch)
logger.info(client.perf_timer.get_last("total_time"))
if CHECK_RESULTS_AND_MAKE_ALL_SLOWER:
local_res = pt_model(sample_batch.to(torch_device))
err_norm = torch.linalg.vector_norm(torch.flatten(remote_result).to(torch_device)-torch.flatten(local_res), ord=1).cpu()
err_norm = torch.linalg.vector_norm(
torch.flatten(remote_result).to(torch_device)
- torch.flatten(local_res),
ord=1,
).cpu()
res_norm = torch.linalg.vector_norm(remote_result, ord=1).item()
local_res_norm = torch.linalg.vector_norm(local_res, ord=1).item()
logger.info(f"Avg norm of error {err_norm.item()/b_size} compared to result norm of {res_norm/b_size}:{local_res_norm/b_size}")
logger.info(
f"Avg norm of error {err_norm.item()/b_size} compared to result norm of {res_norm/b_size}:{local_res_norm/b_size}"
)
torch.cuda.synchronize()

client.perf_timer.print_timings(to_file=True)
client.perf_timer.print_timings(to_file=True)
31 changes: 13 additions & 18 deletions ex/high_throughput_inference/standalone_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from dragon.globalservices.api_setup import connect_to_infrastructure
from dragon.managed_memory import MemoryPool
from dragon.utils import b64decode, b64encode

# pylint enable=import-error

# isort: off
Expand All @@ -45,6 +46,7 @@
import argparse
import base64
import multiprocessing as mp
import optparse
import os
import pickle
import socket
Expand All @@ -53,26 +55,24 @@
import typing as t

import cloudpickle
import optparse
import os

from smartsim._core.entrypoints.service import Service
from smartsim._core.mli.comm.channel.channel import CommChannelBase
from smartsim._core.mli.comm.channel.dragon_channel import DragonCommChannel
from smartsim._core.mli.comm.channel.dragon_fli import DragonFLIChannel
from smartsim._core.mli.infrastructure.storage.dragon_feature_store import (
DragonFeatureStore,
)
from smartsim._core.mli.infrastructure.control.request_dispatcher import (
RequestDispatcher,
)
from smartsim._core.mli.infrastructure.control.worker_manager import WorkerManager
from smartsim._core.mli.infrastructure.environment_loader import EnvironmentConfigLoader
from smartsim._core.mli.infrastructure.storage.backbone_feature_store import (
BackboneFeatureStore,
)
from smartsim._core.mli.infrastructure.storage.dragon_feature_store import (
DragonFeatureStore,
)
from smartsim._core.mli.infrastructure.storage.feature_store import ReservedKeys
from smartsim._core.mli.infrastructure.worker.worker import MachineLearningWorkerBase

from smartsim.log import get_logger

logger = get_logger("Worker Manager Entry Point")
Expand All @@ -85,7 +85,6 @@
logger.info(f"CPUS: {os.cpu_count()}")



def service_as_dragon_proc(
service: Service, cpu_affinity: list[int], gpu_affinity: list[int]
) -> dragon_process.Process:
Expand All @@ -108,8 +107,6 @@ def service_as_dragon_proc(
)




if __name__ == "__main__":
parser = argparse.ArgumentParser("Worker Manager")
parser.add_argument(
Expand Down Expand Up @@ -144,26 +141,24 @@ def service_as_dragon_proc(

connect_to_infrastructure()
ddict_str = os.environ["_SMARTSIM_INFRA_BACKBONE"]
ddict = DDict.attach(ddict_str)

backbone = BackboneFeatureStore.from_descriptor(ddict_str)

to_worker_channel = Channel.make_process_local()
to_worker_fli = fli.FLInterface(main_ch=to_worker_channel, manager_ch=None)
to_worker_fli_serialized = to_worker_fli.serialize()
ddict["to_worker_fli"] = to_worker_fli_serialized
to_worker_fli_comm_channel = DragonFLIChannel(to_worker_fli, True)

backbone.worker_queue = to_worker_fli_comm_channel.descriptor

arg_worker_type = cloudpickle.loads(
base64.b64decode(args.worker_class.encode("ascii"))
)

dfs = DragonFeatureStore(ddict)
comm_channel = DragonFLIChannel(to_worker_fli_serialized)

descriptor = base64.b64encode(to_worker_fli_serialized).decode("utf-8")
os.environ["_SMARTSIM_REQUEST_QUEUE"] = descriptor
os.environ["_SMARTSIM_REQUEST_QUEUE"] = to_worker_fli_comm_channel.descriptor

config_loader = EnvironmentConfigLoader(
featurestore_factory=DragonFeatureStore.from_descriptor,
callback_factory=DragonCommChannel,
callback_factory=DragonCommChannel.from_descriptor,
queue_factory=DragonFLIChannel.from_descriptor,
)

Expand Down
Loading

0 comments on commit 9b437cb

Please sign in to comment.