diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index a21c272f91b1d..43efead606105 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -418,6 +418,8 @@ The Apache Software License, Version 2.0 - avro-protobuf-1.11.4.jar * RE2j -- re2j-1.7.jar * Spotify completable-futures -- completable-futures-0.3.6.jar + * RoaringBitmap -- RoaringBitmap-1.2.0.jar + * Fastutil -- fastutil-8.5.14.jar BSD 3-clause "New" or "Revised" License * JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt diff --git a/pom.xml b/pom.xml index 54744a253bb2b..68069aeacb6c1 100644 --- a/pom.xml +++ b/pom.xml @@ -2586,6 +2586,7 @@ flexible messaging model and an intuitive client API. pulsar-metadata jetcd-core-shaded jclouds-shaded + pulsar-client-dependencies-minimized pulsar-package-management @@ -2651,6 +2652,7 @@ flexible messaging model and an intuitive client API. distribution pulsar-metadata jetcd-core-shaded + pulsar-client-dependencies-minimized pulsar-package-management diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index b372ecabc5de4..f8bc30f09667c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import java.util.HashSet; @@ -311,7 +312,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { // negative topic message id consumer.negativeAcknowledge(topicMessageId); NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker(); - assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L); + assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L); assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); // negative batch message id @@ -319,11 +320,56 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { consumer.negativeAcknowledge(batchMessageId); consumer.negativeAcknowledge(batchMessageId2); consumer.negativeAcknowledge(batchMessageId3); - assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L); + assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L); assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); } + /** + * If we nack multiple messages in the same batch with different redelivery delays, the messages should be redelivered + * with the correct delay. However, all messages are redelivered at the same time. + * @throws Exception + */ + @Test + public void testNegativeAcksWithBatch() throws Exception { + cleanup(); + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + setup(); + String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch"); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .negativeAckRedeliveryDelay(3, TimeUnit.SECONDS) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(true) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .batchingMaxMessages(2) + .create(); + // send two messages in the same batch + producer.sendAsync("test-0"); + producer.sendAsync("test-1"); + producer.flush(); + + // negative ack the first message + consumer.negativeAcknowledge(consumer.receive()); + // wait for 2s, negative ack the second message + Thread.sleep(2000); + consumer.negativeAcknowledge(consumer.receive()); + + // now 2s has passed, the first message should be redelivered 1s later. + Message msg1 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg1); + } + @Test public void testNegativeAcksWithBatchAckEnabled() throws Exception { cleanup(); diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index 74ced063fbfd4..2d1bd3f731fce 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -34,6 +34,17 @@ ${project.groupId} pulsar-client-admin-original ${project.version} + + + it.unimi.dsi + fastutil + + + + + ${project.groupId} + pulsar-client-dependencies-minimized + ${project.version} ${project.groupId} @@ -150,6 +161,8 @@ org.objenesis:* org.reactivestreams:reactive-streams org.yaml:snakeyaml + org.apache.pulsar:pulsar-client-dependencies-minimized + org.roaringbitmap:RoaringBitmap com.fasterxml.jackson.core:jackson-annotations @@ -269,6 +282,10 @@ io.swagger org.apache.pulsar.shade.io.swagger + + it.unimi.dsi.fastutil + org.apache.pulsar.shade.it.unimi.dsi.fastutil + javassist org.apache.pulsar.shade.javassist @@ -313,6 +330,11 @@ META-INF/versions/$1/org/apache/pulsar/shade/org/glassfish/ true + + META-INF/versions/(\d+)/org/roaringbitmap/ + META-INF/versions/$1/org/apache/pulsar/shade/org/roaringbitmap/ + true + META-INF/versions/(\d+)/org/yaml/ META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/ @@ -374,6 +396,10 @@ org.reactivestreams org.apache.pulsar.shade.org.reactivestreams + + org.roaringbitmap + org.apache.pulsar.shade.org.roaringbitmap + org.yaml org.apache.pulsar.shade.org.yaml diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml index 74007745c70ee..cd48c00cfa15d 100644 --- a/pulsar-client-all/pom.xml +++ b/pulsar-client-all/pom.xml @@ -39,6 +39,17 @@ ${project.groupId} pulsar-client-original ${project.version} + + + it.unimi.dsi + fastutil + + + + + ${project.groupId} + pulsar-client-dependencies-minimized + ${project.version} ${project.groupId} @@ -200,6 +211,8 @@ org.reactivestreams:reactive-streams org.tukaani:xz org.yaml:snakeyaml + org.apache.pulsar:pulsar-client-dependencies-minimized + org.roaringbitmap:RoaringBitmap com.fasterxml.jackson.core:jackson-annotations @@ -317,6 +330,10 @@ io.swagger org.apache.pulsar.shade.io.swagger + + it.unimi.dsi.fastutil + org.apache.pulsar.shade.it.unimi.dsi.fastutil + javassist org.apache.pulsar.shade.javassist @@ -361,6 +378,11 @@ META-INF/versions/$1/org/apache/pulsar/shade/org/glassfish/ true + + META-INF/versions/(\d+)/org/roaringbitmap/ + META-INF/versions/$1/org/apache/pulsar/shade/org/roaringbitmap/ + true + META-INF/versions/(\d+)/org/yaml/ META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/ @@ -439,6 +461,10 @@ org.reactivestreams org.apache.pulsar.shade.org.reactivestreams + + org.roaringbitmap + org.apache.pulsar.shade.org.roaringbitmap + org.tukaani org.apache.pulsar.shade.org.tukaani diff --git a/pulsar-client-dependencies-minimized/pom.xml b/pulsar-client-dependencies-minimized/pom.xml new file mode 100644 index 0000000000000..e838fedfddca5 --- /dev/null +++ b/pulsar-client-dependencies-minimized/pom.xml @@ -0,0 +1,100 @@ + + + + 4.0.0 + + org.apache.pulsar + pulsar + 4.1.0-SNAPSHOT + + + pulsar-client-dependencies-minimized + Apache Pulsar :: Client :: Dependencies minimized + This module is used in `pulsar-client-all`, `pulsar-client-shaded`, and `pulsar-client-admin-shaded` + to minimize the number of classes included in the shaded jars for specific dependencies. + Currently, it is used to minimize the classes included from `fastutil`. + + + + ${project.groupId} + pulsar-client-original + ${project.version} + + + + ${project.artifactId}-${project.version} + + + org.apache.maven.plugins + maven-deploy-plugin + + + true + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + true + false + + true + + + + org.apache.pulsar:pulsar-client-original + + it.unimi.dsi:fastutil + + + + + + org.apache.pulsar:pulsar-client-original + + ** + + + ** + + + + + + + + + + diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index 1093b405731ea..0c5c1b3d6316b 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -39,6 +39,17 @@ ${project.groupId} pulsar-client-original ${project.version} + + + it.unimi.dsi + fastutil + + + + + ${project.groupId} + pulsar-client-dependencies-minimized + ${project.version} ${project.groupId} @@ -164,6 +175,8 @@ org.reactivestreams:reactive-streams org.tukaani:xz org.yaml:snakeyaml + org.apache.pulsar:pulsar-client-dependencies-minimized + org.roaringbitmap:RoaringBitmap com.fasterxml.jackson.core:jackson-annotations @@ -263,6 +276,10 @@ io.swagger org.apache.pulsar.shade.io.swagger + + it.unimi.dsi.fastutil + org.apache.pulsar.shade.it.unimi.dsi.fastutil + javax.activation org.apache.pulsar.shade.javax.activation @@ -281,6 +298,11 @@ true + + META-INF/versions/(\d+)/org/roaringbitmap/ + META-INF/versions/$1/org/apache/pulsar/shade/org/roaringbitmap/ + true + META-INF/versions/(\d+)/org/yaml/ META-INF/versions/$1/org/apache/pulsar/shade/org/yaml/ @@ -343,6 +365,10 @@ org.reactivestreams org.apache.pulsar.shade.org.reactivestreams + + org.roaringbitmap + org.apache.pulsar.shade.org.roaringbitmap + org.tukaani org.apache.pulsar.shade.org.tukaani diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index 49bb3c6490ae9..e1a70ed074833 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -207,6 +207,16 @@ test + + org.roaringbitmap + RoaringBitmap + + + + it.unimi.dsi + fastutil + + diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index d2753856264fc..a7eb89bda157f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2745,7 +2745,7 @@ private int removeExpiredMessagesFromQueue(Set messageIds) { int messagesFromQueue = 0; Message peek = incomingMessages.peek(); if (peek != null) { - MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId()); + MessageId messageId = NegativeAcksTracker.discardBatchAndPartitionIndex(peek.getMessageId()); if (!messageIds.contains(messageId)) { // first message is not expired, then no message is expired in queue. return 0; @@ -2756,7 +2756,7 @@ private int removeExpiredMessagesFromQueue(Set messageIds) { while (message != null) { decreaseIncomingMessageSize(message); messagesFromQueue++; - MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId()); + MessageId id = NegativeAcksTracker.discardBatchAndPartitionIndex(message.getMessageId()); if (!messageIds.contains(id)) { messageIds.add(id); break; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 5256ebf04f43c..273880569c307 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -22,9 +22,13 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.util.Timeout; import io.netty.util.Timer; +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator; import java.io.Closeable; import java.util.HashSet; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; @@ -32,40 +36,37 @@ import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; +import org.roaringbitmap.longlong.Roaring64Bitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class NegativeAcksTracker implements Closeable { private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class); - private ConcurrentLongLongPairHashMap nackedMessages = null; + // timestamp -> ledgerId -> entryId, no need to batch index, if different messages have + // different timestamp, there will be multiple entries in the map + // RB Tree -> LongOpenHashMap -> Roaring64Bitmap + private Long2ObjectSortedMap> nackedMessages = null; private final ConsumerBase consumer; private final Timer timer; - private final long nackDelayNanos; - private final long timerIntervalNanos; + private final long nackDelayMs; private final RedeliveryBackoff negativeAckRedeliveryBackoff; + private final int negativeAckPrecisionBitCnt; private Timeout timeout; // Set a min delay to allow for grouping nacks within a single batch - private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100); - private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE; + private static final long MIN_NACK_DELAY_MS = 100; + private static final int DUMMY_PARTITION_INDEX = -2; public NegativeAcksTracker(ConsumerBase consumer, ConsumerConfigurationData conf) { this.consumer = consumer; this.timer = consumer.getClient().timer(); - this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()), - MIN_NACK_DELAY_NANOS); + this.nackDelayMs = Math.max(TimeUnit.MICROSECONDS.toMillis(conf.getNegativeAckRedeliveryDelayMicros()), + MIN_NACK_DELAY_MS); this.negativeAckRedeliveryBackoff = conf.getNegativeAckRedeliveryBackoff(); - if (negativeAckRedeliveryBackoff != null) { - this.timerIntervalNanos = Math.max( - TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)), - MIN_NACK_DELAY_NANOS) / 3; - } else { - this.timerIntervalNanos = nackDelayNanos / 3; - } + this.negativeAckPrecisionBitCnt = conf.getNegativeAckPrecisionBitCnt(); } private void triggerRedelivery(Timeout t) { @@ -76,21 +77,48 @@ private void triggerRedelivery(Timeout t) { return; } - long now = System.nanoTime(); - nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> { - if (timestamp < now) { - MessageId msgId = new MessageIdImpl(ledgerId, entryId, - // need to covert non-partitioned topic partition index to -1 - (int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex)); - addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); - messagesToRedeliver.add(msgId); + long currentTimestamp = System.currentTimeMillis(); + for (long timestamp : nackedMessages.keySet()) { + if (timestamp > currentTimestamp) { + // We are done with all the messages that need to be redelivered + break; + } + + Long2ObjectMap ledgerMap = nackedMessages.get(timestamp); + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entrySet = ledgerEntry.getValue(); + entrySet.forEach(entryId -> { + MessageId msgId = new MessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX); + addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); + messagesToRedeliver.add(msgId); + }); + } + } + + // remove entries from the nackedMessages map + LongBidirectionalIterator iterator = nackedMessages.keySet().iterator(); + while (iterator.hasNext()) { + long timestamp = iterator.nextLong(); + if (timestamp <= currentTimestamp) { + iterator.remove(); + } else { + break; } - }); - for (MessageId messageId : messagesToRedeliver) { - nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(), - ((MessageIdImpl) messageId).getEntryId()); } - this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); + + // Schedule the next redelivery if there are still messages to redeliver + if (!nackedMessages.isEmpty()) { + long nextTriggerTimestamp = nackedMessages.firstLongKey(); + long delayMs = Math.max(nextTriggerTimestamp - currentTimestamp, 0); + if (delayMs > 0) { + this.timeout = timer.newTimeout(this::triggerRedelivery, delayMs, TimeUnit.MILLISECONDS); + } else { + this.timeout = timer.newTimeout(this::triggerRedelivery, 0, TimeUnit.MILLISECONDS); + } + } else { + this.timeout = null; + } } // release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages, @@ -110,39 +138,56 @@ public synchronized void add(Message message) { add(message.getMessageId(), message.getRedeliveryCount()); } + static long trimLowerBit(long timestamp, int bits) { + return timestamp & (-1L << bits); + } + private synchronized void add(MessageId messageId, int redeliveryCount) { if (nackedMessages == null) { - nackedMessages = ConcurrentLongLongPairHashMap.newBuilder() - .autoShrink(true) - .concurrencyLevel(1) - .build(); + nackedMessages = new Long2ObjectAVLTreeMap<>(); } - long backoffNs; + long backoffMs; if (negativeAckRedeliveryBackoff != null) { - backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount)); + backoffMs = TimeUnit.MILLISECONDS.toMillis(negativeAckRedeliveryBackoff.next(redeliveryCount)); } else { - backoffNs = nackDelayNanos; + backoffMs = nackDelayMs; } - MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId); - // ConcurrentLongLongPairHashMap requires the key and value >=0. - // partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use - // partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to - // avoid exception from ConcurrentLongLongPairHashMap. - nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(), - messageIdAdv.getPartitionIndex() >= 0 ? messageIdAdv.getPartitionIndex() : - NON_PARTITIONED_TOPIC_PARTITION_INDEX, System.nanoTime() + backoffNs); + MessageIdAdv messageIdAdv = (MessageIdAdv) messageId; + long timestamp = trimLowerBit(System.currentTimeMillis() + backoffMs, negativeAckPrecisionBitCnt); + nackedMessages.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>()) + .computeIfAbsent(messageIdAdv.getLedgerId(), k -> new Roaring64Bitmap()) + .add(messageIdAdv.getEntryId()); if (this.timeout == null) { // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for // nack immediately following the current one will be batched into the same redeliver request. - this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); + this.timeout = timer.newTimeout(this::triggerRedelivery, backoffMs, TimeUnit.MILLISECONDS); } } + /** + * Discard the batch index and partition index from the message id. + * + * @param messageId + * @return + */ + public static MessageIdAdv discardBatchAndPartitionIndex(MessageId messageId) { + if (messageId instanceof ChunkMessageIdImpl) { + return (MessageIdAdv) messageId; + } + MessageIdAdv msgId = (MessageIdAdv) messageId; + return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), DUMMY_PARTITION_INDEX); + } + @VisibleForTesting - Optional getNackedMessagesCount() { - return Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size); + synchronized long getNackedMessagesCount() { + if (nackedMessages == null) { + return 0; + } + return nackedMessages.values().stream().mapToLong( + ledgerMap -> ledgerMap.values().stream().mapToLong( + Roaring64Bitmap::getLongCardinality).sum()).sum(); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index f9ff5913f62da..f430371d37c75 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -155,6 +155,16 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { ) private long negativeAckRedeliveryDelayMicros = TimeUnit.MINUTES.toMicros(1); + @ApiModelProperty( + name = "negativeAckPrecisionBitCnt", + value = "The redelivery time precision bit count. The lower bits of the redelivery time will be" + + "trimmed to reduce the memory occupation.\nThe default value is 8, which means the" + + "redelivery time will be bucketed by 256ms, the redelivery time could be earlier(no later)" + + "than the expected time, but no more than 256ms. \nIf set to k, the redelivery time will be" + + "bucketed by 2^k ms.\nIf the value is 0, the redelivery time will be accurate to ms." + ) + private int negativeAckPrecisionBitCnt = 8; + @ApiModelProperty( name = "maxTotalReceiverQueueSizeAcrossPartitions", value = "The max total receiver queue size across partitions.\n"