diff --git a/CMakeLists.txt b/CMakeLists.txt index 213b1927..3659c7bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -163,6 +163,8 @@ set( src/metric.cc src/metric_family.h src/metric_family.cc + src/gpu_buffers.cc + src/gpu_buffers.h ) set( diff --git a/src/gpu_buffers.cc b/src/gpu_buffers.cc new file mode 100644 index 00000000..e060b618 --- /dev/null +++ b/src/gpu_buffers.cc @@ -0,0 +1,84 @@ +// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "gpu_buffers.h" +#include "pb_string.h" + +namespace triton { namespace backend { namespace python { +GPUBufferTransporter::GPUBufferTransporter() +{ + completed_ = false; +} + +void +GPUBufferTransporter::AddBuffer( + const bi::managed_external_buffer::handle_t& handle) +{ + if (!completed_) { + buffers_.emplace_back(handle); + } else { + throw PythonBackendException( + "It is not possible to add buffers after 'Complete' has been called on " + "a GPUBufferTransporter."); + } +} + +void +GPUBufferTransporter::Complete( + std::unique_ptr& shm_pool, bool success, + const std::string& message) +{ + if (completed_) { + return; + } + gpu_buffers_shm_ = shm_pool->Construct(); + if (success) { + buffers_handle_shm_ = + shm_pool->Construct( + buffers_.size()); + gpu_buffers_shm_.data_->buffer_count = buffers_.size(); + gpu_buffers_shm_.data_->success = true; + gpu_buffers_shm_.data_->buffers = buffers_handle_shm_.handle_; + for (size_t i = 0; i < buffers_.size(); ++i) { + buffers_handle_shm_.data_.get()[i] = buffers_[i]; + } + } else { + // If there was an error we won't look at the buffers. + gpu_buffers_shm_.data_->success = false; + error_shm_ = PbString::Create(shm_pool, message); + gpu_buffers_shm_.data_->error = error_shm_->ShmHandle(); + } + completed_ = true; +} + + +bi::managed_external_buffer::handle_t +GPUBufferTransporter::ShmHandle() +{ + return gpu_buffers_shm_.handle_; +} + +}}} // namespace triton::backend::python \ No newline at end of file diff --git a/src/gpu_buffers.h b/src/gpu_buffers.h new file mode 100644 index 00000000..9d750bde --- /dev/null +++ b/src/gpu_buffers.h @@ -0,0 +1,64 @@ +// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include "pb_string.h" +#include "pb_utils.h" +#include "scoped_defer.h" + +namespace triton { namespace backend { namespace python { + +/// \param success indicating whether the request was successful +/// \param error if success is equal to false, the error object will be set. +/// \param buffers list of buffers elements. +/// \param buffer_count the number of buffers. +struct GPUBuffersShm { + bool success; + bi::managed_external_buffer::handle_t error; + bi::managed_external_buffer::handle_t buffers; + uint32_t buffer_count; +}; + +class GPUBufferTransporter { + public: + GPUBufferTransporter(); + void AddBuffer(const bi::managed_external_buffer::handle_t& handle); + void Complete( + std::unique_ptr& shm_pool, bool success = true, + const std::string& message = ""); + bi::managed_external_buffer::handle_t ShmHandle(); + + private: + AllocatedSharedMemory gpu_buffers_shm_; + std::vector buffers_; + AllocatedSharedMemory + buffers_handle_shm_; + std::unique_ptr error_shm_; + bool completed_; +}; + +}}}; // namespace triton::backend::python diff --git a/src/infer_request.cc b/src/infer_request.cc index 2a9799db..4586afcf 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -28,6 +28,7 @@ #include +#include "gpu_buffers.h" #include "pb_utils.h" #include "scoped_defer.h" #ifdef TRITON_PB_STUB @@ -481,12 +482,20 @@ InferRequest::Exec(const bool is_decoupled) // Additional round trip required for asking the stub process // to fill in the GPU tensor buffers if (has_gpu_tensor) { + AllocatedSharedMemory gpu_buffers_shm = + shm_pool->Load( + request_batch_shm_ptr->gpu_buffers_handle); AllocatedSharedMemory gpu_buffers_handle = shm_pool->Load( - request_batch_shm_ptr->gpu_buffers_handle); + gpu_buffers_shm.data_->buffers); try { #ifdef TRITON_ENABLE_GPU + if (!gpu_buffers_shm.data_->success) { + std::unique_ptr error = PbString::LoadFromSharedMemory( + shm_pool, gpu_buffers_shm.data_->error); + throw PythonBackendException(error->String()); + } size_t i = 0; for (auto& input_tensor : this->Inputs()) { if (!input_tensor->IsCPU()) { diff --git a/src/infer_response.cc b/src/infer_response.cc index 4e4ea228..d8bfa2a1 100644 --- a/src/infer_response.cc +++ b/src/infer_response.cc @@ -206,6 +206,7 @@ InferResponse::Send( TRITONBACKEND_Response* response, void* cuda_stream, bool& requires_deferred_callback, const uint32_t flags, std::unique_ptr& shm_pool, + GPUBufferTransporter& gpu_buffer_transporter, std::vector, void*>>& output_buffers, const std::set& requested_output_names) { @@ -228,12 +229,20 @@ InferResponse::Send( // Moves the response sending callback so that it is not called until the stub // process fills in the GPU buffers. - ScopedDefer deferred_task( - [this, &requires_deferred_callback, &response_error_handling] { - if (requires_deferred_callback) { - deferred_send_callback_ = std::move(response_error_handling); - } - }); + ScopedDefer deferred_task([this, &requires_deferred_callback, + &response_error_handling, &gpu_buffer_transporter, + response_error, &shm_pool] { + if (*response_error != nullptr) { + gpu_buffer_transporter.Complete( + shm_pool, false /* success */, + TRITONSERVER_ErrorMessage(*response_error)); + } else { + gpu_buffer_transporter.Complete(shm_pool); + } + if (requires_deferred_callback) { + deferred_send_callback_ = std::move(response_error_handling); + } + }); if (HasError()) { *response_error = TRITONSERVER_ErrorNew( @@ -302,6 +311,7 @@ InferResponse::Send( output_tensor->ByteSize(), reinterpret_cast(buffer), true /* copy_gpu */)); } + gpu_buffer_transporter.AddBuffer(output_buffer->ShmHandle()); output_buffers.push_back({std::move(output_buffer), buffer}); #endif } @@ -316,6 +326,7 @@ InferResponse::Send( shm_pool, actual_memory_type, actual_memory_type_id, output_tensor->ByteSize(), nullptr /* data ptr */)); + gpu_buffer_transporter.AddBuffer(output_buffer->ShmHandle()); output_buffers.push_back({std::move(output_buffer), buffer}); } diff --git a/src/infer_response.h b/src/infer_response.h index d5862278..3f795679 100644 --- a/src/infer_response.h +++ b/src/infer_response.h @@ -27,6 +27,7 @@ #pragma once #include +#include "gpu_buffers.h" #include "pb_error.h" #include "pb_tensor.h" #include "pb_utils.h" @@ -100,6 +101,7 @@ class InferResponse { TRITONBACKEND_Response* response, void* cuda_stream, bool& requires_deferred_callback, const uint32_t flags, std::unique_ptr& shm_pool, + GPUBufferTransporter& gpu_buffer_transporter, std::vector, void*>>& output_buffers, const std::set& requested_output_names = {}); diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 9539a250..20b225b6 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -356,9 +356,10 @@ Stub::RunCommand() LoadGPUBuffers(ipc_message); } catch (const PythonBackendException& pb_exception) { - LOG_INFO << "An error occurred while trying to load GPU buffers in the " - "Python backend stub: " - << pb_exception.what() << std::endl; + LOG_ERROR + << "An error occurred while trying to load GPU buffers in the " + "Python backend stub: " + << pb_exception.what() << std::endl; } break; @@ -539,43 +540,50 @@ Stub::ProcessResponse(InferResponse* response) void Stub::LoadGPUBuffers(std::unique_ptr& ipc_message) { - AllocatedSharedMemory gpu_buffers_handle = - shm_pool_->Load(ipc_message->Args()); + ScopedDefer load_gpu_buffer_response([this] { + // LoadGPUBuffers must let the parent process know when loading the + // buffers have been finished. + parent_message_queue_->Push(DUMMY_MESSAGE); + gpu_tensors_.clear(); + }); - uint64_t* gpu_buffer_count = - reinterpret_cast(gpu_buffers_handle.data_.get()); - bi::managed_external_buffer::handle_t* gpu_buffers_handle_shm = - reinterpret_cast( - gpu_buffers_handle.data_.get() + sizeof(uint64_t)); + AllocatedSharedMemory gpu_buffers_handle = + shm_pool_->Load(ipc_message->Args()); + + if (!gpu_buffers_handle.data_->success) { + std::unique_ptr error = PbString::LoadFromSharedMemory( + shm_pool_, gpu_buffers_handle.data_->error); + LOG_ERROR << ("Failed to load GPU buffers: " + error->String()); + return; + } + + uint64_t gpu_buffer_count = gpu_buffers_handle.data_->buffer_count; + AllocatedSharedMemory + gpu_buffers_handle_shm = + shm_pool_->Load( + gpu_buffers_handle.data_->buffers); - if (gpu_tensors_.size() != *gpu_buffer_count) { - LOG_INFO + if (gpu_tensors_.size() != gpu_buffer_count) { + LOG_ERROR << (std::string( "GPU buffers size does not match the provided buffers: ") + std::to_string(gpu_tensors_.size()) + - " != " + std::to_string(*gpu_buffer_count)); + " != " + std::to_string(gpu_buffer_count)); return; } std::vector> dst_buffers; - for (size_t i = 0; i < gpu_tensors_.size(); i++) { std::unique_ptr dst_buffer = PbMemory::LoadFromSharedMemory( - shm_pool_, gpu_buffers_handle_shm[i], true /* open_cuda_handle */); + shm_pool_, gpu_buffers_handle_shm.data_.get()[i], + true /* open_cuda_handle */); dst_buffers.emplace_back(std::move(dst_buffer)); } - ScopedDefer load_gpu_buffer_response([this] { - // Push a dummy message to signal the thread to terminate. - parent_message_queue_->Push(DUMMY_MESSAGE); - }); - for (size_t i = 0; i < gpu_tensors_.size(); i++) { std::shared_ptr& src_buffer = gpu_tensors_[i]; PbMemory::CopyBuffer(dst_buffers[i], src_buffer->Memory()); } - - gpu_tensors_.clear(); } py::list diff --git a/src/pb_utils.h b/src/pb_utils.h index 36d7e3c7..ad3b476c 100644 --- a/src/pb_utils.h +++ b/src/pb_utils.h @@ -212,23 +212,17 @@ struct ResponseSenderBase { struct ResponseSendMessage : ResponseSenderBase { bi::managed_external_buffer::handle_t response; - // GPU Buffers handle + // A pointer to GPUBuffersShm object. bi::managed_external_buffer::handle_t gpu_buffers_handle; - // GPU buffers count - uint32_t gpu_buffers_count; - uint32_t flags; }; struct RequestBatch { uint32_t batch_size; - // GPU Buffers handle + // A pointer to GPUBuffersShm object. bi::managed_external_buffer::handle_t gpu_buffers_handle; - - // GPU buffers count - uint32_t gpu_buffers_count; }; #ifdef TRITON_ENABLE_GPU diff --git a/src/python_be.cc b/src/python_be.cc index e65436d5..3ed88a4b 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -25,6 +25,7 @@ // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "python_be.h" +#include "gpu_buffers.h" #include "infer_payload.h" #include "pb_log.h" @@ -623,10 +624,9 @@ ModelInstanceState::ExecuteBLSRequest( is_response_batch_set = true; bool has_gpu_tensor = false; + GPUBufferTransporter gpu_buffer_transporter; PythonBackendException pb_exception(std::string{}); - - uint32_t gpu_buffers_count = 0; if (request_batch_shm_ptr->batch_size == 1) { std::shared_ptr infer_request; bi::managed_external_buffer::handle_t* request_handle = @@ -643,7 +643,6 @@ ModelInstanceState::ExecuteBLSRequest( for (auto& input_tensor : infer_request->Inputs()) { if (!input_tensor->IsCPU()) { #ifdef TRITON_ENABLE_GPU - gpu_buffers_count++; BackendMemory* backend_memory; std::unique_ptr lbackend_memory; has_gpu_tensor = true; @@ -661,38 +660,26 @@ ModelInstanceState::ExecuteBLSRequest( lbackend_memory.reset(backend_memory); input_tensor->SetMemory(std::move(PbMemory::Create( Stub()->ShmPool(), std::move(lbackend_memory)))); + gpu_buffer_transporter.AddBuffer( + input_tensor->Memory()->ShmHandle()); #endif // TRITON_ENABLE_GPU } } } catch (const PythonBackendException& exception) { + gpu_buffer_transporter.Complete( + Stub()->ShmPool(), false /* success */, exception.what()); pb_exception = exception; } - AllocatedSharedMemory gpu_handles; // Wait for the extra round trip to complete. The stub process will fill // in the data for the GPU tensors. If there is an error, the extra round // trip must be still completed, otherwise the stub process will always be // waiting for a message from the parent process. if (has_gpu_tensor) { - try { - gpu_handles = Stub() - ->ShmPool() - ->Construct( - gpu_buffers_count); - request_batch_shm_ptr->gpu_buffers_count = gpu_buffers_count; - request_batch_shm_ptr->gpu_buffers_handle = gpu_handles.handle_; - size_t i = 0; - for (auto& input_tensor : infer_request->Inputs()) { - if (!input_tensor->IsCPU()) { - gpu_handles.data_.get()[i] = input_tensor->Memory()->ShmHandle(); - ++i; - } - } - } - catch (const PythonBackendException& exception) { - pb_exception = exception; - } + gpu_buffer_transporter.Complete(Stub()->ShmPool()); + request_batch_shm_ptr->gpu_buffers_handle = + gpu_buffer_transporter.ShmHandle(); bi::scoped_lock lock{ *(ipc_message->ResponseMutex())}; @@ -700,7 +687,7 @@ ModelInstanceState::ExecuteBLSRequest( ipc_message->ResponseCondition()->wait(lock); } - if (pb_exception.what() != nullptr) { + if (pb_exception.what() != std::string{""}) { auto callback = std::bind( &ModelInstanceState::SendBLSDecoupledResponse, this, std::placeholders::_1); @@ -1082,34 +1069,19 @@ ModelInstanceState::ResponseSendDecoupled( std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> response_factory_ptr; + GPUBufferTransporter gpu_buffer_transporter; if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { response_factory_ptr.reset( reinterpret_cast(response_factory)); } infer_response->Send( response, CudaStream(), requires_deferred_callback, - send_message_payload->flags, Stub()->ShmPool(), gpu_output_buffers); + send_message_payload->flags, Stub()->ShmPool(), gpu_buffer_transporter, + gpu_output_buffers); if (requires_deferred_callback) { - AllocatedSharedMemory gpu_buffers_handle = - Stub()->ShmPool()->Construct( - sizeof(uint64_t) + - gpu_output_buffers.size() * - sizeof(bi::managed_external_buffer::handle_t)); - uint64_t* gpu_buffer_count = - reinterpret_cast(gpu_buffers_handle.data_.get()); - *gpu_buffer_count = gpu_output_buffers.size(); - bi::managed_external_buffer::handle_t* gpu_buffers_handle_shm = - reinterpret_cast( - gpu_buffers_handle.data_.get() + sizeof(uint64_t)); - send_message_payload->gpu_buffers_handle = gpu_buffers_handle.handle_; - - size_t index = 0; - for (auto& output_buffer_pair : gpu_output_buffers) { - std::unique_ptr& pb_memory = output_buffer_pair.first; - gpu_buffers_handle_shm[index] = pb_memory->ShmHandle(); - ++index; - } + send_message_payload->gpu_buffers_handle = + gpu_buffer_transporter.ShmHandle(); // Additional round trip so that the stub can fill the GPU output buffers. { @@ -1122,7 +1094,6 @@ ModelInstanceState::ResponseSendDecoupled( } } - index = 0; bool cuda_copy = false; for (auto& output_buffer_pair : gpu_output_buffers) { auto& pb_memory = output_buffer_pair.first; @@ -1138,8 +1109,6 @@ ModelInstanceState::ResponseSendDecoupled( CudaStream(), &cuda_used); cuda_copy |= cuda_used; } - gpu_buffers_handle_shm[index] = pb_memory->ShmHandle(); - ++index; #ifdef TRITON_ENABLE_GPU if (cuda_copy) { cudaStreamSynchronize(stream_); @@ -1420,6 +1389,7 @@ ModelInstanceState::ProcessRequests( std::vector> shm_responses; std::vector, void*>>> gpu_output_buffers(request_count); + GPUBufferTransporter gpu_buffer_transporter; for (uint32_t r = 0; r < request_count; ++r) { NVTX_RANGE(nvtx_, "LoadingResponse " + Name()); @@ -1482,7 +1452,7 @@ ModelInstanceState::ProcessRequests( infer_response->Send( response, CudaStream(), require_deferred_callback, TRITONSERVER_RESPONSE_COMPLETE_FINAL, Stub()->ShmPool(), - gpu_output_buffers[r], requested_output_names); + gpu_buffer_transporter, gpu_output_buffers[r], requested_output_names); requires_deferred_callback[r] = require_deferred_callback; @@ -1497,39 +1467,14 @@ ModelInstanceState::ProcessRequests( // If the output tensor is in GPU, there will be a second round trip // required for filling the GPU buffers provided by the main process. if (has_gpu_output) { - size_t total_gpu_buffers_count = 0; - for (auto& gpu_output_buffer : gpu_output_buffers) { - total_gpu_buffers_count += gpu_output_buffer.size(); - } - AllocatedSharedMemory gpu_buffers_handle = - Stub()->ShmPool()->Construct( - sizeof(uint64_t) + - total_gpu_buffers_count * - sizeof(bi::managed_external_buffer::handle_t)); - uint64_t* gpu_buffer_count = - reinterpret_cast(gpu_buffers_handle.data_.get()); - *gpu_buffer_count = total_gpu_buffers_count; - bi::managed_external_buffer::handle_t* gpu_buffers_handle_shm = - reinterpret_cast( - gpu_buffers_handle.data_.get() + sizeof(uint64_t)); - - size_t index = 0; - for (auto& gpu_output_buffer : gpu_output_buffers) { - for (auto& buffer_memory_pair : gpu_output_buffer) { - gpu_buffers_handle_shm[index] = buffer_memory_pair.first->ShmHandle(); - ++index; - } - } - ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; - ipc_message->Args() = gpu_buffers_handle.handle_; + ipc_message->Args() = gpu_buffer_transporter.ShmHandle(); SendMessageAndReceiveResponse( ipc_message->ShmHandle(), response_message, restart, responses, requests, 0); bool cuda_copy = false; - index = 0; uint32_t response_index = 0; for (auto& gpu_output_buffer : gpu_output_buffers) { for (auto& buffer_memory_pair : gpu_output_buffer) { @@ -1547,8 +1492,6 @@ ModelInstanceState::ProcessRequests( CudaStream(), &cuda_used)); cuda_copy |= cuda_used; } - gpu_buffers_handle_shm[index] = pb_memory->ShmHandle(); - ++index; } response_index++; #ifdef TRITON_ENABLE_GPU diff --git a/src/scoped_defer.cc b/src/scoped_defer.cc index 9c33bfd2..e9d86708 100644 --- a/src/scoped_defer.cc +++ b/src/scoped_defer.cc @@ -42,6 +42,12 @@ ScopedDefer::Complete() } } +void +ScopedDefer::SetReleaseCallback(std::function task) +{ + task_ = task; +} + ScopedDefer::~ScopedDefer() { if (!done_) { diff --git a/src/scoped_defer.h b/src/scoped_defer.h index eb52d6b6..02e79e22 100644 --- a/src/scoped_defer.h +++ b/src/scoped_defer.h @@ -31,6 +31,7 @@ namespace triton { namespace backend { namespace python { class ScopedDefer { public: ScopedDefer(std::function task); + void SetReleaseCallback(std::function task); ~ScopedDefer(); void Complete();