Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests for Top Level Request Caching for Ensemble Models #7074

Merged
merged 24 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions qa/L0_perf_analyzer_report/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,7 @@ function check_cache_output {
ERROR_STRING="Cache hit count: 0"
num_cache_hit_lines=$(cat ${CLIENT_LOG} | grep -i "${CACHE_STRING}" | wc -l)
num_cache_hit_zero_lines=$(cat ${CLIENT_LOG} | grep -i "${ERROR_STRING}" | wc -l)
# Top-level ensemble model requests do not currently support caching and
# will always report a cache hit count of zero if any composing model
# has caching enabled. So we check that at least one model reports
# non-zero cache hits for now.
# TODO: When ensemble models support cache hits, this should just fail
# for any occurrence of ERROR_STRING
if [ ${num_cache_hit_lines} -eq ${num_cache_hit_zero_lines} ]; then
if [ ${num_cache_hit_zero_lines} -eq ${num_cache_hit_lines} ]; then
rmccorm4 marked this conversation as resolved.
Show resolved Hide resolved
cat ${CLIENT_LOG}
echo "ERROR: All cache hit counts were zero, expected a non-zero number of cache hits"
echo -e "\n***\n*** Test Failed\n***"
Expand Down Expand Up @@ -166,7 +160,7 @@ set -e

# Cleanup
kill $SERVER_PID
wait $SERVER_PID
lkomali marked this conversation as resolved.
Show resolved Hide resolved

lkomali marked this conversation as resolved.
Show resolved Hide resolved

if [ $RET -eq 0 ]; then
echo -e "\n***\n*** Test Passed\n***"
Expand Down
238 changes: 238 additions & 0 deletions qa/L0_response_cache/ensemble_cache_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
#!/usr/bin/env python3

# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
import re
import sys

sys.path.append("../common")
sys.path.append("../clients")

import logging
import unittest

import numpy as np
import test_util as tu
import tritonclient.grpc as grpcclient
from tritonclient.utils import *


class EnsembleCacheTest(tu.TestResultCollector):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ensemble_model_name = "simple_graphdef_float32_float32_float32"
self.composing_model_name = "graphdef_float32_float32_float32"
self.model_directory = os.path.join(os.getcwd(), "models", "ensemble_models")
self.input_tensors = self._get_input_tensors()
self.triton_client = grpcclient.InferenceServerClient(
"localhost:8001", verbose=True
)
self.ensemble_config_file = os.path.join(
self.model_directory, self.ensemble_model_name, "config.pbtxt"
)
self.composing_config_file = os.path.join(
self.model_directory, self.composing_model_name, "config.pbtxt"
)
self.response_cache_pattern = "response_cache"
self.response_cache_config = "response_cache {\n enable:true\n}\n"
self.decoupled_pattern = "decoupled:true"
self.decoupled_config = "model_transaction_policy {\n decoupled:true\n}\n"

def _get_input_tensors(self):
input0_data = np.ones((1, 16), dtype=np.float32)
input1_data = np.ones((1, 16), dtype=np.float32)
input_tensors = [
grpcclient.InferInput(
"INPUT0", input0_data.shape, np_to_triton_dtype(input0_data.dtype)
),
grpcclient.InferInput(
"INPUT1", input1_data.shape, np_to_triton_dtype(input0_data.dtype)
),
]
input_tensors[0].set_data_from_numpy(input0_data)
input_tensors[1].set_data_from_numpy(input1_data)
return input_tensors

def _get_infer_stats(self):
stats = self.triton_client.get_inference_statistics(
model_name=self.ensemble_model_name, as_json=True
)
return stats["model_stats"][1]["inference_stats"]

def _infer(self):
output = self.triton_client.infer(
model_name=self.ensemble_model_name, inputs=self.input_tensors
)
output0 = output.as_numpy("OUTPUT0")
output1 = output.as_numpy("OUTPUT1")
outputs = [output0, output1]
return outputs

def _check_valid_output(self, output):
if output is None:
self.assertTrue(False, "unexpected error in inference")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _check_valid_output_inference(self, inference_outputs, cached_outputs):
if not np.array_equal(inference_outputs, cached_outputs):
self.assertTrue(False, "mismtached outputs")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _check_zero_stats_baseline(self, model_inference_stats):
if "count" in model_inference_stats["success"]:
self.assertTrue(False, "unexpected non-zero inference statistics")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _check_valid_stats(self, model_inference_stats):
if "count" not in model_inference_stats["success"]:
self.assertTrue(False, "unexpected error while retrieving statistics")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _check_single_cache_miss_success_inference(self, model_inference_stats):
if (
"count" not in model_inference_stats["cache_miss"]
and "count" not in model_inference_stats["cache_hit"]
):
self.assertTrue(False, "unexpected error with response cache")
lkomali marked this conversation as resolved.
Show resolved Hide resolved
if (
model_inference_stats["cache_miss"]["count"] == "0"
and int(model_inference_stats["cache_hit"]["count"]) > 0
):
self.assertTrue(False, "unexpected cache hit")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved
if int(model_inference_stats["cache_miss"]["count"]) > 1:
self.assertTrue(False, "unexpected multiple cache miss")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _run_ensemble(self):
model_inference_stats = self._get_infer_stats()
self._check_zero_stats_baseline(model_inference_stats)
inference_outputs = self._infer()
self._check_valid_output(inference_outputs)
model_inference_stats = self._get_infer_stats()
self._check_valid_stats(model_inference_stats)
self._check_single_cache_miss_success_inference(model_inference_stats)
cached_outputs = self._infer()
self._check_valid_output(cached_outputs)
self._check_valid_output_inference(inference_outputs, cached_outputs)
model_inference_stats = self._get_infer_stats()
self._check_valid_stats(model_inference_stats)
return model_inference_stats

def _update_config(self, config_file, config_pattern, config_to_add):
with open(config_file, "r") as f:
config_data = f.read()
if config_pattern not in config_data:
with open(config_file, "w") as f:
config_data = config_data + config_to_add
f.write(config_data)

def _remove_extra_config(self, config_file, config_to_remove):
with open(config_file, "r") as f:
config_data = f.read()
updated_config_data = re.sub(config_to_remove, "", config_data)
with open(config_file, "w") as f:
f.write(updated_config_data)

def _get_all_config_files(self):
config_files = []
contents = os.listdir(self.model_directory)
folders = [
folder
for folder in contents
if os.path.isdir(os.path.join(self.model_directory, folder))
]
for model_dir in folders:
config_file_path = os.path.join(
self.model_directory, str(model_dir), "config.pbtxt"
)
config_files.append(config_file_path)
return config_files

def _enable_cache_ensemble_model(self):
self._update_config(
self.ensemble_config_file,
self.response_cache_pattern,
self.response_cache_config,
)

def _enable_decoupled_ensemble_model(self):
self._update_config(
self.ensemble_config_file, self.decoupled_pattern, self.decoupled_config
)

def _enable_decoupled_composing_model(self):
self._update_config(
self.composing_config_file, self.decoupled_pattern, self.decoupled_config
)

def _remove_decoupled_ensemble_model(self):
self._remove_extra_config(self.ensemble_config_file, self.decoupled_config)

def _remove_decoupled_composing_model(self):
self._remove_extra_config(self.composing_config_file, self.decoupled_config)

def _enable_cache_for_all_models(self):
config_files = self._get_all_config_files()
for config_file in config_files:
self._update_config(
config_file, self.response_cache_pattern, self.response_cache_config
)

def _reset_config_files(self):
config_files = self._get_all_config_files()
for config_file in config_files:
self._remove_extra_config(config_file, self.response_cache_config)

def test_ensemble_top_level_cache(self):
lkomali marked this conversation as resolved.
Show resolved Hide resolved
self._enable_cache_ensemble_model()
model_inference_stats = self._run_ensemble()
if (
"count" not in model_inference_stats["cache_hit"]
or model_inference_stats["cache_hit"]["count"] != "0"
):
self.assertFalse(
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved
False, "unexpected error in top-level ensemble response caching"
)
if int(model_inference_stats["cache_hit"]["count"]) > 1:
self.assertTrue(False, "unexpected multiple cache hits")

def test_all_models_with_cache_enabled(self):
self._enable_cache_for_all_models()
model_inference_stats = self._run_ensemble()
print(model_inference_stats)
if (
"count" not in model_inference_stats["cache_hit"]
or int(model_inference_stats["cache_hit"]["count"]) == 0
):
self.assertTrue(
False, "unexpected error in top-level ensemble request caching"
)
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved
if int(model_inference_stats["cache_hit"]["count"]) > int(
model_inference_stats["success"]["count"]
):
self.assertTrue(False, "unexpected composing model cache hits")


if __name__ == "__main__":
logging.basicConfig(stream=sys.stderr)
unittest.main()
Binary file added qa/L0_response_cache/response_cache_test
Binary file not shown.
Loading
Loading