diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index d0e72deb87fc2..2b897760b6f00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -136,6 +136,46 @@ public void testRetryTopic() throws Exception { checkConsumer.close(); } + /** + * Retry topic feature relies on the delay queue feature when consumer produce a delayed message + * to the retry topic. The delay queue feature is only supported in shared and key-shared subscription type. + * As a result, the subscription type of the retry topic should be shared or key-shared. + * @throws Exception + */ + @Test + public void testRetryTopicWithExclusiveMode() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic-exclusive"; + final int maxRedeliveryCount = 2; + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Exclusive) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + producer.send("Hello Pulsar".getBytes()); + producer.close(); + + // receive message and set delay to 5 seconds + Message message = consumer.receive(); + long timestamp = System.currentTimeMillis(); + consumer.reconsumeLater(message, 4, TimeUnit.SECONDS); + + // receive message and check the delay is at least 4 seconds + consumer.receive(); + long delay = System.currentTimeMillis() - timestamp; + assertTrue(delay >= 2000); + consumer.close(); + } + @Data public static class Foo { @Nullable diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 973a9b0dcf57c..f0b54c808eea1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; @@ -787,6 +788,13 @@ private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) { } protected SubType getSubType() { + // For retry topic, we always use Shared subscription + // Because we will produce delayed messages to retry topic. + DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy(); + if (deadLetterPolicy != null && topic.equals(deadLetterPolicy.getRetryLetterTopic())) { + return SubType.Shared; + } + SubscriptionType type = conf.getSubscriptionType(); switch (type) { case Exclusive: