From 7adafc757ae7a41d683bc304f26f7b4567ba2640 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 10 Apr 2024 13:36:12 +0000 Subject: [PATCH 01/21] add control message scaffolding to triton inference stage --- .../stages/inference_client_stage.hpp | 44 +++-- .../src/stages/inference_client_stage.cpp | 164 +++++++++++++----- morpheus/_lib/stages/__init__.pyi | 8 +- morpheus/_lib/stages/module.cpp | 21 ++- morpheus/pipeline/stage_base.py | 2 + morpheus/stages/inference/inference_stage.py | 3 +- .../inference/triton_inference_stage.py | 26 ++- 7 files changed, 201 insertions(+), 67 deletions(-) diff --git a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp index 24d142184d..cca6cfb3f2 100644 --- a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp +++ b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp @@ -18,6 +18,7 @@ #pragma once #include "morpheus/export.h" +#include "morpheus/messages/control.hpp" #include "morpheus/messages/multi_inference.hpp" #include "morpheus/messages/multi_response.hpp" #include "morpheus/types.hpp" @@ -93,12 +94,13 @@ class MORPHEUS_EXPORT IInferenceClient * @brief Perform inference with Triton Inference Server. * This class specifies which inference implementation category (Ex: NLP/FIL) is needed for inferencing. */ +template class MORPHEUS_EXPORT InferenceClientStage - : public mrc::pymrc::AsyncioRunnable, std::shared_ptr> + : public mrc::pymrc::AsyncioRunnable, std::shared_ptr> { public: - using sink_type_t = std::shared_ptr; - using source_type_t = std::shared_ptr; + using sink_type_t = std::shared_ptr; + using source_type_t = std::shared_ptr; /** * @brief Construct a new Inference Client Stage object @@ -117,11 +119,11 @@ class MORPHEUS_EXPORT InferenceClientStage std::vector output_mapping); /** - * Process a single MultiInferenceMessage by running the constructor-provided inference client against it's Tensor, - * and yields the result as a MultiResponseMessage + * Process a single InputT by running the constructor-provided inference client against it's Tensor, + * and yields the result as a OutputT */ - mrc::coroutines::AsyncGenerator> on_data( - std::shared_ptr&& data, std::shared_ptr on) override; + mrc::coroutines::AsyncGenerator> on_data( + std::shared_ptr&& data, std::shared_ptr on) override; private: std::string m_model_name; @@ -142,7 +144,7 @@ class MORPHEUS_EXPORT InferenceClientStage struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy { /** - * @brief Create and initialize a InferenceClientStage, and return the result + * @brief Create and initialize a MultiMessage-based InferenceClientStage, and return the result * * @param builder : Pipeline context object reference * @param name : Name of a stage reference @@ -152,9 +154,31 @@ struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy * @param needs_logits : Determines if logits are required. * @param inout_mapping : Dictionary used to map pipeline input/output names to Triton input/output names. Use this * if the Morpheus names do not match the model. - * @return std::shared_ptr> + * @return std::shared_ptr>> */ - static std::shared_ptr> init( + static std::shared_ptr>> init_mm( + mrc::segment::Builder& builder, + const std::string& name, + std::string model_name, + std::string server_url, + bool needs_logits, + std::map input_mapping, + std::map output_mapping); + + /** + * @brief Create and initialize a ControlMessage-based InferenceClientStage, and return the result + * + * @param builder : Pipeline context object reference + * @param name : Name of a stage reference + * @param model_name : Name of the model specifies which model can handle the inference requests that are sent to + * Triton inference + * @param server_url : Triton server URL. + * @param needs_logits : Determines if logits are required. + * @param inout_mapping : Dictionary used to map pipeline input/output names to Triton input/output names. Use this + * if the Morpheus names do not match the model. + * @return std::shared_ptr>> + */ + static std::shared_ptr>> init_cm( mrc::segment::Builder& builder, const std::string& name, std::string model_name, diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index 069ccd557e..dfe9eb457f 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -17,8 +17,11 @@ #include "morpheus/stages/inference_client_stage.hpp" +#include "morpheus/messages/control.hpp" #include "morpheus/messages/memory/response_memory.hpp" #include "morpheus/messages/memory/tensor_memory.hpp" +#include "morpheus/messages/multi_inference.hpp" +#include "morpheus/messages/multi_response.hpp" #include "morpheus/objects/dev_mem_info.hpp" #include "morpheus/objects/dtype.hpp" #include "morpheus/objects/tensor.hpp" @@ -37,11 +40,19 @@ #include #include #include +#include #include namespace { -static morpheus::ShapeType get_seq_ids(const morpheus::InferenceClientStage::sink_type_t& message) +using namespace morpheus; + +using InferenceClientStageMM = + InferenceClientStage; // NOLINT(readability-identifier-naming) +using InferenceClientStageCM = + InferenceClientStage; // NOLINT(readability-identifier-naming) + +static ShapeType get_seq_ids(const InferenceClientStageMM::sink_type_t& message) { // Take a copy of the sequence Ids allowing us to map rows in the response to rows in the dataframe // The output tensors we store in `reponse_memory` will all be of the same length as the the @@ -49,7 +60,7 @@ static morpheus::ShapeType get_seq_ids(const morpheus::InferenceClientStage::sin auto seq_ids = message->get_input("seq_ids"); const auto item_size = seq_ids.dtype().item_size(); - morpheus::ShapeType host_seq_ids(message->count); + ShapeType host_seq_ids(message->count); MRC_CHECK_CUDA(cudaMemcpy2D(host_seq_ids.data(), item_size, seq_ids.data(), @@ -61,35 +72,43 @@ static morpheus::ShapeType get_seq_ids(const morpheus::InferenceClientStage::sin return host_seq_ids; } -static void reduce_outputs(const morpheus::InferenceClientStage::sink_type_t& x, morpheus::TensorMap& output_tensors) +static void reduce_outputs(std::shared_ptr const& message, TensorMap& output_tensors) { + if (message->mess_count == message->count) + { + return; + } + // When our tensor lengths are longer than our dataframe we will need to use the seq_ids array to // lookup how the values should map back into the dataframe. - auto host_seq_ids = get_seq_ids(x); + auto host_seq_ids = get_seq_ids(message); for (auto& mapping : output_tensors) { auto& output_tensor = mapping.second; - morpheus::ShapeType shape = output_tensor.get_shape(); - morpheus::ShapeType stride = output_tensor.get_stride(); + ShapeType shape = output_tensor.get_shape(); + ShapeType stride = output_tensor.get_stride(); - morpheus::ShapeType reduced_shape{shape}; - reduced_shape[0] = x->mess_count; + ShapeType reduced_shape{shape}; + reduced_shape[0] = message->mess_count; - auto reduced_buffer = morpheus::MatxUtil::reduce_max( - morpheus::DevMemInfo{ - output_tensor.data(), output_tensor.dtype(), output_tensor.get_memory(), shape, stride}, + auto reduced_buffer = MatxUtil::reduce_max( + DevMemInfo{output_tensor.data(), output_tensor.dtype(), output_tensor.get_memory(), shape, stride}, host_seq_ids, 0, reduced_shape); - output_tensor.swap( - morpheus::Tensor::create(std::move(reduced_buffer), output_tensor.dtype(), reduced_shape, stride, 0)); + output_tensor.swap(Tensor::create(std::move(reduced_buffer), output_tensor.dtype(), reduced_shape, stride, 0)); } } -static void apply_logits(morpheus::TensorMap& output_tensors) +static void reduce_outputs(std::shared_ptr const& message, TensorMap& output_tensors) +{ + throw std::runtime_error("reduce_outputs not implemented"); +} + +static void apply_logits(TensorMap& output_tensors) { for (auto& mapping : output_tensors) { @@ -110,11 +129,12 @@ static void apply_logits(morpheus::TensorMap& output_tensors) namespace morpheus { -InferenceClientStage::InferenceClientStage(std::unique_ptr&& client, - std::string model_name, - bool needs_logits, - std::vector input_mapping, - std::vector output_mapping) : +template +InferenceClientStage::InferenceClientStage(std::unique_ptr&& client, + std::string model_name, + bool needs_logits, + std::vector input_mapping, + std::vector output_mapping) : m_model_name(std::move(model_name)), m_client(std::move(client)), m_needs_logits(needs_logits), @@ -149,8 +169,44 @@ struct ExponentialBackoff } }; -mrc::coroutines::AsyncGenerator> InferenceClientStage::on_data( - std::shared_ptr&& x, std::shared_ptr on) +static bool has_tensor(std::shared_ptr message, std::string const& tensor_name) +{ + return message->memory->has_tensor(tensor_name); +} + +static bool has_tensor(std::shared_ptr message, std::string const& tensor_name) +{ + return message->tensors()->has_tensor(tensor_name); +} + +static TensorObject get_tensor(std::shared_ptr message, std::string const& tensor_name) +{ + return message->get_input(tensor_name); +} + +static TensorObject get_tensor(std::shared_ptr message, std::string const& tensor_name) +{ + return message->tensors()->get_tensor(tensor_name); +} + +static std::shared_ptr make_response(std::shared_ptr message, + TensorMap output_tensor_map) +{ + // Final output of all mini-batches + auto response_mem = std::make_shared(message->mess_count, std::move(output_tensor_map)); + + return std::make_shared( + message->meta, message->mess_offset, message->mess_count, std::move(response_mem), 0, response_mem->count); +} + +static std::shared_ptr make_response(std::shared_ptr message, TensorMap output_tensor_map) +{ + throw std::runtime_error("make_response not implemented"); +} + +template +mrc::coroutines::AsyncGenerator> InferenceClientStage::on_data( + std::shared_ptr&& message, std::shared_ptr on) { int32_t retry_count = 0; @@ -192,9 +248,9 @@ mrc::coroutines::AsyncGenerator> Inference for (auto mapping : message_session->get_input_mappings(m_input_mapping)) { - if (x->memory->has_tensor(mapping.tensor_field_name)) + if (has_tensor(message, mapping.tensor_field_name)) { - model_input_tensors[mapping.model_field_name].swap(x->get_input(mapping.tensor_field_name)); + model_input_tensors[mapping.model_field_name].swap(get_tensor(message, mapping.tensor_field_name)); } } @@ -202,10 +258,7 @@ mrc::coroutines::AsyncGenerator> Inference co_await on->yield(); - if (x->mess_count != x->count) - { - reduce_outputs(x, model_output_tensors); - } + reduce_outputs(message, model_output_tensors); // If we need to do logits, do that here if (m_needs_logits) @@ -228,13 +281,7 @@ mrc::coroutines::AsyncGenerator> Inference } } - // Final output of all mini-batches - auto response_mem = std::make_shared(x->mess_count, std::move(output_tensor_map)); - - auto response = std::make_shared( - x->meta, x->mess_offset, x->mess_count, std::move(response_mem), 0, response_mem->count); - - co_yield std::move(response); + co_yield make_response(message, output_tensor_map); co_return; @@ -260,14 +307,45 @@ mrc::coroutines::AsyncGenerator> Inference } // ************ InferenceClientStageInterfaceProxy********* // -std::shared_ptr> InferenceClientStageInterfaceProxy::init( - mrc::segment::Builder& builder, - const std::string& name, - std::string server_url, - std::string model_name, - bool needs_logits, - std::map input_mappings, - std::map output_mappings) +std::shared_ptr>> +InferenceClientStageInterfaceProxy::init_mm(mrc::segment::Builder& builder, + const std::string& name, + std::string server_url, + std::string model_name, + bool needs_logits, + std::map input_mappings, + std::map output_mappings) +{ + std::vector input_mappings_{}; + std::vector output_mappings_{}; + + for (auto& mapping : input_mappings) + { + input_mappings_.emplace_back(TensorModelMapping{mapping.first, mapping.second}); + } + + for (auto& mapping : output_mappings) + { + output_mappings_.emplace_back(TensorModelMapping{mapping.first, mapping.second}); + } + + auto triton_client = std::make_unique(server_url); + auto triton_inference_client = std::make_unique(std::move(triton_client), model_name); + auto stage = builder.construct_object>( + name, std::move(triton_inference_client), model_name, needs_logits, input_mappings_, output_mappings_); + + return stage; +} + +// ************ InferenceClientStageInterfaceProxy********* // +std::shared_ptr>> +InferenceClientStageInterfaceProxy::init_cm(mrc::segment::Builder& builder, + const std::string& name, + std::string server_url, + std::string model_name, + bool needs_logits, + std::map input_mappings, + std::map output_mappings) { std::vector input_mappings_{}; std::vector output_mappings_{}; @@ -284,7 +362,7 @@ std::shared_ptr> InferenceClientStage auto triton_client = std::make_unique(server_url); auto triton_inference_client = std::make_unique(std::move(triton_client), model_name); - auto stage = builder.construct_object( + auto stage = builder.construct_object>( name, std::move(triton_inference_client), model_name, needs_logits, input_mappings_, output_mappings_); return stage; diff --git a/morpheus/_lib/stages/__init__.pyi b/morpheus/_lib/stages/__init__.pyi index 3a5a442202..44968ade01 100644 --- a/morpheus/_lib/stages/__init__.pyi +++ b/morpheus/_lib/stages/__init__.pyi @@ -24,7 +24,8 @@ __all__ = [ "FilterDetectionsStage", "FilterSource", "HttpServerSourceStage", - "InferenceClientStage", + "InferenceClientStageCM", + "InferenceClientStageMM", "KafkaSourceStage", "PreallocateMessageMetaStage", "PreallocateMultiMessageStage", @@ -68,7 +69,10 @@ class FilterDetectionsStage(mrc.core.segment.SegmentObject): class HttpServerSourceStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, bind_address: str = '127.0.0.1', port: int = 8080, endpoint: str = '/message', method: str = 'POST', accept_status: int = 201, sleep_time: float = 0.10000000149011612, queue_timeout: int = 5, max_queue_size: int = 1024, num_server_threads: int = 1, max_payload_size: int = 10485760, request_timeout: int = 30, lines: bool = False, stop_after: int = 0) -> None: ... pass -class InferenceClientStage(mrc.core.segment.SegmentObject): +class InferenceClientStageCM(mrc.core.segment.SegmentObject): + def __init__(self, builder: mrc.core.segment.Builder, name: str, server_url: str, model_name: str, needs_logits: bool, input_mapping: typing.Dict[str, str] = {}, output_mapping: typing.Dict[str, str] = {}) -> None: ... + pass +class InferenceClientStageMM(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, server_url: str, model_name: str, needs_logits: bool, input_mapping: typing.Dict[str, str] = {}, output_mapping: typing.Dict[str, str] = {}) -> None: ... pass class KafkaSourceStage(mrc.core.segment.SegmentObject): diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index 7b0d7ea293..4f3bbf5569 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -164,11 +164,24 @@ PYBIND11_MODULE(stages, _module) py::arg("filter_source"), py::arg("field_name") = "probs"); - py::class_, + py::class_>, mrc::segment::ObjectProperties, - std::shared_ptr>>( - _module, "InferenceClientStage", py::multiple_inheritance()) - .def(py::init<>(&InferenceClientStageInterfaceProxy::init), + std::shared_ptr>>>( + _module, "InferenceClientStageMM", py::multiple_inheritance()) + .def(py::init<>(&InferenceClientStageInterfaceProxy::init_mm), + py::arg("builder"), + py::arg("name"), + py::arg("server_url"), + py::arg("model_name"), + py::arg("needs_logits"), + py::arg("input_mapping") = py::dict(), + py::arg("output_mapping") = py::dict()); + + py::class_>, + mrc::segment::ObjectProperties, + std::shared_ptr>>>( + _module, "InferenceClientStageCM", py::multiple_inheritance()) + .def(py::init<>(&InferenceClientStageInterfaceProxy::init_cm), py::arg("builder"), py::arg("name"), py::arg("server_url"), diff --git a/morpheus/pipeline/stage_base.py b/morpheus/pipeline/stage_base.py index 3aa3b2f450..290ed83992 100644 --- a/morpheus/pipeline/stage_base.py +++ b/morpheus/pipeline/stage_base.py @@ -80,6 +80,8 @@ class StageBase(ABC, collections.abc.Hashable): __ID_COUNTER = AtomicInteger(0) + _schema: _pipeline.StageSchema + def __init__(self, config: Config): # Save the config self._config = config diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index e4111926e9..657ffee960 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -192,7 +192,7 @@ def accepted_types(self) -> typing.Tuple: typing.Tuple Tuple of input types. """ - return (MultiInferenceMessage, ) + return (MultiInferenceMessage, ControlMessage) def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MultiResponseMessage) @@ -366,6 +366,7 @@ def _split_batches(x: MultiInferenceMessage, max_batch_size: int) -> typing.List return out_resp + # TODO(cwharris): find out if this function is used. if not, delete it. @staticmethod def _convert_response( x: typing.Tuple[typing.List[MultiInferenceMessage], typing.List[TensorMemory]]) -> MultiResponseMessage: diff --git a/morpheus/stages/inference/triton_inference_stage.py b/morpheus/stages/inference/triton_inference_stage.py index e5901363f9..3c941b432a 100644 --- a/morpheus/stages/inference/triton_inference_stage.py +++ b/morpheus/stages/inference/triton_inference_stage.py @@ -32,6 +32,7 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes +from morpheus.messages import ControlMessage from morpheus.messages import MultiInferenceMessage from morpheus.messages.memory.tensor_memory import TensorMemory from morpheus.stages.inference.inference_stage import InferenceStage @@ -774,10 +775,21 @@ def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> TritonInfer needs_logits=self._needs_logits) def _get_cpp_inference_node(self, builder: mrc.Builder) -> mrc.SegmentObject: - return _stages.InferenceClientStage(builder, - self.unique_name, - self._server_url, - self._model_name, - self._needs_logits, - self._input_mapping, - self._output_mapping) + + if self._schema.input_type == ControlMessage: + return _stages.InferenceClientControlCM(builder, + self.unique_name, + self._server_url, + self._model_name, + self._needs_logits, + self._input_mapping, + self._output_mapping) + + + return _stages.InferenceClientStageMM(builder, + self.unique_name, + self._server_url, + self._model_name, + self._needs_logits, + self._input_mapping, + self._output_mapping) From d5cf5043a425f5e57024e1f6d41a423e4a6d659c Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 10 Apr 2024 15:46:48 +0000 Subject: [PATCH 02/21] support ControlMessage for TritonInferenceStage --- .../src/stages/inference_client_stage.cpp | 67 ++++++++++++------- morpheus/_lib/src/stages/triton_inference.cpp | 12 ++-- .../stages/test_triton_inference_stage.cpp | 3 +- morpheus/stages/inference/inference_stage.py | 5 +- .../inference/triton_inference_stage.py | 14 ++-- .../preprocess/preprocess_base_stage.py | 14 ++-- tests/test_abp.py | 53 +++++++++------ 7 files changed, 101 insertions(+), 67 deletions(-) diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index dfe9eb457f..1fa6263969 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -72,6 +73,26 @@ static ShapeType get_seq_ids(const InferenceClientStageMM::sink_type_t& message) return host_seq_ids; } +static bool has_tensor(std::shared_ptr message, std::string const& tensor_name) +{ + return message->memory->has_tensor(tensor_name); +} + +static bool has_tensor(std::shared_ptr message, std::string const& tensor_name) +{ + return message->tensors()->has_tensor(tensor_name); +} + +static TensorObject get_tensor(std::shared_ptr message, std::string const& tensor_name) +{ + return message->get_input(tensor_name); +} + +static TensorObject get_tensor(std::shared_ptr message, std::string const& tensor_name) +{ + return message->tensors()->get_tensor(tensor_name); +} + static void reduce_outputs(std::shared_ptr const& message, TensorMap& output_tensors) { if (message->mess_count == message->count) @@ -105,7 +126,7 @@ static void reduce_outputs(std::shared_ptr const& message static void reduce_outputs(std::shared_ptr const& message, TensorMap& output_tensors) { - throw std::runtime_error("reduce_outputs not implemented"); + // throw std::runtime_error("reduce_outputs not implemented"); } static void apply_logits(TensorMap& output_tensors) @@ -169,28 +190,8 @@ struct ExponentialBackoff } }; -static bool has_tensor(std::shared_ptr message, std::string const& tensor_name) -{ - return message->memory->has_tensor(tensor_name); -} - -static bool has_tensor(std::shared_ptr message, std::string const& tensor_name) -{ - return message->tensors()->has_tensor(tensor_name); -} - -static TensorObject get_tensor(std::shared_ptr message, std::string const& tensor_name) -{ - return message->get_input(tensor_name); -} - -static TensorObject get_tensor(std::shared_ptr message, std::string const& tensor_name) -{ - return message->tensors()->get_tensor(tensor_name); -} - static std::shared_ptr make_response(std::shared_ptr message, - TensorMap output_tensor_map) + TensorMap&& output_tensor_map) { // Final output of all mini-batches auto response_mem = std::make_shared(message->mess_count, std::move(output_tensor_map)); @@ -199,9 +200,10 @@ static std::shared_ptr make_response(std::shared_ptrmeta, message->mess_offset, message->mess_count, std::move(response_mem), 0, response_mem->count); } -static std::shared_ptr make_response(std::shared_ptr message, TensorMap output_tensor_map) +static std::shared_ptr make_response(std::shared_ptr message, TensorMap&& output_tensor_map) { - throw std::runtime_error("make_response not implemented"); + message->tensors()->set_tensors(std::move(output_tensor_map)); + return message; } template @@ -281,10 +283,25 @@ mrc::coroutines::AsyncGenerator> InferenceClientStage= 0 and ++retry_count > m_retry_max) + { + throw; + } + + LOG(WARNING) << "Exception while processing message for InferenceClientStage, attempting retry. ex.what(): " << ex.what(); } catch (...) { auto lock = std::unique_lock(m_session_mutex); diff --git a/morpheus/_lib/src/stages/triton_inference.cpp b/morpheus/_lib/src/stages/triton_inference.cpp index 6464c3be5d..561506a150 100644 --- a/morpheus/_lib/src/stages/triton_inference.cpp +++ b/morpheus/_lib/src/stages/triton_inference.cpp @@ -36,6 +36,7 @@ #include // for min #include #include +#include #include #include #include @@ -112,6 +113,7 @@ struct TritonInferOperation { CHECK_TRITON(m_client.async_infer( [this, handle](triton::client::InferResult* result) { + std::cout << "resuming..." << std::endl; m_result.reset(result); handle(); }, @@ -476,12 +478,14 @@ mrc::coroutines::Task TritonInferenceClientSession::infer(TensorMap&& const uint8_t* output_ptr = nullptr; size_t output_ptr_size = 0; + + CHECK_TRITON(results->RawData(model_output.name, &output_ptr, &output_ptr_size)); - DCHECK_EQ(stop - start, output_shape[0]); - DCHECK_EQ(output_tensor.bytes(), output_ptr_size); - DCHECK_NOTNULL(output_ptr); // NOLINT - DCHECK_NOTNULL(output_tensor.data()); // NOLINT + // DCHECK_EQ(stop - start, output_shape[0]); + // DCHECK_EQ(output_tensor.bytes(), output_ptr_size); + // DCHECK_NOTNULL(output_ptr); // NOLINT + // DCHECK_NOTNULL(output_tensor.data()); // NOLINT MRC_CHECK_CUDA(cudaMemcpy(output_tensor.data(), output_ptr, output_ptr_size, cudaMemcpyHostToDevice)); } diff --git a/morpheus/_lib/tests/stages/test_triton_inference_stage.cpp b/morpheus/_lib/tests/stages/test_triton_inference_stage.cpp index c7a566b011..df7785d259 100644 --- a/morpheus/_lib/tests/stages/test_triton_inference_stage.cpp +++ b/morpheus/_lib/tests/stages/test_triton_inference_stage.cpp @@ -309,7 +309,8 @@ TEST_F(TestTritonInferenceStage, SingleRow) // create the fake triton client used for testing. auto triton_client = std::make_unique(); auto triton_inference_client = std::make_unique(std::move(triton_client), ""); - auto stage = morpheus::InferenceClientStage(std::move(triton_inference_client), "", false, {}, {}); + auto stage = morpheus::InferenceClientStage( + std::move(triton_inference_client), "", false, {}, {}); // manually invoke the stage and iterate through the inference responses auto on = std::make_shared(); diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index 657ffee960..f3f618c5fb 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -195,7 +195,10 @@ def accepted_types(self) -> typing.Tuple: return (MultiInferenceMessage, ControlMessage) def compute_schema(self, schema: StageSchema): - schema.output_schema.set_type(MultiResponseMessage) + if schema.input_type == ControlMessage: + schema.output_schema.set_type(ControlMessage) + else: + schema.output_schema.set_type(MultiResponseMessage) def supports_cpp_node(self): # Default to False unless derived classes override this value diff --git a/morpheus/stages/inference/triton_inference_stage.py b/morpheus/stages/inference/triton_inference_stage.py index 3c941b432a..1121fd0c67 100644 --- a/morpheus/stages/inference/triton_inference_stage.py +++ b/morpheus/stages/inference/triton_inference_stage.py @@ -777,13 +777,13 @@ def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> TritonInfer def _get_cpp_inference_node(self, builder: mrc.Builder) -> mrc.SegmentObject: if self._schema.input_type == ControlMessage: - return _stages.InferenceClientControlCM(builder, - self.unique_name, - self._server_url, - self._model_name, - self._needs_logits, - self._input_mapping, - self._output_mapping) + return _stages.InferenceClientStageCM(builder, + self.unique_name, + self._server_url, + self._model_name, + self._needs_logits, + self._input_mapping, + self._output_mapping) return _stages.InferenceClientStageMM(builder, diff --git a/morpheus/stages/preprocess/preprocess_base_stage.py b/morpheus/stages/preprocess/preprocess_base_stage.py index 3731912026..f115e38053 100644 --- a/morpheus/stages/preprocess/preprocess_base_stage.py +++ b/morpheus/stages/preprocess/preprocess_base_stage.py @@ -61,15 +61,15 @@ def compute_schema(self, schema: StageSchema): if (schema.input_type == ControlMessage): self._use_control_message = True out_type = ControlMessage + self._preprocess_fn = self._get_preprocess_fn() else: self._use_control_message = False - - self._preprocess_fn = self._get_preprocess_fn() - preproc_sig = inspect.signature(self._preprocess_fn) - # If the innerfunction returns a type annotation, update the output type - if (preproc_sig.return_annotation - and typing_utils.issubtype(preproc_sig.return_annotation, MultiInferenceMessage)): - out_type = preproc_sig.return_annotation + self._preprocess_fn = self._get_preprocess_fn() + preproc_sig = inspect.signature(self._preprocess_fn) + # If the innerfunction returns a type annotation, update the output type + if (preproc_sig.return_annotation + and typing_utils.issubtype(preproc_sig.return_annotation, MultiInferenceMessage)): + out_type = preproc_sig.return_annotation schema.output_schema.set_type(out_type) diff --git a/tests/test_abp.py b/tests/test_abp.py index 86778bfdb6..64d1b8e7f2 100755 --- a/tests/test_abp.py +++ b/tests/test_abp.py @@ -27,6 +27,7 @@ from morpheus.config import Config from morpheus.config import ConfigFIL from morpheus.config import PipelineModes +from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiMessage @@ -119,7 +120,7 @@ def test_abp_cpp(config, tmp_path): config.mode = PipelineModes.FIL config.class_labels = ["mining"] config.model_max_batch_size = MODEL_MAX_BATCH_SIZE - config.pipeline_batch_size = 1024 + config.pipeline_batch_size = 2048 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 config.num_threads = 1 @@ -131,28 +132,36 @@ def test_abp_cpp(config, tmp_path): out_file = os.path.join(tmp_path, 'results.csv') results_file_name = os.path.join(tmp_path, 'results.json') - pipe = LinearPipeline(config) - pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False)) - pipe.add_stage(DeserializeStage(config)) - pipe.add_stage(PreprocessFILStage(config)) - - # We are feeding TritonInferenceStage the port to the grpc server because that is what the validation tests do - # but the code under-the-hood replaces this with the port number of the http server - pipe.add_stage( - TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001', - force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) - pipe.add_stage(AddClassificationsStage(config)) - pipe.add_stage(AddScoresStage(config, prefix="score_")) - pipe.add_stage( - ValidationStage(config, val_file_name=val_file_name, results_file_name=results_file_name, rel_tol=0.05)) - pipe.add_stage(SerializeStage(config)) - pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) - + def create_pipeline(message_type): + pipe = LinearPipeline(config) + pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False)) + pipe.add_stage(DeserializeStage(config, message_type=message_type)) + pipe.add_stage(PreprocessFILStage(config)) + + # We are feeding TritonInferenceStage the port to the grpc server because that is what the validation tests do + # but the code under-the-hood replaces this with the port number of the http server + pipe.add_stage( + TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001', + force_convert_inputs=True)) + pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage(AddClassificationsStage(config)) + pipe.add_stage(AddScoresStage(config, prefix="score_")) + pipe.add_stage( + ValidationStage(config, val_file_name=val_file_name, results_file_name=results_file_name, rel_tol=0.05, overwrite=True)) + pipe.add_stage(SerializeStage(config)) + pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=True)) + + return pipe + + # pipe = create_pipeline(MultiMessage) + # pipe.run() + + # compare_class_to_scores(out_file, config.class_labels, '', 'score_', threshold=0.5) + # results = calc_error_val(results_file_name) + # assert results.diff_rows == 0 + + pipe = create_pipeline(ControlMessage) pipe.run() - compare_class_to_scores(out_file, config.class_labels, '', 'score_', threshold=0.5) - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 @pytest.mark.slow From 175d6c8be6cb8500234009568881e45a20f9bdd4 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 10 Apr 2024 16:19:20 +0000 Subject: [PATCH 03/21] test both MultiMessage and Control message in test_abp_cpp --- .../src/stages/inference_client_stage.cpp | 1 - .../stages/postprocess/validation_stage.py | 3 +- tests/test_abp.py | 56 +++++++++---------- 3 files changed, 27 insertions(+), 33 deletions(-) diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index 1fa6263969..21dc2fabf1 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -126,7 +126,6 @@ static void reduce_outputs(std::shared_ptr const& message static void reduce_outputs(std::shared_ptr const& message, TensorMap& output_tensors) { - // throw std::runtime_error("reduce_outputs not implemented"); } static void apply_logits(TensorMap& output_tensors) diff --git a/morpheus/stages/postprocess/validation_stage.py b/morpheus/stages/postprocess/validation_stage.py index 1d62f18cab..c07fe27c80 100644 --- a/morpheus/stages/postprocess/validation_stage.py +++ b/morpheus/stages/postprocess/validation_stage.py @@ -24,6 +24,7 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.messages import MultiMessage +from morpheus.messages import ControlMessage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage logger = logging.getLogger(__name__) @@ -114,7 +115,7 @@ def accepted_types(self) -> typing.Tuple: Accepted input types. """ - return (MultiMessage, ) + return (MultiMessage, ControlMessage) def _do_comparison(self): results = self.get_results(clear=False) diff --git a/tests/test_abp.py b/tests/test_abp.py index 64d1b8e7f2..66b751693e 100755 --- a/tests/test_abp.py +++ b/tests/test_abp.py @@ -116,11 +116,12 @@ def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path): @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -def test_abp_cpp(config, tmp_path): +@pytest.mark.parametrize("message_type", [MultiMessage, ControlMessage]) +def test_abp_cpp(config, tmp_path, message_type): config.mode = PipelineModes.FIL config.class_labels = ["mining"] config.model_max_batch_size = MODEL_MAX_BATCH_SIZE - config.pipeline_batch_size = 2048 + config.pipeline_batch_size = 1024 config.feature_length = FEATURE_LENGTH config.edge_buffer_size = 128 config.num_threads = 1 @@ -132,37 +133,30 @@ def test_abp_cpp(config, tmp_path): out_file = os.path.join(tmp_path, 'results.csv') results_file_name = os.path.join(tmp_path, 'results.json') - def create_pipeline(message_type): - pipe = LinearPipeline(config) - pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False)) - pipe.add_stage(DeserializeStage(config, message_type=message_type)) - pipe.add_stage(PreprocessFILStage(config)) - - # We are feeding TritonInferenceStage the port to the grpc server because that is what the validation tests do - # but the code under-the-hood replaces this with the port number of the http server - pipe.add_stage( - TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001', - force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) - pipe.add_stage(AddClassificationsStage(config)) - pipe.add_stage(AddScoresStage(config, prefix="score_")) - pipe.add_stage( - ValidationStage(config, val_file_name=val_file_name, results_file_name=results_file_name, rel_tol=0.05, overwrite=True)) - pipe.add_stage(SerializeStage(config)) - pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=True)) - - return pipe - - # pipe = create_pipeline(MultiMessage) - # pipe.run() - - # compare_class_to_scores(out_file, config.class_labels, '', 'score_', threshold=0.5) - # results = calc_error_val(results_file_name) - # assert results.diff_rows == 0 - - pipe = create_pipeline(ControlMessage) + pipe = LinearPipeline(config) + pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False)) + pipe.add_stage(DeserializeStage(config, message_type=message_type)) + pipe.add_stage(PreprocessFILStage(config)) + + # We are feeding TritonInferenceStage the port to the grpc server because that is what the validation tests do + # but the code under-the-hood replaces this with the port number of the http server + pipe.add_stage( + TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001', + force_convert_inputs=True)) + pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage(AddClassificationsStage(config)) + pipe.add_stage(AddScoresStage(config, prefix="score_")) + pipe.add_stage( + ValidationStage(config, val_file_name=val_file_name, results_file_name=results_file_name, rel_tol=0.05, overwrite=True)) + pipe.add_stage(SerializeStage(config)) + pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=True)) + pipe.run() + compare_class_to_scores(out_file, config.class_labels, '', 'score_', threshold=0.5) + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 + @pytest.mark.slow @pytest.mark.use_python From 1f0746ca14e6d46793564eadcfe6c1723be62b2c Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 12 Apr 2024 17:02:37 +0000 Subject: [PATCH 04/21] add ControlMessage support to PreallocatorMixin --- .../include/morpheus/stages/preallocate.hpp | 8 ++++++ morpheus/_lib/stages/__init__.pyi | 4 +++ morpheus/_lib/stages/module.cpp | 9 +++++++ morpheus/pipeline/preallocator_mixin.py | 15 ++++++++--- tests/test_abp.py | 25 ++++++++++++------- 5 files changed, 49 insertions(+), 12 deletions(-) diff --git a/morpheus/_lib/include/morpheus/stages/preallocate.hpp b/morpheus/_lib/include/morpheus/stages/preallocate.hpp index 30b6b186c6..ab1cabdde0 100644 --- a/morpheus/_lib/include/morpheus/stages/preallocate.hpp +++ b/morpheus/_lib/include/morpheus/stages/preallocate.hpp @@ -17,6 +17,7 @@ #pragma once +#include "morpheus/messages/control.hpp" #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi.hpp" #include "morpheus/objects/dtype.hpp" // for TypeId @@ -51,11 +52,18 @@ void preallocate(std::shared_ptr msg, table.insert_missing_columns(columns); } +void preallocate(std::shared_ptr msg, + const std::vector>& columns) +{ + preallocate(msg->payload(), columns); +} + void preallocate(std::shared_ptr msg, const std::vector>& columns) { preallocate(msg->meta, columns); } + } // namespace /****** Component public implementations *******************/ diff --git a/morpheus/_lib/stages/__init__.pyi b/morpheus/_lib/stages/__init__.pyi index aa69aa3f40..85767bdcef 100644 --- a/morpheus/_lib/stages/__init__.pyi +++ b/morpheus/_lib/stages/__init__.pyi @@ -27,6 +27,7 @@ __all__ = [ "InferenceClientStageCM", "InferenceClientStageMM", "KafkaSourceStage", + "PreallocateControlMessageStage", "PreallocateMessageMetaStage", "PreallocateMultiMessageStage", "PreprocessFILControlMessageStage", @@ -81,6 +82,9 @@ class KafkaSourceStage(mrc.core.segment.SegmentObject): @typing.overload def __init__(self, builder: mrc.core.segment.Builder, name: str, max_batch_size: int, topics: typing.List[str], batch_timeout_ms: int, config: typing.Dict[str, str], disable_commits: bool = False, disable_pre_filtering: bool = False, stop_after: int = 0, async_commits: bool = True, oauth_callback: typing.Optional[function] = None) -> None: ... pass +class PreallocateControlMessageStage(mrc.core.segment.SegmentObject): + def __init__(self, builder: mrc.core.segment.Builder, name: str, needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: ... + pass class PreallocateMessageMetaStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, needed_columns: typing.List[typing.Tuple[str, morpheus._lib.common.TypeId]]) -> None: ... pass diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index 4f3bbf5569..37e4d17475 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -219,6 +219,15 @@ PYBIND11_MODULE(stages, _module) py::arg("async_commits") = true, py::arg("oauth_callback") = py::none()); + py::class_>, + mrc::segment::ObjectProperties, + std::shared_ptr>>>( + _module, "PreallocateControlMessageStage", py::multiple_inheritance()) + .def(py::init<>(&PreallocateStageInterfaceProxy::init), + py::arg("builder"), + py::arg("name"), + py::arg("needed_columns")); + py::class_>, mrc::segment::ObjectProperties, std::shared_ptr>>>( diff --git a/morpheus/pipeline/preallocator_mixin.py b/morpheus/pipeline/preallocator_mixin.py index 61e9cd3702..c40ed6be04 100644 --- a/morpheus/pipeline/preallocator_mixin.py +++ b/morpheus/pipeline/preallocator_mixin.py @@ -28,6 +28,7 @@ from morpheus.common import TypeId from morpheus.common import typeid_to_numpy_str from morpheus.config import CppConfig +from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta from morpheus.messages import MultiMessage from morpheus.utils.type_aliases import DataFrameType @@ -85,6 +86,10 @@ def _preallocate_multi(self, msg: MultiMessage) -> MultiMessage: self._preallocate_meta(msg.meta) return msg + def _preallocate_control(self, msg: ControlMessage) -> ControlMessage: + self._preallocate_meta(msg.payload()) + return msg + def _post_build_single(self, builder: mrc.Builder, out_node: mrc.SegmentObject) -> mrc.SegmentObject: out_type = self.output_ports[0].output_type pretty_type = pretty_print_type_name(out_type) @@ -92,17 +97,21 @@ def _post_build_single(self, builder: mrc.Builder, out_node: mrc.SegmentObject) if len(self._needed_columns) > 0: node_name = f"{self.unique_name}-preallocate" - if issubclass(out_type, (MessageMeta, MultiMessage)): + if issubclass(out_type, (ControlMessage, MessageMeta, MultiMessage)): # Intentionally not using `_build_cpp_node` because `LinearBoundaryIngressStage` lacks a C++ impl if CppConfig.get_should_use_cpp(): import morpheus._lib.stages as _stages needed_columns = list(self._needed_columns.items()) - if issubclass(out_type, MessageMeta): + if issubclass(out_type, ControlMessage): + node = _stages.PreallocateControlMessageStage(builder, node_name, needed_columns) + elif issubclass(out_type, MessageMeta): node = _stages.PreallocateMessageMetaStage(builder, node_name, needed_columns) else: node = _stages.PreallocateMultiMessageStage(builder, node_name, needed_columns) else: - if issubclass(out_type, MessageMeta): + if issubclass(out_type, ControlMessage): + node = builder.make_node(node_name, ops.map(self._preallocate_control)) + elif issubclass(out_type, MessageMeta): node = builder.make_node(node_name, ops.map(self._preallocate_meta)) else: node = builder.make_node(node_name, ops.map(self._preallocate_multi)) diff --git a/tests/test_abp.py b/tests/test_abp.py index 66b751693e..35ecd5ec92 100755 --- a/tests/test_abp.py +++ b/tests/test_abp.py @@ -240,7 +240,14 @@ def test_abp_multi_segment_no_cpp(mock_triton_client, config: Config, tmp_path): @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -def test_abp_multi_segment_cpp(config, tmp_path): +@pytest.mark.parametrize("message_type", [MultiMessage, ControlMessage]) +def test_abp_multi_segment_cpp(config, tmp_path, message_type): + + def get_boundary_type(boundary_type): + if message_type == ControlMessage: + return ControlMessage + return boundary_type + config.mode = PipelineModes.FIL config.class_labels = ["mining"] config.model_max_batch_size = MODEL_MAX_BATCH_SIZE @@ -258,13 +265,13 @@ def test_abp_multi_segment_cpp(config, tmp_path): pipe = LinearPipeline(config) pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False)) - pipe.add_stage(DeserializeStage(config)) + pipe.add_stage(DeserializeStage(config, message_type=message_type)) - pipe.add_segment_boundary(MultiMessage) # Boundary 1 + pipe.add_segment_boundary(get_boundary_type(MultiMessage)) # Boundary 1 pipe.add_stage(PreprocessFILStage(config)) - pipe.add_segment_boundary(MultiInferenceMessage) # Boundary 2 + pipe.add_segment_boundary(get_boundary_type(MultiInferenceMessage)) # Boundary 2 # We are feeding TritonInferenceStage the port to the grpc server because that is what the validation tests do # but the code under-the-hood replaces this with the port number of the http server @@ -272,17 +279,17 @@ def test_abp_multi_segment_cpp(config, tmp_path): TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001', force_convert_inputs=True)) - pipe.add_segment_boundary(MultiResponseMessage) # Boundary 3 + pipe.add_segment_boundary(get_boundary_type(MultiResponseMessage)) # Boundary 3 pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) pipe.add_stage(AddClassificationsStage(config)) - pipe.add_segment_boundary(MultiResponseMessage) # Boundary 4 + pipe.add_segment_boundary(get_boundary_type(MultiResponseMessage)) # Boundary 4 pipe.add_stage( ValidationStage(config, val_file_name=val_file_name, results_file_name=results_file_name, rel_tol=0.05)) - pipe.add_segment_boundary(MultiResponseMessage) # Boundary 5 + pipe.add_segment_boundary(get_boundary_type(MultiResponseMessage)) # Boundary 5 pipe.add_stage(SerializeStage(config)) @@ -292,5 +299,5 @@ def test_abp_multi_segment_cpp(config, tmp_path): pipe.run() - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 + # results = calc_error_val(results_file_name) + # assert results.diff_rows == 0 From f51d172c443db90b4a7f9a47de547935716e4a72 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 12 Apr 2024 17:10:10 +0000 Subject: [PATCH 05/21] styles --- .../morpheus/stages/inference_client_stage.hpp | 16 ++++++++-------- .../_lib/src/stages/inference_client_stage.cpp | 12 ++++++------ morpheus/_lib/src/stages/triton_inference.cpp | 1 - morpheus/_lib/stages/module.cpp | 7 ++++--- .../stages/inference/triton_inference_stage.py | 3 +-- morpheus/stages/postprocess/validation_stage.py | 2 +- tests/test_abp.py | 8 ++++++-- 7 files changed, 26 insertions(+), 23 deletions(-) diff --git a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp index cca6cfb3f2..71d6dd92af 100644 --- a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp +++ b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp @@ -156,14 +156,14 @@ struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy * if the Morpheus names do not match the model. * @return std::shared_ptr>> */ - static std::shared_ptr>> init_mm( - mrc::segment::Builder& builder, - const std::string& name, - std::string model_name, - std::string server_url, - bool needs_logits, - std::map input_mapping, - std::map output_mapping); + static std::shared_ptr>> + init_mm(mrc::segment::Builder& builder, + const std::string& name, + std::string model_name, + std::string server_url, + bool needs_logits, + std::map input_mapping, + std::map output_mapping); /** * @brief Create and initialize a ControlMessage-based InferenceClientStage, and return the result diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index 21dc2fabf1..26afe41c94 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -124,9 +124,7 @@ static void reduce_outputs(std::shared_ptr const& message } } -static void reduce_outputs(std::shared_ptr const& message, TensorMap& output_tensors) -{ -} +static void reduce_outputs(std::shared_ptr const& message, TensorMap& output_tensors) {} static void apply_logits(TensorMap& output_tensors) { @@ -190,7 +188,7 @@ struct ExponentialBackoff }; static std::shared_ptr make_response(std::shared_ptr message, - TensorMap&& output_tensor_map) + TensorMap&& output_tensor_map) { // Final output of all mini-batches auto response_mem = std::make_shared(message->mess_count, std::move(output_tensor_map)); @@ -199,7 +197,8 @@ static std::shared_ptr make_response(std::shared_ptrmeta, message->mess_offset, message->mess_count, std::move(response_mem), 0, response_mem->count); } -static std::shared_ptr make_response(std::shared_ptr message, TensorMap&& output_tensor_map) +static std::shared_ptr make_response(std::shared_ptr message, + TensorMap&& output_tensor_map) { message->tensors()->set_tensors(std::move(output_tensor_map)); return message; @@ -300,7 +299,8 @@ mrc::coroutines::AsyncGenerator> InferenceClientStage TritonInferenceClientSession::infer(TensorMap&& const uint8_t* output_ptr = nullptr; size_t output_ptr_size = 0; - CHECK_TRITON(results->RawData(model_output.name, &output_ptr, &output_ptr_size)); // DCHECK_EQ(stop - start, output_shape[0]); diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index 37e4d17475..d4b051b615 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -164,9 +164,10 @@ PYBIND11_MODULE(stages, _module) py::arg("filter_source"), py::arg("field_name") = "probs"); - py::class_>, - mrc::segment::ObjectProperties, - std::shared_ptr>>>( + py::class_< + mrc::segment::Object>, + mrc::segment::ObjectProperties, + std::shared_ptr>>>( _module, "InferenceClientStageMM", py::multiple_inheritance()) .def(py::init<>(&InferenceClientStageInterfaceProxy::init_mm), py::arg("builder"), diff --git a/morpheus/stages/inference/triton_inference_stage.py b/morpheus/stages/inference/triton_inference_stage.py index 1121fd0c67..4ee0bd90c4 100644 --- a/morpheus/stages/inference/triton_inference_stage.py +++ b/morpheus/stages/inference/triton_inference_stage.py @@ -775,7 +775,7 @@ def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> TritonInfer needs_logits=self._needs_logits) def _get_cpp_inference_node(self, builder: mrc.Builder) -> mrc.SegmentObject: - + if self._schema.input_type == ControlMessage: return _stages.InferenceClientStageCM(builder, self.unique_name, @@ -785,7 +785,6 @@ def _get_cpp_inference_node(self, builder: mrc.Builder) -> mrc.SegmentObject: self._input_mapping, self._output_mapping) - return _stages.InferenceClientStageMM(builder, self.unique_name, self._server_url, diff --git a/morpheus/stages/postprocess/validation_stage.py b/morpheus/stages/postprocess/validation_stage.py index c07fe27c80..7ae46db06f 100644 --- a/morpheus/stages/postprocess/validation_stage.py +++ b/morpheus/stages/postprocess/validation_stage.py @@ -23,8 +23,8 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config -from morpheus.messages import MultiMessage from morpheus.messages import ControlMessage +from morpheus.messages import MultiMessage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage logger = logging.getLogger(__name__) diff --git a/tests/test_abp.py b/tests/test_abp.py index 35ecd5ec92..0afe86ce81 100755 --- a/tests/test_abp.py +++ b/tests/test_abp.py @@ -142,12 +142,16 @@ def test_abp_cpp(config, tmp_path, message_type): # but the code under-the-hood replaces this with the port number of the http server pipe.add_stage( TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001', - force_convert_inputs=True)) + force_convert_inputs=True)) pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) pipe.add_stage(AddClassificationsStage(config)) pipe.add_stage(AddScoresStage(config, prefix="score_")) pipe.add_stage( - ValidationStage(config, val_file_name=val_file_name, results_file_name=results_file_name, rel_tol=0.05, overwrite=True)) + ValidationStage(config, + val_file_name=val_file_name, + results_file_name=results_file_name, + rel_tol=0.05, + overwrite=True)) pipe.add_stage(SerializeStage(config)) pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=True)) From 39281edc9c2546948c407a8855dbe9b1d110d13e Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 12 Apr 2024 17:11:58 +0000 Subject: [PATCH 06/21] styles --- .../_lib/include/morpheus/stages/inference_client_stage.hpp | 1 - morpheus/_lib/src/stages/inference_client_stage.cpp | 4 ++-- morpheus/_lib/src/stages/triton_inference.cpp | 2 +- morpheus/_lib/stages/module.cpp | 2 ++ 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp index 71d6dd92af..f650f2b30b 100644 --- a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp +++ b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp @@ -28,7 +28,6 @@ #include #include #include -#include #include #include diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index 26afe41c94..c5c7d84ddb 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -22,6 +22,7 @@ #include "morpheus/messages/memory/tensor_memory.hpp" #include "morpheus/messages/multi_inference.hpp" #include "morpheus/messages/multi_response.hpp" +#include "morpheus/objects/data_table.hpp" #include "morpheus/objects/dev_mem_info.hpp" #include "morpheus/objects/dtype.hpp" #include "morpheus/objects/tensor.hpp" @@ -29,15 +30,14 @@ #include "morpheus/stages/triton_inference.hpp" #include "morpheus/utilities/matx_util.hpp" -#include #include #include #include +#include #include #include #include -#include #include #include #include diff --git a/morpheus/_lib/src/stages/triton_inference.cpp b/morpheus/_lib/src/stages/triton_inference.cpp index 12b0254abe..d3bfae36db 100644 --- a/morpheus/_lib/src/stages/triton_inference.cpp +++ b/morpheus/_lib/src/stages/triton_inference.cpp @@ -36,8 +36,8 @@ #include // for min #include #include -#include #include +#include #include #include #include diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index d4b051b615..6cdba387f0 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -18,6 +18,8 @@ #include "morpheus/messages/control.hpp" #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi.hpp" +#include "morpheus/messages/multi_inference.hpp" +#include "morpheus/messages/multi_response.hpp" #include "morpheus/objects/file_types.hpp" #include "morpheus/stages/add_classification.hpp" #include "morpheus/stages/add_scores.hpp" From ba8fbf79526542352f711d1637b09cd316d79987 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 12 Apr 2024 17:13:36 +0000 Subject: [PATCH 07/21] add assertion back to abp test --- tests/test_abp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_abp.py b/tests/test_abp.py index 0afe86ce81..729c14e70d 100755 --- a/tests/test_abp.py +++ b/tests/test_abp.py @@ -303,5 +303,5 @@ def get_boundary_type(boundary_type): pipe.run() - # results = calc_error_val(results_file_name) - # assert results.diff_rows == 0 + results = calc_error_val(results_file_name) + assert results.diff_rows == 0 From ef1c6d49258230380750938f9fa92bf523fbddc2 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 12 Apr 2024 17:29:22 +0000 Subject: [PATCH 08/21] remove unused function --- morpheus/stages/inference/inference_stage.py | 49 ---------------- tests/test_inference_stage.py | 61 -------------------- 2 files changed, 110 deletions(-) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index f3f618c5fb..03227d5c77 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -369,55 +369,6 @@ def _split_batches(x: MultiInferenceMessage, max_batch_size: int) -> typing.List return out_resp - # TODO(cwharris): find out if this function is used. if not, delete it. - @staticmethod - def _convert_response( - x: typing.Tuple[typing.List[MultiInferenceMessage], typing.List[TensorMemory]]) -> MultiResponseMessage: - - # Convert a MultiInferenceMessage into a MultiResponseMessage - in_message = x[0] - out_message = x[1] - - assert len(in_message) == len(out_message) - - # Get the total output size - total_mess_count = reduce(lambda y, z: y + z.mess_count, in_message, 0) - - # Create a message data to store the entire list - probs = cp.zeros((total_mess_count, out_message[0].get_tensor('probs').shape[1])) - - saved_offset = in_message[0].mess_offset - saved_count = 0 - - for inf, res in zip(in_message, out_message): - - # Ensure they all share the same meta object. Otherwise this doesn't work - # assert inf.meta is saved_meta - - # Make sure we have a continuous list - assert inf.mess_offset == saved_offset + saved_count - - assert inf.count == res.count - - # Two scenarios: - if (inf.mess_count == inf.count): - # In message and out message have same count. Just use probs as is - probs[inf.offset:inf.offset + inf.count, :] = res.get_output('probs') - else: - mess_ids = inf.get_tensor("seq_ids")[:, 0].get().tolist() - - # Out message has more reponses, so we have to do key based blending of probs - for i, idx in enumerate(mess_ids): - probs[idx, :] = cp.maximum(probs[idx, :], res.get_output('probs')[i, :]) - - saved_count += inf.mess_count - - assert saved_count == total_mess_count, "Did not set every element in output" - - memory = TensorMemory(count=total_mess_count, tensors={'probs': probs}) - - return MultiResponseMessage.from_message(in_message[0], mess_count=saved_count, memory=memory) - @staticmethod def _convert_one_response(output: MultiResponseMessage, inf: MultiInferenceMessage, res: TensorMemory): # Make sure we have a continuous list diff --git a/tests/test_inference_stage.py b/tests/test_inference_stage.py index ee1989f2f9..1ac9adffae 100755 --- a/tests/test_inference_stage.py +++ b/tests/test_inference_stage.py @@ -121,67 +121,6 @@ def test_split_batches(): mock_message.get_slice.assert_has_calls([mock.call(0, 3), mock.call(3, 7), mock.call(7, 10)]) -@pytest.mark.use_python -def test_convert_response(): - # Pylint currently fails to work with classmethod: https://github.com/pylint-dev/pylint/issues/981 - # pylint: disable=no-member - - message_sizes = [3, 2, 1, 7, 4] - total_size = sum(message_sizes) - - full_input = _mk_message(mess_count=total_size, count=total_size) - - input_messages = [ - full_input.get_slice(sum(message_sizes[:i]), sum(message_sizes[:i]) + size) for i, - size in enumerate(message_sizes) - ] - - full_output = cp.random.rand(total_size, 3) - output_memory = [] - - for i, count in enumerate(message_sizes): - output_memory.append( - ResponseMemory(count=count, - tensors={"probs": full_output[sum(message_sizes[:i]):sum(message_sizes[:i]) + count, :]})) - - resp = InferenceStageT._convert_response((input_messages, output_memory)) - assert isinstance(resp, MultiResponseMessage) - assert resp.meta == full_input.meta - assert resp.mess_offset == 0 - assert resp.mess_count == total_size - assert isinstance(resp.memory, TensorMemory) - assert resp.offset == 0 - assert resp.count == total_size - assert (resp.memory.get_tensor("probs") == full_output).all() - - -def test_convert_response_errors(): - # Length of input messages doesn't match length of output messages - with pytest.raises(AssertionError): - InferenceStageT._convert_response(([1, 2, 3], [1, 2])) - - # Message offst of the second message doesn't line up offset+count of the first - msg1 = _mk_message() - msg2 = _mk_message(mess_offset=12) - - out_msg1 = ResponseMemory(count=1, tensors={"probs": cp.random.rand(1, 3)}) - out_msg2 = ResponseMemory(count=1, tensors={"probs": cp.random.rand(1, 3)}) - - with pytest.raises(AssertionError): - InferenceStageT._convert_response(([msg1, msg2], [out_msg1, out_msg2])) - - # mess_coutn and count don't match for msg2, and msg2.count != out_msg2.count - msg = _mk_message(mess_count=2, count=2) - msg1 = msg.get_slice(0, 1) - msg2 = msg.get_slice(1, 2) - - out_msg1 = ResponseMemory(count=1, tensors={"probs": cp.random.rand(1, 3)}) - out_msg2 = ResponseMemory(count=2, tensors={"probs": cp.random.rand(2, 3)}) - - with pytest.raises(AssertionError): - InferenceStageT._convert_response(([msg1, msg2], [out_msg1, out_msg2])) - - @pytest.mark.use_python def test_convert_one_response(): # Pylint currently fails to work with classmethod: https://github.com/pylint-dev/pylint/issues/981 From 04e2bf5568c50644eb27ced5a9da4111de7121c5 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 12 Apr 2024 17:41:30 +0000 Subject: [PATCH 09/21] fix styles --- .../src/stages/inference_client_stage.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index c5c7d84ddb..fe63aee237 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -20,6 +20,7 @@ #include "morpheus/messages/control.hpp" #include "morpheus/messages/memory/response_memory.hpp" #include "morpheus/messages/memory/tensor_memory.hpp" +#include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi_inference.hpp" #include "morpheus/messages/multi_response.hpp" #include "morpheus/objects/data_table.hpp" @@ -38,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -48,12 +50,7 @@ namespace { using namespace morpheus; -using InferenceClientStageMM = - InferenceClientStage; // NOLINT(readability-identifier-naming) -using InferenceClientStageCM = - InferenceClientStage; // NOLINT(readability-identifier-naming) - -static ShapeType get_seq_ids(const InferenceClientStageMM::sink_type_t& message) +static ShapeType get_seq_ids(const std::shared_ptr& message) { // Take a copy of the sequence Ids allowing us to map rows in the response to rows in the dataframe // The output tensors we store in `reponse_memory` will all be of the same length as the the @@ -124,7 +121,15 @@ static void reduce_outputs(std::shared_ptr const& message } } -static void reduce_outputs(std::shared_ptr const& message, TensorMap& output_tensors) {} +static void reduce_outputs(std::shared_ptr const& message, TensorMap& output_tensors) +{ + if (message->payload()->count() == message->tensors()->count) + { + return; + } + + throw std::runtime_error("reduce_outputs not implemented"); +} static void apply_logits(TensorMap& output_tensors) { From 9ba8bf8a088f3240e74624695d166c5d445aaadb Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 15 Apr 2024 17:34:41 +0000 Subject: [PATCH 10/21] add ControlMessage support to InferenceClientStage where reduce_outputs is necessary --- .../src/stages/inference_client_stage.cpp | 49 +++++++++++++++++-- morpheus/_lib/src/stages/triton_inference.cpp | 1 - tests/test_sid.py | 25 ++++++---- 3 files changed, 61 insertions(+), 14 deletions(-) diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index fe63aee237..edc6351364 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -70,6 +70,26 @@ static ShapeType get_seq_ids(const std::shared_ptr& messa return host_seq_ids; } +static ShapeType get_seq_ids(const std::shared_ptr& message) +{ + // Take a copy of the sequence Ids allowing us to map rows in the response to rows in the dataframe + // The output tensors we store in `reponse_memory` will all be of the same length as the the + // dataframe. seq_ids has three columns, but we are only interested in the first column. + auto seq_ids = message->tensors()->get_tensor("seq_ids"); + const auto item_size = seq_ids.dtype().item_size(); + + ShapeType host_seq_ids(message->tensors()->count); + MRC_CHECK_CUDA(cudaMemcpy2D(host_seq_ids.data(), + item_size, + seq_ids.data(), + seq_ids.stride(0) * item_size, + item_size, + host_seq_ids.size(), + cudaMemcpyDeviceToHost)); + + return host_seq_ids; +} + static bool has_tensor(std::shared_ptr message, std::string const& tensor_name) { return message->memory->has_tensor(tensor_name); @@ -128,7 +148,28 @@ static void reduce_outputs(std::shared_ptr const& message, Tenso return; } - throw std::runtime_error("reduce_outputs not implemented"); + // When our tensor lengths are longer than our dataframe we will need to use the seq_ids array to + // lookup how the values should map back into the dataframe. + auto host_seq_ids = get_seq_ids(message); + + for (auto& mapping : output_tensors) + { + auto& output_tensor = mapping.second; + + ShapeType shape = output_tensor.get_shape(); + ShapeType stride = output_tensor.get_stride(); + + ShapeType reduced_shape{shape}; + reduced_shape[0] = message->payload()->count(); + + auto reduced_buffer = MatxUtil::reduce_max( + DevMemInfo{output_tensor.data(), output_tensor.dtype(), output_tensor.get_memory(), shape, stride}, + host_seq_ids, + 0, + reduced_shape); + + output_tensor.swap(Tensor::create(std::move(reduced_buffer), output_tensor.dtype(), reduced_shape, stride, 0)); + } } static void apply_logits(TensorMap& output_tensors) @@ -205,7 +246,7 @@ static std::shared_ptr make_response(std::shared_ptr make_response(std::shared_ptr message, TensorMap&& output_tensor_map) { - message->tensors()->set_tensors(std::move(output_tensor_map)); + message->tensors(std::make_shared(message->payload()->count(), std::move(output_tensor_map))); return message; } @@ -286,7 +327,9 @@ mrc::coroutines::AsyncGenerator> InferenceClientStage Date: Mon, 15 Apr 2024 18:41:06 +0000 Subject: [PATCH 11/21] fix missing probs tensor --- morpheus/stages/inference/inference_stage.py | 4 ++-- morpheus/stages/preprocess/preprocess_nlp_stage.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index 03227d5c77..e67bfb8998 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -285,10 +285,10 @@ def set_output_fut(resp: TensorMemory, inner_batch, batch_future: mrc.Future): if (isinstance(_message, ControlMessage)): _df = cudf.DataFrame(output_message.get_meta()) if (_df is not None and not _df.empty): - embeddings = output_message.get_probs_tensor() - _df["embedding"] = embeddings.tolist() _message_meta = CppMessageMeta(df=_df) _message.payload(_message_meta) + _message.tensors().set_tensor("probs", output_message.get_probs_tensor()) + print(_df) output_message = _message return output_message diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index feace923dc..f495e11f00 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -35,7 +35,7 @@ from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiInferenceNLPMessage from morpheus.messages import MultiMessage -from morpheus.messages import TensorMemory as CppTensorMemory +from morpheus._lib.messages import TensorMemory as CppTensorMemory from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage from morpheus.utils.cudf_subword_helper import tokenize_text_series From dbe77f9860a76c158ebf1a60e87c7feac518a1e6 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 15 Apr 2024 20:37:04 +0000 Subject: [PATCH 12/21] fix styles --- morpheus/_lib/src/stages/inference_client_stage.cpp | 2 +- morpheus/_lib/src/stages/triton_inference.cpp | 1 - morpheus/stages/preprocess/preprocess_nlp_stage.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index edc6351364..99a95314ad 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -167,7 +167,7 @@ static void reduce_outputs(std::shared_ptr const& message, Tenso host_seq_ids, 0, reduced_shape); - + output_tensor.swap(Tensor::create(std::move(reduced_buffer), output_tensor.dtype(), reduced_shape, stride, 0)); } } diff --git a/morpheus/_lib/src/stages/triton_inference.cpp b/morpheus/_lib/src/stages/triton_inference.cpp index 2387dd1bf8..30f100e7ea 100644 --- a/morpheus/_lib/src/stages/triton_inference.cpp +++ b/morpheus/_lib/src/stages/triton_inference.cpp @@ -37,7 +37,6 @@ #include #include #include -#include #include #include #include diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index f495e11f00..5f0377f38b 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -25,6 +25,7 @@ import cudf import morpheus._lib.stages as _stages +from morpheus._lib.messages import TensorMemory as CppTensorMemory from morpheus.cli.register_stage import register_stage from morpheus.cli.utils import MorpheusRelativePath from morpheus.cli.utils import get_package_relative_file @@ -35,7 +36,6 @@ from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiInferenceNLPMessage from morpheus.messages import MultiMessage -from morpheus._lib.messages import TensorMemory as CppTensorMemory from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage from morpheus.utils.cudf_subword_helper import tokenize_text_series From 3b4fac6f195aa843993738ab5a14975276d9be06 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 15 Apr 2024 20:39:30 +0000 Subject: [PATCH 13/21] fix styles --- morpheus/stages/inference/inference_stage.py | 1 - morpheus/stages/preprocess/preprocess_nlp_stage.py | 2 +- tests/test_inference_stage.py | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index e67bfb8998..8b1fa75d3a 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -16,7 +16,6 @@ import typing from abc import abstractmethod from functools import partial -from functools import reduce import cupy as cp import mrc diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index 5f0377f38b..355258c1fd 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -27,8 +27,8 @@ import morpheus._lib.stages as _stages from morpheus._lib.messages import TensorMemory as CppTensorMemory from morpheus.cli.register_stage import register_stage -from morpheus.cli.utils import MorpheusRelativePath from morpheus.cli.utils import get_package_relative_file +from morpheus.cli.utils import MorpheusRelativePath from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.messages import ControlMessage diff --git a/tests/test_inference_stage.py b/tests/test_inference_stage.py index 1ac9adffae..e34f5a5bd4 100755 --- a/tests/test_inference_stage.py +++ b/tests/test_inference_stage.py @@ -25,7 +25,6 @@ from _utils.inference_worker import IW from morpheus.messages import ResponseMemory from morpheus.messages.memory.inference_memory import InferenceMemory -from morpheus.messages.memory.tensor_memory import TensorMemory from morpheus.messages.message_meta import MessageMeta from morpheus.messages.multi_inference_message import MultiInferenceMessage from morpheus.messages.multi_response_message import MultiResponseMessage From 843ee92f03f753e8aa02e950ca1d2d23c2105ab4 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 15 Apr 2024 20:50:25 +0000 Subject: [PATCH 14/21] style checks --- morpheus/stages/preprocess/preprocess_nlp_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index 355258c1fd..5f0377f38b 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -27,8 +27,8 @@ import morpheus._lib.stages as _stages from morpheus._lib.messages import TensorMemory as CppTensorMemory from morpheus.cli.register_stage import register_stage -from morpheus.cli.utils import get_package_relative_file from morpheus.cli.utils import MorpheusRelativePath +from morpheus.cli.utils import get_package_relative_file from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.messages import ControlMessage From b8396cc39eb8ed9cd55ed93f1bc41a442769bf4d Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 16 Apr 2024 11:43:38 +0000 Subject: [PATCH 15/21] fix import --- morpheus/stages/preprocess/preprocess_nlp_stage.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index 5f0377f38b..19cdf26d52 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -25,7 +25,6 @@ import cudf import morpheus._lib.stages as _stages -from morpheus._lib.messages import TensorMemory as CppTensorMemory from morpheus.cli.register_stage import register_stage from morpheus.cli.utils import MorpheusRelativePath from morpheus.cli.utils import get_package_relative_file @@ -36,6 +35,7 @@ from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiInferenceNLPMessage from morpheus.messages import MultiMessage +from morpheus.messages import TensorMemory from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage from morpheus.utils.cudf_subword_helper import tokenize_text_series @@ -205,12 +205,12 @@ def process_control_message(message: ControlMessage, del text_series message.tensors( - CppTensorMemory(count=tokenized.input_ids.shape[0], - tensors={ - "input_ids": tokenized.input_ids, - "input_mask": tokenized.input_mask, - "seq_ids": tokenized.segment_ids - })) + TensorMemory(count=tokenized.input_ids.shape[0], + tensors={ + "input_ids": tokenized.input_ids, + "input_mask": tokenized.input_mask, + "seq_ids": tokenized.segment_ids + })) message.set_metadata("inference_memory_params", {"inference_type": "nlp"}) return message From a0a3f2602f8a113b949ac21d54b0c0eb2ba3ef1e Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 16 Apr 2024 13:25:15 +0000 Subject: [PATCH 16/21] fix import --- morpheus/stages/preprocess/preprocess_nlp_stage.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/morpheus/stages/preprocess/preprocess_nlp_stage.py b/morpheus/stages/preprocess/preprocess_nlp_stage.py index 19cdf26d52..ec73a0bade 100644 --- a/morpheus/stages/preprocess/preprocess_nlp_stage.py +++ b/morpheus/stages/preprocess/preprocess_nlp_stage.py @@ -24,6 +24,7 @@ import cudf +import morpheus._lib.messages as _messages import morpheus._lib.stages as _stages from morpheus.cli.register_stage import register_stage from morpheus.cli.utils import MorpheusRelativePath @@ -35,7 +36,6 @@ from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiInferenceNLPMessage from morpheus.messages import MultiMessage -from morpheus.messages import TensorMemory from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage from morpheus.utils.cudf_subword_helper import tokenize_text_series @@ -205,12 +205,12 @@ def process_control_message(message: ControlMessage, del text_series message.tensors( - TensorMemory(count=tokenized.input_ids.shape[0], - tensors={ - "input_ids": tokenized.input_ids, - "input_mask": tokenized.input_mask, - "seq_ids": tokenized.segment_ids - })) + _messages.TensorMemory(count=tokenized.input_ids.shape[0], + tensors={ + "input_ids": tokenized.input_ids, + "input_mask": tokenized.input_mask, + "seq_ids": tokenized.segment_ids + })) message.set_metadata("inference_memory_params", {"inference_type": "nlp"}) return message From 21ff88d65555c0e13931cb25f175f56ec72c243b Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 22 Apr 2024 18:31:19 +0000 Subject: [PATCH 17/21] explicitly instantiate InferenceClientStage templates --- morpheus/_lib/src/stages/inference_client_stage.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index 99a95314ad..26428aa159 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -432,4 +432,7 @@ InferenceClientStageInterfaceProxy::init_cm(mrc::segment::Builder& builder, return stage; } +template class InferenceClientStage; +template class InferenceClientStage; + } // namespace morpheus From a98599e78234de81ad308be9f9c5c069b71fdd0b Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 22 Apr 2024 19:43:16 +0000 Subject: [PATCH 18/21] iwyu --- .../_lib/include/morpheus/stages/inference_client_stage.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp index f650f2b30b..3bc95fe3d1 100644 --- a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp +++ b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp @@ -23,11 +23,13 @@ #include "morpheus/messages/multi_response.hpp" #include "morpheus/types.hpp" +#include #include #include #include #include #include +#include #include #include From 8521f20804ac77687e3e8c5152c22fa83aafdc07 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 22 Apr 2024 20:04:44 +0000 Subject: [PATCH 19/21] iywu --- morpheus/_lib/src/stages/inference_client_stage.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index 26428aa159..403bb4a183 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include From ba11392a6a219d4ad839cb4a8366ff3ed1cad3aa Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 22 Apr 2024 20:18:02 +0000 Subject: [PATCH 20/21] iwyu --- morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp | 1 + morpheus/_lib/src/stages/inference_client_stage.cpp | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp index 3bc95fe3d1..fd115de5af 100644 --- a/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp +++ b/morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include diff --git a/morpheus/_lib/src/stages/inference_client_stage.cpp b/morpheus/_lib/src/stages/inference_client_stage.cpp index 403bb4a183..26428aa159 100644 --- a/morpheus/_lib/src/stages/inference_client_stage.cpp +++ b/morpheus/_lib/src/stages/inference_client_stage.cpp @@ -35,7 +35,6 @@ #include #include #include -#include #include #include From 25854d9cd407fc68fd4262ce97ce472b43462a06 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 24 Apr 2024 19:55:22 +0000 Subject: [PATCH 21/21] styles --- tests/test_sid.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_sid.py b/tests/test_sid.py index de578b1c65..b36903fd82 100755 --- a/tests/test_sid.py +++ b/tests/test_sid.py @@ -198,7 +198,11 @@ def test_minibert_no_trunc(config: Config, tmp_path: str, message_type: type, mo @pytest.mark.usefixtures("launch_mock_triton") @pytest.mark.parametrize("data_col_name", ["data", "definitely_not_data"]) @pytest.mark.parametrize("message_type", [MultiMessage, ControlMessage]) -def test_minibert_truncated(config: Config, tmp_path: str, message_type: type, morpheus_log_level: int, data_col_name: str): +def test_minibert_truncated(config: Config, + tmp_path: str, + message_type: type, + morpheus_log_level: int, + data_col_name: str): results = _run_minibert(config=config, tmp_path=tmp_path,