From d6ab1efaa9f45d80ff9644bd03e23c8c310cd101 Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Fri, 16 Feb 2024 17:37:55 -0800 Subject: [PATCH] Add response statistics (#6869) * Add response statistics * Add L0_response_statistics * Enable http vs grpc statistics comparison * Add docs for response statistics protocol * Add more comments for response statistics test * Remove model name from config * Improve docs wordings * [Continue] Improve docs wordings * [Continue] Add more comments for response statistics test * [Continue 2] Improve docs wordings * Fix typo * Remove mentioning decoupled from docs * [Continue 3] Improve docs wordings * [Continue 4] Improve docs wordings Co-authored-by: Ryan McCormick --------- Co-authored-by: Ryan McCormick --- docs/protocol/extension_statistics.md | 64 +++++- .../response_statistics_test.py | 202 ++++++++++++++++++ qa/L0_response_statistics/test.sh | 88 ++++++++ src/grpc/grpc_server.cc | 92 +++++++- 4 files changed, 441 insertions(+), 5 deletions(-) create mode 100755 qa/L0_response_statistics/response_statistics_test.py create mode 100755 qa/L0_response_statistics/test.sh diff --git a/docs/protocol/extension_statistics.md b/docs/protocol/extension_statistics.md index 040f165dde..4da4140657 100644 --- a/docs/protocol/extension_statistics.md +++ b/docs/protocol/extension_statistics.md @@ -78,7 +78,8 @@ $model_stat = "inference_count" : $number, "execution_count" : $number, "inference_stats" : $inference_stats, - "batch_stats" : [ $batch_stat, ... ], + "response_stats" : { $string : $response_stats, ... }, + "batch_stats" : [ $batch_stats, ... ], "memory_usage" : [ $memory_usage, ...] } ``` @@ -111,8 +112,15 @@ $model_stat = DOES NOT include cache hits. - "inference_stats" : The aggregate statistics for the - model/version. So, for example, "inference_stats":"success" - indicates the number of successful inference requests for the model. + model. So, for example, "inference_stats":"success" indicates the number of + successful inference requests for the model. + +- "response_stats" : The aggregate response statistics for the model. For + example, { "key" : { "response_stats" : "success" } } indicates the aggregate + statistics of successful responses at "key" for the model, where "key" + identifies each response generated by the model across different requests. For + example, given a model that generates three responses, the keys can be "0", + "1" and "2" identifying the three responses in order. - "batch_stats" : The aggregate statistics for each different batch size that is executed in the model. The batch statistics indicate @@ -180,6 +188,28 @@ $inference_stats = from the response object to the Response Cache. +``` +$response_stats = +{ + "compute_infer" : $duration_stat, + "compute_output" : $duration_stat, + "success" : $duration_stat, + "fail" : $duration_stat, + "empty_response" : $duration_stat +} +``` + +- "compute_infer" : The count and cumulative duration to compute a response. +- "compute_output" : The count and cumulative duration to extract the output + tensor of a computed response. +- "success" : The count and cumulative duration of a success inference. The + duration is the sum of infer and output durations. +- "fail" : The count and cumulative duration of a fail inference. The duration + is the sum of infer and output durations. +- "empty_response" : The count and cumulative duration of an inference with an + empty / no response. The duration is infer durations. + + ``` $batch_stats = { @@ -343,7 +373,7 @@ message ModelStatistics // The "execution_count" value DOES NOT include cache hits. uint64 execution_count = 5; - // The aggregate statistics for the model/version. + // The aggregate statistics for the model. InferStatistics inference_stats = 6; // The aggregate statistics for each different batch size that is @@ -360,6 +390,12 @@ message ModelStatistics // point, the GPU memory usage for models in ONNX Runtime backend and TensorRT // backend is usually aligned. repeated MemoryUsage memory_usage = 8; + + // The key and value pairs for all decoupled responses statistics. The key is + // a string identifying a set of response statistics aggregated together (i.e. + // index of the response sent). The value is the aggregated response + // statistics. + map response_stats = 9; } // Inference statistics. @@ -428,6 +464,26 @@ message InferStatistics StatisticDuration cache_miss = 8; } +// Statistics per decoupled response. +message InferResponseStatistics +{ + // The count and cumulative duration to compute a response. + StatisticDuration compute_infer = 1; + + // The count and cumulative duration to extract the output tensors of a + // response. + StatisticDuration compute_output = 2; + + // The count and cumulative duration for successful responses. + StatisticDuration success = 3; + + // The count and cumulative duration for failed responses. + StatisticDuration fail = 4; + + // The count and cumulative duration for empty responses. + StatisticDuration empty_response = 5; +} + // Inference batch statistics. message InferBatchStatistics { diff --git a/qa/L0_response_statistics/response_statistics_test.py b/qa/L0_response_statistics/response_statistics_test.py new file mode 100755 index 0000000000..b04403bfb3 --- /dev/null +++ b/qa/L0_response_statistics/response_statistics_test.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 + +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import time +import unittest + +import numpy as np +import tritonclient.grpc as grpcclient +import tritonclient.http as httpclient + + +class TestResponseStatistics(unittest.TestCase): + def setUp(self): + self._model_name = "square_int32" + self._min_infer_delay_ns = 400000000 + self._min_output_delay_ns = 200000000 + self._number_of_fail_responses = 2 + self._number_of_empty_responses = 1 + self._statistics_counts = [] + self._grpc_client = grpcclient.InferenceServerClient( + "localhost:8001", verbose=True + ) + self._http_client = httpclient.InferenceServerClient("localhost:8000") + + # Return a coupled (callback, response) pair for gRPC stream infer. + def _generate_streaming_callback_and_response_pair(self): + # [{"result": result, "error": error}, ...] + response = [] + + def callback(result, error): + response.append({"result": result, "error": error}) + + return callback, response + + # Send an infer request and return its responses. 'number_of_responses' is the sum + # of success, fail and empty responses the model should return for this request. + # This function waits until all success and fail responses are received. + def _stream_infer(self, number_of_responses): + callback, responses = self._generate_streaming_callback_and_response_pair() + self._grpc_client.start_stream(callback) + input_data = np.array([number_of_responses], dtype=np.int32) + inputs = [grpcclient.InferInput("IN", input_data.shape, "INT32")] + inputs[0].set_data_from_numpy(input_data) + outputs = [grpcclient.InferRequestedOutput("OUT")] + self._grpc_client.async_stream_infer( + model_name=self._model_name, inputs=inputs, outputs=outputs + ) + while len(responses) < (number_of_responses - self._number_of_empty_responses): + time.sleep(0.1) # poll until all expected responses are received + self._grpc_client.stop_stream() + return responses + + # Update expected statistics counts for the response at 'current_index'. + # 'number_of_responses' is the sum of success, fail and empty responses expected + # from this inference request. + def _update_statistics_counts(self, current_index, number_of_responses): + if current_index >= len(self._statistics_counts): + self._statistics_counts.append( + { + "compute_infer": 0, + "compute_output": 0, + "success": 0, + "fail": 0, + "empty_response": 0, + } + ) + if ( + current_index + + self._number_of_fail_responses + + self._number_of_empty_responses + < number_of_responses + ): + # success + self._statistics_counts[current_index]["compute_infer"] += 1 + self._statistics_counts[current_index]["compute_output"] += 1 + self._statistics_counts[current_index]["success"] += 1 + elif current_index + self._number_of_empty_responses < number_of_responses: + # fail + self._statistics_counts[current_index]["compute_infer"] += 1 + self._statistics_counts[current_index]["compute_output"] += 1 + self._statistics_counts[current_index]["fail"] += 1 + else: + # empty + self._statistics_counts[current_index]["compute_infer"] += 1 + self._statistics_counts[current_index]["empty_response"] += 1 + + # Check the 'response_stats' at 'current_index' for 'stats_name' is valid. + def _check_statistics_count_and_duration( + self, response_stats, current_index, stats_name + ): + expected_count = self._statistics_counts[current_index][stats_name] + if stats_name == "compute_infer" or stats_name == "empty_response": + delay_ns = self._min_infer_delay_ns + elif stats_name == "compute_output": + delay_ns = self._min_output_delay_ns + else: # success or fail + delay_ns = self._min_infer_delay_ns + self._min_output_delay_ns + upper_bound_ns = 1.1 * delay_ns * expected_count + lower_bound_ns = 0.9 * delay_ns * expected_count + stats = response_stats[str(current_index)][stats_name] + self.assertEqual(stats["count"], expected_count) + self.assertLessEqual(stats["ns"], upper_bound_ns) + self.assertGreaterEqual(stats["ns"], lower_bound_ns) + + # Fetch and return the response statistics from both gRPC and HTTP endpoints, and + # check they are equivalent before returning. + def _get_response_statistics(self): + # http response statistics + statistics_http = self._http_client.get_inference_statistics( + model_name=self._model_name + ) + model_stats_http = statistics_http["model_stats"][0] + self.assertEqual(model_stats_http["name"], self._model_name) + response_stats_http = model_stats_http["response_stats"] + # grpc response statistics + statistics_grpc = self._grpc_client.get_inference_statistics( + model_name=self._model_name, as_json=True + ) + model_stats_grpc = statistics_grpc["model_stats"][0] + self.assertEqual(model_stats_grpc["name"], self._model_name) + response_stats_grpc = model_stats_grpc["response_stats"] + # check equivalent between http and grpc statistics + self.assertEqual(len(response_stats_http), len(response_stats_grpc)) + for idx, statistics_http in response_stats_http.items(): + self.assertIn(idx, response_stats_grpc) + statistics_grpc = response_stats_grpc[idx] + for name, stats_http in statistics_http.items(): + self.assertIn(name, statistics_grpc) + stats_grpc = statistics_grpc[name] + # normalize gRPC statistics to http + stats_grpc["count"] = ( + int(stats_grpc["count"]) if ("count" in stats_grpc) else 0 + ) + stats_grpc["ns"] = int(stats_grpc["ns"]) if ("ns" in stats_grpc) else 0 + # check equal + self.assertEqual(stats_http, stats_grpc) + return response_stats_http + + # Check the response statistics is valid for a given infer request, providing its + # 'responses' and 'number_of_responses'. + def _check_response_stats(self, responses, number_of_responses): + response_stats = self._get_response_statistics() + self.assertGreaterEqual(len(response_stats), number_of_responses) + for i in range(number_of_responses): + self._update_statistics_counts(i, number_of_responses) + self._check_statistics_count_and_duration( + response_stats, i, "compute_infer" + ) + self._check_statistics_count_and_duration( + response_stats, i, "compute_output" + ) + self._check_statistics_count_and_duration(response_stats, i, "success") + self._check_statistics_count_and_duration(response_stats, i, "fail") + self._check_statistics_count_and_duration( + response_stats, i, "empty_response" + ) + + # Test response statistics. The statistics must be valid over two or more infers. + def test_response_statistics(self): + # Send a request that generates 4 responses. + number_of_responses = 4 + responses = self._stream_infer(number_of_responses) + self._check_response_stats(responses, number_of_responses) + # Send a request that generates 6 responses, and make sure the + # statistics are aggregated with the previous request. + number_of_responses = 6 + responses = self._stream_infer(number_of_responses) + self._check_response_stats(responses, number_of_responses) + # Send a request that generates 3 responses, and make sure the + # statistics are aggregated with the previous requests. + number_of_responses = 3 + responses = self._stream_infer(number_of_responses) + self._check_response_stats(responses, number_of_responses) + + +if __name__ == "__main__": + unittest.main() diff --git a/qa/L0_response_statistics/test.sh b/qa/L0_response_statistics/test.sh new file mode 100755 index 0000000000..eae900a9e9 --- /dev/null +++ b/qa/L0_response_statistics/test.sh @@ -0,0 +1,88 @@ +#!/bin/bash +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +REPO_VERSION=${NVIDIA_TRITON_SERVER_VERSION} +if [ "$#" -ge 1 ]; then + REPO_VERSION=$1 +fi +if [ -z "$REPO_VERSION" ]; then + echo -e "Repository version must be specified" + echo -e "\n***\n*** Test Failed\n***" + exit 1 +fi +if [ ! -z "$TEST_REPO_ARCH" ]; then + REPO_VERSION=${REPO_VERSION}_${TEST_REPO_ARCH} +fi + +export CUDA_VISIBLE_DEVICES=0 + +SERVER=/opt/tritonserver/bin/tritonserver +source ../common/util.sh + +RET=0 + +rm -rf models && mkdir models +mkdir -p models/square_int32/1 && (cd models/square_int32 && \ + echo 'backend: "square"' >> config.pbtxt && \ + echo 'max_batch_size: 0' >> config.pbtxt && \ + echo 'model_transaction_policy { decoupled: True }' >> config.pbtxt && \ + echo -e 'input [{ name: "IN" \n data_type: TYPE_INT32 \n dims: [ 1 ] }]' >> config.pbtxt && \ + echo -e 'output [{ name: "OUT" \n data_type: TYPE_INT32 \n dims: [ 1 ] }]' >> config.pbtxt && \ + echo -e 'parameters [{ key: "CUSTOM_INFER_DELAY_NS" \n value: { string_value: "400000000" } }]' >> config.pbtxt && \ + echo -e 'parameters [{ key: "CUSTOM_OUTPUT_DELAY_NS" \n value: { string_value: "200000000" } }]' >> config.pbtxt && \ + echo -e 'parameters [{ key: "CUSTOM_FAIL_COUNT" \n value: { string_value: "2" } }]' >> config.pbtxt && \ + echo -e 'parameters [{ key: "CUSTOM_EMPTY_COUNT" \n value: { string_value: "1" } }]' >> config.pbtxt) + +TEST_LOG="response_statistics_test.log" +SERVER_LOG="./response_statistics_test.server.log" + +SERVER_ARGS="--model-repository=`pwd`/models" +run_server +if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 +fi + +set +e +python response_statistics_test.py > $TEST_LOG 2>&1 +if [ $? -ne 0 ]; then + echo -e "\n***\n*** Failed response statistics test\n***" + cat $TEST_LOG + RET=1 +fi +set -e + +kill $SERVER_PID +wait $SERVER_PID + +if [ $RET -eq 0 ]; then + echo -e "\n***\n*** Test Passed\n***" +else + echo -e "\n***\n*** Test FAILED\n***" +fi +exit $RET diff --git a/src/grpc/grpc_server.cc b/src/grpc/grpc_server.cc index ebe53c82e0..187272217d 100644 --- a/src/grpc/grpc_server.cc +++ b/src/grpc/grpc_server.cc @@ -1,4 +1,4 @@ -// Copyright 2019-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2019-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -1100,6 +1100,96 @@ CommonHandler::RegisterModelStatistics() ucnt); } + { + triton::common::TritonJson::Value responses_json; + err = model_stat.MemberAsObject("response_stats", &responses_json); + GOTO_IF_ERR(err, earlyexit); + + std::vector keys; + err = responses_json.Members(&keys); + GOTO_IF_ERR(err, earlyexit); + + for (const auto& key : keys) { + triton::common::TritonJson::Value res_json; + err = responses_json.MemberAsObject(key.c_str(), &res_json); + GOTO_IF_ERR(err, earlyexit); + + inference::InferResponseStatistics res; + + { + triton::common::TritonJson::Value stat_json; + err = res_json.MemberAsObject("compute_infer", &stat_json); + GOTO_IF_ERR(err, earlyexit); + + uint64_t val; + err = stat_json.MemberAsUInt("count", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_compute_infer()->set_count(val); + err = stat_json.MemberAsUInt("ns", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_compute_infer()->set_ns(val); + } + + { + triton::common::TritonJson::Value stat_json; + err = res_json.MemberAsObject("compute_output", &stat_json); + GOTO_IF_ERR(err, earlyexit); + + uint64_t val; + err = stat_json.MemberAsUInt("count", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_compute_output()->set_count(val); + err = stat_json.MemberAsUInt("ns", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_compute_output()->set_ns(val); + } + + { + triton::common::TritonJson::Value stat_json; + err = res_json.MemberAsObject("success", &stat_json); + GOTO_IF_ERR(err, earlyexit); + + uint64_t val; + err = stat_json.MemberAsUInt("count", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_success()->set_count(val); + err = stat_json.MemberAsUInt("ns", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_success()->set_ns(val); + } + + { + triton::common::TritonJson::Value stat_json; + err = res_json.MemberAsObject("fail", &stat_json); + GOTO_IF_ERR(err, earlyexit); + + uint64_t val; + err = stat_json.MemberAsUInt("count", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_fail()->set_count(val); + err = stat_json.MemberAsUInt("ns", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_fail()->set_ns(val); + } + + { + triton::common::TritonJson::Value stat_json; + err = res_json.MemberAsObject("empty_response", &stat_json); + GOTO_IF_ERR(err, earlyexit); + + uint64_t val; + err = stat_json.MemberAsUInt("count", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_empty_response()->set_count(val); + err = stat_json.MemberAsUInt("ns", &val); + GOTO_IF_ERR(err, earlyexit); + res.mutable_empty_response()->set_ns(val); + } + + (*statistics->mutable_response_stats())[key] = std::move(res); + } + } + triton::common::TritonJson::Value batches_json; err = model_stat.MemberAsArray("batch_stats", &batches_json); GOTO_IF_ERR(err, earlyexit);