diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 18de9c4f8c..5be323db01 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -579,11 +579,17 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa }; CreditNotification creditNotification = - (subscriptionId, responseCode) -> - LOGGER.debug( - "Received credit notification for subscription {}: {}", - subscriptionId & 0xFF, - Utils.formatConstant(responseCode)); + (subscriptionId, responseCode) -> { + SubscriptionTracker subscriptionTracker = + subscriptionTrackers.get(subscriptionId & 0xFF); + String stream = subscriptionTracker == null ? "?" : subscriptionTracker.stream; + LOGGER.debug( + "Received credit notification for subscription {} (stream '{}'): {}", + subscriptionId & 0xFF, + stream, + Utils.formatConstant(responseCode)); + }; + MessageListener messageListener = (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { SubscriptionTracker subscriptionTracker = diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java index 31e33269a0..d24e5260f6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java @@ -35,14 +35,13 @@ import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) @@ -300,4 +299,65 @@ void autoOffsetTrackingShouldStoreOffsetZero() throws Exception { }); })); } + + @Test + @Disabled + void rebalancedPartitionShouldGetMessagesWhenItComesBackToOriginalConsumerInstance() + throws Exception { + declareSuperStreamTopology(connection, superStream, partitionCount); + Client client = cf.get(); + List partitions = client.partitions(superStream); + int messageCount = 10_000; + publishToPartitions(cf, partitions, messageCount); + String consumerName = "my-app"; + Set receivedPartitions = ConcurrentHashMap.newKeySet(partitionCount); + Runnable processing = + () -> { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // OK + } + }; + Consumer consumer1 = + environment + .consumerBuilder() + .superStream(superStream) + .singleActiveConsumer() + .offset(OffsetSpecification.first()) + .name(consumerName) + .autoTrackingStrategy() + .messageCountBeforeStorage(messageCount / partitionCount / 50) + .builder() + .messageHandler( + (context, message) -> { + receivedPartitions.add(context.stream()); + processing.run(); + }) + .build(); + waitAtMost(() -> receivedPartitions.size() == partitions.size()); + + AtomicReference partition = new AtomicReference<>(); + Consumer consumer2 = + environment + .consumerBuilder() + .superStream(superStream) + .singleActiveConsumer() + .offset(OffsetSpecification.first()) + .name(consumerName) + .autoTrackingStrategy() + .messageCountBeforeStorage(messageCount / partitionCount / 50) + .builder() + .messageHandler( + (context, message) -> { + partition.set(context.stream()); + processing.run(); + }) + .build(); + waitAtMost(() -> partition.get() != null); + consumer2.close(); + receivedPartitions.clear(); + waitAtMost(() -> receivedPartitions.size() == partitions.size()); + consumer1.close(); + } }