Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests for Top Level Request Caching for Ensemble Models #7074

Merged
merged 24 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 139 additions & 83 deletions qa/L0_response_cache/ensemble_cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import logging
import unittest

import infer_util as iu
import numpy as np
import test_util as tu
import tritonclient.grpc as grpcclient
from tritonclient.utils import *


class EnsembleCacheTest(tu.TestResultCollector):
Expand All @@ -48,38 +48,113 @@ def __init__(self, *args, **kwargs):
self.ensemble_model_name = "simple_graphdef_float32_float32_float32"
self.composing_model_name = "graphdef_float32_float32_float32"
self.model_directory = os.path.join(os.getcwd(), "models", "ensemble_models")
self.input_tensors = self._get_input_tensors()
self.triton_client = grpcclient.InferenceServerClient(
"localhost:8001", verbose=True
)
self.ensemble_config_file = os.path.join(
self.model_directory, self.ensemble_model_name, "config.pbtxt"
)
self.composing_config_file = os.path.join(
self.model_directory, self.composing_model_name, "config.pbtxt"
)
self.response_cache_pattern = "response_cache"
self.response_cache_config = "response_cache {\n enable:true\n}\n"
self.decoupled_pattern = "decoupled:true"
self.decoupled_config = "model_transaction_policy {\n decoupled:true\n}\n"

def _get_input_tensors(self):
input0_data = np.ones((1, 16), dtype=np.float32)
input1_data = np.ones((1, 16), dtype=np.float32)
self.input_tensors = [
grpcclient.InferInput("INPUT0", [1, 16], "FP32"),
grpcclient.InferInput("INPUT1", [1, 16], "FP32"),
input_tensors = [
grpcclient.InferInput(
"INPUT0", input0_data.shape, np_to_triton_dtype(input0_data.dtype)
),
grpcclient.InferInput(
"INPUT1", input1_data.shape, np_to_triton_dtype(input0_data.dtype)
),
]
self.input_tensors[0].set_data_from_numpy(input0_data)
self.input_tensors[1].set_data_from_numpy(input1_data)
input_tensors[0].set_data_from_numpy(input0_data)
input_tensors[1].set_data_from_numpy(input1_data)
return input_tensors

def _get_infer_result(self):
triton_client = grpcclient.InferenceServerClient("localhost:8001", verbose=True)
results = triton_client.infer(
model_name=self.ensemble_model_name, inputs=self.input_tensors
)
stats = triton_client.get_inference_statistics(
def _get_infer_stats(self):
stats = self.triton_client.get_inference_statistics(
model_name=self.ensemble_model_name, as_json=True
)
return stats

def _run_ensemble(self):
stats = self._get_infer_result()
stats = self._get_infer_result()
return stats["model_stats"][1]["inference_stats"]

def _update_config(self, config_file_path, config_pattern, config_to_add):
with open(config_file_path, "r") as f:
def _infer(self):
output = self.triton_client.infer(
model_name=self.ensemble_model_name, inputs=self.input_tensors
)
output0 = output.as_numpy("OUTPUT0")
output1 = output.as_numpy("OUTPUT1")
outputs = [output0, output1]
return outputs

def _check_valid_output(self, output):
if output is None:
self.assertTrue(False, "unexpected error in inference")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _check_valid_output_inference(self, inference_outputs, cached_outputs):
if not np.array_equal(inference_outputs, cached_outputs):
self.assertTrue(False, "mismtached outputs")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _check_zero_stats_baseline(self, model_inference_stats):
if "count" in model_inference_stats["success"]:
self.assertTrue(False, "unexpected non-zero inference statistics")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _check_valid_stats(self, model_inference_stats):
if "count" not in model_inference_stats["success"]:
self.assertTrue(False, "unexpected error while retrieving statistics")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _check_single_cache_miss_success_inference(self, model_inference_stats):
if (
"count" not in model_inference_stats["cache_miss"]
and "count" not in model_inference_stats["cache_hit"]
):
self.assertTrue(False, "unexpected error with response cache")
lkomali marked this conversation as resolved.
Show resolved Hide resolved
if (
model_inference_stats["cache_miss"]["count"] == "0"
and int(model_inference_stats["cache_hit"]["count"]) > 0
):
self.assertTrue(False, "unexpected cache hit")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved
if int(model_inference_stats["cache_miss"]["count"]) > 1:
self.assertTrue(False, "unexpected multiple cache miss")
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved

def _run_ensemble(self):
model_inference_stats = self._get_infer_stats()
self._check_zero_stats_baseline(model_inference_stats)
inference_outputs = self._infer()
self._check_valid_output(inference_outputs)
model_inference_stats = self._get_infer_stats()
self._check_valid_stats(model_inference_stats)
self._check_single_cache_miss_success_inference(model_inference_stats)
cached_outputs = self._infer()
self._check_valid_output(cached_outputs)
self._check_valid_output_inference(inference_outputs, cached_outputs)
model_inference_stats = self._get_infer_stats()
self._check_valid_stats(model_inference_stats)
return model_inference_stats

def _update_config(self, config_file, config_pattern, config_to_add):
with open(config_file, "r") as f:
config_data = f.read()
if config_pattern not in config_data:
with open(config_file_path, "a") as f:
f.write(config_to_add)
with open(config_file, "w") as f:
config_data = config_data + config_to_add
f.write(config_data)

def _remove_extra_config(self, config_file, config_to_remove):
with open(config_file, "r") as f:
config_data = f.read()
updated_config_data = re.sub(config_to_remove, "", config_data)
with open(config_file, "w") as f:
f.write(updated_config_data)

def _enable_response_cache_for_all_models(self):
def _get_all_config_files(self):
config_files = []
contents = os.listdir(self.model_directory)
folders = [
folder
Expand All @@ -90,29 +165,59 @@ def _enable_response_cache_for_all_models(self):
config_file_path = os.path.join(
self.model_directory, str(model_dir), "config.pbtxt"
)
config_pattern = "response_cache"
config_to_add = "response_cache {\n enable:true\n}\n"
self._update_config(config_file_path, config_pattern, config_to_add)
config_files.append(config_file_path)
return config_files

def _enable_cache_ensemble_model(self):
self._update_config(
self.ensemble_config_file,
self.response_cache_pattern,
self.response_cache_config,
)

def _remove_extra_config(self, config_file_path, config_to_remove):
with open(config_file_path, "r") as f:
config_data = f.read()
updated_config_data = re.sub(config_to_remove, "", config_data)
with open(config_file_path, "w") as f:
f.write(updated_config_data)
def _enable_decoupled_ensemble_model(self):
self._update_config(
self.ensemble_config_file, self.decoupled_pattern, self.decoupled_config
)

def _enable_decoupled_composing_model(self):
self._update_config(
self.composing_config_file, self.decoupled_pattern, self.decoupled_config
)

def _remove_decoupled_ensemble_model(self):
self._remove_extra_config(self.ensemble_config_file, self.decoupled_config)

def _remove_decoupled_composing_model(self):
self._remove_extra_config(self.composing_config_file, self.decoupled_config)

def _enable_cache_for_all_models(self):
config_files = self._get_all_config_files()
for config_file in config_files:
self._update_config(
config_file, self.response_cache_pattern, self.response_cache_config
)

def _reset_config_files(self):
config_files = self._get_all_config_files()
for config_file in config_files:
self._remove_extra_config(config_file, self.response_cache_config)

def test_ensemble_top_level_cache(self):
lkomali marked this conversation as resolved.
Show resolved Hide resolved
self._enable_cache_ensemble_model()
model_inference_stats = self._run_ensemble()
if (
"count" not in model_inference_stats["cache_hit"]
or int(model_inference_stats["cache_hit"]["count"]) == 0
or model_inference_stats["cache_hit"]["count"] != "0"
):
self.assertFalse(
oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved
False, "unexpected error in top-level ensemble request caching"
False, "unexpected error in top-level ensemble response caching"
)
if int(model_inference_stats["cache_hit"]["count"]) > 1:
self.assertTrue(False, "unexpected multiple cache hits")

def test_all_models_with_cache_enabled(self):
self._enable_response_cache_for_all_models()
self._enable_cache_for_all_models()
model_inference_stats = self._run_ensemble()
print(model_inference_stats)
if (
Expand All @@ -127,55 +232,6 @@ def test_all_models_with_cache_enabled(self):
):
self.assertTrue(False, "unexpected composing model cache hits")

def enable_cache_and_decoupled_ensemble_model(self):
config_file_path = os.path.join(
self.model_directory, self.ensemble_model_name, "config.pbtxt"
)
config_pattern = "decoupled:true"
config_to_add = "model_transaction_policy {\n decoupled:true\n}\n"
self._update_config(config_file_path, config_pattern, config_to_add)
config_pattern = "response_cache"
config_to_add = "response_cache {\n enable:true\n}\n"
self._update_config(config_file_path, config_pattern, config_to_add)

def enable_composing_model_decoupled(self):
config_file_path = os.path.join(
self.model_directory, self.ensemble_model_name, "config.pbtxt"
)
config_to_remove = (
r"model_transaction_policy\s*\{[^}]*decoupled\s*:\s*true[^}]*\}\n*"
)
self._remove_extra_config(config_file_path, config_to_remove)
config_file_path = os.path.join(
self.model_directory, self.composing_model_name, "config.pbtxt"
)
config_pattern = "decoupled:true"
config_to_add = "model_transaction_policy {\n decoupled:true\n}\n"
self._update_config(config_file_path, config_pattern, config_to_add)

def remove_decoupled_config(self):
config_file_path = os.path.join(
self.model_directory, self.composing_model_name, "config.pbtxt"
)
config_to_remove = (
r"model_transaction_policy\s*\{[^}]*decoupled\s*:\s*true[^}]*\}\n*"
)
self._remove_extra_config(config_file_path, config_to_remove)

def reset_config_files(self):
contents = os.listdir(self.model_directory)
folders = [
folder
for folder in contents
if os.path.isdir(os.path.join(self.model_directory, folder))
]
for model_dir in folders:
config_file_path = os.path.join(
self.model_directory, str(model_dir), "config.pbtxt"
)
config_to_remove = r"response_cache\s*\{[^}]*\}\n*"
self._remove_extra_config(config_file_path, config_to_remove)


if __name__ == "__main__":
logging.basicConfig(stream=sys.stderr)
Expand Down
72 changes: 36 additions & 36 deletions qa/L0_response_cache/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -197,26 +197,36 @@ function check_server_failure_decoupled_model {
fi
}

function test_ensemble_model_with_cache_util {
function run_server_ensemble_model {
SERVER_ARGS="--model-repository="${3}" --cache-config local,size=1048576000"
if [ "${SERVER_PID}" == "0" ]; then
run_server
run_server
lkomali marked this conversation as resolved.
Show resolved Hide resolved
set +e
python ${ENSEMBLE_CACHE_TEST_PY} "${1}" >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
RET=1
else
set +e
python ${ENSEMBLE_CACHE_TEST_PY} "${1}" >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
RET=1
else
check_test_results $TEST_RESULT_FILE 1
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "${2}"
RET=1
fi
fi
set -e

check_test_results $TEST_RESULT_FILE 1
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "${2}"
RET=1
fi
fi
set -e
}

function test_ensemble_model_cache_and_decoupled {
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest._enable_cache_ensemble_model >>$CLIENT_LOG 2>&1
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest._enable_decoupled_ensemble_model >>$CLIENT_LOG 2>&1
check_server_failure_decoupled_model "${ENSEMBLE_MODEL_DIR}" "explicit" "${ENSEMBLE_MODEL}"
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest._remove_decoupled_ensemble_model >>$CLIENT_LOG 2>&1
}

function test_ensemble_cache_composing_decoupled {
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest._enable_cache_ensemble_model >>$CLIENT_LOG 2>&1
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest._enable_decoupled_composing_model >>$CLIENT_LOG 2>&1
check_server_failure_decoupled_model "${ENSEMBLE_MODEL_DIR}" "explicit" "${ENSEMBLE_MODEL}"
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest._remove_decoupled_composing_model >>$CLIENT_LOG 2>&1
}

# Check that server fails to start for a "decoupled" model with cache enabled
Expand Down Expand Up @@ -330,36 +340,26 @@ stop_redis


oandreeva-nv marked this conversation as resolved.
Show resolved Hide resolved
#Test Ensemble Model With Cache Enabled and Decoupled Mode in Ensemble Config
lkomali marked this conversation as resolved.
Show resolved Hide resolved
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest.enable_cache_and_decoupled_ensemble_model >>$CLIENT_LOG 2>&1
check_server_failure_decoupled_model "${ENSEMBLE_MODEL_DIR}" "explicit" "${ENSEMBLE_MODEL}"
test_ensemble_model_cache_and_decoupled "${ENSEMBLE_MODEL_DIR}" "explicit" "${ENSEMBLE_MODEL}"

# Test Ensemble Model With Cache Enabled and Decoupled Mode in Composing Model
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here and similar following comments to the tests: I'd prefer capitalization of only the first letter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and a space after #

python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest.enable_composing_model_decoupled >>$CLIENT_LOG 2>&1
check_server_failure_decoupled_model "${ENSEMBLE_MODEL_DIR}" "explicit" "${ENSEMBLE_MODEL}"

#Remove decoupled Config
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest.remove_decoupled_config >> $CLIENT_LOG 2>&1
test_ensemble_cache_composing_decoupled "${ENSEMBLE_MODEL_DIR}" "explicit" "${ENSEMBLE_MODEL}"

#Test Ensemble Model with Top Level Caching Enabled
FUNCTION_NAME="EnsembleCacheTest.test_ensemble_top_level_cache"
ERROR_MESSAGE="\n***\n*** Failed: Expected Top Level Request Caching\n***"
test_ensemble_model_with_cache_util "${FUNCTION_NAME}" "${ERROR_MESSAGE}" ${ENSEMBLE_MODEL_DIR}
#check_server_success_and_kill
run_server_ensemble_model "${FUNCTION_NAME}" "${ERROR_MESSAGE}" ${ENSEMBLE_MODEL_DIR}
check_server_success_and_kill

# Test Ensemble Model with cache enabled in all models
#Test Ensemble Model with cache enabled in all models
FUNCTION_NAME="EnsembleCacheTest.test_all_models_with_cache_enabled"
ERROR_MESSAGE="\n***\n*** Failed: Expected Cache to return Top-Level request's response\n***"
test_ensemble_model_with_cache_util "${FUNCTION_NAME}" "${ERROR_MESSAGE}" ${ENSEMBLE_MODEL_DIR}
#check_server_success_and_kill

# Cleanup extra configuration for next iteration
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest.reset_config_files >>$CLIENT_LOG 2>&1
run_server_ensemble_model "${FUNCTION_NAME}" "${ERROR_MESSAGE}" ${ENSEMBLE_MODEL_DIR}
check_server_success_and_kill

#Cleanup extra configuration for next iteration
python ${ENSEMBLE_CACHE_TEST_PY} EnsembleCacheTest._reset_config_files >>$CLIENT_LOG 2>&1

if [ $SERVER_PID != "0" ]; then
kill $SERVER_PID
#wait $SERVER_PID
fi

if [ $RET -eq 0 ]; then
echo -e "\n***\n*** Test Passed\n***"
Expand Down
Loading