Skip to content

Commit

Permalink
[fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when Ser… (
Browse files Browse the repository at this point in the history
apache#18219)

### Motivation
https://github.com/apache/pulsar/blob/b061c6ac5833c21e483368febebd0d30679a35e1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L748-L774
The `pendingLookupRequestSemaphore` will leak when handleError. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests`

related PR: apache#17856

### Modifications
We can't easily release the semaphore in `handleError`, because there are not only `LookUpRequest`. So release the semaphore when LookupException

### Verifying this change
Add unit test case to cover this change

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc-required`
(Your PR needs to update docs and you will update later)

- [x] `doc-not-needed`
bug fixs, no need doc

- [ ] `doc`
(Your PR contains doc changes)

- [ ] `doc-complete`
(Docs have been already added)

(cherry picked from commit fad3ccc)
(cherry picked from commit 3996f3d)
  • Loading branch information
hezhangjian authored and nicoloboschi committed Jan 11, 2023
1 parent 8bab173 commit d03e746
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,8 @@ public CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long reque

if (pendingLookupRequestSemaphore.tryAcquire()) {
future.whenComplete((lookupDataResult, throwable) -> {
if (throwable instanceof ConnectException) {
if (throwable instanceof ConnectException
|| throwable instanceof PulsarClientException.LookupException) {
pendingLookupRequestSemaphore.release();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,49 @@ public void testPendingLookupRequestSemaphore() throws Exception {
eventLoop.shutdownGracefully();
}

@Test
public void testPendingLookupRequestSemaphoreServiceNotReady() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10_000);
conf.setKeepAliveIntervalSeconds(0);
ClientCnx cnx = new ClientCnx(conf, eventLoop);

ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
when(ctx.channel()).thenReturn(channel);
ChannelFuture listenerFuture = mock(ChannelFuture.class);
when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
cnx.channelActive(ctx);
cnx.state = ClientCnx.State.Ready;
CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture<Exception> completableFuture = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(1_000);
CompletableFuture<BinaryProtoLookupService.LookupDataResult> future =
cnx.newLookup(null, 123);
countDownLatch.countDown();
future.get();
} catch (Exception e) {
completableFuture.complete(e);
}
}).start();
countDownLatch.await();
CommandError commandError = new CommandError();
commandError.setRequestId(123L);
commandError.setError(ServerError.ServiceNotReady);
commandError.setMessage("Service not ready");
cnx.handleError(commandError);
assertTrue(completableFuture.get().getCause() instanceof PulsarClientException.LookupException);
// wait for subsequent calls over
Awaitility.await().untilAsserted(() -> {
assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest());
});
eventLoop.shutdownGracefully();
}

@Test
public void testPendingWaitingLookupRequestSemaphore() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
Expand Down

0 comments on commit d03e746

Please sign in to comment.