Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ControlMessage support in TritonInferenceStage and PreallocatorMixin #1610

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7adafc7
add control message scaffolding to triton inference stage
cwharris Apr 10, 2024
d5cf504
support ControlMessage for TritonInferenceStage
cwharris Apr 10, 2024
175d6c8
test both MultiMessage and Control message in test_abp_cpp
cwharris Apr 10, 2024
6e7c2b8
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into t…
cwharris Apr 10, 2024
c1e1d65
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into t…
cwharris Apr 12, 2024
1f0746c
add ControlMessage support to PreallocatorMixin
cwharris Apr 12, 2024
f51d172
styles
cwharris Apr 12, 2024
39281ed
styles
cwharris Apr 12, 2024
ba8fbf7
add assertion back to abp test
cwharris Apr 12, 2024
ef1c6d4
remove unused function
cwharris Apr 12, 2024
04e2bf5
fix styles
cwharris Apr 12, 2024
9ba8bf8
add ControlMessage support to InferenceClientStage where reduce_outpu…
cwharris Apr 15, 2024
c8cf5dd
fix missing probs tensor
cwharris Apr 15, 2024
dbe77f9
fix styles
cwharris Apr 15, 2024
3b4fac6
fix styles
cwharris Apr 15, 2024
843ee92
style checks
cwharris Apr 15, 2024
b8396cc
fix import
cwharris Apr 16, 2024
a0a3f26
fix import
cwharris Apr 16, 2024
a5c57d5
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into t…
cwharris Apr 17, 2024
c86f7eb
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into t…
cwharris Apr 19, 2024
b05fd90
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into t…
cwharris Apr 19, 2024
f228baf
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into t…
cwharris Apr 22, 2024
21ff88d
explicitly instantiate InferenceClientStage templates
cwharris Apr 22, 2024
a98599e
iwyu
cwharris Apr 22, 2024
8521f20
iywu
cwharris Apr 22, 2024
ba11392
iwyu
cwharris Apr 22, 2024
30b2d99
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into t…
cwharris Apr 23, 2024
2c653c7
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into t…
cwharris Apr 24, 2024
25854d9
styles
cwharris Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions morpheus/_lib/include/morpheus/stages/inference_client_stage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
#pragma once

#include "morpheus/export.h"
#include "morpheus/messages/control.hpp"
#include "morpheus/messages/multi_inference.hpp"
#include "morpheus/messages/multi_response.hpp"
#include "morpheus/types.hpp"

#include <boost/fiber/policy.hpp>
#include <mrc/coroutines/async_generator.hpp>
#include <mrc/coroutines/scheduler.hpp>
#include <mrc/coroutines/task.hpp>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <pybind11/pybind11.h>
#include <pymrc/asyncio_runnable.hpp>
#include <rxcpp/rx.hpp>

#include <cstdint>
#include <map>
Expand Down Expand Up @@ -93,12 +96,13 @@ class MORPHEUS_EXPORT IInferenceClient
* @brief Perform inference with Triton Inference Server.
* This class specifies which inference implementation category (Ex: NLP/FIL) is needed for inferencing.
*/
template <typename InputT, typename OutputT>
class MORPHEUS_EXPORT InferenceClientStage
: public mrc::pymrc::AsyncioRunnable<std::shared_ptr<MultiInferenceMessage>, std::shared_ptr<MultiResponseMessage>>
: public mrc::pymrc::AsyncioRunnable<std::shared_ptr<InputT>, std::shared_ptr<OutputT>>
{
public:
using sink_type_t = std::shared_ptr<MultiInferenceMessage>;
using source_type_t = std::shared_ptr<MultiResponseMessage>;
using sink_type_t = std::shared_ptr<InputT>;
using source_type_t = std::shared_ptr<OutputT>;

/**
* @brief Construct a new Inference Client Stage object
Expand All @@ -117,11 +121,11 @@ class MORPHEUS_EXPORT InferenceClientStage
std::vector<TensorModelMapping> output_mapping);

/**
* Process a single MultiInferenceMessage by running the constructor-provided inference client against it's Tensor,
* and yields the result as a MultiResponseMessage
* Process a single InputT by running the constructor-provided inference client against it's Tensor,
* and yields the result as a OutputT
*/
mrc::coroutines::AsyncGenerator<std::shared_ptr<MultiResponseMessage>> on_data(
std::shared_ptr<MultiInferenceMessage>&& data, std::shared_ptr<mrc::coroutines::Scheduler> on) override;
mrc::coroutines::AsyncGenerator<std::shared_ptr<OutputT>> on_data(
std::shared_ptr<InputT>&& data, std::shared_ptr<mrc::coroutines::Scheduler> on) override;

private:
std::string m_model_name;
Expand All @@ -142,7 +146,7 @@ class MORPHEUS_EXPORT InferenceClientStage
struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy
{
/**
* @brief Create and initialize a InferenceClientStage, and return the result
* @brief Create and initialize a MultiMessage-based InferenceClientStage, and return the result
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
Expand All @@ -152,9 +156,31 @@ struct MORPHEUS_EXPORT InferenceClientStageInterfaceProxy
* @param needs_logits : Determines if logits are required.
* @param inout_mapping : Dictionary used to map pipeline input/output names to Triton input/output names. Use this
* if the Morpheus names do not match the model.
* @return std::shared_ptr<mrc::segment::Object<InferenceClientStage>>
* @return std::shared_ptr<mrc::segment::Object<InferenceClientStage<MultiInferenceMessage, MultiResponseMessage>>>
*/
static std::shared_ptr<mrc::segment::Object<InferenceClientStage>> init(
static std::shared_ptr<mrc::segment::Object<InferenceClientStage<MultiInferenceMessage, MultiResponseMessage>>>
init_mm(mrc::segment::Builder& builder,
const std::string& name,
std::string model_name,
std::string server_url,
bool needs_logits,
std::map<std::string, std::string> input_mapping,
std::map<std::string, std::string> output_mapping);

/**
* @brief Create and initialize a ControlMessage-based InferenceClientStage, and return the result
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
* @param model_name : Name of the model specifies which model can handle the inference requests that are sent to
* Triton inference
* @param server_url : Triton server URL.
* @param needs_logits : Determines if logits are required.
* @param inout_mapping : Dictionary used to map pipeline input/output names to Triton input/output names. Use this
* if the Morpheus names do not match the model.
* @return std::shared_ptr<mrc::segment::Object<InferenceClientStage<ControlMessage, ControlMessage>>>
*/
static std::shared_ptr<mrc::segment::Object<InferenceClientStage<ControlMessage, ControlMessage>>> init_cm(
mrc::segment::Builder& builder,
const std::string& name,
std::string model_name,
Expand Down
8 changes: 8 additions & 0 deletions morpheus/_lib/include/morpheus/stages/preallocate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "morpheus/messages/control.hpp"
#include "morpheus/messages/meta.hpp"
#include "morpheus/messages/multi.hpp"
#include "morpheus/objects/dtype.hpp" // for TypeId
Expand Down Expand Up @@ -51,11 +52,18 @@ void preallocate(std::shared_ptr<morpheus::MessageMeta> msg,
table.insert_missing_columns(columns);
}

void preallocate(std::shared_ptr<morpheus::ControlMessage> msg,
const std::vector<std::tuple<std::string, morpheus::DType>>& columns)
{
preallocate(msg->payload(), columns);
}

void preallocate(std::shared_ptr<morpheus::MultiMessage> msg,
const std::vector<std::tuple<std::string, morpheus::DType>>& columns)
{
preallocate(msg->meta, columns);
}

} // namespace

/****** Component public implementations *******************/
Expand Down
Loading
Loading