Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add GRPC error codes to GRPC streaming if enabled by user. #7499

Merged
merged 35 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
33c1e93
Park
indrajit96 Jul 25, 2024
3affd3a
Park
indrajit96 Jul 30, 2024
7f86c6a
Working Set
indrajit96 Jul 31, 2024
b65fd74
Working Set
indrajit96 Aug 1, 2024
36e461f
Working Set
indrajit96 Aug 1, 2024
bd549b1
Working Set
indrajit96 Aug 1, 2024
85ccd72
Tests Added
indrajit96 Aug 2, 2024
faa742c
Merge branch 'main' into ibhosale_grpc_streaming
indrajit96 Aug 2, 2024
37b15e8
Clean up
indrajit96 Aug 2, 2024
a65f8c3
Tests updated
indrajit96 Aug 5, 2024
080985c
Zombie request fixed
indrajit96 Aug 6, 2024
cc34d41
Pre Commit fixed
indrajit96 Aug 6, 2024
40344d5
Review Comments cleaned up, crash fixed in multi threading
indrajit96 Aug 7, 2024
f40f695
Review Comments fixed
indrajit96 Aug 7, 2024
0792bc1
Pre-Commit fixed
indrajit96 Aug 7, 2024
cdd60bf
Park
indrajit96 Aug 8, 2024
6661f21
Simpler design piggyback on NotifywhenDone()
indrajit96 Aug 9, 2024
6342524
Merge branch 'main' into ibhosale_grpc_streaming
indrajit96 Aug 9, 2024
22e5359
Cleanup unwanted states from old design
indrajit96 Aug 9, 2024
a31ba09
Improved tests around triton_grpc_error mode
indrajit96 Aug 12, 2024
af451a2
Comments resolved
indrajit96 Aug 14, 2024
0cb7db0
Comments resolved
indrajit96 Aug 14, 2024
8ea2647
New class gRPCErrorTracker created
indrajit96 Aug 14, 2024
e8c3242
Docs Updated
indrajit96 Aug 14, 2024
72097e3
Pipeline test
indrajit96 Aug 15, 2024
350af25
Resolve Unused local variable warning
indrajit96 Aug 15, 2024
e473f29
GRPC Cleanup tests updated for triton grpc error
indrajit96 Aug 16, 2024
370c449
Revert "GRPC Cleanup tests updated for triton grpc error"
indrajit96 Aug 16, 2024
b87c3fc
GRPC Cleanup tests updated for triton grpc error
indrajit96 Aug 16, 2024
cb548ff
Pre-Commit format
indrajit96 Aug 16, 2024
1b6b3a7
Devel build fix
indrajit96 Aug 16, 2024
70ce279
Streamline new tests
indrajit96 Aug 16, 2024
631b352
Merge branch 'main' into ibhosale_grpc_streaming
indrajit96 Aug 16, 2024
887aaa2
PR comments fixed and main merged
indrajit96 Aug 16, 2024
0e7670c
DockerFile fixed
indrajit96 Aug 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions qa/L0_backend_python/lifecycle/lifecycle_test.py
tanmayv25 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,70 @@ def test_infer_pymodel_error(self):
initial_metrics_value,
)

def test_triton_grpc_error_error_on(self):
model_name = "execute_grpc_error"
shape = [2, 2]
number_of_requests = 2
user_data = UserData()
triton_client = grpcclient.InferenceServerClient(f"{_tritonserver_ipaddr}:8001")
metadata = {"triton_grpc_error": "true"}
Copy link
Contributor

@rmccorm4 rmccorm4 Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can we add docs on this feature?
  2. Was there any discussion on using headers vs request parameters etc for it? I guess it's at the stream level, but just wanted to double check if headers were the desired choice

Copy link
Contributor Author

@indrajit96 indrajit96 Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a draft TEP (Was not required, I thought would be a good idea) https://docs.google.com/document/d/1TfNAMYLsPuLrduBtAKo64YqV55IWQhcn6zayc_CBY18/edit?pli=1
headers seemed logical after discussions with the SA on all possible options.
Not sure how we would do it via request params.
We need the triton_grpc_error flag set only once while the stream starts so headers seemed logica also has no backward compatibility issues.
Updated the PR with TEP link

triton_client.start_stream(
callback=partial(callback, user_data), headers=metadata
)

with self._shm_leak_detector.Probe() as shm_probe:
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
input_datas = []
for i in range(number_of_requests):
input_data = np.random.randn(*shape).astype(np.float32)
input_datas.append(input_data)
inputs = [
grpcclient.InferInput(
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
result = user_data._completed_requests.get()
if i == 0:
# Stream is not killed
output_data = result.as_numpy("OUT")
self.assertIsNotNone(output_data, "error: expected 'OUT'")
elif i == 1:
# execute_grpc_error intentionally returns error with StatusCode.INTERNAL status on 2nd request
self.assertIsInstance(result, InferenceServerException)
self.assertEqual(str(result.status()), "StatusCode.INTERNAL")

def test_triton_grpc_error_error_off(self):
model_name = "execute_grpc_error"
shape = [2, 2]
number_of_requests = 4
user_data = UserData()
triton_client = grpcclient.InferenceServerClient(f"{_tritonserver_ipaddr}:8001")
triton_client.start_stream(callback=partial(callback, user_data))

with self._shm_leak_detector.Probe() as shm_probe:
input_datas = []
for i in range(number_of_requests):
input_data = np.random.randn(*shape).astype(np.float32)
input_datas.append(input_data)
inputs = [
grpcclient.InferInput(
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
result = user_data._completed_requests.get()
Fixed Show fixed Hide fixed
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved
if i == 1 or i == 3:
# execute_grpc_error intentionally returns error with StatusCode.INTERNAL status on 2nd request
self.assertIsInstance(result, InferenceServerException)
# Existing Behaviour
self.assertEqual(str(result.status()), "None")
elif i == 0 or i == 2:
# Stream is not killed
output_data = result.as_numpy("OUT")
self.assertIsNotNone(output_data, "error: expected 'OUT'")


if __name__ == "__main__":
unittest.main()
8 changes: 8 additions & 0 deletions qa/L0_backend_python/lifecycle/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ cp ../../python_models/execute_error/config.pbtxt ./models/execute_error/
sed -i "s/^max_batch_size:.*/max_batch_size: 8/" config.pbtxt && \
echo "dynamic_batching { preferred_batch_size: [8], max_queue_delay_microseconds: 12000000 }" >> config.pbtxt)

mkdir -p models/execute_grpc_error/1/
cp ../../python_models/execute_grpc_error/model.py ./models/execute_grpc_error/1/
cp ../../python_models/execute_grpc_error/config.pbtxt ./models/execute_grpc_error/
(cd models/execute_grpc_error && \
sed -i "s/^name:.*/name: \"execute_grpc_error\"/" config.pbtxt && \
sed -i "s/^max_batch_size:.*/max_batch_size: 8/" config.pbtxt && \
echo "dynamic_batching { preferred_batch_size: [8], max_queue_delay_microseconds: 1200000 }" >> config.pbtxt)

mkdir -p models/execute_return_error/1/
cp ../../python_models/execute_return_error/model.py ./models/execute_return_error/1/
cp ../../python_models/execute_return_error/config.pbtxt ./models/execute_return_error/
Expand Down
51 changes: 51 additions & 0 deletions qa/python_models/execute_grpc_error/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move the model to the L0_backend_python/lifecycle folder? I think it might be easier this way to track which test the models belong to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have any models or model folder in
https://github.com/triton-inference-server/server/tree/main/qa/L0_backend_python/lifecycle
In test.sh we copy over from python_models and create the models folder with versions inside.
I have kept this new model in parallel to existing models used in L0_backend_python/lifecycle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can always create a models subfolder under L0_* tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Olga, that's what I meant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the existing test.sh we do
rm -fr *.log ./models before we start the test.

rm -fr *.log ./models

For me to add models to L0_backend_python/lifecycle I will need to remove this.
We cleanup the models before every test, I will need to remove all the instances where we delete the models folder.


Not sure if I should change existing design might impact other tests too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name the folder differently and copy from it to models ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tanmayv25 suggesnted I move all the changes from L0_lifecycle to a new L0_ dedicated for this feature going forward.
The reason I made changes here was, L0_lifecycle/models had models where we send errors pragmatically helped me reuse all the models.
Will resolve this comment along with the original comment by @tanmayv25 in a new PR after the cherrypick.
Keeping it unresolved for now will mark this resolved after the new PR

#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

backend: "python"
max_batch_size: 64

input [
{
name: "IN"
data_type: TYPE_FP32
dims: [ -1 ]
}
]

output [
{
name: "OUT"
data_type: TYPE_FP32
dims: [ -1 ]
}
]

instance_group [
{
count: 1
kind : KIND_CPU
}
]
52 changes: 52 additions & 0 deletions qa/python_models/execute_grpc_error/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import triton_python_backend_utils as pb_utils


class TritonPythonModel:
def __init__(self):
# Maintain total inference count, so as to return error on 2nd request, all of this to simulate model failure
self.inf_count = 1

def execute(self, requests):
"""This function is called on inference request."""
responses = []

# Generate the error for the second request
for request in requests:
input_tensor = pb_utils.get_input_tensor_by_name(request, "IN")
out_tensor = pb_utils.Tensor("OUT", input_tensor.as_numpy())
if self.inf_count % 2:
# Every odd request is success
responses.append(pb_utils.InferenceResponse([out_tensor]))
else:
# Every even request is failure
error = pb_utils.TritonError("An error occurred during execution")
responses.append(pb_utils.InferenceResponse([out_tensor], error))
self.inf_count += 1

return responses
11 changes: 11 additions & 0 deletions src/grpc/grpc_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ typedef enum {
PARTIAL_COMPLETION
} Steps;

typedef enum {
// No error from CORE seen yet
NONE,
// Error from CORE encountered, waiting to be picked up by completion queue to
// initiate cancellation
ERROR_WAITING,
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
// Error from CORE encountered, stream closed
// This state is added to avoid double cancellation
ERROR_CANCELED
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
} Triton_grpc_error_steps;
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved

// Debugging helper
std::ostream& operator<<(std::ostream& out, const Steps& step);

Expand Down
3 changes: 2 additions & 1 deletion src/grpc/infer_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,8 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
// single thread scenario.
StartNewRequest();
}
bool resume = state->context_->HandleCancellation(state, rpc_ok, Name());
bool resume = state->context_->HandleCancellation(
state, rpc_ok, Name(), false /* is_triton_grpc_error */);
return resume;
}

Expand Down
87 changes: 79 additions & 8 deletions src/grpc/infer_handler.h
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,8 @@ class InferHandlerState {
::grpc::ServerCompletionQueue* cq, const uint64_t unique_id = 0)
: cq_(cq), unique_id_(unique_id), ongoing_requests_(0),
step_(Steps::START), finish_ok_(true), ongoing_write_(false),
received_notification_(false)
received_notification_(false), triton_grpc_error_(false),
grpc_stream_error_state_(Triton_grpc_error_steps::NONE)
{
ctx_.reset(new ::grpc::ServerContext());
responder_.reset(new ServerResponderType(ctx_.get()));
Expand All @@ -664,11 +665,56 @@ class InferHandlerState {

bool ReceivedNotification() { return received_notification_; }

// Returns true ONLY when GRPC_ERROR from CORE is waiting to be processed.
bool IsGRPCError()
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
{
if (grpc_stream_error_state_ == Triton_grpc_error_steps::ERROR_WAITING) {
// Change the state to ERROR_CANCELED as we have called
// HandleCancellation
grpc_stream_error_state_ = Triton_grpc_error_steps::ERROR_CANCELED;
return true;
}
return false;
}

bool IsCancelled()
{
return received_notification_ ? ctx_->IsCancelled() : false;
return received_notification_ ? (ctx_->IsCancelled() || IsGRPCError())
: false;
}

// Extracts headers from GRPC request and updates state
void ExtractStateFromHeaders(InferHandlerStateType* state)
{
const auto& metadata = state->context_->ctx_->client_metadata();
for (const auto& pair : metadata) {
auto& key = pair.first;
auto& value = pair.second;
std::string param_key = std::string(key.begin(), key.end());
std::string value_key = std::string(value.begin(), value.end());
std::string triton_grpc_error_key = "triton_grpc_error";
if (param_key == triton_grpc_error_key) {
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
if (value_key == "true") {
LOG_VERBOSE(2)
<< "GRPC: triton_grpc_error mode detected in new grpc stream";
state->context_->triton_grpc_error_ = true;
}
}
}
}

void SendGRPCStrictResponse(InferHandlerStateType* state)
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
{
std::lock_guard<std::recursive_mutex> lock(state->context_->mu_);
// Check if Error not responded previously
// Avoid closing connection twice on multiple errors from core
if (!state->context_->IsGRPCStrictError()) {
state->step_ = Steps::COMPLETE;
state->context_->responder_->Finish(state->status_, state);
// Mark error for this stream
state->context_->MarkGRPCStrictError();
}
}
// Increments the ongoing request counter
void IncrementRequestCounter() { ongoing_requests_++; }

Expand Down Expand Up @@ -746,7 +792,7 @@ class InferHandlerState {

// Issues the cancellation for all inflight requests
// being tracked by this context.
void IssueRequestCancellation()
void IssueRequestCancellation(bool is_triton_grpc_error)
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
{
{
std::lock_guard<std::recursive_mutex> lock(mu_);
Expand Down Expand Up @@ -779,6 +825,7 @@ class InferHandlerState {
// The RPC is complete and no callback will be invoked to retrieve
// the object. Hence, need to explicitly place the state on the
// completion queue.
// CHeck for writeready
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
PutTaskBackToQueue(state);
}
}
Expand All @@ -791,9 +838,11 @@ class InferHandlerState {
// Returns whether or not to continue cycling through the gRPC
// completion queue or not.
bool HandleCancellation(
InferHandlerStateType* state, bool rpc_ok, const std::string& name)
InferHandlerStateType* state, bool rpc_ok, const std::string& name,
bool is_triton_grpc_error)
{
if (!IsCancelled()) {
// Check to avoid early exit in case of triton_grpc_error
if (!IsCancelled() && !(is_triton_grpc_error)) {
LOG_ERROR
<< "[INTERNAL] HandleCancellation called even when the context was "
"not cancelled for "
Expand All @@ -813,10 +862,9 @@ class InferHandlerState {
// issue cancellation request to all the inflight
// states belonging to the context.
if (state->context_->step_ != Steps::CANCELLED) {
IssueRequestCancellation();
IssueRequestCancellation(is_triton_grpc_error);
// Mark the context as cancelled
state->context_->step_ = Steps::CANCELLED;

// The state returns true because the CancelExecution
// call above would have raised alarm objects on all
// pending inflight states objects. This state will
Expand Down Expand Up @@ -941,6 +989,21 @@ class InferHandlerState {
return false;
}

// Marks error after it has been responded to
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
void MarkGRPCStrictError()
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
{
grpc_stream_error_state_ = Triton_grpc_error_steps::ERROR_WAITING;
}

// Checks if error already responded to in triton_grpc_error mode
bool IsGRPCStrictError()
indrajit96 marked this conversation as resolved.
Show resolved Hide resolved
{
if (grpc_stream_error_state_ == Triton_grpc_error_steps::NONE) {
return false;
}
return true;
}

// Return true if this context has completed all reads and writes.
bool IsRequestsCompleted()
{
Expand Down Expand Up @@ -999,6 +1062,15 @@ class InferHandlerState {
// Tracks whether the async notification has been delivered by
// completion queue.
bool received_notification_;

// True if set by user via header
// Can be accessed without a lock, as set only once in startstream
std::atomic<bool> triton_grpc_error_;

// True if stream already encountered error and closed connection
// State maintained to avoid writes on closed stream
// Need to acquire lock before access
int grpc_stream_error_state_;
};

// This constructor is used to build a wrapper state object
Expand Down Expand Up @@ -1090,7 +1162,6 @@ class InferHandlerState {

void MarkAsAsyncNotifyState() { async_notify_state_ = true; }
bool IsAsyncNotifyState() { return async_notify_state_; }

// Needed in the response handle for classification outputs.
TRITONSERVER_Server* tritonserver_;

Expand Down
Loading
Loading