Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handling grpc cancellation edge-case:: Cancelling at step START #7325

Merged
merged 9 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions qa/L0_request_cancellation/grpc_cancellation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,28 @@ def callback(user_data, result, error):
user_data._completed_requests.put(result)


def prepare_inputs_outputs():
inputs, outputs = [], []
inputs.append(grpcclient.InferInput("INPUT0", [1, 1], "INT32"))
outputs.append(grpcclient.InferRequestedOutput("OUTPUT0"))
inputs[0].set_data_from_numpy(np.array([[5]], dtype=np.int32))
return inputs, outputs


def grpc_async_infer_request_with_instant_cancellation(model_name):
inputs_, outputs_ = prepare_inputs_outputs()
user_data = UserData()
with grpcclient.InferenceServerClient(url="localhost:8001") as client:
future = client.async_infer(
model_name=model_name,
inputs=inputs_,
callback=partial(callback, user_data),
outputs=outputs_,
)
time.sleep(2)
future.cancel()


class GrpcCancellationTest(unittest.IsolatedAsyncioTestCase):
_model_name = "custom_identity_int32"
_model_delay = 10.0 # seconds
Expand All @@ -68,11 +90,7 @@ def tearDown(self):
self._assert_max_duration()

def _prepare_request(self):
self._inputs = []
self._inputs.append(grpcclient.InferInput("INPUT0", [1, 1], "INT32"))
self._outputs = []
self._outputs.append(grpcclient.InferRequestedOutput("OUTPUT0"))
self._inputs[0].set_data_from_numpy(np.array([[10]], dtype=np.int32))
self._inputs, self._outputs = prepare_inputs_outputs()

def _assert_max_duration(self):
max_duration = self._model_delay * 0.5 # seconds
Expand Down
53 changes: 53 additions & 0 deletions qa/L0_request_cancellation/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,59 @@ for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc
wait $SERVER_PID
done

#
# gRPC cancellation on step START test
#
rm -rf models && mkdir models
mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \
echo 'name: "custom_identity_int32"' >> config.pbtxt && \
echo 'backend: "identity"' >> config.pbtxt && \
echo 'max_batch_size: 1024' >> config.pbtxt && \
echo -e 'input [{ name: "INPUT0" \n data_type: TYPE_INT32 \n dims: [ -1 ] }]' >> config.pbtxt && \
echo -e 'output [{ name: "OUTPUT0" \n data_type: TYPE_INT32 \n dims: [ -1 ] }]' >> config.pbtxt && \
echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt && \
echo -e 'parameters [{ key: "execute_delay_ms" \n value: { string_value: "500" } }]' >> config.pbtxt)

TEST_LOG="./grpc_cancellation_stress_test.log"
SERVER_LOG="grpc_cancellation_stress_test.server.log"

SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=2"
export TRITONSERVER_DELAY_GRPC_PROCESS=10000
run_server
if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
cat $SERVER_LOG
exit 1
fi

INIT_NEW_REQ_HANDL_COUNT=$(grep -c "New request handler for ModelInferHandler" $SERVER_LOG)

set +e
while true; do
python3 -c "import grpc_cancellation_test; grpc_cancellation_test.grpc_async_infer_request_with_instant_cancellation(model_name=\"custom_identity_int32\")" > $TEST_LOG 2>&1
sleep 30
CANCEL_AT_START_COUNT=$(grep -c 'Cancellation notification received for ModelInferHandler, rpc_ok=1, context [0-9]*, [0-9]* step START' $SERVER_LOG)
if [[ $CANCEL_AT_START_COUNT == 0 ]]; then
INIT_NEW_REQ_HANDL_COUNT=$(grep -c "New request handler for ModelInferHandler" $SERVER_LOG)
continue
fi
NEW_REQ_HANDL_COUNT=$(grep -c "New request handler for ModelInferHandler" $SERVER_LOG)
if [[ $NEW_REQ_HANDL_COUNT == $INIT_NEW_REQ_HANDL_COUNT ]]; then
echo -e "\n***\n*** gRPC Cancellation on step START Test Failed: New request handler for ModelInferHandler was not created \n***"
cat $TEST_LOG
RET=1
break
else
break
fi
done
kthui marked this conversation as resolved.
Show resolved Hide resolved

set -e
kill $SERVER_PID
wait $SERVER_PID

unset TRITONSERVER_DELAY_GRPC_PROCESS

#
# End-to-end scheduler tests
#
Expand Down
26 changes: 26 additions & 0 deletions src/grpc/infer_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,35 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
// Need to protect the state transitions for these cases.
std::lock_guard<std::recursive_mutex> lock(state->step_mtx_);

if (state->delay_process_ms_ != 0) {
// Will delay the Process execution by the specified time.
// This can be used to test the flow when cancellation request
// issued for the request, which is still at START step.
LOG_INFO << "Delaying the write of the response by "
<< state->delay_process_ms_ << " ms...";
std::this_thread::sleep_for(
std::chrono::milliseconds(state->delay_process_ms_));
}

// Handle notification for cancellation which can be raised
// asynchronously if detected on the network.
if (state->IsGrpcContextCancelled()) {
if (rpc_ok && (state->step_ == Steps::START) &&
(state->context_->step_ != Steps::CANCELLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what does the second clause here imply?

!= Steps::Cancelled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid calling StartNewRequest twice, at first we fall into HandleCancellation and go through this block, which returns true for resume, so we will go into if (state->IsGrpcContextCancelled()) loop for the second time but this time state->context_->step_ is CANCELLED

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late to the game, but what is the reasoning of not moving the original "StartNewRequest() if at START" to before handling the cancellation? Although I think other code needs to be moved around as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% aware of all underlying processes, meaning state->step_ and state->context_->step_ combinations. This change helps to address the bug with known symptoms. Refactoring if the Process logic needs proper time and testing IMHO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kthui thoughts? If feasible, this can be done as follow-up and by someone else. Want to make sure if there is room for improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think there is definitely room for improvement/refactoring, i.e. I think the if (shutdown) { ... } could also be moved into the if (state->step_ == Steps::START) { ... } else ... block, so all procedures for Steps::START would be inside the if (state->step_ == Steps::START) { ... } block, but it can be done as a follow-up later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jira ticket: DLIS-6831

#ifdef TRITON_ENABLE_TRACING
// Can't create trace as we don't know the model to be requested,
// track timestamps in 'state'
state->trace_timestamps_.emplace_back(std::make_pair(
"GRPC_WAITREAD_END", TraceManager::CaptureTimestamp()));
#endif // TRITON_ENABLE_TRACING
// Need to create a new request object here explicitly for step START,
// because we will never leave this if body. Refer to PR 7325.
// This is a special case for ModelInferHandler, since we have 2 threads,
// and each of them can process cancellation. ModelStreamInfer has only 1
// thread, and cancellation at step START was not reproducible in a
// single thread scenario.
StartNewRequest();
}
bool resume = state->context_->HandleCancellation(state, rpc_ok, Name());
return resume;
}
Expand Down
6 changes: 6 additions & 0 deletions src/grpc/infer_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,11 @@ class InferHandlerState {
if (cstr != nullptr) {
delay_complete_ms_ = atoi(cstr);
}
const char* pstr = getenv("TRITONSERVER_DELAY_GRPC_PROCESS");
delay_process_ms_ = 0;
if (pstr != nullptr) {
delay_process_ms_ = atoi(pstr);
}

response_queue_.reset(new ResponseQueue<ResponseType>());
Reset(context, start_step);
Expand Down Expand Up @@ -1120,6 +1125,7 @@ class InferHandlerState {
// For testing and debugging
int delay_response_ms_;
int delay_complete_ms_;
int delay_process_ms_;

// For inference requests the allocator payload, unused for other
// requests.
Expand Down
Loading