Skip to content

Commit

Permalink
[fix][client] fix incomingMessageSize and client memory usage is nega…
Browse files Browse the repository at this point in the history
…tive (apache#23624)

Co-authored-by: fanjianye <[email protected]>
  • Loading branch information
TakaHiR07 and fanjianye authored Nov 22, 2024
1 parent 024ff75 commit 708c5cc
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4252,6 +4252,62 @@ public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
});
}

@Test(timeOut = 100000)
public void testNegativeIncomingMessageSize() throws Exception {
final String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" +
UUID.randomUUID().toString();
final String subName = "my-sub";

admin.topics().createPartitionedTopic(topicName, 3);

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();

final int messages = 1000;
List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync());
}
FutureUtil.waitForAll(messageIds).get();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();


Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should greater that 0, current size is {}", size);
Assert.assertTrue(size > 0);
});


for (int i = 0; i < messages; i++) {
consumer.receive();
}


Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should be 0, current size is {}", size);
Assert.assertEquals(size, 0);
});


MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer;
List<ConsumerImpl<byte[]>> list = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> subConsumer : list) {
long size = subConsumer.getIncomingMessageSize();
log.info("Check the sub consumer incoming message size should be 0, current size is {}", size);
Assert.assertEquals(size, 0);
}
}

@Data
@EqualsAndHashCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@


import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -257,4 +265,58 @@ public void testMultiConsumerImplBatchReceive() throws PulsarClientException, Pu
Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2);
log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
}

@Test
public void testNegativeClientMemory() throws Exception {
final String topicName = "persistent://public/default/testMemory-" +
UUID.randomUUID().toString();
final String subName = "my-sub";

admin.topics().createPartitionedTopic(topicName, 3);

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();

final int messages = 1000;
List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync());
}
FutureUtil.waitForAll(messageIds).get();


@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.autoScaledReceiverQueueSizeEnabled(true)
.subscribe();


Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should greater that 0, current size is {}", size);
Assert.assertTrue(size > 0);
});


for (int i = 0; i < messages; i++) {
consumer.receive();
}

Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should be 0, current size is {}", size);
Assert.assertEquals(size, 0);
});


MemoryLimitController controller = ((PulsarClientImpl)pulsarClient).getMemoryLimitController();
Assert.assertEquals(controller.currentUsage(), 0);
Assert.assertEquals(controller.currentUsagePercent(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,11 @@ protected void decreaseIncomingMessageSize(final Message<?> message) {
getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size()));
}

protected void increaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(message.size()));
}

public long getIncomingMessageSize() {
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1668,6 +1668,8 @@ void notifyPendingReceivedCallback(final Message<T> message, Exception exception
return;
}

// increase incomingMessageSize here because the size would be decreased in messageProcessed() next step
increaseIncomingMessageSize(message);
// increase permits for available message-queue
messageProcessed(message);
// call interceptor and complete received callback
Expand Down

0 comments on commit 708c5cc

Please sign in to comment.