diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index eb39fe53f1ab6..9c6dcfa0effd8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -141,6 +141,9 @@ public class ClientCnx extends PulsarHandler { private final CompletableFuture connectionFuture = new CompletableFuture(); private final ConcurrentLinkedQueue requestTimeoutQueue = new ConcurrentLinkedQueue<>(); + + @VisibleForTesting + @Getter(AccessLevel.PACKAGE) private final Semaphore pendingLookupRequestSemaphore; private final Semaphore maxLookupRequestSemaphore; private final EventLoopGroup eventLoopGroup; @@ -776,6 +779,11 @@ public CompletableFuture newLookup(ByteBuf request, long reque TimedCompletableFuture future = new TimedCompletableFuture<>(); if (pendingLookupRequestSemaphore.tryAcquire()) { + future.whenComplete((lookupDataResult, throwable) -> { + if (throwable instanceof ConnectException) { + pendingLookupRequestSemaphore.release(); + } + }); addPendingLookupRequests(requestId, future); ctx.writeAndFlush(request).addListener(writeFuture -> { if (!writeFuture.isSuccess()) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index c6eba43fb7a16..63aa7b7048be0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -34,6 +34,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; import java.util.function.Consumer; @@ -50,6 +51,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class ClientCnxTest { @@ -80,6 +82,70 @@ public void testClientCnxTimeout() throws Exception { eventLoop.shutdownGracefully(); } + @Test + public void testPendingLookupRequestSemaphore() throws Exception { + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout")); + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setOperationTimeoutMs(10_000); + conf.setKeepAliveIntervalSeconds(0); + ClientCnx cnx = new ClientCnx(conf, eventLoop); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + when(ctx.channel()).thenReturn(channel); + ChannelFuture listenerFuture = mock(ChannelFuture.class); + when(listenerFuture.addListener(any())).thenReturn(listenerFuture); + when(ctx.writeAndFlush(any())).thenReturn(listenerFuture); + cnx.channelActive(ctx); + CountDownLatch countDownLatch = new CountDownLatch(1); + CompletableFuture completableFuture = new CompletableFuture<>(); + new Thread(() -> { + try { + Thread.sleep(1_000); + CompletableFuture future = + cnx.newLookup(null, 123); + countDownLatch.countDown(); + future.get(); + } catch (Exception e) { + completableFuture.complete(e); + } + }).start(); + countDownLatch.await(); + cnx.channelInactive(ctx); + assertTrue(completableFuture.get().getCause() instanceof PulsarClientException.ConnectException); + // wait for subsequent calls over + Awaitility.await().untilAsserted(() -> { + assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest()); + }); + eventLoop.shutdownGracefully(); + } + + @Test + public void testPendingWaitingLookupRequestSemaphore() throws Exception { + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout")); + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setOperationTimeoutMs(10_000); + conf.setKeepAliveIntervalSeconds(0); + ClientCnx cnx = new ClientCnx(conf, eventLoop); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + when(ctx.channel()).thenReturn(channel); + ChannelFuture listenerFuture = mock(ChannelFuture.class); + when(listenerFuture.addListener(any())).thenReturn(listenerFuture); + when(ctx.writeAndFlush(any())).thenReturn(listenerFuture); + cnx.channelActive(ctx); + for (int i = 0; i < 5001; i++) { + cnx.newLookup(null, i); + } + cnx.channelInactive(ctx); + // wait for subsequent calls over + Awaitility.await().untilAsserted(() -> { + assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest()); + }); + eventLoop.shutdownGracefully(); + } + @Test public void testReceiveErrorAtSendConnectFrameState() throws Exception { ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");