Skip to content

Commit

Permalink
Add cancellation into response statistics (#6904)
Browse files Browse the repository at this point in the history
* Add cancellation into response statistics

* Add test for response statistics cancel

* Remove debugging print

* Use is None comparison

* Fix docs

* Use default args None

* Refactor RegisterModelStatistics()
  • Loading branch information
kthui authored Feb 23, 2024
1 parent 5732163 commit 48cf6b7
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 282 deletions.
5 changes: 4 additions & 1 deletion docs/protocol/extension_statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ $response_stats =
"compute_output" : $duration_stat,
"success" : $duration_stat,
"fail" : $duration_stat,
"empty_response" : $duration_stat
"empty_response" : $duration_stat,
"cancel" : $duration_stat
}
```

Expand All @@ -208,6 +209,8 @@ $response_stats =
is the sum of infer and output durations.
- "empty_response" : The count and cumulative duration of an inference with an
empty / no response. The duration is infer durations.
- "cancel" : The count and cumulative duration of a inference cancellation. The
duration is for cleaning up resources held by cancelled inference requests.


```
Expand Down
104 changes: 82 additions & 22 deletions qa/L0_response_statistics/response_statistics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@

class TestResponseStatistics(unittest.TestCase):
def setUp(self):
self._model_name = "square_int32"
self._min_infer_delay_ns = 400000000
self._min_output_delay_ns = 200000000
self._number_of_fail_responses = 2
self._number_of_empty_responses = 1
self._model_name = "set_by_test_case"
self._min_infer_delay_ns = 0
self._min_output_delay_ns = 0
self._min_cancel_delay_ns = 0
self._number_of_fail_responses = 0
self._number_of_empty_responses = 0
self._statistics_counts = []
self._grpc_client = grpcclient.InferenceServerClient(
"localhost:8001", verbose=True
Expand All @@ -59,8 +60,10 @@ def callback(result, error):

# Send an infer request and return its responses. 'number_of_responses' is the sum
# of success, fail and empty responses the model should return for this request.
# This function waits until all success and fail responses are received.
def _stream_infer(self, number_of_responses):
# 'cancel_at_response_size' will cancel the stream when the number of responses
# received equals the size, set to None if cancellation is not required. This
# function waits until all success and fail responses are received, or cancelled.
def _stream_infer(self, number_of_responses, cancel_at_response_size=None):
callback, responses = self._generate_streaming_callback_and_response_pair()
self._grpc_client.start_stream(callback)
input_data = np.array([number_of_responses], dtype=np.int32)
Expand All @@ -70,15 +73,27 @@ def _stream_infer(self, number_of_responses):
self._grpc_client.async_stream_infer(
model_name=self._model_name, inputs=inputs, outputs=outputs
)
while len(responses) < (number_of_responses - self._number_of_empty_responses):
time.sleep(0.1) # poll until all expected responses are received
self._grpc_client.stop_stream()
if cancel_at_response_size is None:
# poll until all expected responses are received
while len(responses) < (
number_of_responses - self._number_of_empty_responses
):
time.sleep(0.1)
self._grpc_client.stop_stream(cancel_requests=False)
else:
# poll until cancellation response size is reached
while len(responses) < cancel_at_response_size:
time.sleep(0.1)
self._grpc_client.stop_stream(cancel_requests=True)
return responses

# Update expected statistics counts for the response at 'current_index'.
# 'number_of_responses' is the sum of success, fail and empty responses expected
# from this inference request.
def _update_statistics_counts(self, current_index, number_of_responses):
# from this inference request. 'cancel_at_index' is the index at which the request
# should be cancelled.
def _update_statistics_counts(
self, current_index, number_of_responses, cancel_at_index
):
if current_index >= len(self._statistics_counts):
self._statistics_counts.append(
{
Expand All @@ -87,9 +102,13 @@ def _update_statistics_counts(self, current_index, number_of_responses):
"success": 0,
"fail": 0,
"empty_response": 0,
"cancel": 0,
}
)
if (
if current_index == cancel_at_index:
# cancel
self._statistics_counts[current_index]["cancel"] += 1
elif (
current_index
+ self._number_of_fail_responses
+ self._number_of_empty_responses
Expand Down Expand Up @@ -118,10 +137,16 @@ def _check_statistics_count_and_duration(
delay_ns = self._min_infer_delay_ns
elif stats_name == "compute_output":
delay_ns = self._min_output_delay_ns
elif stats_name == "cancel":
delay_ns = self._min_cancel_delay_ns
else: # success or fail
delay_ns = self._min_infer_delay_ns + self._min_output_delay_ns
upper_bound_ns = 1.1 * delay_ns * expected_count
lower_bound_ns = 0.9 * delay_ns * expected_count
if delay_ns == 0:
upper_bound_ns = 10000000 * expected_count
lower_bound_ns = 0
else:
upper_bound_ns = 1.1 * delay_ns * expected_count
lower_bound_ns = 0.9 * delay_ns * expected_count
stats = response_stats[str(current_index)][stats_name]
self.assertEqual(stats["count"], expected_count)
self.assertLessEqual(stats["ns"], upper_bound_ns)
Expand Down Expand Up @@ -162,12 +187,14 @@ def _get_response_statistics(self):
return response_stats_http

# Check the response statistics is valid for a given infer request, providing its
# 'responses' and 'number_of_responses'.
def _check_response_stats(self, responses, number_of_responses):
# 'responses', expected 'number_of_responses' and 'cancel_at_index'.
def _check_response_stats(
self, responses, number_of_responses, cancel_at_index=None
):
response_stats = self._get_response_statistics()
self.assertGreaterEqual(len(response_stats), number_of_responses)
for i in range(number_of_responses):
self._update_statistics_counts(i, number_of_responses)
self._update_statistics_counts(i, number_of_responses, cancel_at_index)
self._check_statistics_count_and_duration(
response_stats, i, "compute_infer"
)
Expand All @@ -179,24 +206,57 @@ def _check_response_stats(self, responses, number_of_responses):
self._check_statistics_count_and_duration(
response_stats, i, "empty_response"
)
self._check_statistics_count_and_duration(response_stats, i, "cancel")

# Test response statistics. The statistics must be valid over two or more infers.
def test_response_statistics(self):
self._model_name = "square_int32"
self._min_infer_delay_ns = 400000000
self._min_output_delay_ns = 200000000
self._number_of_fail_responses = 2
self._number_of_empty_responses = 1
# Send a request that generates 4 responses.
number_of_responses = 4
responses = self._stream_infer(number_of_responses)
self._check_response_stats(responses, number_of_responses)
# Send a request that generates 6 responses, and make sure the
# statistics are aggregated with the previous request.
# Send a request that generates 6 responses, and make sure the statistics are
# aggregated with the previous request.
number_of_responses = 6
responses = self._stream_infer(number_of_responses)
self._check_response_stats(responses, number_of_responses)
# Send a request that generates 3 responses, and make sure the
# statistics are aggregated with the previous requests.
# Send a request that generates 3 responses, and make sure the statistics are
# aggregated with the previous requests.
number_of_responses = 3
responses = self._stream_infer(number_of_responses)
self._check_response_stats(responses, number_of_responses)

# Test response statistics with cancellation.
def test_response_statistics_cancel(self):
self._model_name = "square_int32_slow"
self._min_infer_delay_ns = 1200000000
self._min_output_delay_ns = 800000000
self._min_cancel_delay_ns = 400000000

# Send a request that generates 4 responses.
number_of_responses = 4
responses = self._stream_infer(number_of_responses)
self._check_response_stats(responses, number_of_responses)

# Send a request that generates 4 responses, and cancel on the 3rd response.
# Make sure the statistics are aggregated with the previous request.
responses = self._stream_infer(number_of_responses=4, cancel_at_response_size=1)
# There is an infer and output delay on the 1st and 2nd response, and a cancel
# delay on the 3rd response.
min_total_delay_ns = (
self._min_infer_delay_ns + self._min_output_delay_ns
) * 2 + self._min_cancel_delay_ns
# Make sure the inference and cancellation is completed before checking.
time.sleep(min_total_delay_ns * 1.5 / 1000000000)
# The request is cancelled when the 2nd response is computing, so the
# cancellation should be received at the 3rd response (index 2), making a total
# of 3 responses on the statistics.
self._check_response_stats(responses, number_of_responses=3, cancel_at_index=2)


if __name__ == "__main__":
unittest.main()
9 changes: 9 additions & 0 deletions qa/L0_response_statistics/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ mkdir -p models/square_int32/1 && (cd models/square_int32 && \
echo -e 'parameters [{ key: "CUSTOM_OUTPUT_DELAY_NS" \n value: { string_value: "200000000" } }]' >> config.pbtxt && \
echo -e 'parameters [{ key: "CUSTOM_FAIL_COUNT" \n value: { string_value: "2" } }]' >> config.pbtxt && \
echo -e 'parameters [{ key: "CUSTOM_EMPTY_COUNT" \n value: { string_value: "1" } }]' >> config.pbtxt)
mkdir -p models/square_int32_slow/1 && (cd models/square_int32_slow && \
echo 'backend: "square"' >> config.pbtxt && \
echo 'max_batch_size: 0' >> config.pbtxt && \
echo 'model_transaction_policy { decoupled: True }' >> config.pbtxt && \
echo -e 'input [{ name: "IN" \n data_type: TYPE_INT32 \n dims: [ 1 ] }]' >> config.pbtxt && \
echo -e 'output [{ name: "OUT" \n data_type: TYPE_INT32 \n dims: [ 1 ] }]' >> config.pbtxt && \
echo -e 'parameters [{ key: "CUSTOM_INFER_DELAY_NS" \n value: { string_value: "1200000000" } }]' >> config.pbtxt && \
echo -e 'parameters [{ key: "CUSTOM_OUTPUT_DELAY_NS" \n value: { string_value: "800000000" } }]' >> config.pbtxt && \
echo -e 'parameters [{ key: "CUSTOM_CANCEL_DELAY_NS" \n value: { string_value: "400000000" } }]' >> config.pbtxt)

TEST_LOG="response_statistics_test.log"
SERVER_LOG="./response_statistics_test.server.log"
Expand Down
Loading

0 comments on commit 48cf6b7

Please sign in to comment.