Skip to content


Fixes OOM when receiving from Event Hubs (#20832)
Browse files Browse the repository at this point in the history
* Add check for messageQueue.size()

* Fixing message format.

* Adding more credits if there are none left on the link.

* Enable disabled parallel test.

* Consolidating add credits logic.

* Adding test that we add credits correctly on new requests.

* Add documentation to link processor.
  • Loading branch information
conniey authored and benbp committed Apr 28, 2021
1 parent 13e02c6 commit c26927b
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;

* Processes AMQP receive links into a stream of AMQP messages.
Expand All @@ -41,6 +42,7 @@ public class AmqpReceiveLinkProcessor extends FluxProcessor<AmqpReceiveLink, Mes
private final AtomicBoolean isTerminated = new AtomicBoolean();
private final AtomicInteger retryAttempts = new AtomicInteger();
private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<>();
private final AtomicBoolean linkHasNoCredits = new AtomicBoolean();
private final Object creditsAdded = new Object();

private final AtomicReference<CoreSubscriber<? super Message>> downstream = new AtomicReference<>();
Expand Down Expand Up @@ -163,28 +165,48 @@ public void onNext(AmqpReceiveLink next) {
currentLink = next;
currentLinkName = next.getLinkName();

// For a new link, add the prefetch as credits.
// Empty credit listener is invoked when there are no credits left on the underlying link.
next.setEmptyCreditListener(() -> {
final int credits;
synchronized (creditsAdded) {
credits = getCreditsToAdd();

// This means that considering the downstream request and current size of the message queue, we
// have enough messages to satisfy them.
// Thus, there are no credits on the link AND we are not going to add anymore.
// We'll wait until the next time downstream calls request(long) to get more events.
if (credits < 1) {
linkHasNoCredits.compareAndSet(false, true);
} else {"linkName[{}] entityPath[{}] credits[{}] Link is empty. Adding more credits.",
linkName, entityPath, credits);

return credits;

currentLinkSubscriptions = Disposables.composite(
// For a new link, add the prefetch as credits.
next.getEndpointStates().filter(e -> e == AmqpEndpointState.ACTIVE).next()
.flatMap(state -> {
// If there was already a subscriber downstream who made a request, see if that is more than
// the prefetch. If it is, then add the difference. (ie. if they requested 500, but our
// prefetch is 100, we'll add 500 credits rather than 100.
final int creditsToAdd = getCreditsToAdd();
final int total = Math.max(prefetch, creditsToAdd);

logger.verbose("linkName[{}] prefetch[{}] creditsToAdd[{}] Adding initial credits.",
linkName, prefetch, creditsToAdd);
final Mono<Void> operation;
synchronized (creditsAdded) {
final int creditsToAdd = getCreditsToAdd();
final int total = Math.max(prefetch, creditsToAdd);

logger.verbose("linkName[{}] prefetch[{}] creditsToAdd[{}] Adding initial credits.",
linkName, prefetch, creditsToAdd);
operation = next.addCredits(total);

return next.addCredits(total);
.onErrorResume(IllegalStateException.class, error -> {"linkName[{}] was already closed. Could not add credits.", linkName);
return Mono.empty();
return operation;
.subscribe(noop -> {
}, error ->"linkName[{}] was already closed. Could not add credits.", linkName)),
state -> {
// Connection was successfully opened, we can reset the retry interval.
Expand Down Expand Up @@ -356,26 +378,7 @@ public void request(long request) {

Operators.addCap(REQUESTED, this, request);

synchronized (creditsAdded) {
final AmqpReceiveLink link = currentLink;
final int credits = getCreditsToAdd();

logger.verbose("linkName[{}] entityPath[{}] request[{}] credits[{}] Backpressure request from downstream.",
currentLinkName, entityPath, request, credits);

if (link != null) {
.onErrorResume(IllegalStateException.class, error -> {"linkName[{}] was already closed. Could not add credits.", link.getLinkName());
return Mono.empty();
} else {
logger.verbose("entityPath[{}] credits[{}] totalRequest[{}] totalSent[{}] totalCredits[{}] "
+ "There is no link to add credits to, yet.", entityPath, credits);

addCreditsToLink("Backpressure request from downstream. Request: " + request);

Expand Down Expand Up @@ -482,7 +485,8 @@ private void drainQueue() {
try {
} catch (Exception e) {
logger.error("Exception occurred while handling downstream onNext operation.", e);
logger.error("linkName[{}] entityPath[{}] Exception occurred while handling downstream onNext "
+ "operation.", currentLinkName, entityPath, e);
throw logger.logExceptionAsError(Exceptions.propagate(
Operators.onOperatorError(upstream, e, message, subscriber.currentContext())));
Expand All @@ -496,6 +500,10 @@ private void drainQueue() {
numberRequested = REQUESTED.addAndGet(this, -numberEmitted);

if (numberRequested > 0L && isEmpty) {
addCreditsToLink("Adding more credits in drain loop.");

private boolean checkAndSetTerminated() {
Expand All @@ -517,22 +525,72 @@ private boolean checkAndSetTerminated() {
return true;

private int getCreditsToAdd() {
* Consolidates all credits calculation when checking to see if more should be added. This is invoked in
* {@link #drainQueue()} and {@link #request(long)}.
* Calculates if there are enough credits to satisfy the downstream subscriber. If there is not AND the link has no
* more credits, we will add them onto the link.
* In the case that the link has some credits, but _not_ enough to satisfy the request, when the link is empty, it
* will call {@link AmqpReceiveLink#setEmptyCreditListener(Supplier)} to get how much is remaining.
* @param message Additional message for context.
private void addCreditsToLink(String message) {
synchronized (creditsAdded) {
final CoreSubscriber<? super Message> subscriber = downstream.get();
final long request = REQUESTED.get(this);

final int credits;
if (subscriber == null || request == 0) {
credits = 0;
} else if (request == Long.MAX_VALUE) {
credits = 1;
} else {
credits = Long.valueOf(request).intValue();
final AmqpReceiveLink link = currentLink;
final int credits = getCreditsToAdd();

if (link == null) {
logger.verbose("entityPath[{}] creditsToAdd[{}] There is no link to add credits to.",
entityPath, credits);

final String linkName = link.getLinkName();

if (credits < 1) {
logger.verbose("linkName[{}] entityPath[{}] creditsToAdd[{}] There are no additional credits to add.",
linkName, entityPath, credits);

if (linkHasNoCredits.compareAndSet(true, false)) {"linkName[{}] entityPath[{}] creditsToAdd[{}] There are no more credits on link."
+ " Adding more. {}", linkName, entityPath, credits, message);

link.addCredits(credits).subscribe(noop -> {
}, error -> {"linkName[{}] entityPath[{}] was already closed. Could not add credits.",
linkName, entityPath);
linkHasNoCredits.compareAndSet(false, true);

return credits;
* Gets the number of credits to add based on {@link #requested} and how many messages are still in queue.
* If {@link #requested} is {@link Long#MAX_VALUE}, then we add credits 1 by 1. Similar to Track 1's behaviour.
* @return The number of credits to add.
private int getCreditsToAdd() {
final CoreSubscriber<? super Message> subscriber = downstream.get();
final long request = REQUESTED.get(this);

final int credits;
if (subscriber == null || request == 0) {
credits = 0;
} else if (request == Long.MAX_VALUE) {
credits = 1;
} else {
final int remaining = Long.valueOf(request).intValue() - messageQueue.size();
credits = Math.max(remaining, 0);

return credits;

private void disposeReceiver(AmqpReceiveLink link) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -75,7 +74,6 @@ void receiveMessage(AmqpTransportType transportType) {
@EnumSource(value = AmqpTransportType.class)
@Disabled("Works part of the time:")
void parallelEventHubClients(AmqpTransportType transportType) throws InterruptedException {
// Arrange
final int numberOfClients = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,50 @@ void canReceiveWithBackpressure() {

* Verify that when we specify a small prefetch, it continues to fetch items.
void receivesWithSmallPrefetch() {
// Arrange
final String secondPartitionId = "2";
final AtomicBoolean isActive = new AtomicBoolean(true);
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
final Disposable producerEvents = getEvents(isActive)
.flatMap(event -> producer.send(event, new SendOptions().setPartitionId(secondPartitionId)))
sent -> {
error -> logger.error("Error sending event", error),
() ->"Event sent."));

final int prefetch = 5;
final int backpressure = 3;
final int batchSize = 10;
final EventHubConsumerAsyncClient consumer = builder

// Act & Assert
try {
StepVerifier.create(consumer.receiveFromPartition(secondPartitionId, EventPosition.latest()), prefetch)
} finally {
dispose(producer, consumer);

private static void assertPartitionEvent(PartitionEvent event, String eventHubName, Set<Integer> allPartitions,
Set<Integer> expectedPartitions) {
final PartitionContext context = event.getPartitionContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;
Expand All @@ -23,8 +22,6 @@
* Verifies we can use various prefetch options with {@link EventHubConsumerAsyncClient}.
@Disabled("Set prefetch tests do not work because they try to send very large number of events at once."
+ "")
class SetPrefetchCountTest extends IntegrationTestBase {
private static final String PARTITION_ID = "3";
Expand Down

0 comments on commit c26927b

Please sign in to comment.