diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java index 69c21aba30fe..79672239db9c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.mailbox; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import io.grpc.Status; @@ -100,7 +101,7 @@ public void cancel(Throwable t) { // anyways as info for now so we can see how frequently this happens. LOGGER.info("Unexpected error issuing onError to MailboxContentStreamObserver: {}", e.getMessage()); } - }, DEFAULT_CANCELLATION_DELAY_MS, TimeUnit.MILLISECONDS); + }, getCancellationDelayMs(), TimeUnit.MILLISECONDS); } } @@ -109,6 +110,11 @@ public String getMailboxId() { return _mailboxId; } + @VisibleForTesting + public long getCancellationDelayMs() { + return DEFAULT_CANCELLATION_DELAY_MS; + } + private void open() { _mailboxContentStreamObserver = _mailboxContentStreamObserverSupplier.apply(_deadlineMs); _initialized.set(true); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java index 7d0843960156..82b737f703a6 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java @@ -37,6 +37,7 @@ import org.apache.pinot.query.testutils.QueryTestUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.util.TestUtils; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -294,6 +295,10 @@ public void testStreamCancellationBySender() GrpcSendingMailbox grpcSendingMailbox = (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId, deadlineMs); + // Do cancellations immediately for this test + grpcSendingMailbox = Mockito.spy(grpcSendingMailbox); + Mockito.doReturn(0L).when(grpcSendingMailbox).getCancellationDelayMs(); + GrpcReceivingMailbox grpcReceivingMailbox = (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId);