Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handling grpc cancellation edge-case:: Cancelling at step START #7325

Merged
merged 9 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions qa/L0_request_cancellation/grpc_cancellation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@ def callback(user_data, result, error):
user_data._completed_requests.put(result)


def prepare_inputs_outputs():
inputs, outputs = [], []
inputs.append(grpcclient.InferInput("INPUT0", [1, 1], "INT32"))
outputs.append(grpcclient.InferRequestedOutput("OUTPUT0"))
inputs[0].set_data_from_numpy(np.array([[5]], dtype=np.int32))
return inputs, outputs


def grpc_async_infer_request_with_instant_cancellation(model_name):
inputs_, outputs_ = prepare_inputs_outputs()
user_data = UserData()
with grpcclient.InferenceServerClient(url="localhost:8001") as client:
t_end = time.time() + 60
while time.time() < t_end:
tanmayv25 marked this conversation as resolved.
Show resolved Hide resolved
future = client.async_infer(
model_name=model_name,
inputs=inputs_,
callback=partial(callback, user_data),
outputs=outputs_,
)
future.cancel()


class GrpcCancellationTest(unittest.IsolatedAsyncioTestCase):
_model_name = "custom_identity_int32"
_model_delay = 10.0 # seconds
Expand All @@ -68,11 +91,7 @@ def tearDown(self):
self._assert_max_duration()

def _prepare_request(self):
self._inputs = []
self._inputs.append(grpcclient.InferInput("INPUT0", [1, 1], "INT32"))
self._outputs = []
self._outputs.append(grpcclient.InferRequestedOutput("OUTPUT0"))
self._inputs[0].set_data_from_numpy(np.array([[10]], dtype=np.int32))
self._inputs, self._outputs = prepare_inputs_outputs()

def _assert_max_duration(self):
max_duration = self._model_delay * 0.5 # seconds
Expand Down
69 changes: 67 additions & 2 deletions qa/L0_request_cancellation/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,74 @@ for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc
done

#
# End-to-end scheduler tests
# gRPC cancellation stress test
#
rm -rf models && mkdir models
mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \
echo 'name: "custom_identity_int32"' >> config.pbtxt && \
echo 'backend: "identity"' >> config.pbtxt && \
echo 'max_batch_size: 1024' >> config.pbtxt && \
echo -e 'input [{ name: "INPUT0" \n data_type: TYPE_INT32 \n dims: [ -1 ] }]' >> config.pbtxt && \
echo -e 'output [{ name: "OUTPUT0" \n data_type: TYPE_INT32 \n dims: [ -1 ] }]' >> config.pbtxt && \
echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt && \
echo -e 'parameters [{ key: "execute_delay_ms" \n value: { string_value: "500" } }]' >> config.pbtxt)

TEST_LOG="./grpc_cancellation_stress_test.log"
SERVER_LOG="grpc_cancellation_stress_test.server.log"

SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=2"
run_server
if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
cat $SERVER_LOG
exit 1
fi

set +e
python -c "import grpc_cancellation_test; grpc_cancellation_test.grpc_async_infer_request_with_instant_cancellation(model_name=\"custom_identity_int32\")" > $TEST_LOG 2>&1 &
PYTHON_CLIENT_PID=$!
PREV_NEW_REQ_HANDL_COUNT=-1
NUMBER_RUNS=10
while true; do
if ps -p $PYTHON_CLIENT_PID > /dev/null; then
echo "Python process stopped. Restarting..."
python -c "import grpc_cancellation_test; grpc_cancellation_test.grpc_async_infer_request_with_instant_cancellation(model_name=\"custom_identity_int32\")" > $TEST_LOG 2>&1 &
PYTHON_CLIENT_PID=$!
(( NUMBER_RUNS -= 1 ))
fi
CUR_NEW_REQ_HANDL_COUNT=$(cat $SERVER_LOG | grep -c "New request handler for ModelInferHandler")
echo $CUR_NEW_REQ_HANDL_COUNT
sleep 1
if [[ $CUR_NEW_REQ_HANDL_COUNT -gt $PREV_NEW_REQ_HANDL_COUNT ]]; then
# Update the previous count
PREV_NEW_REQ_HANDL_COUNT=$CUR_NEW_REQ_HANDL_COUNT
else
# Kill the Python process if the count hasn't increased
kill $PYTHON_CLIENT_PID
wait $PYTHON_CLIENT_PID
echo "Python process killed. Final 'New request handler' count: $CUR_NEW_REQ_HANDL_COUNT"
echo "Cancellation notification received count: $(grep -c 'Cancellation notification received for ModelInferHandler, rpc_ok=1, context 0, [0-9]* step' $SERVER_LOG)"
echo "Cancellation notification received for START count: $(grep -c 'Cancellation notification received for ModelInferHandler, rpc_ok=1, context 0, [0-9]* step START' $SERVER_LOG)"
RET=1
break
fi
if [ "$NUMBER_RUNS" -le 0 ]; then
kill $PYTHON_CLIENT_PID
wait $PYTHON_CLIENT_PID
echo "Python process killed. Final 'New request handler' count: $CUR_NEW_REQ_HANDL_COUNT"
echo "Cancellation notification received count: $(grep -c 'Cancellation notification received for ModelInferHandler, rpc_ok=1, context 0, [0-9]* step' $SERVER_LOG)"
echo "Cancellation notification received for START count: $(grep -c 'Cancellation notification received for ModelInferHandler, rpc_ok=1, context 0, [0-9]* step START' $SERVER_LOG)"
break
fi
sleep 20
done

set -e
kill $SERVER_PID
wait $SERVER_PID
#
# End-to-end scheduler tests

rm -rf models && mkdir models
mkdir -p models/dynamic_batch/1 && (cd models/dynamic_batch && \
echo 'name: "dynamic_batch"' >> config.pbtxt && \
Expand Down Expand Up @@ -174,7 +240,6 @@ set -e

kill $SERVER_PID
wait $SERVER_PID

#
# Implicit state tests
#
Expand Down
10 changes: 10 additions & 0 deletions src/grpc/infer_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,16 @@ ModelInferHandler::Process(InferHandler::State* state, bool rpc_ok)
// Handle notification for cancellation which can be raised
// asynchronously if detected on the network.
if (state->IsGrpcContextCancelled()) {
if (rpc_ok && (state->step_ == Steps::START) &&
(state->context_->step_ != Steps::CANCELLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what does the second clause here imply?

!= Steps::Cancelled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid calling StartNewRequest twice, at first we fall into HandleCancellation and go through this block, which returns true for resume, so we will go into if (state->IsGrpcContextCancelled()) loop for the second time but this time state->context_->step_ is CANCELLED

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late to the game, but what is the reasoning of not moving the original "StartNewRequest() if at START" to before handling the cancellation? Although I think other code needs to be moved around as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% aware of all underlying processes, meaning state->step_ and state->context_->step_ combinations. This change helps to address the bug with known symptoms. Refactoring if the Process logic needs proper time and testing IMHO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kthui thoughts? If feasible, this can be done as follow-up and by someone else. Want to make sure if there is room for improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think there is definitely room for improvement/refactoring, i.e. I think the if (shutdown) { ... } could also be moved into the if (state->step_ == Steps::START) { ... } else ... block, so all procedures for Steps::START would be inside the if (state->step_ == Steps::START) { ... } block, but it can be done as a follow-up later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jira ticket: DLIS-6831

#ifdef TRITON_ENABLE_TRACING
// Can't create trace as we don't know the model to be requested,
// track timestamps in 'state'
state->trace_timestamps_.emplace_back(std::make_pair(
"GRPC_WAITREAD_END", TraceManager::CaptureTimestamp()));
#endif // TRITON_ENABLE_TRACING
StartNewRequest();
}
bool resume = state->context_->HandleCancellation(state, rpc_ok, Name());
return resume;
}
Expand Down
Loading