Skip to content

Commit

Permalink
Schema performance improvements (#632)
Browse files Browse the repository at this point in the history
Schemas were enhanced for performance.

[committed by @AlyssaCote ]
[approved by @al-rigazzi @mellis13 ]
  • Loading branch information
AlyssaCote authored Jul 18, 2024
1 parent 272a1d7 commit 7169f1c
Show file tree
Hide file tree
Showing 21 changed files with 316 additions and 965 deletions.
1 change: 1 addition & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Jump to:

Description

- Adjust schemas for better performance
- Add TorchWorker first implementation and mock inference app example
- Add error handling in Worker Manager pipeline
- Add EnvironmentConfigLoader for ML Worker Manager
Expand Down
18 changes: 12 additions & 6 deletions ex/high_throughput_inference/mock_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ def print_timings(self, to_file: bool = False):


def run_model(self, model: bytes | str, batch: torch.Tensor):
tensors = [batch.numpy()]
self.start_timings(batch.shape[0])
built_tensor = MessageHandler.build_tensor(
batch.numpy(), "c", "float32", list(batch.shape))
self.measure_time("build_tensor")
built_tensor_desc = MessageHandler.build_tensor_descriptor(
"c", "float32", list(batch.shape))
self.measure_time("build_tensor_descriptor")
built_model = None
if isinstance(model, str):
model_arg = MessageHandler.build_model_key(model)
Expand All @@ -120,7 +121,7 @@ def run_model(self, model: bytes | str, batch: torch.Tensor):
request = MessageHandler.build_request(
reply_channel=self._from_worker_ch_serialized,
model= model_arg,
inputs=[built_tensor],
inputs=[built_tensor_desc],
outputs=[],
output_descriptors=[],
custom_attributes=None,
Expand All @@ -130,6 +131,9 @@ def run_model(self, model: bytes | str, batch: torch.Tensor):
self.measure_time("serialize_request")
with self._to_worker_fli.sendh(timeout=None, stream_channel=self._to_worker_ch) as to_sendh:
to_sendh.send_bytes(request_bytes)
for t in tensors:
to_sendh.send_bytes(t.tobytes()) #TODO NOT FAST ENOUGH!!!
# to_sendh.send_bytes(bytes(t.data))
logger.info(f"Message size: {len(request_bytes)} bytes")

self.measure_time("send")
Expand All @@ -138,10 +142,12 @@ def run_model(self, model: bytes | str, batch: torch.Tensor):
self.measure_time("receive")
response = MessageHandler.deserialize_response(resp)
self.measure_time("deserialize_response")
# list of data blobs? recv depending on the len(response.result.descriptors)?
data_blob = from_recvh.recv_bytes(timeout=None)
result = torch.from_numpy(
numpy.frombuffer(
response.result.data[0].blob,
dtype=str(response.result.data[0].tensorDescriptor.dataType),
data_blob,
dtype=str(response.result.descriptors[0].dataType),
)
)
self.measure_time("deserialize_tensor")
Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/mli/comm/channel/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def send(self, value: bytes) -> None:
:param value: The value to send"""

@abstractmethod
def recv(self) -> bytes:
def recv(self) -> t.List[bytes]:
"""Receieve a message through the underlying communication channel
:returns: the received message"""

Expand Down
5 changes: 3 additions & 2 deletions smartsim/_core/mli/comm/channel/dragonchannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import sys
import typing as t

import smartsim._core.mli.comm.channel.channel as cch
from smartsim.log import get_logger
Expand Down Expand Up @@ -52,9 +53,9 @@ def send(self, value: bytes) -> None:
with self._channel.sendh(timeout=None) as sendh:
sendh.send_bytes(value)

def recv(self) -> bytes:
def recv(self) -> t.List[bytes]:
"""Receieve a message through the underlying communication channel
:returns: the received message"""
with self._channel.recvh(timeout=None) as recvh:
message_bytes: bytes = recvh.recv_bytes(timeout=None)
return message_bytes
return [message_bytes]
17 changes: 10 additions & 7 deletions smartsim/_core/mli/comm/channel/dragonfli.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,16 @@ def send(self, value: bytes) -> None:
with self._fli.sendh(timeout=None, stream_channel=self._channel) as sendh:
sendh.send_bytes(value)

def recv(self) -> bytes:
def recv(self) -> t.List[bytes]:
"""Receieve a message through the underlying communication channel
:returns: the received message"""
messages = []
eot = False
with self._fli.recvh(timeout=None) as recvh:
try:
request_bytes: bytes
request_bytes, _ = recvh.recv_bytes(timeout=None)
return request_bytes
except fli.FLIEOT as exc:
return b""
while not eot:
try:
message, _ = recvh.recv_bytes(timeout=None)
messages.append(message)
except fli.FLIEOT as exc:
eot = True
return messages
52 changes: 32 additions & 20 deletions smartsim/_core/mli/infrastructure/control/workermanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

from smartsim._core.mli.mli_schemas.model.model_capnp import Model
from smartsim._core.mli.mli_schemas.response.response_capnp import Status
from smartsim._core.mli.mli_schemas.tensor.tensor_capnp import TensorDescriptor

logger = get_logger(__name__)

Expand Down Expand Up @@ -88,25 +89,23 @@ def deserialize_message(
elif request.model.which() == "data":
model_bytes = request.model.data

callback_key = request.replyChannel.reply
callback_key = request.replyChannel.descriptor

# todo: shouldn't this be `CommChannel.find` instead of `DragonCommChannel`
comm_channel = channel_type(callback_key)
# comm_channel = DragonCommChannel(request.replyChannel)

input_keys: t.Optional[t.List[str]] = None
input_bytes: t.Optional[t.List[bytes]] = (
None # these will really be tensors already
)
input_bytes: t.Optional[t.List[bytes]] = None

output_keys: t.Optional[t.List[str]] = None

input_meta: t.List[t.Any] = []
input_meta: t.Optional[t.List[TensorDescriptor]] = None

if request.input.which() == "keys":
input_keys = [input_key.key for input_key in request.input.keys]
elif request.input.which() == "data":
input_bytes = [data.blob for data in request.input.data]
input_meta = [data.tensorDescriptor for data in request.input.data]
elif request.input.which() == "descriptors":
input_meta = request.input.descriptors # type: ignore

if request.output:
output_keys = [tensor_key.key for tensor_key in request.output]
Expand Down Expand Up @@ -142,20 +141,13 @@ def prepare_outputs(reply: InferenceReply) -> t.List[t.Any]:
msg_key = MessageHandler.build_tensor_key(key)
prepared_outputs.append(msg_key)
elif reply.outputs:
arrays: t.List[np.ndarray[t.Any, np.dtype[t.Any]]] = [
output.numpy() for output in reply.outputs
]
for tensor in arrays:
# todo: need to have the output attributes specified in the req?
# maybe, add `MessageHandler.dtype_of(tensor)`?
# can `build_tensor` do dtype and shape?
msg_tensor = MessageHandler.build_tensor(
tensor,
for _ in reply.outputs:
msg_tensor_desc = MessageHandler.build_tensor_descriptor(
"c",
"float32",
[1],
)
prepared_outputs.append(msg_tensor)
prepared_outputs.append(msg_tensor_desc)
return prepared_outputs


Expand Down Expand Up @@ -272,13 +264,28 @@ def _on_iteration(self) -> None:
return

timings = [] # timing
# perform default deserialization of the message envelope
request_bytes: bytes = self._task_queue.recv()

bytes_list: t.List[bytes] = self._task_queue.recv()

if not bytes_list:
exception_handler(
ValueError("No request data found"),
None,
"No request data found.",
)
return

request_bytes = bytes_list[0]
tensor_bytes_list = bytes_list[1:]

interm = time.perf_counter() # timing
request = deserialize_message(
request_bytes, self._comm_channel_type, self._device
)

if request.input_meta and tensor_bytes_list:
request.raw_inputs = tensor_bytes_list

if not self._validate_request(request):
return

Expand Down Expand Up @@ -430,7 +437,12 @@ def _on_iteration(self) -> None:
timings.append(time.perf_counter() - interm) # timing
interm = time.perf_counter() # timing
if request.callback:
# send serialized response
request.callback.send(serialized_resp)
if reply.outputs:
# send tensor data after response
for output in reply.outputs:
request.callback.send(output)

timings.append(time.perf_counter() - interm) # timing
interm = time.perf_counter() # timing
Expand Down
10 changes: 8 additions & 2 deletions smartsim/_core/mli/infrastructure/worker/torch_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,16 @@ def transform_output(
result_device: str,
) -> TransformOutputResult:
if result_device != "cpu":
transformed = [item.to("cpu") for item in execute_result.predictions]
transformed = [
item.to("cpu").numpy().tobytes() for item in execute_result.predictions
]

# todo: need the shape from latest schemas added here.
return TransformOutputResult(transformed, None, "c", "float32") # fixme

return TransformOutputResult(
execute_result.predictions, None, "c", "float32"
[item.numpy().tobytes() for item in execute_result.predictions],
None,
"c",
"float32",
) # fixme
2 changes: 1 addition & 1 deletion smartsim/_core/mli/infrastructure/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
self.model_key = model_key
self.raw_model = raw_model
self.callback = callback
self.raw_inputs = raw_inputs
self.raw_inputs = raw_inputs or []
self.input_keys = input_keys or []
self.input_meta = input_meta or []
self.output_keys = output_keys or []
Expand Down
46 changes: 21 additions & 25 deletions smartsim/_core/mli/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import typing as t

import numpy as np

from .mli_schemas.data import data_references_capnp
from .mli_schemas.model import model_capnp
from .mli_schemas.request import request_capnp
Expand All @@ -38,17 +36,15 @@

class MessageHandler:
@staticmethod
def build_tensor(
tensor: np.ndarray[t.Any, np.dtype[t.Any]],
def build_tensor_descriptor(
order: "tensor_capnp.Order",
data_type: "tensor_capnp.NumericalType",
dimensions: t.List[int],
) -> tensor_capnp.Tensor:
) -> tensor_capnp.TensorDescriptor:
"""
Builds a Tensor message using the provided data,
Builds a TensorDescriptor message using the provided
order, data type, and dimensions.
:param tensor: Tensor to build the message around
:param order: Order of the tensor, such as row-major (c) or column-major (f)
:param data_type: Data type of the tensor
:param dimensions: Dimensions of the tensor
Expand All @@ -59,15 +55,12 @@ def build_tensor(
description.order = order
description.dataType = data_type
description.dimensions = dimensions
built_tensor = tensor_capnp.Tensor.new_message()
built_tensor.blob = tensor.tobytes() # tensor channel instead?
built_tensor.tensorDescriptor = description
except Exception as e:
raise ValueError(
"Error building tensor."
"Error building tensor descriptor."
) from e # TODO: create custom exception

return built_tensor
return description

@staticmethod
def build_output_tensor_descriptor(
Expand Down Expand Up @@ -240,15 +233,16 @@ def _assign_reply_channel(
:raises ValueError: if building fails
"""
try:
request.replyChannel.reply = reply_channel
request.replyChannel.descriptor = reply_channel
except Exception as e:
raise ValueError("Error building reply channel portion of request.") from e

@staticmethod
def _assign_inputs(
request: request_capnp.Request,
inputs: t.Union[
t.List[data_references_capnp.TensorKey], t.List[tensor_capnp.Tensor]
t.List[data_references_capnp.TensorKey],
t.List[tensor_capnp.TensorDescriptor],
],
) -> None:
"""
Expand All @@ -262,14 +256,13 @@ def _assign_inputs(
if inputs:
display_name = inputs[0].schema.node.displayName # type: ignore
input_class_name = display_name.split(":")[-1]
if input_class_name == "Tensor":
request.input.data = inputs # type: ignore
if input_class_name == "TensorDescriptor":
request.input.descriptors = inputs # type: ignore
elif input_class_name == "TensorKey":
request.input.keys = inputs # type: ignore
else:
raise ValueError(
"Invalid input class name. Expected 'Tensor' or 'TensorKey'."
)
raise ValueError("""Invalid input class name. Expected
'TensorDescriptor' or 'TensorKey'.""")
except Exception as e:
raise ValueError("Error building inputs portion of request.") from e

Expand Down Expand Up @@ -351,7 +344,8 @@ def build_request(
reply_channel: bytes,
model: t.Union[data_references_capnp.ModelKey, model_capnp.Model],
inputs: t.Union[
t.List[data_references_capnp.TensorKey], t.List[tensor_capnp.Tensor]
t.List[data_references_capnp.TensorKey],
t.List[tensor_capnp.TensorDescriptor],
],
outputs: t.List[data_references_capnp.TensorKey],
output_descriptors: t.List[tensor_capnp.OutputDescriptor],
Expand Down Expand Up @@ -437,7 +431,8 @@ def _assign_message(response: response_capnp.Response, message: str) -> None:
def _assign_result(
response: response_capnp.Response,
result: t.Union[
t.List[tensor_capnp.Tensor], t.List[data_references_capnp.TensorKey]
t.List[tensor_capnp.TensorDescriptor],
t.List[data_references_capnp.TensorKey],
],
) -> None:
"""
Expand All @@ -452,13 +447,13 @@ def _assign_result(
first_result = result[0]
display_name = first_result.schema.node.displayName # type: ignore
result_class_name = display_name.split(":")[-1]
if result_class_name == "Tensor":
response.result.data = result # type: ignore
if result_class_name == "TensorDescriptor":
response.result.descriptors = result # type: ignore
elif result_class_name == "TensorKey":
response.result.keys = result # type: ignore
else:
raise ValueError("""Invalid custom attribute class name.
Expected 'Tensor' or 'TensorKey'.""")
Expected 'TensorDescriptor' or 'TensorKey'.""")
except Exception as e:
raise ValueError("Error assigning result to response.") from e

Expand Down Expand Up @@ -501,7 +496,8 @@ def build_response(
status: "response_capnp.Status",
message: str,
result: t.Union[
t.List[tensor_capnp.Tensor], t.List[data_references_capnp.TensorKey]
t.List[tensor_capnp.TensorDescriptor],
t.List[data_references_capnp.TensorKey],
],
custom_attributes: t.Union[
response_attributes_capnp.TorchResponseAttributes,
Expand Down
4 changes: 2 additions & 2 deletions smartsim/_core/mli/mli_schemas/request/request.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ using DataRef = import "../data/data_references.capnp";
using Models = import "../model/model.capnp";

struct ChannelDescriptor {
reply @0 :Data;
descriptor @0 :Data;
}

struct Request {
Expand All @@ -43,7 +43,7 @@ struct Request {
}
input :union {
keys @3 :List(DataRef.TensorKey);
data @4 :List(Tensors.Tensor);
descriptors @4 :List(Tensors.TensorDescriptor);
}
output @5 :List(DataRef.TensorKey);
outputDescriptors @6 :List(Tensors.OutputDescriptor);
Expand Down
Loading

0 comments on commit 7169f1c

Please sign in to comment.