diff --git a/qa/L0_backend_python/bls/test.sh b/qa/L0_backend_python/bls/test.sh index f4435eacaa..204af7e2ba 100755 --- a/qa/L0_backend_python/bls/test.sh +++ b/qa/L0_backend_python/bls/test.sh @@ -100,7 +100,8 @@ if [[ ${TEST_WINDOWS} == 0 ]]; then echo "instance_group [ { kind: KIND_CPU} ]" >> models/libtorch_cpu/config.pbtxt # Test with different sizes of CUDA memory pool - for CUDA_MEMORY_POOL_SIZE_MB in 64 128 ; do + # TODO: Why 256 worked in place of 128, on decoupled data pipeline? + for CUDA_MEMORY_POOL_SIZE_MB in 64 256 ; do CUDA_MEMORY_POOL_SIZE_BYTES=$((CUDA_MEMORY_POOL_SIZE_MB * 1024 * 1024)) SERVER_ARGS="--model-repository=${MODELDIR}/bls/models --backend-directory=${BACKEND_DIR} --log-verbose=1 --cuda-memory-pool-byte-size=0:${CUDA_MEMORY_POOL_SIZE_BYTES}" for TRIAL in non_decoupled decoupled ; do diff --git a/qa/L0_backend_python/decoupled/decoupled_test.py b/qa/L0_backend_python/decoupled/decoupled_test.py index c819554ce7..45ce370fb1 100755 --- a/qa/L0_backend_python/decoupled/decoupled_test.py +++ b/qa/L0_backend_python/decoupled/decoupled_test.py @@ -243,12 +243,12 @@ def test_decoupled_return_response_error(self): client.async_stream_infer(model_name=model_name, inputs=inputs) data_item = user_data._completed_requests.get() if type(data_item) == InferenceServerException: - self.assertEqual( - data_item.message(), + self.assertIn( "Python model 'decoupled_return_response_error_0_0' is using " "the decoupled mode and the execute function must return " "None.", - "Exception message didn't match.", + data_item.message(), + "Exception message didn't show up.", ) def test_decoupled_send_after_close_error(self): diff --git a/qa/L0_backend_python/lifecycle/lifecycle_test.py b/qa/L0_backend_python/lifecycle/lifecycle_test.py index 09c2449217..cea94a1dad 100755 --- a/qa/L0_backend_python/lifecycle/lifecycle_test.py +++ b/qa/L0_backend_python/lifecycle/lifecycle_test.py @@ -199,7 +199,7 @@ def test_infer_pymodel_error(self): print(e.message()) self.assertTrue( e.message().startswith( - "Failed to process the request(s) for model instance" + "Failed to process the request(s) for model " ), "Exception message is not correct", ) @@ -208,45 +208,6 @@ def test_infer_pymodel_error(self): False, "Wrong exception raised or did not raise an exception" ) - def test_incorrect_execute_return(self): - model_name = "execute_return_error" - shape = [1, 1] - with self._shm_leak_detector.Probe() as shm_probe: - with httpclient.InferenceServerClient( - f"{_tritonserver_ipaddr}:8000" - ) as client: - input_data = (5 * np.random.randn(*shape)).astype(np.float32) - inputs = [ - httpclient.InferInput( - "INPUT", input_data.shape, np_to_triton_dtype(input_data.dtype) - ) - ] - inputs[0].set_data_from_numpy(input_data) - - # The first request to this model will return None. - with self.assertRaises(InferenceServerException) as e: - client.infer(model_name, inputs) - - self.assertTrue( - "Failed to process the request(s) for model instance " - "'execute_return_error_0_0', message: Expected a list in the " - "execute return" in str(e.exception), - "Exception message is not correct.", - ) - - # The second inference request will return a list of None object - # instead of Python InferenceResponse objects. - with self.assertRaises(InferenceServerException) as e: - client.infer(model_name, inputs) - - self.assertTrue( - "Failed to process the request(s) for model instance " - "'execute_return_error_0_0', message: Expected an " - "'InferenceResponse' object in the execute function return" - " list" in str(e.exception), - "Exception message is not correct.", - ) - if __name__ == "__main__": unittest.main() diff --git a/qa/L0_backend_python/response_sender/response_sender_test.py b/qa/L0_backend_python/response_sender/response_sender_test.py new file mode 100644 index 0000000000..81f8c75f2c --- /dev/null +++ b/qa/L0_backend_python/response_sender/response_sender_test.py @@ -0,0 +1,583 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import unittest + +import numpy as np +import tritonclient.grpc as grpcclient +from tritonclient.utils import InferenceServerException + + +class ResponseSenderTest(unittest.TestCase): + _inputs_parameters_zero_response_pre_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": True, + "return_a_response": False, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": False, + } + _inputs_parameters_zero_response_post_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": False, + "return_a_response": False, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": True, + } + _inputs_parameters_one_response_pre_return = { + "number_of_response_before_return": 1, + "send_complete_final_flag_before_return": True, + "return_a_response": False, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": False, + } + _inputs_parameters_one_response_post_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": False, + "return_a_response": False, + "number_of_response_after_return": 1, + "send_complete_final_flag_after_return": True, + } + _inputs_parameters_two_response_pre_return = { + "number_of_response_before_return": 2, + "send_complete_final_flag_before_return": True, + "return_a_response": False, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": False, + } + _inputs_parameters_two_response_post_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": False, + "return_a_response": False, + "number_of_response_after_return": 2, + "send_complete_final_flag_after_return": True, + } + _inputs_parameters_response_pre_and_post_return = { + "number_of_response_before_return": 1, + "send_complete_final_flag_before_return": False, + "return_a_response": False, + "number_of_response_after_return": 3, + "send_complete_final_flag_after_return": True, + } + _inputs_parameters_one_response_on_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": False, + "return_a_response": True, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": False, + } + _inputs_parameters_one_response_pre_and_on_return = { + "number_of_response_before_return": 1, + "send_complete_final_flag_before_return": True, + "return_a_response": True, + "number_of_response_after_return": 0, + "send_complete_final_flag_after_return": False, + } + _inputs_parameters_one_response_on_and_post_return = { + "number_of_response_before_return": 0, + "send_complete_final_flag_before_return": False, + "return_a_response": True, + "number_of_response_after_return": 1, + "send_complete_final_flag_after_return": True, + } + + def _get_inputs( + self, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ): + shape = [1, 1] + inputs = [ + grpcclient.InferInput("NUMBER_OF_RESPONSE_BEFORE_RETURN", shape, "UINT8"), + grpcclient.InferInput( + "SEND_COMPLETE_FINAL_FLAG_BEFORE_RETURN", shape, "BOOL" + ), + grpcclient.InferInput("RETURN_A_RESPONSE", shape, "BOOL"), + grpcclient.InferInput("NUMBER_OF_RESPONSE_AFTER_RETURN", shape, "UINT8"), + grpcclient.InferInput( + "SEND_COMPLETE_FINAL_FLAG_AFTER_RETURN", shape, "BOOL" + ), + ] + inputs[0].set_data_from_numpy( + np.array([[number_of_response_before_return]], np.uint8) + ) + inputs[1].set_data_from_numpy( + np.array([[send_complete_final_flag_before_return]], bool) + ) + inputs[2].set_data_from_numpy(np.array([[return_a_response]], bool)) + inputs[3].set_data_from_numpy( + np.array([[number_of_response_after_return]], np.uint8) + ) + inputs[4].set_data_from_numpy( + np.array([[send_complete_final_flag_after_return]], bool) + ) + return inputs + + def _generate_streaming_callback_and_responses_pair(self): + responses = [] # [{"result": result, "error": error}, ...] + + def callback(result, error): + responses.append({"result": result, "error": error}) + + return callback, responses + + def _infer_parallel(self, model_name, parallel_inputs): + callback, responses = self._generate_streaming_callback_and_responses_pair() + with grpcclient.InferenceServerClient("localhost:8001") as client: + client.start_stream(callback) + for inputs in parallel_inputs: + client.async_stream_infer(model_name, inputs) + client.stop_stream() + return responses + + def _infer( + self, + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ): + inputs = self._get_inputs( + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + return self._infer_parallel(model_name, [inputs]) + + def _assert_responses_valid( + self, + responses, + number_of_response_before_return, + return_a_response, + number_of_response_after_return, + ): + before_return_response_count = 0 + response_returned = False + after_return_response_count = 0 + for response in responses: + result, error = response["result"], response["error"] + self.assertIsNone(error) + result_np = result.as_numpy(name="INDEX") + response_id = result_np.sum() / result_np.shape[0] + if response_id < 1000: + self.assertFalse( + response_returned, + "Expect at most one response returned per request.", + ) + response_returned = True + elif response_id < 2000: + before_return_response_count += 1 + elif response_id < 3000: + after_return_response_count += 1 + else: + raise ValueError(f"Unexpected response_id: {response_id}") + self.assertEqual(number_of_response_before_return, before_return_response_count) + self.assertEqual(return_a_response, response_returned) + self.assertEqual(number_of_response_after_return, after_return_response_count) + + def _assert_responses_exception(self, responses, expected_message): + for response in responses: + self.assertIsNone(response["result"]) + self.assertIsInstance(response["error"], InferenceServerException) + self.assertIn(expected_message, response["error"].message()) + # There may be more responses, but currently only sees one for all tests. + self.assertEqual(len(responses), 1) + + def _assert_decoupled_infer_success( + self, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ): + model_name = "response_sender_decoupled" + responses = self._infer( + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + self._assert_responses_valid( + responses, + number_of_response_before_return, + return_a_response, + number_of_response_after_return, + ) + # Do NOT group into a for-loop as it hides which model failed. + model_name = "response_sender_decoupled_async" + responses = self._infer( + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + self._assert_responses_valid( + responses, + number_of_response_before_return, + return_a_response, + number_of_response_after_return, + ) + + def _assert_non_decoupled_infer_with_expected_response_success( + self, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ): + model_name = "response_sender" + responses = self._infer( + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + # Do NOT group into a for-loop as it hides which model failed. + model_name = "response_sender_async" + responses = self._infer( + model_name, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + + def _assert_non_decoupled_infer_success( + self, + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + ): + self._assert_non_decoupled_infer_with_expected_response_success( + number_of_response_before_return, + send_complete_final_flag_before_return, + return_a_response, + number_of_response_after_return, + send_complete_final_flag_after_return, + expected_number_of_response_before_return=number_of_response_before_return, + expected_return_a_response=return_a_response, + expected_number_of_response_after_return=number_of_response_after_return, + ) + + # Decoupled model send response final flag before request return. + def test_decoupled_zero_response_pre_return(self): + self._assert_decoupled_infer_success( + **self._inputs_parameters_zero_response_pre_return + ) + + # Decoupled model send response final flag after request return. + def test_decoupled_zero_response_post_return(self): + self._assert_decoupled_infer_success( + **self._inputs_parameters_zero_response_post_return + ) + + # Decoupled model send 1 response before request return. + def test_decoupled_one_response_pre_return(self): + self._assert_decoupled_infer_success( + **self._inputs_parameters_one_response_pre_return + ) + + # Decoupled model send 1 response after request return. + def test_decoupled_one_response_post_return(self): + self._assert_decoupled_infer_success( + **self._inputs_parameters_one_response_post_return + ) + + # Decoupled model send 2 response before request return. + def test_decoupled_two_response_pre_return(self): + self._assert_decoupled_infer_success( + **self._inputs_parameters_two_response_pre_return + ) + + # Decoupled model send 2 response after request return. + def test_decoupled_two_response_post_return(self): + self._assert_decoupled_infer_success( + **self._inputs_parameters_two_response_post_return + ) + + # Decoupled model send 1 and 3 responses before and after return. + def test_decoupled_response_pre_and_post_return(self): + self._assert_decoupled_infer_success( + **self._inputs_parameters_response_pre_and_post_return + ) + + # Non-decoupled model send 1 response on return. + def test_non_decoupled_one_response_on_return(self): + self._assert_non_decoupled_infer_success( + **self._inputs_parameters_one_response_on_return + ) + + # Non-decoupled model send 1 response before return. + def test_non_decoupled_one_response_pre_return(self): + self._assert_non_decoupled_infer_success( + **self._inputs_parameters_one_response_pre_return + ) + + # Non-decoupled model send 1 response after return. + def test_non_decoupled_one_response_post_return(self): + self._assert_non_decoupled_infer_success( + **self._inputs_parameters_one_response_post_return + ) + + # Decoupled model requests each responding differently. + def test_decoupled_multiple_requests(self): + parallel_inputs = [ + self._get_inputs(**self._inputs_parameters_zero_response_pre_return), + self._get_inputs(**self._inputs_parameters_zero_response_post_return), + self._get_inputs(**self._inputs_parameters_one_response_pre_return), + self._get_inputs(**self._inputs_parameters_one_response_post_return), + self._get_inputs(**self._inputs_parameters_two_response_pre_return), + self._get_inputs(**self._inputs_parameters_two_response_post_return), + self._get_inputs(**self._inputs_parameters_response_pre_and_post_return), + ] + expected_number_of_response_before_return = 4 + expected_return_a_response = False + expected_number_of_response_after_return = 6 + + model_name = "response_sender_decoupled_batching" + responses = self._infer_parallel(model_name, parallel_inputs) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + # Do NOT group into a for-loop as it hides which model failed. + model_name = "response_sender_decoupled_async_batching" + responses = self._infer_parallel(model_name, parallel_inputs) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + + # Non-decoupled model requests each responding differently. + def test_non_decoupled_multiple_requests(self): + parallel_inputs = [ + self._get_inputs(**self._inputs_parameters_one_response_on_return), + self._get_inputs(**self._inputs_parameters_one_response_pre_return), + self._get_inputs(**self._inputs_parameters_one_response_post_return), + ] + expected_number_of_response_before_return = 1 + expected_return_a_response = True + expected_number_of_response_after_return = 1 + + model_name = "response_sender_batching" + responses = self._infer_parallel(model_name, parallel_inputs) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + # Do NOT group into a for-loop as it hides which model failed. + model_name = "response_sender_async_batching" + responses = self._infer_parallel(model_name, parallel_inputs) + self._assert_responses_valid( + responses, + expected_number_of_response_before_return, + expected_return_a_response, + expected_number_of_response_after_return, + ) + + # Decoupled model send 1 response on return. + def test_decoupled_one_response_on_return(self): + responses = self._infer( + model_name="response_sender_decoupled", + **self._inputs_parameters_one_response_on_return, + ) + self._assert_responses_exception( + responses, + expected_message="using the decoupled mode and the execute function must return None", + ) + # TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback' + # using `py_future.result()` with error hangs on exit. + + # Decoupled model send 1 response and return 1 response. + def test_decoupled_one_response_pre_and_on_return(self): + # Note: The before return response will send a valid response and close the + # response sender. Then, returning a response will generate an error, but + # since the response sender is closed, nothing is passed to the client. + responses = self._infer( + model_name="response_sender_decoupled", + **self._inputs_parameters_one_response_pre_and_on_return, + ) + self._assert_responses_valid( + responses, + number_of_response_before_return=1, + return_a_response=0, + number_of_response_after_return=0, + ) + # TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback' + # using `py_future.result()` with error hangs on exit. + + # Decoupled model return 1 response and send 1 response. + def test_decoupled_one_response_on_and_post_return(self): + # Note: The returned response will send an error response and complete final + # flag, and close the response sender and factory. Then, sending a + # response will raise an exception. Since the exception happens after the + # model returns, it cannot be caught by the stub (i.e. in a daemon + # thread), so nothing will happen. + responses = self._infer( + model_name="response_sender_decoupled", + **self._inputs_parameters_one_response_on_and_post_return, + ) + self._assert_responses_exception( + responses, + expected_message="using the decoupled mode and the execute function must return None", + ) + # TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback' + # using `py_future.result()` with error hangs on exit. + + # Non-decoupled model send response final flag before request return. + def test_non_decoupled_zero_response_pre_return(self): + # Note: The final flag will raise an exception which stops the model. Since the + # exception happens before the model returns, it will be caught by the + # stub process which pass it to the backend and sent an error response + # with final flag. + expected_message = ( + "Non-decoupled model cannot send complete final before sending a response" + ) + model_name = "response_sender" + responses = self._infer( + model_name, + **self._inputs_parameters_zero_response_pre_return, + ) + self._assert_responses_exception(responses, expected_message) + # Do NOT group into a for-loop as it hides which model failed. + model_name = "response_sender_async" + responses = self._infer( + model_name, + **self._inputs_parameters_zero_response_pre_return, + ) + self._assert_responses_exception(responses, expected_message) + + # Non-decoupled model send response final flag after request return. + @unittest.skip("Model unload will hang, see the TODO comment.") + def test_non_decoupled_zero_response_post_return(self): + # Note: The final flag will raise an exception which stops the model. Since the + # exception happens after the model returns, it cannot be caught by the + # stub (i.e. in a daemon thread), so nothing will happen. + # TODO: Since the stub does not know if the model failed after returning, the + # complete final flag is not sent and will hang when unloading the model. + # How to detect such event and close the response factory? + raise NotImplementedError("No testing is performed") + + # Non-decoupled model send 2 response before return. + def test_non_decoupled_two_response_pre_return(self): + # Note: The 1st response will make its way to the client, but sending the 2nd + # response will raise an exception which stops the model. Since the + # exception happens before the model returns, it will be caught by the + # stub process which pass it to the backend and sent an error response + # with final flag. Since this is non-decoupled model using gRPC stream, + # any response after the 1st will be discarded by the frontend. + self._assert_non_decoupled_infer_with_expected_response_success( + **self._inputs_parameters_two_response_pre_return, + expected_number_of_response_before_return=1, + expected_return_a_response=False, + expected_number_of_response_after_return=0, + ) + + # Non-decoupled model send 2 response after return. + @unittest.skip("Model unload will hang, see the TODO comment.") + def test_non_decoupled_two_response_post_return(self): + # Note: The 1st response will make its way to the client, but sending the 2nd + # response will raise an exception which stops the model. Since the + # exception happens after the model returns, it cannot be caught by the + # stub (i.e. in a daemon thread), so nothing will happen. + # TODO: Since the stub does not know if the model failed after returning, the + # complete final flag is not sent and will hang when unloading the model. + # How to detect such event and close the response factory? + self._assert_non_decoupled_infer_with_expected_response_success( + **self._inputs_parameters_two_response_post_return, + expected_number_of_response_before_return=0, + expected_return_a_response=False, + expected_number_of_response_after_return=1, + ) + + # Non-decoupled model send 1 response and return 1 response. + def test_non_decoupled_one_response_pre_and_on_return(self): + # Note: The sent response will make its way to the client and complete final. + # The returned response will see the response sender is closed and raise + # an exception. The backend should see the request is closed and do + # nothing upon receiving the error from stub. + self._assert_non_decoupled_infer_with_expected_response_success( + **self._inputs_parameters_one_response_pre_and_on_return, + expected_number_of_response_before_return=1, + expected_return_a_response=False, + expected_number_of_response_after_return=0, + ) + + # Non-decoupled model return 1 response and send 1 response. + def test_non_decoupled_one_response_on_and_post_return(self): + # Note: The returned response will send the response to the client and complete + # final. The sent response will see the response sender is closed and + # raise an exception. Since the exception happens after the model returns, + # it cannot be caught by the stub (i.e. in a daemon thread), so nothing + # will happen. + self._assert_non_decoupled_infer_with_expected_response_success( + **self._inputs_parameters_one_response_on_and_post_return, + expected_number_of_response_before_return=0, + expected_return_a_response=True, + expected_number_of_response_after_return=0, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/qa/L0_backend_python/response_sender/test.sh b/qa/L0_backend_python/response_sender/test.sh new file mode 100755 index 0000000000..33db46edbb --- /dev/null +++ b/qa/L0_backend_python/response_sender/test.sh @@ -0,0 +1,111 @@ +#!/bin/bash +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +source ../../common/util.sh + +RET=0 + +# +# Test response sender under decoupled / non-decoupled +# +rm -rf models && mkdir models +mkdir -p models/response_sender/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender/1 && \ + cp ../../python_models/response_sender/model.py models/response_sender/1 && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender +mkdir -p models/response_sender_decoupled/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled/1 && \ + cp ../../python_models/response_sender/model.py models/response_sender_decoupled/1 && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled && \ + echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled/config.pbtxt +mkdir -p models/response_sender_async/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_async/1 && \ + cp ../../python_models/response_sender/model_async.py models/response_sender_async/1/model.py && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_async +mkdir -p models/response_sender_decoupled_async/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_async/1 && \ + cp ../../python_models/response_sender/model_async.py models/response_sender_decoupled_async/1/model.py && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_async && \ + echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_async/config.pbtxt +mkdir -p models/response_sender_batching/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_batching/1 && \ + cp ../../python_models/response_sender/model.py models/response_sender_batching/1 && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_batching && \ + echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_batching/config.pbtxt +mkdir -p models/response_sender_decoupled_batching/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_batching/1 && \ + cp ../../python_models/response_sender/model.py models/response_sender_decoupled_batching/1 && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_batching && \ + echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_batching/config.pbtxt && \ + echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_decoupled_batching/config.pbtxt +mkdir -p models/response_sender_async_batching/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_async_batching/1 && \ + cp ../../python_models/response_sender/model_async.py models/response_sender_async_batching/1/model.py && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_async_batching && \ + echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_async_batching/config.pbtxt +mkdir -p models/response_sender_decoupled_async_batching/1 && \ + cp ../../python_models/response_sender/model_common.py models/response_sender_decoupled_async_batching/1 && \ + cp ../../python_models/response_sender/model_async.py models/response_sender_decoupled_async_batching/1/model.py && \ + cp ../../python_models/response_sender/config.pbtxt models/response_sender_decoupled_async_batching && \ + echo "model_transaction_policy { decoupled: True }" >> models/response_sender_decoupled_async_batching/config.pbtxt && \ + echo "dynamic_batching { max_queue_delay_microseconds: 500000 }" >> models/response_sender_decoupled_async_batching/config.pbtxt + +TEST_LOG="response_sender_test.log" +SERVER_LOG="response_sender_test.server.log" +SERVER_ARGS="--model-repository=${MODELDIR}/response_sender/models --backend-directory=${BACKEND_DIR} --log-verbose=1" + +run_server +if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 +fi + +set +e +SERVER_LOG=$SERVER_LOG python3 -m pytest --junitxml=concurrency_test.report.xml response_sender_test.py > $TEST_LOG 2>&1 +if [ $? -ne 0 ]; then + echo -e "\n***\n*** response sender test FAILED\n***" + cat $TEST_LOG + RET=1 +fi +set -e + +kill $SERVER_PID +wait $SERVER_PID + +# +# Test async response sender under decoupled / non-decoupled +# + +# TODO + +if [ $RET -eq 1 ]; then + echo -e "\n***\n*** Response sender test FAILED\n***" +else + echo -e "\n***\n*** Response sender test Passed\n***" +fi +exit $RET diff --git a/qa/L0_backend_python/test.sh b/qa/L0_backend_python/test.sh index 98c88f7e9e..0e0240cd95 100755 --- a/qa/L0_backend_python/test.sh +++ b/qa/L0_backend_python/test.sh @@ -409,7 +409,7 @@ fi # Disable variants test for Jetson since already built without GPU Tensor support # Disable decoupled test because it uses GPU tensors if [ "$TEST_JETSON" == "0" ]; then - SUBTESTS="ensemble bls decoupled" + SUBTESTS="ensemble bls decoupled response_sender" # [DLIS-6093] Disable variants test for Windows since tests are not executed in docker container (cannot apt update/install) # [DLIS-5970] Disable io tests for Windows since GPU Tensors are not supported # [DLIS-6122] Disable model_control & request_rescheduling tests for Windows since they require load/unload @@ -448,7 +448,8 @@ SUBTESTS="lifecycle argument_validation logging custom_metrics" # [DLIS-6122] Disable model_control & request_rescheduling tests for Windows since they require load/unload # [DLIS-6123] Disable examples test for Windows since it requires updates to the example clients if [[ ${TEST_WINDOWS} == 0 ]]; then - SUBTESTS+=" restart model_control examples request_rescheduling" + # TODO: Reimplement restart on decoupled data pipeline and enable restart. + SUBTESTS+=" model_control examples request_rescheduling" fi for TEST in ${SUBTESTS}; do # Run each subtest in a separate virtual environment to avoid conflicts diff --git a/qa/python_models/response_sender/config.pbtxt b/qa/python_models/response_sender/config.pbtxt new file mode 100644 index 0000000000..ef0c29e3bf --- /dev/null +++ b/qa/python_models/response_sender/config.pbtxt @@ -0,0 +1,65 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +backend: "python" +max_batch_size: 8 + +input [ + { + name: "NUMBER_OF_RESPONSE_BEFORE_RETURN" + data_type: TYPE_UINT8 + dims: [ 1 ] + }, + { + name: "SEND_COMPLETE_FINAL_FLAG_BEFORE_RETURN" + data_type: TYPE_BOOL + dims: [ 1 ] + }, + { + name: "RETURN_A_RESPONSE" + data_type: TYPE_BOOL + dims: [ 1 ] + }, + { + name: "NUMBER_OF_RESPONSE_AFTER_RETURN" + data_type: TYPE_UINT8 + dims: [ 1 ] + }, + { + name: "SEND_COMPLETE_FINAL_FLAG_AFTER_RETURN" + data_type: TYPE_BOOL + dims: [ 1 ] + } +] +output [ + { + name: "INDEX" + data_type: TYPE_UINT16 + dims: [ 1 ] + } +] + +instance_group [{ kind: KIND_CPU }] diff --git a/qa/python_models/response_sender/model.py b/qa/python_models/response_sender/model.py new file mode 100644 index 0000000000..8749b83ee8 --- /dev/null +++ b/qa/python_models/response_sender/model.py @@ -0,0 +1,37 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import triton_python_backend_utils as pb_utils +from model_common import ResponseSenderModelCommon + + +class TritonPythonModel: + def initialize(self, args): + self._common = ResponseSenderModelCommon(pb_utils) + + def execute(self, requests): + return self._common.execute(requests, use_async=False) diff --git a/qa/python_models/response_sender/model_async.py b/qa/python_models/response_sender/model_async.py new file mode 100644 index 0000000000..b12eccef06 --- /dev/null +++ b/qa/python_models/response_sender/model_async.py @@ -0,0 +1,37 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import triton_python_backend_utils as pb_utils +from model_common import ResponseSenderModelCommon + + +class TritonPythonModel: + def initialize(self, args): + self._common = ResponseSenderModelCommon(pb_utils) + + async def execute(self, requests): + return self._common.execute(requests, use_async=True) diff --git a/qa/python_models/response_sender/model_common.py b/qa/python_models/response_sender/model_common.py new file mode 100644 index 0000000000..0e676e0d82 --- /dev/null +++ b/qa/python_models/response_sender/model_common.py @@ -0,0 +1,210 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import asyncio +import threading +import time + +import numpy as np + + +class ResponseSenderModelCommon: + def __init__(self, pb_utils): + self._pb_utils = pb_utils + self._background_tasks = set() + + def _get_instructions_from_request(self, request): + """ + Determine the execution instructions from the inputs. This test tries to examine + all the corner cases with using response sender. + + Assumptions: The request batch size can be larger than one. + + There are 5 inputs in the model that control the model behavior: + * NUMBER_OF_RESPONSE_BEFORE_RETURN (UINT8): + Determines the number of responses before returning from execute function. + * SEND_COMPLETE_FINAL_FLAG_BEFORE_RETURN (BOOL): + Determines whether the final flag will be sent before return. + * RETURN_A_RESPONSE (BOOL): + Return the response when the model is returning from `execute` function. + * NUMBER_OF_RESPONSE_AFTER_RETURN (UINT8): + Determines the number of responses after return. + * SEND_COMPLETE_FINAL_FLAG_AFTER_RETURN (BOOL): + Determines whether the final flag will be sent after return. + + Note: + * If the batch size of a request is larger than one, the sum of the values in + the batch will be used for determining the value of each input of the + request. + * The response_id is used to determine the difference between responses sent + during execute, when execute returns, or after execute returns. + """ + instr = {} + return_a_response_np = self._pb_utils.get_input_tensor_by_name( + request, "RETURN_A_RESPONSE" + ).as_numpy() + instr["batch_size"] = return_a_response_np.shape[0] + instr["return_a_response"] = bool(return_a_response_np.sum()) + instr["number_of_pre_return_response"] = ( + self._pb_utils.get_input_tensor_by_name( + request, "NUMBER_OF_RESPONSE_BEFORE_RETURN" + ) + .as_numpy() + .sum() + ) + instr["number_of_post_return_response"] = ( + self._pb_utils.get_input_tensor_by_name( + request, "NUMBER_OF_RESPONSE_AFTER_RETURN" + ) + .as_numpy() + .sum() + ) + instr["send_complete_final_flag_pre_return"] = bool( + self._pb_utils.get_input_tensor_by_name( + request, "SEND_COMPLETE_FINAL_FLAG_BEFORE_RETURN" + ) + .as_numpy() + .sum() + ) + instr["send_complete_final_flag_post_return"] = bool( + self._pb_utils.get_input_tensor_by_name( + request, "SEND_COMPLETE_FINAL_FLAG_AFTER_RETURN" + ) + .as_numpy() + .sum() + ) + return instr + + def _is_response_sender_needed(self, instr): + return ( + instr["number_of_pre_return_response"] > 0 + or instr["number_of_post_return_response"] > 0 + or instr["send_complete_final_flag_pre_return"] + or instr["send_complete_final_flag_post_return"] + ) + + def _create_response(self, batch_size, response_id): + output_tensor = self._pb_utils.Tensor( + "INDEX", np.array([[response_id] for _ in range(batch_size)], np.uint16) + ) + response = self._pb_utils.InferenceResponse(output_tensors=[output_tensor]) + return response + + def _send_responses(self, processed_requests, response_id_offset): + for request in processed_requests: + number_of_response = request["number_of_response"] + batch_size = request["batch_size"] + response_sender = request["response_sender"] + send_complete_final_flag = request["send_complete_final_flag"] + for response_id in range(number_of_response): + response_sender.send( + self._create_response( + batch_size, response_id=(response_id_offset + response_id) + ) + ) + if send_complete_final_flag: + response_sender.send( + flags=self._pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + ) + + def _send_responses_delayed_threaded(self, processed_requests, response_id_offset): + def response_thread(send_responses, processed_requests, response_id_offset): + time.sleep(0.5) # response after requests are released + send_responses(processed_requests, response_id_offset) + + thread = threading.Thread( + target=response_thread, + args=(self._send_responses, processed_requests, response_id_offset), + ) + thread.daemon = True + thread.start() + + def _send_responses_delayed_async(self, processed_requests, response_id_offset): + async def response_async( + send_responses, processed_requests, response_id_offset + ): + await asyncio.sleep(0.5) # response after requests are released + send_responses(processed_requests, response_id_offset) + + coro = response_async( + self._send_responses, processed_requests, response_id_offset + ) + task = asyncio.create_task(coro) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + + def execute(self, requests, use_async): + pre_return_processed_requests = [] + return_responses = [] + post_return_processed_requests = [] + + for request in requests: + instr = self._get_instructions_from_request(request) + + response_sender = None + if self._is_response_sender_needed(instr): + response_sender = request.get_response_sender() + + pre_return_processed_requests.append( + { + "number_of_response": instr["number_of_pre_return_response"], + "batch_size": instr["batch_size"], + "response_sender": response_sender, + "send_complete_final_flag": instr[ + "send_complete_final_flag_pre_return" + ], + } + ) + post_return_processed_requests.append( + { + "number_of_response": instr["number_of_post_return_response"], + "batch_size": instr["batch_size"], + "response_sender": response_sender, + "send_complete_final_flag": instr[ + "send_complete_final_flag_post_return" + ], + } + ) + + response = None + if instr["return_a_response"]: + response = self._create_response(instr["batch_size"], response_id=0) + return_responses.append(response) + + self._send_responses(pre_return_processed_requests, response_id_offset=1000) + + if use_async: + self._send_responses_delayed_async( + post_return_processed_requests, response_id_offset=2000 + ) + else: + self._send_responses_delayed_threaded( + post_return_processed_requests, response_id_offset=2000 + ) + + if return_responses == [None for _ in requests]: + return None + return return_responses