Skip to content

Commit

Permalink
Test preserve_ordering for oldest strategy sequence batcher (#6185) (#…
Browse files Browse the repository at this point in the history
…6191)

Co-authored-by: Ryan McCormick <[email protected]>
  • Loading branch information
tanmayv25 and rmccorm4 authored Aug 15, 2023
1 parent bb880ba commit eb51807
Show file tree
Hide file tree
Showing 4 changed files with 409 additions and 0 deletions.
182 changes: 182 additions & 0 deletions qa/L0_sequence_batcher/sequence_batcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
sys.path.append("../common")

import os
import random
import threading
import time
import unittest
from builtins import str
from functools import partial

import numpy as np
import sequence_util as su
Expand Down Expand Up @@ -3432,5 +3434,185 @@ def test_send_request_after_timeout(self):
raise last_err


class SequenceBatcherPreserveOrderingTest(su.SequenceBatcherTestUtil):
def setUp(self):
super().setUp()
# By default, find tritonserver on "localhost", but can be overridden
# with TRITONSERVER_IPADDR envvar
self.server_address_ = (
os.environ.get("TRITONSERVER_IPADDR", "localhost") + ":8001"
)

# Prepare input and expected output based on the model and
# the infer sequence sent for testing. If the test is to be extended
# for different sequence and model, then proper grouping should be added
self.model_name_ = "sequence_py"
self.tensor_data_ = np.ones(shape=[1, 1], dtype=np.int32)
self.inputs_ = [grpcclient.InferInput("INPUT0", [1, 1], "INT32")]
self.inputs_[0].set_data_from_numpy(self.tensor_data_)
self.triton_client = grpcclient.InferenceServerClient(self.server_address_)

# Atomic request ID for multi-threaded inference
self.request_id_lock = threading.Lock()
self.request_id = 1

def send_sequence(self, seq_id, seq_id_map, req_id_map):
if seq_id not in seq_id_map:
seq_id_map[seq_id] = []

start, middle, end = (True, False), (False, False), (False, True)
# Send sequence with 1 start, 1 middle, and 1 end request
seq_flags = [start, middle, end]
for start_flag, end_flag in seq_flags:
# Introduce random sleep to better interweave requests from different sequences
time.sleep(random.uniform(0.0, 1.0))

# Serialize sending requests to ensure ordered request IDs
with self.request_id_lock:
req_id = self.request_id
self.request_id += 1

# Store metadata to validate results later
req_id_map[req_id] = seq_id
seq_id_map[seq_id].append(req_id)

self.triton_client.async_stream_infer(
self.model_name_,
self.inputs_,
sequence_id=seq_id,
sequence_start=start_flag,
sequence_end=end_flag,
timeout=None,
request_id=str(req_id),
)

def _test_sequence_ordering(self, preserve_ordering, decoupled):
# 1. Send a few grpc streaming sequence requests to the model.
# 2. With grpc streaming, the model should receive the requests in
# the same order they are sent from client, and the client should
# receive the responses in the same order sent back by the
# model/server. With sequence scheduler, the requests for each sequence should be routed to the same model
# instance, and no two requests from the same sequence should
# get batched together.
# 3. With preserve_ordering=False, we may get the responses back in a different
# order than the requests, but with grpc streaming we should still expect responses for each sequence to be ordered.
# 4. Assert that the sequence values are ordered, and that the response IDs per sequence are ordered
class SequenceResult:
def __init__(self, seq_id, result, request_id):
self.seq_id = seq_id
self.result = result
self.request_id = int(request_id)

def full_callback(sequence_dict, sequence_list, result, error):
# We expect no model errors for this test
if error:
self.assertTrue(False, error)

# Gather all the necessary metadata for validation
request_id = int(result.get_response().id)
sequence_id = request_id_map[request_id]
# Overall list of results in the order received, regardless of sequence ID
sequence_list.append(SequenceResult(sequence_id, result, request_id))
# Ordered results organized by their seq IDs
sequence_dict[sequence_id].append(result)

# Store ordered list in which responses are received by client
sequence_list = []
# Store mapping of sequence ID to response results
sequence_dict = {}
# Store mapping of sequence ID to request IDs and vice versa
sequence_id_map = {}
request_id_map = {}

# Start stream
seq_callback = partial(full_callback, sequence_dict, sequence_list)
self.triton_client.start_stream(callback=seq_callback)

# Send N sequences concurrently
threads = []
num_sequences = 10
for i in range(num_sequences):
# Sequence IDs are 1-indexed
sequence_id = i + 1
# Add a result list and callback for each sequence
sequence_dict[sequence_id] = []
threads.append(
threading.Thread(
target=self.send_sequence,
args=(sequence_id, sequence_id_map, request_id_map),
)
)

# Start all sequence threads
for t in threads:
t.start()

# Wait for threads to return
for t in threads:
t.join()

# Block until all requests are completed
self.triton_client.stop_stream()

# Make sure some inferences occurred and metadata was collected
self.assertGreater(len(sequence_dict), 0)
self.assertGreater(len(sequence_list), 0)

# Validate model results are sorted per sequence ID (model specific logic)
print(f"=== {preserve_ordering=} {decoupled=} ===")
print("Outputs per Sequence:")
for seq_id, sequence in sequence_dict.items():
seq_outputs = [
result.as_numpy("OUTPUT0").flatten().tolist() for result in sequence
]
print(f"{seq_id}: {seq_outputs}")
self.assertEqual(seq_outputs, sorted(seq_outputs))

# Validate request/response IDs for each response in a sequence is sorted
# This should be true regardless of preserve_ordering or not
print("Request IDs per Sequence:")
for seq_id in sequence_id_map:
per_seq_request_ids = sequence_id_map[seq_id]
print(f"{seq_id}: {per_seq_request_ids}")
self.assertEqual(per_seq_request_ids, sorted(per_seq_request_ids))

# Validate results are sorted in request order if preserve_ordering is True
if preserve_ordering:
request_ids = [s.request_id for s in sequence_list]
print(f"Request IDs overall:\n{request_ids}")
sequence_ids = [s.seq_id for s in sequence_list]
print(f"Sequence IDs overall:\n{sequence_ids}")
self.assertEqual(request_ids, sorted(request_ids))

# Assert some dynamic batching of requests was done
stats = self.triton_client.get_inference_statistics(
model_name=self.model_name_, headers={}, as_json=True
)
model_stats = stats["model_stats"][0]
self.assertEqual(model_stats["name"], self.model_name_)
self.assertLess(
int(model_stats["execution_count"]), int(model_stats["inference_count"])
)

def test_sequence_with_preserve_ordering(self):
self.model_name_ = "seqpy_preserve_ordering_nondecoupled"
self._test_sequence_ordering(preserve_ordering=True, decoupled=False)

def test_sequence_without_preserve_ordering(self):
self.model_name_ = "seqpy_no_preserve_ordering_nondecoupled"
self._test_sequence_ordering(preserve_ordering=False, decoupled=False)

# FIXME [DLIS-5280]: This may fail for decoupled models if writes to GRPC
# stream are done out of order in server, so disable test for now.
# def test_sequence_with_preserve_ordering_decoupled(self):
# self.model_name_ = "seqpy_preserve_ordering_decoupled"
# self._test_sequence_ordering(preserve_ordering=True, decoupled=True)

# FIXME [DLIS-5280]
# def test_sequence_without_preserve_ordering_decoupled(self):
# self.model_name_ = "seqpy_no_preserve_ordering_decoupled"
# self._test_sequence_ordering(preserve_ordering=False, decoupled=True)


if __name__ == "__main__":
unittest.main()
81 changes: 81 additions & 0 deletions qa/L0_sequence_batcher/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ TF_VERSION=${TF_VERSION:=2}
# On windows the paths invoked by the script (running in WSL) must use
# /mnt/c when needed but the paths on the tritonserver command-line
# must be C:/ style.
WINDOWS=0
if [[ "$(< /proc/sys/kernel/osrelease)" == *microsoft* ]]; then
MODELDIR=${MODELDIR:=C:/models}
DATADIR=${DATADIR:="/mnt/c/data/inferenceserver/${REPO_VERSION}"}
BACKEND_DIR=${BACKEND_DIR:=C:/tritonserver/backends}
SERVER=${SERVER:=/mnt/c/tritonserver/bin/tritonserver.exe}
export WSLENV=$WSLENV:TRITONSERVER_DELAY_SCHEDULER:TRITONSERVER_BACKLOG_DELAY_SCHEDULER
WINDOWS=1
else
MODELDIR=${MODELDIR:=`pwd`}
DATADIR=${DATADIR:="/data/inferenceserver/${REPO_VERSION}"}
Expand Down Expand Up @@ -800,6 +802,85 @@ if [ "$TEST_SYSTEM_SHARED_MEMORY" -ne 1 ] && [ "$TEST_CUDA_SHARED_MEMORY" -ne 1
set -e
fi

### Start Preserve Ordering Tests ###

# Test only supported on windows currently due to use of python backend models
if [ ${WINDOWS} -ne 1 ]; then
# Test preserve ordering true/false and decoupled/non-decoupled
TEST_CASE=SequenceBatcherPreserveOrderingTest
MODEL_PATH=preserve_ordering_models
BASE_MODEL="../python_models/sequence_py"
rm -rf ${MODEL_PATH}

# FIXME [DLIS-5280]: This may fail for decoupled models if writes to GRPC
# stream are done out of order in server, so decoupled tests are disabled.
MODES="decoupled nondecoupled"
for mode in $MODES; do
NO_PRESERVE="${MODEL_PATH}/seqpy_no_preserve_ordering_${mode}"
mkdir -p ${NO_PRESERVE}/1
cp ${BASE_MODEL}/config.pbtxt ${NO_PRESERVE}
cp ${BASE_MODEL}/model.py ${NO_PRESERVE}/1

PRESERVE="${MODEL_PATH}/seqpy_preserve_ordering_${mode}"
cp -r ${NO_PRESERVE} ${PRESERVE}
sed -i "s/^preserve_ordering: False/preserve_ordering: True/" ${PRESERVE}/config.pbtxt

if [ ${mode} == "decoupled" ]; then
echo -e "\nmodel_transaction_policy { decoupled: true }" >> ${NO_PRESERVE}/config.pbtxt
echo -e "\nmodel_transaction_policy { decoupled: true }" >> ${PRESERVE}/config.pbtxt
fi
done

SERVER_ARGS="--model-repository=$MODELDIR/$MODEL_PATH ${SERVER_ARGS_EXTRA}"
SERVER_LOG="./$TEST_CASE.$MODEL_PATH.server.log"

if [ "$TEST_VALGRIND" -eq 1 ]; then
LEAKCHECK_LOG="./$i.$MODEL_PATH.valgrind.log"
LEAKCHECK_ARGS="$LEAKCHECK_ARGS_BASE --log-file=$LEAKCHECK_LOG"
run_server_leakcheck
else
run_server
fi

if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
cat $SERVER_LOG
exit 1
fi

echo "Test: $TEST_CASE, repository $MODEL_PATH" >>$CLIENT_LOG

set +e
python3 $BATCHER_TEST $TEST_CASE >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
echo -e "\n***\n*** Test $TEST_CASE Failed\n***" >>$CLIENT_LOG
echo -e "\n***\n*** Test $TEST_CASE Failed\n***"
RET=1
else
# 2 for preserve_ordering = True/False
check_test_results $TEST_RESULT_FILE 2
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "\n***\n*** Test Result Verification Failed\n***"
RET=1
fi
fi
set -e

kill_server

set +e
if [ "$TEST_VALGRIND" -eq 1 ]; then
python3 ../common/check_valgrind_log.py -f $LEAKCHECK_LOG
if [ $? -ne 0 ]; then
RET=1
fi
fi
set -e
fi

### End Preserve Ordering Tests ###

if [ $RET -eq 0 ]; then
echo -e "\n***\n*** Test Passed\n***"
else
Expand Down
53 changes: 53 additions & 0 deletions qa/python_models/sequence_py/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

backend: "python"
max_batch_size: 4

input [
{
name: "INPUT0"
data_type: TYPE_INT32
dims: [ 1 ]
}
]
output [
{
name: "OUTPUT0"
data_type: TYPE_INT32
dims: [ 1 ]
}
]

sequence_batching {
oldest {
max_candidate_sequences: 4
max_queue_delay_microseconds: 1000000
preserve_ordering: False
}
max_sequence_idle_microseconds: 10000000
}
Loading

0 comments on commit eb51807

Please sign in to comment.