Skip to content

Commit

Permalink
[fix][client] fix retry topic with exclusive mode. (#23859)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5a59ab7)
  • Loading branch information
thetumbled authored and lhotari committed Feb 19, 2025
1 parent bcbbbd3 commit d8e2dab
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,46 @@ public void testRetryTopic() throws Exception {
checkConsumer.close();
}

/**
* Retry topic feature relies on the delay queue feature when consumer produce a delayed message
* to the retry topic. The delay queue feature is only supported in shared and key-shared subscription type.
* As a result, the subscription type of the retry topic should be shared or key-shared.
* @throws Exception
*/
@Test
public void testRetryTopicWithExclusiveMode() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic-exclusive";
final int maxRedeliveryCount = 2;

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

producer.send("Hello Pulsar".getBytes());
producer.close();

// receive message and set delay to 5 seconds
Message<byte[]> message = consumer.receive();
long timestamp = System.currentTimeMillis();
consumer.reconsumeLater(message, 4, TimeUnit.SECONDS);

// receive message and check the delay is at least 4 seconds
consumer.receive();
long delay = System.currentTimeMillis() - timestamp;
assertTrue(delay >= 2000);
consumer.close();
}

@Data
public static class Foo {
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
Expand Down Expand Up @@ -787,6 +788,13 @@ private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
}

protected SubType getSubType() {
// For retry topic, we always use Shared subscription
// Because we will produce delayed messages to retry topic.
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
if (deadLetterPolicy != null && topic.equals(deadLetterPolicy.getRetryLetterTopic())) {
return SubType.Shared;
}

SubscriptionType type = conf.getSubscriptionType();
switch (type) {
case Exclusive:
Expand Down

0 comments on commit d8e2dab

Please sign in to comment.