diff --git a/qa/L0_trace/models/input_all_required/1/model.py b/qa/L0_trace/models/input_all_required/1/model.py new file mode 100644 index 0000000000..8d51130d06 --- /dev/null +++ b/qa/L0_trace/models/input_all_required/1/model.py @@ -0,0 +1,49 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import json +import time + +import numpy as np +import triton_python_backend_utils as pb_utils + + +class TritonPythonModel: + def initialize(self, args): + self.model_config = json.loads(args["model_config"]) + + def execute(self, requests): + """This function is called on inference request.""" + # Less than collector timeout which is 10 + time.sleep(2) + responses = [] + for _ in requests: + # Include one of each specially parsed JSON value: nan, inf, and -inf + out_0 = np.array([1], dtype=np.float32) + out_tensor_0 = pb_utils.Tensor("OUTPUT0", out_0) + responses.append(pb_utils.InferenceResponse([out_tensor_0])) + + return responses diff --git a/qa/L0_trace/models/input_all_required/config.pbtxt b/qa/L0_trace/models/input_all_required/config.pbtxt new file mode 100644 index 0000000000..1426af2b65 --- /dev/null +++ b/qa/L0_trace/models/input_all_required/config.pbtxt @@ -0,0 +1,55 @@ +# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +name: "input_all_required" +backend: "python" +input [ + { + name: "INPUT0" + data_type: TYPE_FP32 + dims: [ -1 ] + }, + { + name: "INPUT1" + data_type: TYPE_FP32 + dims: [ -1 ] + }, + { + name: "INPUT2" + data_type: TYPE_FP32 + dims: [ -1 ] + } +] + +output [ + { + name: "OUTPUT0" + data_type: TYPE_FP32 + dims: [ 1 ] + } +] + +instance_group [{ kind: KIND_CPU }] \ No newline at end of file diff --git a/qa/L0_trace/opentelemetry_unittest.py b/qa/L0_trace/opentelemetry_unittest.py index 04a82d157c..93056e613d 100644 --- a/qa/L0_trace/opentelemetry_unittest.py +++ b/qa/L0_trace/opentelemetry_unittest.py @@ -27,6 +27,7 @@ import sys sys.path.append("../common") +import concurrent.futures import json import queue import re @@ -82,6 +83,11 @@ def send_bls_request(model_name="simple", headers=None): client.infer("bls_simple", inputs, headers=headers) +class UserData: + def __init__(self): + self._completed_requests = queue.Queue() + + class OpenTelemetryTest(tu.TestResultCollector): def setUp(self): self.collector_subprocess = subprocess.Popen( @@ -104,14 +110,27 @@ def setUp(self): ) self.simple_model_name = "simple" self.ensemble_model_name = "ensemble_add_sub_int32_int32_int32" + self.input_all_required_model_name = "input_all_required" + self.cancel_queue_model_name = "dynamic_batch" self.bls_model_name = "bls_simple" self.trace_context_model = "trace_context" + self.non_decoupled_model_name_ = "repeat_int32" self.test_models = [ self.simple_model_name, self.ensemble_model_name, self.bls_model_name, + self.non_decoupled_model_name_, + self.cancel_queue_model_name, ] self.root_span = "InferRequest" + self._user_data = UserData() + self._callback = partial(callback, self._user_data) + self._outputs = [] + self.input_data = { + "IN": np.array([1], dtype=np.int32), + "DELAY": np.array([0], dtype=np.uint32), + "WAIT": np.array([0], dtype=np.uint32), + } def tearDown(self): self.collector_subprocess.kill() @@ -120,6 +139,22 @@ def tearDown(self): test_name = unittest.TestCase.id(self).split(".")[-1] shutil.copyfile(self.filename, self.filename + "_" + test_name + ".log") + def _get_inputs(self, batch_size): + shape = [batch_size, 8] + inputs = [grpcclient.InferInput("INPUT0", shape, "FP32")] + inputs[0].set_data_from_numpy(np.ones(shape, dtype=np.float32)) + return inputs + + def _generate_callback_and_response_pair(self): + response = {"responded": False, "result": None, "error": None} + + def callback_queue(result, error): + response["responded"] = True + response["result"] = result + response["error"] = error + + return callback_queue, response + def _parse_trace_log(self, trace_log): """ Helper function that parses file, containing collected traces. @@ -138,7 +173,7 @@ def _parse_trace_log(self, trace_log): return traces - def _check_events(self, span_name, events): + def _check_events(self, span_name, events, is_cancelled): """ Helper function that verifies passed events contain expected entries. @@ -160,6 +195,14 @@ def _check_events(self, span_name, events): "GRPC_SEND_START", "GRPC_SEND_END", ] + cancel_root_events_http = [ + "HTTP_RECV_START", + "HTTP_RECV_END", + ] + cancel_root_events_grpc = [ + "GRPC_WAITREAD_START", + "GRPC_WAITREAD_END", + ] request_events = ["REQUEST_START", "QUEUE_START", "REQUEST_END"] compute_events = [ "COMPUTE_START", @@ -180,6 +223,10 @@ def _check_events(self, span_name, events): elif span_name == self.root_span: # Check that root span has INFER_RESPONSE_COMPLETE, _RECV/_WAITREAD # and _SEND events (and only them) + if is_cancelled == True: + root_events_http = cancel_root_events_http + root_events_grpc = cancel_root_events_grpc + if "HTTP" in events: self.assertTrue(all(entry in events for entry in root_events_http)) self.assertFalse(all(entry in events for entry in root_events_grpc)) @@ -187,8 +234,10 @@ def _check_events(self, span_name, events): elif "GRPC" in events: self.assertTrue(all(entry in events for entry in root_events_grpc)) self.assertFalse(all(entry in events for entry in root_events_http)) - self.assertFalse(all(entry in events for entry in request_events)) - self.assertFalse(all(entry in events for entry in compute_events)) + + if is_cancelled == False: + self.assertFalse(all(entry in events for entry in request_events)) + self.assertFalse(all(entry in events for entry in compute_events)) elif span_name in self.test_models: # Check that all request related events (and only them) @@ -232,7 +281,7 @@ def _test_resource_attributes(self, attributes): ), ) - def _verify_contents(self, spans, expected_counts): + def _verify_contents(self, spans, expected_counts, is_cancelled): """ Helper function that: * iterates over `spans` and for every span it verifies that proper events are collected @@ -247,6 +296,7 @@ def _verify_contents(self, spans, expected_counts): and `events` are required. expected_counts (dict): dictionary, containing expected spans in the form: span_name : #expected_number_of_entries + is_cancelled (bool): boolean, is true if called by cancelled workflow """ span_names = [] @@ -256,7 +306,7 @@ def _verify_contents(self, spans, expected_counts): span_names.append(span_name) span_events = span["events"] event_names_only = [event["name"] for event in span_events] - self._check_events(span_name, event_names_only) + self._check_events(span_name, event_names_only, is_cancelled) self.assertEqual( len(span_names), @@ -339,6 +389,24 @@ def _verify_headers_propagated_from_client_if_any(self, root_span, headers): ), ) + def _test_trace_cancel(self, is_queued): + # We want to capture a cancellation request traces WHILE the inference is in the COMPUTE stage. + # Because the model "input_all_required" has a delay/wait in the compute phase so the cancellation request can be send while the request is waiting in the compute phase. + # The idea here is to wait before we try and read the traces from the file. + time.sleep(2 * COLLECTOR_TIMEOUT) + traces = self._parse_trace_log(self.filename) + if is_queued == False: + expected_counts = dict( + {"compute": 1, self.input_all_required_model_name: 1, self.root_span: 1} + ) + else: + # Compute is expected to be 0 as cancelled in queue + expected_counts = dict( + {"compute": 0, self.cancel_queue_model_name: 1, self.root_span: 1} + ) + parsed_spans = traces[0]["resourceSpans"][0]["scopeSpans"][0]["spans"] + self._verify_contents(parsed_spans, expected_counts, is_cancelled=True) + def _test_trace( self, headers, @@ -396,8 +464,7 @@ def _test_trace( entry for entry in parsed_spans if entry["name"] == "InferRequest" ][0] self.assertEqual(len(parsed_spans), expected_number_of_spans) - - self._verify_contents(parsed_spans, expected_counts) + self._verify_contents(parsed_spans, expected_counts, is_cancelled=False) self._verify_nesting(parsed_spans, expected_parent_span_dict) self._verify_headers_propagated_from_client_if_any(root_span, headers) @@ -420,6 +487,24 @@ def _test_simple_trace(self, headers=None): expected_parent_span_dict=expected_parent_span_dict, ) + def _test_non_decoupled_trace(self, headers=None): + """ + Helper function, that collects trace for non decoupled model and verifies it. + """ + expected_number_of_spans = 3 + expected_counts = dict( + {"compute": 1, self.non_decoupled_model_name_: 1, self.root_span: 1} + ) + expected_parent_span_dict = dict( + {"InferRequest": ["repeat_int32"], "repeat_int32": ["compute"]} + ) + self._test_trace( + headers=headers, + expected_number_of_spans=expected_number_of_spans, + expected_counts=expected_counts, + expected_parent_span_dict=expected_parent_span_dict, + ) + def _test_bls_trace(self, headers=None): """ Helper function, that specifies expected parameters to evaluate trace, @@ -527,6 +612,86 @@ def test_grpc_trace_simple_model(self): self._test_simple_trace() + def test_grpc_trace_all_input_required_model_cancel(self): + """ + Tests trace, collected from executing one inference request and cancelling the request + for a model and GRPC client. Expects only 2 GRPC stage events + """ + triton_client_grpc = grpcclient.InferenceServerClient( + "localhost:8001", verbose=True + ) + inputs = [] + inputs.append(grpcclient.InferInput("INPUT0", [1], "FP32")) + inputs[0].set_data_from_numpy(np.arange(1, dtype=np.float32)) + inputs.append(grpcclient.InferInput("INPUT1", [1], "FP32")) + inputs[1].set_data_from_numpy(np.arange(1, dtype=np.float32)) + inputs.append(grpcclient.InferInput("INPUT2", [1], "FP32")) + inputs[2].set_data_from_numpy(np.arange(1, dtype=np.float32)) + future = triton_client_grpc.async_infer( + model_name=self.input_all_required_model_name, + inputs=inputs, + callback=self._callback, + outputs=self._outputs, + ) + time.sleep(2) # ensure the inference has started + future.cancel() + time.sleep(0.1) # context switch + self._test_trace_cancel(is_queued=False) + + # Test queued requests on dynamic batch scheduler can be cancelled + def test_grpc_trace_model_cancel_in_queue(self): + """ + Tests trace, collected from executing one inference request and cancelling the request + for a model and GRPC client while the request is in queue. Expects 0 compute stage traces + """ + model_name = self.cancel_queue_model_name + triton_client_grpc = grpcclient.InferenceServerClient( + "localhost:8001", verbose=True + ) + with concurrent.futures.ThreadPoolExecutor() as pool: + # Saturate the slots on the model + saturate_thread = pool.submit( + triton_client_grpc.infer, model_name, self._get_inputs(batch_size=1) + ) + time.sleep(2) # ensure the slots are filled + # The next request should be queued + callback, response = self._generate_callback_and_response_pair() + future = triton_client_grpc.async_infer( + model_name, self._get_inputs(batch_size=1), callback + ) + time.sleep(0.2) # ensure the request is queued + future.cancel() + # Join saturating thread + saturate_thread.result() + self._test_trace_cancel(is_queued=True) + + def test_non_decoupled(self): + """ + Tests trace, collected from executing one inference request of non decoupled model. + """ + inputs = [ + grpcclient.InferInput("IN", [1], "INT32").set_data_from_numpy( + self.input_data["IN"] + ), + grpcclient.InferInput("DELAY", [1], "UINT32").set_data_from_numpy( + self.input_data["DELAY"] + ), + grpcclient.InferInput("WAIT", [1], "UINT32").set_data_from_numpy( + self.input_data["WAIT"] + ), + ] + + triton_client = grpcclient.InferenceServerClient( + url="localhost:8001", verbose=True + ) + # Expect the inference is successful + res = triton_client.infer( + model_name=self.non_decoupled_model_name_, inputs=inputs + ) + self._test_non_decoupled_trace() + self.assertEqual(1, res.as_numpy("OUT")[0]) + self.assertEqual(0, res.as_numpy("IDX")[0]) + def test_grpc_trace_simple_model_context_propagation(self): """ Tests trace, collected from executing one inference request diff --git a/qa/L0_trace/test.sh b/qa/L0_trace/test.sh index 9cfa480e04..8a9172b02f 100755 --- a/qa/L0_trace/test.sh +++ b/qa/L0_trace/test.sh @@ -52,16 +52,28 @@ export CUDA_VISIBLE_DEVICES=0 DATADIR=/data/inferenceserver/${REPO_VERSION}/qa_model_repository ENSEMBLEDIR=$DATADIR/../qa_ensemble_model_repository/qa_model_repository/ BLSDIR=../python_models/bls_simple +CANCELDIR=models/ MODELBASE=onnx_int32_int32_int32 MODELSDIR=`pwd`/trace_models SERVER=/opt/tritonserver/bin/tritonserver source ../common/util.sh - rm -f *.log rm -f *.log.* rm -fr $MODELSDIR && mkdir -p $MODELSDIR +# set up model for inference delay queueing +mkdir -p trace_models/dynamic_batch/1 && (cd trace_models/dynamic_batch && \ + echo 'backend: "identity"' >> config.pbtxt && \ + echo 'max_batch_size: 1' >> config.pbtxt && \ + echo -e 'input [{ name: "INPUT0" \n data_type: TYPE_FP32 \n dims: [ -1 ] }]' >> config.pbtxt && \ + echo -e 'output [{ name: "OUTPUT0" \n data_type: TYPE_FP32 \n dims: [ -1 ] }]' >> config.pbtxt && \ + echo -e 'instance_group [{ count: 1 \n kind: KIND_CPU }]' >> config.pbtxt && \ + echo -e 'dynamic_batching {' >> config.pbtxt && \ + echo -e ' preferred_batch_size: [ 1 ]' >> config.pbtxt && \ + echo -e ' default_queue_policy { timeout_action: REJECT \n default_timeout_microseconds: 1000000 \n max_queue_size: 8 }' >> config.pbtxt && \ + echo -e '}' >> config.pbtxt && \ + echo -e 'parameters [{ key: "execute_delay_ms" \n value: { string_value: "8000" } }]' >> config.pbtxt) # set up simple and global_simple model using MODELBASE cp -r $DATADIR/$MODELBASE $MODELSDIR/simple && \ @@ -72,6 +84,8 @@ cp -r $DATADIR/$MODELBASE $MODELSDIR/simple && \ (cd $MODELSDIR/global_simple && \ sed -i "s/^name:.*/name: \"global_simple\"/" config.pbtxt) && \ cp -r $ENSEMBLEDIR/simple_onnx_int32_int32_int32 $MODELSDIR/ensemble_add_sub_int32_int32_int32 && \ + # set up new dir for cancel model + cp -r $CANCELDIR/input_all_required $MODELSDIR/input_all_required && \ rm -r $MODELSDIR/ensemble_add_sub_int32_int32_int32/2 && \ rm -r $MODELSDIR/ensemble_add_sub_int32_int32_int32/3 && \ (cd $MODELSDIR/ensemble_add_sub_int32_int32_int32 && \ @@ -79,6 +93,10 @@ cp -r $DATADIR/$MODELBASE $MODELSDIR/simple && \ sed -i "s/model_name:.*/model_name: \"simple\"/" config.pbtxt) && \ mkdir -p $MODELSDIR/bls_simple/1 && cp $BLSDIR/bls_simple.py $MODELSDIR/bls_simple/1/model.py +# set up repeat_int32 model +cp -r ../L0_decoupled/models/repeat_int32 $MODELSDIR +sed -i "s/decoupled: True/decoupled: False/" $MODELSDIR/repeat_int32/config.pbtxt + RET=0 # Helpers ======================================= @@ -740,7 +758,7 @@ rm collected_traces.json* # Unittests then check that produced spans have expected format and events OPENTELEMETRY_TEST=opentelemetry_unittest.py OPENTELEMETRY_LOG="opentelemetry_unittest.log" -EXPECTED_NUM_TESTS="14" +EXPECTED_NUM_TESTS="17" # Set up repo and args for SageMaker export SAGEMAKER_TRITON_DEFAULT_MODEL_NAME="simple" @@ -757,6 +775,8 @@ mkdir -p $MODELSDIR/trace_context/1 && cp ./trace_context.py $MODELSDIR/trace_co SERVER_ARGS="--allow-sagemaker=true --model-control-mode=explicit \ --load-model=simple --load-model=ensemble_add_sub_int32_int32_int32 \ + --load-model=repeat_int32 \ + --load-model=input_all_required \ --load-model=bls_simple --trace-config=level=TIMESTAMPS \ --load-model=trace_context --trace-config=rate=1 \ --trace-config=count=-1 --trace-config=mode=opentelemetry \