Skip to content

Commit

Permalink
[fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when cha…
Browse files Browse the repository at this point in the history
…nnel inactive (apache#17856)

### Motivation
https://github.com/apache/pulsar/blob/b89c1451551a6bbe681465726906a2e61c9d8a69/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L282-L297
The `pendingLookupRequestSemaphore` will leak when channel inactive. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests`

### Modifications
We can't easily release the semaphore in `channelInactive`, because there are not only `LookUpRequest`. So release the semaphore when connectionException

### Verifying this change
Add unit test case to cover this change

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc-required`
(Your PR needs to update docs and you will update later)

- [x] `doc-not-needed`
bug fixs, no need doc

- [ ] `doc`
(Your PR contains doc changes)

- [ ] `doc-complete`
(Docs have been already added)

(cherry picked from commit b451880)
(cherry picked from commit 28e4b9c)
  • Loading branch information
hezhangjian authored and nicoloboschi committed Jan 10, 2023
1 parent da57cdf commit cae167b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public class ClientCnx extends PulsarHandler {

private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();

@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
private final Semaphore pendingLookupRequestSemaphore;
private final Semaphore maxLookupRequestSemaphore;
private final EventLoopGroup eventLoopGroup;
Expand Down Expand Up @@ -754,6 +757,11 @@ public CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long reque
TimedCompletableFuture<LookupDataResult> future = new TimedCompletableFuture<>();

if (pendingLookupRequestSemaphore.tryAcquire()) {
future.whenComplete((lookupDataResult, throwable) -> {
if (throwable instanceof ConnectException) {
pendingLookupRequestSemaphore.release();
}
});
addPendingLookupRequests(requestId, future);
ctx.writeAndFlush(request).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -44,6 +45,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

public class ClientCnxTest {
Expand Down Expand Up @@ -74,6 +76,70 @@ public void testClientCnxTimeout() throws Exception {
eventLoop.shutdownGracefully();
}

@Test
public void testPendingLookupRequestSemaphore() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10_000);
conf.setKeepAliveIntervalSeconds(0);
ClientCnx cnx = new ClientCnx(conf, eventLoop);

ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
when(ctx.channel()).thenReturn(channel);
ChannelFuture listenerFuture = mock(ChannelFuture.class);
when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
cnx.channelActive(ctx);
CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture<Exception> completableFuture = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(1_000);
CompletableFuture<BinaryProtoLookupService.LookupDataResult> future =
cnx.newLookup(null, 123);
countDownLatch.countDown();
future.get();
} catch (Exception e) {
completableFuture.complete(e);
}
}).start();
countDownLatch.await();
cnx.channelInactive(ctx);
assertTrue(completableFuture.get().getCause() instanceof PulsarClientException.ConnectException);
// wait for subsequent calls over
Awaitility.await().untilAsserted(() -> {
assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest());
});
eventLoop.shutdownGracefully();
}

@Test
public void testPendingWaitingLookupRequestSemaphore() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10_000);
conf.setKeepAliveIntervalSeconds(0);
ClientCnx cnx = new ClientCnx(conf, eventLoop);

ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
when(ctx.channel()).thenReturn(channel);
ChannelFuture listenerFuture = mock(ChannelFuture.class);
when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
cnx.channelActive(ctx);
for (int i = 0; i < 5001; i++) {
cnx.newLookup(null, i);
}
cnx.channelInactive(ctx);
// wait for subsequent calls over
Awaitility.await().untilAsserted(() -> {
assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest());
});
eventLoop.shutdownGracefully();
}

@Test
public void testReceiveErrorAtSendConnectFrameState() throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
Expand Down

0 comments on commit cae167b

Please sign in to comment.