From b0c7e04c19091d1f6494e36222b0e38220c15064 Mon Sep 17 00:00:00 2001 From: Kris Hung Date: Thu, 9 Nov 2023 11:58:31 -0800 Subject: [PATCH] Add support for request rescheduling (#319) * Add support for request rescheduling * Address comment * Add documentation * Fix up for doc * Revert response sender changes * Address comment --- README.md | 97 +++++++++++++++++ src/infer_request.cc | 18 ++- src/infer_request.h | 4 + src/pb_stub.cc | 49 ++++++--- src/python_be.cc | 109 ++++++++++++++++--- src/python_be.h | 4 + src/resources/triton_python_backend_utils.py | 4 +- 7 files changed, 249 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 8a93dd07..70ebbe18 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,7 @@ any C++ code. - [Decoupled mode](#decoupled-mode) - [Use Cases](#use-cases) - [Known Issues](#known-issues) + - [Request Rescheduling](#request-rescheduling) - [`finalize`](#finalize) - [Model Config File](#model-config-file) - [Inference Request Parameters](#inference-request-parameters) @@ -623,6 +624,102 @@ for more details on how to host a decoupled model. * Currently, decoupled Python models can not make async infer requests. +#### Request Rescheduling + +Starting from 23.11, Python backend supports request rescheduling. By calling +the `set_release_flags` function on the request object with the flag +`pb_utils.TRITONSERVER_REQUEST_RELEASE_RESCHEDULE`, you can reschedule the +request for further execution in a future batch. This feature is useful for +handling generative sequences. + +The model config must be configured to enable generative sequence batching in +order to use the request rescheduling API: + +``` +sequence_batching { + generative_sequence : true +} +``` + +For non-decoupled models, there can only be one response for each request. Since +the rescheduled request is the same as the original, you must append a `None` +object to the response list for the rescheduled request. For example: + +```python +import triton_python_backend_utils as pb_utils + +class TritonPythonModel: + ... + + def execute(self, requests): + responses = [] + + for request in requests: + # Explicitly reschedule the first request + if self.idx == 0: + request.set_release_flags( + pb_utils.TRITONSERVER_REQUEST_RELEASE_RESCHEDULE + ) + responses.append(None) + self.idx += 1 + else: + responses.append(inference_response) + + return responses +``` + +For decoupled models, it is required to reschedule a request *before* returning +from the `execute` function. +Below is an example of a decoupled model using request rescheduling. This model +takes 1 input tensor, an INT32 [ 1 ] input named "IN", and produces an output +tensor "OUT" with the same shape as the input tensor. The input value indicates +the total number of responses to be generated and the output value indicates the +number of remaining responses. For example, if the request input has value 2, +the model will: + - Send a response with value 1. + - Release request with RESCHEDULE flag. + - When execute on the same request, send the last response with value 0. + - Release request with ALL flag. + +```python +import triton_python_backend_utils as pb_utils + +class TritonPythonModel: + ... + + def execute(self, requests): + responses = [] + + for request in requests: + in_input = pb_utils.get_input_tensor_by_name(request, "IN").as_numpy() + + if self.reset_flag: + self.remaining_response = in_input[0] + self.reset_flag = False + + response_sender = request.get_response_sender() + + self.remaining_response -= 1 + + out_output = pb_utils.Tensor( + "OUT", np.array([self.remaining_response], np.int32) + ) + response = pb_utils.InferenceResponse(output_tensors=[out_output]) + + if self.remaining_response <= 0: + response_sender.send( + response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + ) + self.reset_flag = True + else: + request.set_release_flags( + pb_utils.TRITONSERVER_REQUEST_RELEASE_RESCHEDULE + ) + response_sender.send(response) + + return None +``` + ### `finalize` Implementing `finalize` is optional. This function allows you to do any clean diff --git a/src/infer_request.cc b/src/infer_request.cc index 4c2d2575..d641526e 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -50,7 +50,7 @@ InferRequest::InferRequest( model_version_(model_version), parameters_(parameters), flags_(flags), timeout_(timeout), response_factory_address_(response_factory_address), request_address_(request_address), preferred_memory_(preferred_memory), - trace_(trace) + trace_(trace), request_release_flags_(TRITONSERVER_REQUEST_RELEASE_ALL) { for (auto& input : inputs) { if (!input) { @@ -175,6 +175,20 @@ InferRequest::Trace() return trace_; } +uint32_t +InferRequest::ReleaseFlags() +{ + request_release_flags_ = infer_request_shm_ptr_->request_release_flags; + return request_release_flags_; +} + +void +InferRequest::SetReleaseFlags(const uint32_t& flags) +{ + request_release_flags_ = flags; + infer_request_shm_ptr_->request_release_flags = request_release_flags_; +} + void InferRequest::SaveToSharedMemory(std::unique_ptr& shm_pool) { @@ -201,6 +215,7 @@ InferRequest::SaveToSharedMemory(std::unique_ptr& shm_pool) infer_request_shm_ptr_->timeout = timeout_; infer_request_shm_ptr_->preferred_memory = preferred_memory_; infer_request_shm_ptr_->trace = trace_; + infer_request_shm_ptr_->request_release_flags = request_release_flags_; output_names_handle_shm_ptr_ = reinterpret_cast( @@ -379,6 +394,7 @@ InferRequest::InferRequest( timeout_ = infer_request_shm_ptr_->timeout; preferred_memory_ = infer_request_shm_ptr_->preferred_memory; trace_ = infer_request_shm_ptr_->trace; + request_release_flags_ = infer_request_shm_ptr_->request_release_flags; #ifdef TRITON_PB_STUB pb_cancel_ = diff --git a/src/infer_request.h b/src/infer_request.h index bc6a2acf..3d81c5d2 100644 --- a/src/infer_request.h +++ b/src/infer_request.h @@ -73,6 +73,7 @@ struct InferRequestShm { int32_t timeout; PreferredMemory preferred_memory; InferenceTrace trace; + uint32_t request_release_flags; }; class InferRequest { @@ -104,6 +105,8 @@ class InferRequest { void SetIsDecoupled(const bool is_decoupled); PreferredMemory& GetPreferredMemory(); InferenceTrace& Trace(); + uint32_t ReleaseFlags(); + void SetReleaseFlags(const uint32_t& flags); #ifdef TRITON_PB_STUB std::shared_ptr Exec(const bool is_decoupled); @@ -161,6 +164,7 @@ class InferRequest { bool is_decoupled_; PreferredMemory preferred_memory_; InferenceTrace trace_; + uint32_t request_release_flags_; // Shared Memory Data Structures AllocatedSharedMemory infer_request_shm_; diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 123b2832..3d473101 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -793,26 +793,39 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) std::to_string(response_size) + "\n"; throw PythonBackendException(err); } - for (auto& response : responses) { + + for (size_t i = 0; i < response_size; i++) { // Check the return type of execute function. - if (!py::isinstance(response)) { - std::string str = py::str(response.get_type()); - throw PythonBackendException( - std::string("Expected an 'InferenceResponse' object in the execute " - "function return list, found type '") + - str + "'."); + InferRequest* infer_request = py_request_list[i].cast(); + if (infer_request->ReleaseFlags() == + TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { + if (!py::isinstance(responses[i])) { + // When the request is rescheduled in non-decoupled model, the + // response must be None. + std::string str = py::str(responses[i].get_type()); + throw PythonBackendException( + "Expected a None object in the execute function return list for " + "reschduled request, " + "found type '" + + str + "'."); + } + } else { + if (!py::isinstance(responses[i])) { + std::string str = py::str(responses[i].get_type()); + throw PythonBackendException( + std::string( + "Expected an 'InferenceResponse' object in the execute " + "function return list, found type '") + + str + "'."); + } + InferResponse* infer_response = responses[i].cast(); + infer_response->PruneOutputTensors( + infer_request->RequestedOutputNames()); + ProcessResponse(infer_response); + responses_shm_handle[i] = infer_response->ShmHandle(); } } response_batch_shm_ptr->batch_size = response_size; - - for (size_t i = 0; i < batch_size; i++) { - InferResponse* infer_response = responses[i].cast(); - InferRequest* infer_request = py_request_list[i].cast(); - infer_response->PruneOutputTensors(infer_request->RequestedOutputNames()); - - ProcessResponse(infer_response); - responses_shm_handle[i] = infer_response->ShmHandle(); - } } catch (const PythonBackendException& pb_exception) { has_exception = true; @@ -1675,7 +1688,9 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) "requested_output_names", &InferRequest::RequestedOutputNames, py::return_value_policy::reference_internal) .def("get_response_sender", &InferRequest::GetResponseSender) - .def("is_cancelled", &InferRequest::IsCancelled); + .def("is_cancelled", &InferRequest::IsCancelled) + .def("set_release_flags", &InferRequest::SetReleaseFlags), + py::arg("flags").none(false); py::class_>(module, "Tensor") .def(py::init(&PbTensor::FromNumpy)) diff --git a/src/python_be.cc b/src/python_be.cc index 1f5a2e34..cec2d18a 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -271,12 +271,12 @@ ModelInstanceState::IsStubProcessAlive() TRITONSERVER_Error* ModelInstanceState::SaveRequestsToSharedMemory( TRITONBACKEND_Request** requests, const uint32_t request_count, - std::vector>& pb_inference_requests, + std::vector>& pb_infer_requests, AllocatedSharedMemory& request_batch, std::shared_ptr>& responses) { // Clear any existing items in the requests vector - pb_inference_requests.clear(); + pb_infer_requests.clear(); ModelState* model_state = reinterpret_cast(Model()); RETURN_IF_EXCEPTION( @@ -375,7 +375,22 @@ ModelInstanceState::SaveRequestsToSharedMemory( std::unique_ptr infer_request; if (model_state->IsDecoupled()) { TRITONBACKEND_ResponseFactory* factory_ptr; - RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); + // Reuse the response factory if there is already a response factory + // associated with the request + std::lock_guard guard{response_factory_map_mutex_}; + { + if (response_factory_map_.find(reinterpret_cast(request)) != + response_factory_map_.end()) { + factory_ptr = + response_factory_map_[reinterpret_cast(request)]; + } else { + RETURN_IF_ERROR( + TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); + response_factory_map_[reinterpret_cast(request)] = + factory_ptr; + } + } + infer_request = std::make_unique( id, correlation_id, pb_input_tensors, requested_output_names, model_state->Name(), model_state->Version(), parameters_string, flags, @@ -393,7 +408,7 @@ ModelInstanceState::SaveRequestsToSharedMemory( RETURN_IF_EXCEPTION(infer_request->SaveToSharedMemory(Stub()->ShmPool())); requests_shm[r] = infer_request->ShmHandle(); - pb_inference_requests.emplace_back(std::move(infer_request)); + pb_infer_requests.emplace_back(std::move(infer_request)); } return nullptr; // success @@ -1149,8 +1164,16 @@ ModelInstanceState::ResponseSendDecoupled( reinterpret_cast( send_message_payload->response_factory_address); if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - std::lock_guard guard{closed_requests_mutex_}; - closed_requests_.push_back(send_message_payload->request_address); + { + std::lock_guard guard{closed_requests_mutex_}; + closed_requests_.push_back(send_message_payload->request_address); + } + + // Clean up the response factory map. + { + std::lock_guard guard{response_factory_map_mutex_}; + response_factory_map_.erase(send_message_payload->request_address); + } } if (send_message_payload->response != 0) { @@ -1275,7 +1298,7 @@ ModelInstanceState::ResponseSendDecoupled( TRITONSERVER_Error* ModelInstanceState::ProcessRequestsDecoupled( TRITONBACKEND_Request** requests, const uint32_t request_count, - std::vector>& pb_inference_requests, + std::vector>& pb_infer_requests, PbMetricReporter& reporter) { NVTX_RANGE(nvtx_, "ProcessRequests " + Name()); @@ -1301,8 +1324,7 @@ ModelInstanceState::ProcessRequestsDecoupled( std::shared_ptr> responses; RETURN_IF_ERROR(SaveRequestsToSharedMemory( - requests, request_count, pb_inference_requests, request_batch, - responses)); + requests, request_count, pb_infer_requests, request_batch, responses)); uint64_t compute_start_ns = 0; SET_TIMESTAMP(compute_start_ns); @@ -1342,6 +1364,11 @@ ModelInstanceState::ProcessRequestsDecoupled( TRITONSERVER_ERROR_INTERNAL, error->String().c_str()); } + // Reset the release flags for all the requests. + for (auto& infer_request : pb_infer_requests) { + infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + } + return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, "Failed to process the requests."); } @@ -1352,6 +1379,7 @@ ModelInstanceState::ProcessRequestsDecoupled( void ModelInstanceState::ProcessRequests( TRITONBACKEND_Request** requests, const uint32_t request_count, + std::vector>& pb_infer_requests, bool& restart) { NVTX_RANGE(nvtx_, "ProcessRequests " + Name()); @@ -1399,12 +1427,11 @@ ModelInstanceState::ProcessRequests( // Wait for all the pending BLS requests to be completed. ScopedDefer bls_defer([this] { WaitForBLSRequestsToFinish(); }); - std::vector> pb_inference_requests; AllocatedSharedMemory request_batch; RESPOND_ALL_AND_RETURN_IF_ERROR( responses, request_count, SaveRequestsToSharedMemory( - requests, request_count, pb_inference_requests, request_batch, + requests, request_count, pb_infer_requests, request_batch, responses)); std::shared_ptr ipc_message = @@ -1515,6 +1542,11 @@ ModelInstanceState::ProcessRequests( RespondErrorToAllRequests( error_message, responses, requests, request_count); } + + // Reset the release flags for all the requests. + for (auto& infer_request : pb_infer_requests) { + infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + } return; } @@ -1542,6 +1574,15 @@ ModelInstanceState::ProcessRequests( shm_responses.emplace_back(nullptr); std::unique_ptr& infer_response = shm_responses.back(); try { + if (pb_infer_requests[r]->ReleaseFlags() == + TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { + // For rescheduled requests, we do not need to send a response. + LOG_IF_ERROR( + TRITONBACKEND_ResponseDelete((*responses)[r]), + "failed to delete response"); + (*responses)[r] = nullptr; + continue; + } infer_response = InferResponse::LoadFromSharedMemory( Stub()->ShmPool(), response_shm_handle[r], false /* open_cuda_handle */); @@ -1557,6 +1598,9 @@ ModelInstanceState::ProcessRequests( TRITONSERVER_ErrorDelete(err); (*responses)[r] = nullptr; + // Reset the release flags for the request. + pb_infer_requests[r]->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + // If has_error is true, we do not look at the response tensors. continue; } @@ -1570,6 +1614,10 @@ ModelInstanceState::ProcessRequests( "failed sending response"); TRITONSERVER_ErrorDelete(err); (*responses)[r] = nullptr; + + // Reset the release flags for the request. + pb_infer_requests[r]->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + continue; } @@ -2385,8 +2433,10 @@ TRITONBACKEND_ModelInstanceExecute( bool restart = false; ModelState* model_state = reinterpret_cast(instance_state->Model()); + std::vector> infer_requests; if (!model_state->IsDecoupled()) { - instance_state->ProcessRequests(requests, request_count, restart); + instance_state->ProcessRequests( + requests, request_count, infer_requests, restart); if (restart) { LOG_MESSAGE( @@ -2404,10 +2454,12 @@ TRITONBACKEND_ModelInstanceExecute( err, "Failed to restart the stub process: failed to launch " "the stub process."); + // Reset the release flags for all the requests. + for (auto& infer_request : infer_requests) { + infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + } } } else { - std::vector> infer_requests; - uint64_t exec_start_ns = 0; SET_TIMESTAMP(exec_start_ns); @@ -2456,11 +2508,34 @@ TRITONBACKEND_ModelInstanceExecute( } } + // The InferRequest object might not be created if an error occurs. Explicitly + // update the release flags here based on the number of InferRequest objects. + std::vector request_release_flags( + request_count, TRITONSERVER_REQUEST_RELEASE_ALL); + for (size_t i = 0; i < infer_requests.size(); ++i) { + request_release_flags[i] = infer_requests[i]->ReleaseFlags(); + } + for (uint32_t r = 0; r < request_count; ++r) { TRITONBACKEND_Request* request = requests[r]; - LOG_IF_ERROR( - TRITONBACKEND_RequestRelease(request, TRITONSERVER_REQUEST_RELEASE_ALL), - "failed releasing request"); + try { + THROW_IF_TRITON_ERROR( + TRITONBACKEND_RequestRelease(request, request_release_flags[r])); + } + catch (const PythonBackendException& pb_exception) { + LOG_MESSAGE( + TRITONSERVER_LOG_ERROR, + (std::string("Failed to release request: ") + pb_exception.what()) + .c_str()); + if (request_release_flags[r] == TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { + // If error occurs during request rescheduling, release the request with + // `TRITONSERVER_REQUEST_RELEASE_ALL` flag. + LOG_IF_ERROR( + TRITONBACKEND_RequestRelease( + request, TRITONSERVER_REQUEST_RELEASE_ALL), + "Failed to release request."); + } + } } LOG_MESSAGE( diff --git a/src/python_be.h b/src/python_be.h index f8ec8cfa..5504e0c9 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -288,6 +288,9 @@ class ModelInstanceState : public BackendModelInstance { std::unique_ptr thread_pool_; std::unordered_map> infer_payload_; std::unique_ptr request_executor_; + std::mutex response_factory_map_mutex_; + std::unordered_map + response_factory_map_; public: static TRITONSERVER_Error* Create( @@ -338,6 +341,7 @@ class ModelInstanceState : public BackendModelInstance { // Process all the requests obtained from Triton. void ProcessRequests( TRITONBACKEND_Request** requests, const uint32_t request_count, + std::vector>& pb_infer_requests, bool& restart); // Process all the requests in the decoupled mode. diff --git a/src/resources/triton_python_backend_utils.py b/src/resources/triton_python_backend_utils.py index b4732da6..de332cf7 100644 --- a/src/resources/triton_python_backend_utils.py +++ b/src/resources/triton_python_backend_utils.py @@ -1,4 +1,4 @@ -# Copyright 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -606,3 +606,5 @@ def set_model_transaction_policy(self, transaction_policy_dict): TRITONSERVER_REQUEST_FLAG_SEQUENCE_START = 1 TRITONSERVER_REQUEST_FLAG_SEQUENCE_END = 2 TRITONSERVER_RESPONSE_COMPLETE_FINAL = 1 +TRITONSERVER_REQUEST_RELEASE_ALL = 1 +TRITONSERVER_REQUEST_RELEASE_RESCHEDULE = 2