Skip to content

Commit

Permalink
[multistage] Handle Stream Cancellations for Unstarted Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana committed Mar 15, 2023
1 parent 3bad67d commit 4adde96
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
Expand All @@ -41,6 +45,10 @@
*/
public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
private static final long DEFAULT_CANCELLATION_DELAY_MS = 1_000;
private static final ScheduledExecutorService DELAYED_CANCEL_SCHEDULER =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
new TracedThreadFactory(Thread.NORM_PRIORITY, true, "grpc-send-cancel-%d"));
private final String _mailboxId;
private final AtomicBoolean _initialized = new AtomicBoolean(false);

Expand Down Expand Up @@ -84,14 +92,15 @@ public boolean isInitialized() {
public void cancel(Throwable t) {
if (_initialized.get() && !_statusObserver.isFinished()) {
LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
try {
_mailboxContentStreamObserver.onError(Status.fromThrowable(
new RuntimeException("Cancelled by the sender")).asRuntimeException());
} catch (Exception e) {
// TODO: We don't necessarily need to log this since this is relatively quite likely to happen. Logging this
// anyways as info for now so we can see how frequently this happens.
LOGGER.info("Unexpected error issuing onError to MailboxContentStreamObserver: {}", e.getMessage());
}
DELAYED_CANCEL_SCHEDULER.schedule(() -> {
try {
_mailboxContentStreamObserver.onError(Status.CANCELLED.asRuntimeException());
} catch (Exception e) {
// TODO: We don't necessarily need to log this since this is relatively quite likely to happen. Logging this
// anyways as info for now so we can see how frequently this happens.
LOGGER.info("Unexpected error issuing onError to MailboxContentStreamObserver: {}", e.getMessage());
}
}, DEFAULT_CANCELLATION_DELAY_MS, TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.pinot.query.runtime.executor;

import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.MapUtils;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
Expand Down Expand Up @@ -83,7 +86,6 @@ protected void run()
@Override
public void runJob() {
boolean isFinished = false;
boolean returnedErrorBlock = false;
Throwable thrown = null;
try {
LOGGER.trace("({}): Executing", operatorChain);
Expand All @@ -107,7 +109,7 @@ public void runJob() {
} else {
isFinished = true;
if (result.isErrorBlock()) {
returnedErrorBlock = true;
thrown = new RuntimeException(getErrorMessage(result));
LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
result.getDataBlock().getExceptions());
} else {
Expand All @@ -119,7 +121,7 @@ public void runJob() {
LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e);
thrown = e;
} finally {
if (returnedErrorBlock || thrown != null) {
if (thrown != null) {
cancelOpChain(operatorChain, thrown);
} else if (isFinished) {
closeOpChain(operatorChain);
Expand Down Expand Up @@ -185,4 +187,13 @@ private void cancelOpChain(OpChain opChain, Throwable e) {
_scheduler.deregister(opChain);
}
}

private String getErrorMessage(TransferableBlock block) {
Preconditions.checkState(block.isErrorBlock());
DataBlock dataBlock = block.getDataBlock();
if (MapUtils.isNotEmpty(dataBlock.getExceptions())) {
return dataBlock.getExceptions().values().iterator().next();
}
return "No message found in error-block";
}
}

0 comments on commit 4adde96

Please sign in to comment.