Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] Handle Stream Cancellations for Unstarted Streams #10425

Closed
wants to merge 2 commits into from

Conversation

ankitsultana
Copy link
Contributor

We detected this issue due to a flaky test and there was a discussion on this in: #10417

Context: When there's an error, we try to send the error-block via MailboxSendOperator, and after it returns a cancellation is issued for the OpChain, which issues a cancellation for the MailboxSendOperator as well.

On looking into the flaky test (which can be reproduced on local by running it separately in IntelliJ), I found that the receiving mailbox is never initialized for the broker's receiver. On looking further I saw that calling StreamObserver#onNext doesn't guarantee that the stream has been created. The onNext operation is queued in a executor (see DelayedClientCall), and if there's a concurrent cancellation before the queued request is processed, the stream would never get created and the receiver will be stuck waiting to be initialized because neither onNext, onError or onCompleted would be called on its end.

In this patch I have introduced a stop-gap solution for this.

For the full fix I have created this issue: #10424. I have been punting writing the design doc for this. I'll try to finish it this week.

@codecov-commenter
Copy link

codecov-commenter commented Mar 15, 2023

Codecov Report

Merging #10425 (1872ce6) into master (3bad67d) will decrease coverage by 38.28%.
The diff coverage is 0.00%.

@@              Coverage Diff              @@
##             master   #10425       +/-   ##
=============================================
- Coverage     70.32%   32.05%   -38.28%     
+ Complexity     6105      273     -5832     
=============================================
  Files          2055     2053        -2     
  Lines        111389   111405       +16     
  Branches      16939    16940        +1     
=============================================
- Hits          78337    35706    -42631     
- Misses        27566    72511    +44945     
+ Partials       5486     3188     -2298     
Flag Coverage Δ
integration1 24.37% <0.00%> (-0.04%) ⬇️
integration2 ?
unittests1 ?
unittests2 13.85% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...apache/pinot/query/mailbox/GrpcSendingMailbox.java 0.00% <0.00%> (-81.58%) ⬇️
...uery/runtime/executor/OpChainSchedulerService.java 0.00% <0.00%> (-96.83%) ⬇️

... and 1286 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

}
DELAYED_CANCEL_SCHEDULER.schedule(() -> {
try {
_mailboxContentStreamObserver.onError(Status.CANCELLED.asRuntimeException());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to simply cancelled since when sender issues onError, a rst_stream packet is sent on the stream and no additional data is sent to the receiver. In other words, receiver only sees a cancellation on its end.

However when a server calls its outbound onError then the trailers have information about the Status which contains the exception message, so in those cases we should continue to pass the detailed Status.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the onError call be propagate to the receiving side? and populate the errorContent block? how come the receiving end is still error out on timeout?

@@ -185,4 +187,13 @@ private void cancelOpChain(OpChain opChain, Throwable e) {
_scheduler.deregister(opChain);
}
}

private String getErrorMessage(TransferableBlock block) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's put this in transferable block util. there's already something similar in DataBlockUtils as well

Comment on lines +298 to +300
// Do cancellations immediately for this test
grpcSendingMailbox = Mockito.spy(grpcSendingMailbox);
Mockito.doReturn(0L).when(grpcSendingMailbox).getCancellationDelayMs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any test we can add for this cancellation?

}
DELAYED_CANCEL_SCHEDULER.schedule(() -> {
try {
_mailboxContentStreamObserver.onError(Status.CANCELLED.asRuntimeException());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the onError call be propagate to the receiving side? and populate the errorContent block? how come the receiving end is still error out on timeout?

@ankitsultana
Copy link
Contributor Author

Talked to Rong offline. There are some reservations against delaying the cancellation and #10432 is an alternative. Both options are imperfect.

Closing this one and we can go with #10432 for now. I'll follow-up on #10424 this week itself. 🚀

@walterddr
Copy link
Contributor

Thank you for the thorough investigation and follow-up discussion with #10424 @ankitsultana

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants