Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add GRPC error codes to GRPC streaming if enabled by user. (#7499) #7555

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Dockerfile.QA
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ RUN cp -r qa/L0_decoupled/models qa/L0_decoupled/python_models/ && \
cp /workspace/tritonbuild/python/examples/decoupled/square_config.pbtxt \
qa/L0_decoupled/python_models/square_int32/.

RUN mkdir -p qa/L0_decoupled_grpc_error && \
cp -r qa/L0_decoupled/. qa/L0_decoupled_grpc_error

RUN mkdir -p qa/L0_grpc_error_state_cleanup && \
cp -r qa/L0_grpc_state_cleanup/. qa/L0_grpc_error_state_cleanup

RUN mkdir -p qa/L0_repoagent_checksum/models/identity_int32/1 && \
cp tritonbuild/identity/install/backends/identity/libtriton_identity.so \
qa/L0_repoagent_checksum/models/identity_int32/1/.
Expand Down
10 changes: 10 additions & 0 deletions docs/customization_guide/inference_protocols.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ These options can be used to configure the KeepAlive settings:

For client-side documentation, see [Client-Side GRPC KeepAlive](https://github.com/triton-inference-server/client/blob/main/README.md#grpc-keepalive).

#### GRPC Status Codes

Triton implements GRPC error handling for streaming requests when a specific flag is enabled through headers. Upon encountering an error, Triton returns the appropriate GRPC error code and subsequently closes the stream.

* `triton_grpc_error` : The header value needs to be set to true while starting the stream.

GRPC status codes can be used for better visibility and monitoring. For more details, see [gRPC Status Codes](https://grpc.io/docs/guides/status-codes/)

For client-side documentation, see [Client-Side GRPC Status Codes](https://github.com/triton-inference-server/client/tree/main#GRPC-Status-Codes)

### Limit Endpoint Access (BETA)

Triton users may want to restrict access to protocols or APIs that are
Expand Down
130 changes: 130 additions & 0 deletions qa/L0_backend_python/lifecycle/lifecycle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
sys.path.append("../../common")

import queue
import threading
import time
import unittest
from functools import partial
Expand Down Expand Up @@ -241,6 +242,135 @@ def test_infer_pymodel_error(self):
initial_metrics_value,
)

# Test grpc stream behavior when triton_grpc_error is set to true.
# Expected to close stream and return GRPC error when model returns error.
def test_triton_grpc_error_error_on(self):
model_name = "execute_grpc_error"
shape = [2, 2]
number_of_requests = 2
user_data = UserData()
triton_client = grpcclient.InferenceServerClient(f"{_tritonserver_ipaddr}:8001")
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data), headers=metadata
)
stream_end = False
for i in range(number_of_requests):
input_data = np.random.randn(*shape).astype(np.float32)
inputs = [
grpcclient.InferInput(
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
try:
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
result = user_data._completed_requests.get()
if type(result) == InferenceServerException:
# execute_grpc_error intentionally returns error with StatusCode.INTERNAL status on 2nd request
self.assertEqual(str(result.status()), "StatusCode.INTERNAL")
stream_end = True
else:
# Stream is not killed
output_data = result.as_numpy("OUT")
self.assertIsNotNone(output_data, "error: expected 'OUT'")
except Exception as e:
if stream_end == True:
# We expect the stream to have closed
self.assertTrue(
True,
"This should always pass as cancellation should succeed",
)
else:
self.assertFalse(
True, "Unexpected Stream killed without Error from CORE"
)

# Test grpc stream behavior when triton_grpc_error is set to true in multiple open streams.
# Expected to close stream and return GRPC error when model returns error.
def test_triton_grpc_error_multithreaded(self):
thread1 = threading.Thread(target=self.test_triton_grpc_error_error_on)
thread2 = threading.Thread(target=self.test_triton_grpc_error_error_on)
# Start the threads
thread1.start()
thread2.start()
# Wait for both threads to finish
thread1.join()
thread2.join()

# Test grpc stream behavior when triton_grpc_error is set to true and subsequent stream is cancelled.
# Expected cancellation is successful.
def test_triton_grpc_error_cancel(self):
model_name = "execute_grpc_error"
shape = [2, 2]
number_of_requests = 1
user_data = UserData()
triton_server_url = "localhost:8001" # Replace with your Triton server address
stream_end = False
triton_client = grpcclient.InferenceServerClient(triton_server_url)

metadata = {"triton_grpc_error": "true"}

triton_client.start_stream(
callback=partial(callback, user_data), headers=metadata
)

for i in range(number_of_requests):
input_data = np.random.randn(*shape).astype(np.float32)
inputs = [
grpcclient.InferInput(
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
try:
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
result = user_data._completed_requests.get()
if type(result) == InferenceServerException:
stream_end = True
if i == 0:
triton_client.stop_stream(cancel_requests=True)
except Exception as e:
if stream_end == True:
# We expect the stream to have closed
self.assertTrue(
True,
"This should always pass as cancellation should succeed",
)
else:
self.assertFalse(
True, "Unexpected Stream killed without Error from CORE"
)
self.assertTrue(
True,
"This should always pass as cancellation should succeed without any exception",
)

# Test grpc stream behavior when triton_grpc_error is set to false
# and subsequent stream is NOT closed when error is reported from CORE
def test_triton_grpc_error_error_off(self):
model_name = "execute_grpc_error"
shape = [2, 2]
number_of_requests = 4
response_counter = 0
user_data = UserData()
triton_client = grpcclient.InferenceServerClient(f"{_tritonserver_ipaddr}:8001")
triton_client.start_stream(callback=partial(callback, user_data))
for i in range(number_of_requests):
input_data = np.random.randn(*shape).astype(np.float32)
inputs = [
grpcclient.InferInput(
"IN", input_data.shape, np_to_triton_dtype(input_data.dtype)
)
]
inputs[0].set_data_from_numpy(input_data)
triton_client.async_stream_infer(model_name=model_name, inputs=inputs)
_ = user_data._completed_requests.get()
response_counter += 1
# we expect response_counter == number_of_requests,
# which indicates that after the first reported grpc error stream did NOT close and mode != triton_grpc_error
self.assertEqual(response_counter, number_of_requests)


if __name__ == "__main__":
unittest.main()
8 changes: 8 additions & 0 deletions qa/L0_backend_python/lifecycle/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ cp ../../python_models/execute_error/config.pbtxt ./models/execute_error/
sed -i "s/^max_batch_size:.*/max_batch_size: 8/" config.pbtxt && \
echo "dynamic_batching { preferred_batch_size: [8], max_queue_delay_microseconds: 12000000 }" >> config.pbtxt)

mkdir -p models/execute_grpc_error/1/
cp ../../python_models/execute_grpc_error/model.py ./models/execute_grpc_error/1/
cp ../../python_models/execute_grpc_error/config.pbtxt ./models/execute_grpc_error/
(cd models/execute_grpc_error && \
sed -i "s/^name:.*/name: \"execute_grpc_error\"/" config.pbtxt && \
sed -i "s/^max_batch_size:.*/max_batch_size: 8/" config.pbtxt && \
echo "dynamic_batching { preferred_batch_size: [8], max_queue_delay_microseconds: 1200000 }" >> config.pbtxt)

mkdir -p models/execute_return_error/1/
cp ../../python_models/execute_return_error/model.py ./models/execute_return_error/1/
cp ../../python_models/execute_return_error/config.pbtxt ./models/execute_return_error/
Expand Down
16 changes: 14 additions & 2 deletions qa/L0_decoupled/decoupled_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ def _stream_infer_with_params(
url="localhost:8001", verbose=True
) as triton_client:
# Establish stream
triton_client.start_stream(callback=partial(callback, user_data))
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data), headers=metadata
)
else:
triton_client.start_stream(callback=partial(callback, user_data))
# Send specified many requests in parallel
for i in range(request_count):
time.sleep((request_delay / 1000))
Expand Down Expand Up @@ -175,7 +181,13 @@ def _stream_infer(
url="localhost:8001", verbose=True
) as triton_client:
# Establish stream
triton_client.start_stream(callback=partial(callback, user_data))
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data), headers=metadata
)
else:
triton_client.start_stream(callback=partial(callback, user_data))
# Send specified many requests in parallel
for i in range(request_count):
time.sleep((request_delay / 1000))
Expand Down
2 changes: 1 addition & 1 deletion qa/L0_decoupled/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,4 @@ else
echo -e "\n***\n*** Test Failed\n***"
fi

exit $RET
exit $RET
42 changes: 33 additions & 9 deletions qa/L0_grpc_state_cleanup/cleanup_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,17 @@ def _stream_infer_with_params(
url="localhost:8001", verbose=True
) as triton_client:
# Establish stream
triton_client.start_stream(
callback=partial(callback, user_data), stream_timeout=stream_timeout
)
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data),
stream_timeout=stream_timeout,
headers=metadata,
)
else:
triton_client.start_stream(
callback=partial(callback, user_data), stream_timeout=stream_timeout
)
# Send specified many requests in parallel
for i in range(request_count):
time.sleep((request_delay / 1000))
Expand Down Expand Up @@ -229,9 +237,17 @@ def _stream_infer(
url="localhost:8001", verbose=True
) as triton_client:
# Establish stream
triton_client.start_stream(
callback=partial(callback, user_data), stream_timeout=stream_timeout
)
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
triton_client.start_stream(
callback=partial(callback, user_data),
stream_timeout=stream_timeout,
headers=metadata,
)
else:
triton_client.start_stream(
callback=partial(callback, user_data), stream_timeout=stream_timeout
)
# Send specified many requests in parallel
for i in range(request_count):
time.sleep((request_delay / 1000))
Expand Down Expand Up @@ -608,9 +624,17 @@ def test_non_decoupled_streaming_multi_response(self):
url="localhost:8001", verbose=True
) as client:
# Establish stream
client.start_stream(
callback=partial(callback, user_data), stream_timeout=16
)
if "TRITONSERVER_GRPC_STATUS_FLAG" in os.environ:
metadata = {"triton_grpc_error": "true"}
client.start_stream(
callback=partial(callback, user_data),
stream_timeout=16,
headers=metadata,
)
else:
client.start_stream(
callback=partial(callback, user_data), stream_timeout=16
)
# Send a request
client.async_stream_infer(
model_name=self.repeat_non_decoupled_model_name,
Expand Down
51 changes: 51 additions & 0 deletions qa/python_models/execute_grpc_error/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

backend: "python"
max_batch_size: 64

input [
{
name: "IN"
data_type: TYPE_FP32
dims: [ -1 ]
}
]

output [
{
name: "OUT"
data_type: TYPE_FP32
dims: [ -1 ]
}
]

instance_group [
{
count: 1
kind : KIND_CPU
}
]
52 changes: 52 additions & 0 deletions qa/python_models/execute_grpc_error/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import triton_python_backend_utils as pb_utils


class TritonPythonModel:
def __init__(self):
# Maintain total inference count, so as to return error on 2nd request, all of this to simulate model failure
self.inf_count = 1

def execute(self, requests):
"""This function is called on inference request."""
responses = []

# Generate the error for the second request
for request in requests:
input_tensor = pb_utils.get_input_tensor_by_name(request, "IN")
out_tensor = pb_utils.Tensor("OUT", input_tensor.as_numpy())
if self.inf_count % 2:
# Every odd request is success
responses.append(pb_utils.InferenceResponse([out_tensor]))
else:
# Every even request is failure
error = pb_utils.TritonError("An error occurred during execution")
responses.append(pb_utils.InferenceResponse([out_tensor], error))
self.inf_count += 1

return responses
Loading
Loading