diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java index ea212c55be50..6699224b71d4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; /** * Processes AMQP receive links into a stream of AMQP messages. @@ -41,6 +42,7 @@ public class AmqpReceiveLinkProcessor extends FluxProcessor messageQueue = new ConcurrentLinkedDeque<>(); + private final AtomicBoolean linkHasNoCredits = new AtomicBoolean(); private final Object creditsAdded = new Object(); private final AtomicReference> downstream = new AtomicReference<>(); @@ -163,28 +165,48 @@ public void onNext(AmqpReceiveLink next) { currentLink = next; currentLinkName = next.getLinkName(); - // For a new link, add the prefetch as credits. - next.setEmptyCreditListener(this::getCreditsToAdd); + // Empty credit listener is invoked when there are no credits left on the underlying link. + next.setEmptyCreditListener(() -> { + final int credits; + synchronized (creditsAdded) { + credits = getCreditsToAdd(); + + // This means that considering the downstream request and current size of the message queue, we + // have enough messages to satisfy them. + // Thus, there are no credits on the link AND we are not going to add anymore. + // We'll wait until the next time downstream calls request(long) to get more events. + if (credits < 1) { + linkHasNoCredits.compareAndSet(false, true); + } else { + logger.info("linkName[{}] entityPath[{}] credits[{}] Link is empty. Adding more credits.", + linkName, entityPath, credits); + } + } + + return credits; + }); currentLinkSubscriptions = Disposables.composite( + // For a new link, add the prefetch as credits. next.getEndpointStates().filter(e -> e == AmqpEndpointState.ACTIVE).next() .flatMap(state -> { // If there was already a subscriber downstream who made a request, see if that is more than // the prefetch. If it is, then add the difference. (ie. if they requested 500, but our // prefetch is 100, we'll add 500 credits rather than 100. - final int creditsToAdd = getCreditsToAdd(); - final int total = Math.max(prefetch, creditsToAdd); - - logger.verbose("linkName[{}] prefetch[{}] creditsToAdd[{}] Adding initial credits.", - linkName, prefetch, creditsToAdd); + final Mono operation; + synchronized (creditsAdded) { + final int creditsToAdd = getCreditsToAdd(); + final int total = Math.max(prefetch, creditsToAdd); + + logger.verbose("linkName[{}] prefetch[{}] creditsToAdd[{}] Adding initial credits.", + linkName, prefetch, creditsToAdd); + operation = next.addCredits(total); + } - return next.addCredits(total); - }) - .onErrorResume(IllegalStateException.class, error -> { - logger.info("linkName[{}] was already closed. Could not add credits.", linkName); - return Mono.empty(); + return operation; }) - .subscribe(), + .subscribe(noop -> { + }, error -> logger.info("linkName[{}] was already closed. Could not add credits.", linkName)), next.getEndpointStates().subscribeOn(Schedulers.boundedElastic()).subscribe( state -> { // Connection was successfully opened, we can reset the retry interval. @@ -356,26 +378,7 @@ public void request(long request) { Operators.addCap(REQUESTED, this, request); - synchronized (creditsAdded) { - final AmqpReceiveLink link = currentLink; - final int credits = getCreditsToAdd(); - - logger.verbose("linkName[{}] entityPath[{}] request[{}] credits[{}] Backpressure request from downstream.", - currentLinkName, entityPath, request, credits); - - if (link != null) { - link.addCredits(credits) - .onErrorResume(IllegalStateException.class, error -> { - logger.info("linkName[{}] was already closed. Could not add credits.", link.getLinkName()); - return Mono.empty(); - }) - .subscribe(); - } else { - logger.verbose("entityPath[{}] credits[{}] totalRequest[{}] totalSent[{}] totalCredits[{}] " - + "There is no link to add credits to, yet.", entityPath, credits); - } - } - + addCreditsToLink("Backpressure request from downstream. Request: " + request); drain(); } @@ -482,7 +485,8 @@ private void drainQueue() { try { subscriber.onNext(message); } catch (Exception e) { - logger.error("Exception occurred while handling downstream onNext operation.", e); + logger.error("linkName[{}] entityPath[{}] Exception occurred while handling downstream onNext " + + "operation.", currentLinkName, entityPath, e); throw logger.logExceptionAsError(Exceptions.propagate( Operators.onOperatorError(upstream, e, message, subscriber.currentContext()))); } @@ -496,6 +500,10 @@ private void drainQueue() { numberRequested = REQUESTED.addAndGet(this, -numberEmitted); } } + + if (numberRequested > 0L && isEmpty) { + addCreditsToLink("Adding more credits in drain loop."); + } } private boolean checkAndSetTerminated() { @@ -517,22 +525,72 @@ private boolean checkAndSetTerminated() { return true; } - private int getCreditsToAdd() { + /** + * Consolidates all credits calculation when checking to see if more should be added. This is invoked in + * {@link #drainQueue()} and {@link #request(long)}. + * + * Calculates if there are enough credits to satisfy the downstream subscriber. If there is not AND the link has no + * more credits, we will add them onto the link. + * + * In the case that the link has some credits, but _not_ enough to satisfy the request, when the link is empty, it + * will call {@link AmqpReceiveLink#setEmptyCreditListener(Supplier)} to get how much is remaining. + * + * @param message Additional message for context. + */ + private void addCreditsToLink(String message) { synchronized (creditsAdded) { - final CoreSubscriber subscriber = downstream.get(); - final long request = REQUESTED.get(this); - - final int credits; - if (subscriber == null || request == 0) { - credits = 0; - } else if (request == Long.MAX_VALUE) { - credits = 1; - } else { - credits = Long.valueOf(request).intValue(); + final AmqpReceiveLink link = currentLink; + final int credits = getCreditsToAdd(); + + if (link == null) { + logger.verbose("entityPath[{}] creditsToAdd[{}] There is no link to add credits to.", + entityPath, credits); + return; + } + + final String linkName = link.getLinkName(); + + if (credits < 1) { + logger.verbose("linkName[{}] entityPath[{}] creditsToAdd[{}] There are no additional credits to add.", + linkName, entityPath, credits); + return; + } + + if (linkHasNoCredits.compareAndSet(true, false)) { + logger.info("linkName[{}] entityPath[{}] creditsToAdd[{}] There are no more credits on link." + + " Adding more. {}", linkName, entityPath, credits, message); + + link.addCredits(credits).subscribe(noop -> { + }, error -> { + logger.info("linkName[{}] entityPath[{}] was already closed. Could not add credits.", + linkName, entityPath); + linkHasNoCredits.compareAndSet(false, true); + }); } + } + } - return credits; + /** + * Gets the number of credits to add based on {@link #requested} and how many messages are still in queue. + * If {@link #requested} is {@link Long#MAX_VALUE}, then we add credits 1 by 1. Similar to Track 1's behaviour. + * + * @return The number of credits to add. + */ + private int getCreditsToAdd() { + final CoreSubscriber subscriber = downstream.get(); + final long request = REQUESTED.get(this); + + final int credits; + if (subscriber == null || request == 0) { + credits = 0; + } else if (request == Long.MAX_VALUE) { + credits = 1; + } else { + final int remaining = Long.valueOf(request).intValue() - messageQueue.size(); + credits = Math.max(remaining, 0); } + + return credits; } private void disposeReceiver(AmqpReceiveLink link) { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java index f9d6f43a645a..0e70a40bc827 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java @@ -7,7 +7,6 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.models.EventPosition; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -75,7 +74,6 @@ void receiveMessage(AmqpTransportType transportType) { */ @ParameterizedTest @EnumSource(value = AmqpTransportType.class) - @Disabled("Works part of the time: https://github.com/Azure/azure-sdk-for-java/issues/9659") void parallelEventHubClients(AmqpTransportType transportType) throws InterruptedException { // Arrange final int numberOfClients = 3; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientIntegrationTest.java index c2fdb9c01500..f64e24ef1601 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientIntegrationTest.java @@ -596,6 +596,50 @@ void canReceiveWithBackpressure() { } } + /** + * Verify that when we specify a small prefetch, it continues to fetch items. + */ + @Test + void receivesWithSmallPrefetch() { + // Arrange + final String secondPartitionId = "2"; + final AtomicBoolean isActive = new AtomicBoolean(true); + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); + final Disposable producerEvents = getEvents(isActive) + .flatMap(event -> producer.send(event, new SendOptions().setPartitionId(secondPartitionId))) + .subscribe( + sent -> { + }, + error -> logger.error("Error sending event", error), + () -> logger.info("Event sent.")); + + final int prefetch = 5; + final int backpressure = 3; + final int batchSize = 10; + final EventHubConsumerAsyncClient consumer = builder + .prefetchCount(prefetch) + .buildAsyncConsumerClient(); + + // Act & Assert + try { + StepVerifier.create(consumer.receiveFromPartition(secondPartitionId, EventPosition.latest()), prefetch) + .expectNextCount(prefetch) + .thenRequest(backpressure) + .expectNextCount(backpressure) + .thenRequest(batchSize) + .expectNextCount(batchSize) + .thenRequest(batchSize) + .expectNextCount(batchSize) + .thenAwait(Duration.ofSeconds(1)) + .thenCancel() + .verify(TIMEOUT); + } finally { + isActive.set(false); + producerEvents.dispose(); + dispose(producer, consumer); + } + } + private static void assertPartitionEvent(PartitionEvent event, String eventHubName, Set allPartitions, Set expectedPartitions) { final PartitionContext context = event.getPartitionContext(); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java index 32a3926122d1..fc97a760fd3e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java @@ -7,7 +7,6 @@ import com.azure.messaging.eventhubs.models.CreateBatchOptions; import com.azure.messaging.eventhubs.models.EventPosition; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import reactor.test.StepVerifier; @@ -23,8 +22,6 @@ /** * Verifies we can use various prefetch options with {@link EventHubConsumerAsyncClient}. */ -@Disabled("Set prefetch tests do not work because they try to send very large number of events at once." - + "https://github.com/Azure/azure-sdk-for-java/issues/9659") @Tag(TestUtils.INTEGRATION) class SetPrefetchCountTest extends IntegrationTestBase { private static final String PARTITION_ID = "3"; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java index 701c64ecc1ad..9598baa3d791 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java @@ -24,7 +24,6 @@ import org.mockito.MockitoAnnotations; import org.reactivestreams.Subscription; import reactor.core.Disposable; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; @@ -33,12 +32,18 @@ import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -65,10 +70,10 @@ class AmqpReceiveLinkProcessorTest { @Captor private ArgumentCaptor> creditSupplierCaptor; - private final DirectProcessor endpointProcessor = DirectProcessor.create(); - private final DirectProcessor messageProcessor = DirectProcessor.create(); - private final FluxSink messageProcessorSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); + private final TestPublisher endpointProcessor = TestPublisher.createCold(); + private final TestPublisher messageProcessor = TestPublisher.createCold(); private AmqpReceiveLinkProcessor linkProcessor; + private AutoCloseable mockCloseable; @BeforeAll static void beforeAll() { @@ -82,19 +87,23 @@ static void afterAll() { @BeforeEach void setup() { - MockitoAnnotations.initMocks(this); + mockCloseable = MockitoAnnotations.openMocks(this); when(retryPolicy.getRetryOptions()).thenReturn(new AmqpRetryOptions()); linkProcessor = new AmqpReceiveLinkProcessor("entity-path", PREFETCH, parentConnection); - when(link1.getEndpointStates()).thenReturn(endpointProcessor); - when(link1.receive()).thenReturn(messageProcessor); + when(link1.getEndpointStates()).thenReturn(endpointProcessor.flux()); + when(link1.receive()).thenReturn(messageProcessor.flux()); when(link1.addCredits(anyInt())).thenReturn(Mono.empty()); } @AfterEach - void teardown() { + void teardown() throws Exception { + if (mockCloseable != null) { + mockCloseable.close(); + } + Mockito.framework().clearInlineMocks(); } @@ -125,17 +134,17 @@ void createNewLink() { StepVerifier.create(processor) .then(() -> { endpoints.next(AmqpEndpointState.ACTIVE); - messageProcessorSink.next(message1); - messageProcessorSink.next(message2); + messageProcessor.next(message1); + messageProcessor.next(message2); }) .expectNext(message1) .expectNext(message2) .thenCancel() .verify(); - Assertions.assertTrue(processor.isTerminated()); - Assertions.assertFalse(processor.hasError()); - Assertions.assertNull(processor.getError()); + assertTrue(processor.isTerminated()); + assertFalse(processor.hasError()); + assertNull(processor.getError()); verify(link1).addCredits(eq(PREFETCH)); verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); @@ -162,7 +171,7 @@ void respectsBackpressureInRange() { // Act & Assert StepVerifier.create(processor, backpressure) - .then(() -> messageProcessorSink.next(message1)) + .then(() -> messageProcessor.next(message1)) .expectNext(message1) .thenCancel() .verify(); @@ -236,7 +245,6 @@ void newLinkOnClose() { final Message message4 = mock(Message.class); final AmqpReceiveLinkProcessor processor = createSink(connections).subscribeWith(linkProcessor); - final FluxSink endpointSink = endpointProcessor.sink(); final TestPublisher connection2Endpoints = TestPublisher.createCold(); when(link2.getEndpointStates()).thenReturn(connection2Endpoints.flux()); @@ -256,11 +264,11 @@ void newLinkOnClose() { // Act & Assert StepVerifier.create(processor) - .then(() -> messageProcessorSink.next(message1)) + .then(() -> messageProcessor.next(message1)) .expectNext(message1) .then(() -> { // Close that first link. - endpointSink.complete(); + endpointProcessor.complete(); }) .expectNext(message2) .then(() -> { @@ -274,9 +282,9 @@ void newLinkOnClose() { }) .verifyComplete(); - Assertions.assertTrue(processor.isTerminated()); - Assertions.assertFalse(processor.hasError()); - Assertions.assertNull(processor.getError()); + assertTrue(processor.isTerminated()); + assertFalse(processor.hasError()); + assertNull(processor.getError()); } /** @@ -288,7 +296,6 @@ void nonRetryableError() { final AmqpReceiveLink[] connections = new AmqpReceiveLink[]{link1, link2}; final AmqpReceiveLinkProcessor processor = createSink(connections).subscribeWith(linkProcessor); - final FluxSink endpointSink = endpointProcessor.sink(); final Message message3 = mock(Message.class); when(link2.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE))); @@ -304,25 +311,25 @@ void nonRetryableError() { // Verify that we get the first connection. StepVerifier.create(processor) .then(() -> { - endpointSink.next(AmqpEndpointState.ACTIVE); - messageProcessorSink.next(message1); + endpointProcessor.next(AmqpEndpointState.ACTIVE); + messageProcessor.next(message1); }) .expectNext(message1) .then(() -> { - endpointSink.error(amqpException); + endpointProcessor.error(amqpException); }) .expectErrorSatisfies(error -> { - Assertions.assertTrue(error instanceof AmqpException); + assertTrue(error instanceof AmqpException); AmqpException exception = (AmqpException) error; - Assertions.assertFalse(exception.isTransient()); + assertFalse(exception.isTransient()); Assertions.assertEquals(amqpException.getErrorCondition(), exception.getErrorCondition()); Assertions.assertEquals(amqpException.getMessage(), exception.getMessage()); }) .verify(); - Assertions.assertTrue(processor.isTerminated()); - Assertions.assertTrue(processor.hasError()); + assertTrue(processor.isTerminated()); + assertTrue(processor.hasError()); Assertions.assertSame(amqpException, processor.getError()); } @@ -358,7 +365,6 @@ void noSubscribersWhenTerminated() { verifyNoInteractions(subscription); } - /** * Does not request another link when parent connection is closed. */ @@ -368,28 +374,27 @@ void doNotRetryWhenParentConnectionIsClosed() { final AmqpReceiveLink[] connections = new AmqpReceiveLink[]{link1, link2}; final AmqpReceiveLinkProcessor processor = createSink(connections).subscribeWith(linkProcessor); - final FluxSink endpointSink = endpointProcessor.sink(); - final DirectProcessor link2StateProcessor = DirectProcessor.create(); + final TestPublisher link2StateProcessor = TestPublisher.createCold(); when(parentConnection.isDisposed()).thenReturn(true); - when(link2.getEndpointStates()).thenReturn(link2StateProcessor); + when(link2.getEndpointStates()).thenReturn(link2StateProcessor.flux()); when(link2.receive()).thenReturn(Flux.never()); when(link2.addCredits(anyInt())).thenReturn(Mono.empty()); // Act & Assert StepVerifier.create(processor) .then(() -> { - endpointSink.next(AmqpEndpointState.ACTIVE); - messageProcessorSink.next(message1); + endpointProcessor.next(AmqpEndpointState.ACTIVE); + messageProcessor.next(message1); }) .expectNext(message1) - .then(() -> endpointSink.complete()) + .then(() -> endpointProcessor.complete()) .thenCancel() .verify(); - Assertions.assertTrue(processor.isTerminated()); + assertTrue(processor.isTerminated()); } @Test @@ -417,7 +422,7 @@ void stopsEmittingAfterBackPressure() { StepVerifier.create(processor, backpressure) .then(() -> { for (int i = 0; i < backpressure + 2; i++) { - messageProcessorSink.next(message2); + messageProcessor.next(message2); } }) .expectNextCount(backpressure) @@ -430,26 +435,25 @@ void stopsEmittingAfterBackPressure() { void receivesUntilFirstLinkClosed() { // Arrange AmqpReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor); - FluxSink sink = endpointProcessor.sink(); when(link1.getCredits()).thenReturn(1); // Act & Assert StepVerifier.create(processor) .then(() -> { - sink.next(AmqpEndpointState.ACTIVE); - messageProcessorSink.next(message1); - messageProcessorSink.next(message2); + endpointProcessor.next(AmqpEndpointState.ACTIVE); + messageProcessor.next(message1); + messageProcessor.next(message2); }) .expectNext(message1) .expectNext(message2) - .then(() -> sink.complete()) + .then(() -> endpointProcessor.complete()) .expectComplete() .verify(); - Assertions.assertTrue(processor.isTerminated()); - Assertions.assertFalse(processor.hasError()); - Assertions.assertNull(processor.getError()); + assertTrue(processor.isTerminated()); + assertFalse(processor.hasError()); + assertNull(processor.getError()); verify(link1).addCredits(eq(PREFETCH)); verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); @@ -466,25 +470,24 @@ void receivesUntilFirstLinkClosed() { void receivesFromFirstLink() { // Arrange AmqpReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor); - FluxSink sink = endpointProcessor.sink(); when(link1.getCredits()).thenReturn(1); // Act & Assert StepVerifier.create(processor) .then(() -> { - sink.next(AmqpEndpointState.ACTIVE); - messageProcessorSink.next(message1); - messageProcessorSink.next(message2); + endpointProcessor.next(AmqpEndpointState.ACTIVE); + messageProcessor.next(message1); + messageProcessor.next(message2); }) .expectNext(message1) .expectNext(message2) .thenCancel() .verify(); - Assertions.assertTrue(processor.isTerminated()); - Assertions.assertFalse(processor.hasError()); - Assertions.assertNull(processor.getError()); + assertTrue(processor.isTerminated()); + assertFalse(processor.hasError()); + assertNull(processor.getError()); verify(link1).addCredits(eq(PREFETCH)); verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); @@ -498,40 +501,93 @@ void receivesFromFirstLink() { } /** - * Verifies that when we request back pressure amounts, if it only requests a certain number of events, only - * that number is consumed. + * Verifies that when we request back pressure amounts, if it only requests a certain number of events, only that + * number is consumed. */ @Test void backpressureRequestOnlyEmitsThatAmount() { // Arrange final int backpressure = 10; AmqpReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor); - FluxSink sink = endpointProcessor.sink(); when(link1.getCredits()).thenReturn(1); // Act & Assert StepVerifier.create(processor, backpressure) .then(() -> { - sink.next(AmqpEndpointState.ACTIVE); + endpointProcessor.next(AmqpEndpointState.ACTIVE); final int emitted = backpressure + 5; for (int i = 0; i < emitted; i++) { - messageProcessorSink.next(mock(Message.class)); + messageProcessor.next(mock(Message.class)); } }) .expectNextCount(backpressure) .thenCancel() .verify(); - Assertions.assertTrue(processor.isTerminated()); - Assertions.assertFalse(processor.hasError()); - Assertions.assertNull(processor.getError()); + assertTrue(processor.isTerminated()); + assertFalse(processor.hasError()); + assertNull(processor.getError()); // Once when the user initially makes a backpressure request and a second time when the link is active. verify(link1, atLeastOnce()).addCredits(eq(backpressure)); verify(link1).setEmptyCreditListener(any()); } + @Test + void onlyRequestsWhenNoCredits() { + // Arrange + final AtomicReference> creditListener = new AtomicReference<>(); + + when(link1.getCredits()).thenReturn(1); + + doAnswer(invocationOnMock -> { + assertTrue(creditListener.compareAndSet(null, invocationOnMock.getArgument(0))); + return null; + }).when(link1).setEmptyCreditListener(any()); + + final int backpressure = 10; + final AmqpReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor); + final int extra = 4; + final int nextRequest = 11; + + // Act & Assert + StepVerifier.create(processor, backpressure) + .then(() -> { + endpointProcessor.next(AmqpEndpointState.ACTIVE); + for (int i = 0; i < backpressure; i++) { + messageProcessor.next(mock(Message.class)); + } + }) + .expectNextCount(backpressure) + .then(() -> { + final Supplier integerSupplier = creditListener.get(); + assertNotNull(integerSupplier); + // Invoking this once. Should return a value and notify that there are no credits left on the link. + final int messages = integerSupplier.get(); + System.out.println("Messages: " + messages); + }) + .expectNoEvent(Duration.ofSeconds(1)) + .thenRequest(nextRequest) + .then(() -> { + for (int i = 0; i < nextRequest; i++) { + messageProcessor.next(mock(Message.class)); + } + }) + .expectNextCount(nextRequest) + .thenCancel() + .verify(); + + assertTrue(processor.isTerminated()); + assertFalse(processor.hasError()); + assertNull(processor.getError()); + + // Once when the user initially makes a backpressure request and a second time when the link is active. + verify(link1).addCredits(eq(backpressure)); + verify(link1).addCredits(eq(nextRequest)); + verify(link1).setEmptyCreditListener(any()); + } + private static Flux createSink(AmqpReceiveLink[] links) { return Flux.create(emitter -> { final AtomicInteger counter = new AtomicInteger();