Skip to content

Commit

Permalink
Fix excessive prefetching in partition pump manager (#38572)
Browse files Browse the repository at this point in the history
* Fix excessive prefetching in partition pump manager
  • Loading branch information
lmolkova authored Feb 2, 2024
1 parent 2a38dd4 commit ad477de
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 24 deletions.
2 changes: 2 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixed over-prefetching in EventProcessorClient caused by implicit prefetching in partition pump reactor pipeline ([#38572](https://github.com/Azure/azure-sdk-for-java/issues/38572))

### Other Changes

## 5.18.0 (2024-01-19)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONNECTION_ID_KEY;
Expand Down Expand Up @@ -242,7 +241,6 @@ public class EventHubClientBuilder implements
private static final String UNKNOWN = "UNKNOWN";

private static final String AZURE_EVENT_HUBS_CONNECTION_STRING = "AZURE_EVENT_HUBS_CONNECTION_STRING";
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");

private static final ClientLogger LOGGER = new ClientLogger(EventHubClientBuilder.class);
private final Object connectionLock = new Object();
Expand Down Expand Up @@ -718,10 +716,10 @@ public EventHubClientBuilder consumerGroup(String consumerGroup) {
}

/**
* Sets the count used by the receiver to control the number of events the Event Hub consumer will actively receive
* Sets the count used by the receiver to control the number of events per partition the Event Hub consumer will actively receive
* and queue locally without regard to whether a receive operation is currently active.
*
* @param prefetchCount The amount of events to queue locally.
* @param prefetchCount The amount of events per partition to queue locally. Defaults to 500 events per partition.
*
* @return The updated {@link EventHubClientBuilder} object.
* @throws IllegalArgumentException if {@code prefetchCount} is less than {@link #MINIMUM_PREFETCH_COUNT 1} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,11 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
} else {
partitionEventFlux = receiver.window(options.getMaxBatchSize());
}

int prefetchWindows = Math.max(prefetch / options.getMaxBatchSize(), 1);
partitionEventFlux
.concatMap(Flux::collectList)
.publishOn(scheduler, false, prefetch)
.concatMap(Flux::collectList, 0)
.publishOn(scheduler, false, prefetchWindows)
.subscribe(partitionEventBatch -> {
processEvents(partitionContext, partitionProcessor, partitionPump,
partitionEventBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.test.publisher.TestPublisher;

Expand Down Expand Up @@ -84,16 +86,15 @@ public class PartitionPumpManagerTest {

private final Map<String, EventPosition> initialPartitionPositions = new HashMap<>();
private final TestPublisher<PartitionEvent> receivePublisher = TestPublisher.createCold();

private final Integer prefetch = 100;
private Checkpoint checkpoint;
private PartitionOwnership partitionOwnership;
private AutoCloseable autoCloseable;

@BeforeEach
public void beforeEach() throws InterruptedException {
public void beforeEach() {
this.autoCloseable = MockitoAnnotations.openMocks(this);

final Integer prefetch = 100;
when(builder.getPrefetchCount()).thenReturn(prefetch);
when(builder.buildAsyncClient()).thenReturn(asyncClient);

Expand Down Expand Up @@ -392,21 +393,9 @@ public void processesEventBatchWithLastEnqueued() throws InterruptedException {

// Mock events to add.
final Instant retrievalTime = Instant.now();
final Instant lastEnqueuedTime = retrievalTime.minusSeconds(60);
final LastEnqueuedEventProperties lastEnqueuedProperties1 =
new LastEnqueuedEventProperties(10L, 15L, retrievalTime, lastEnqueuedTime.plusSeconds(1));
final EventData eventData1 = new EventData("1");
final PartitionEvent partitionEvent1 = new PartitionEvent(PARTITION_CONTEXT, eventData1, lastEnqueuedProperties1);

final LastEnqueuedEventProperties lastEnqueuedProperties2 =
new LastEnqueuedEventProperties(20L, 25L, retrievalTime, lastEnqueuedTime.plusSeconds(2));
final EventData eventData2 = new EventData("2");
final PartitionEvent partitionEvent2 = new PartitionEvent(PARTITION_CONTEXT, eventData2, lastEnqueuedProperties2);

final LastEnqueuedEventProperties lastEnqueuedProperties3 =
new LastEnqueuedEventProperties(30L, 35L, retrievalTime, lastEnqueuedTime.plusSeconds(3));
final EventData eventData3 = new EventData("3");
final PartitionEvent partitionEvent3 = new PartitionEvent(PARTITION_CONTEXT, eventData3, lastEnqueuedProperties3);
final PartitionEvent partitionEvent1 = createEvent(retrievalTime, 1);
final PartitionEvent partitionEvent2 = createEvent(retrievalTime, 2);
final PartitionEvent partitionEvent3 = createEvent(retrievalTime, 3);

final AtomicInteger eventCounter = new AtomicInteger();

Expand Down Expand Up @@ -454,6 +443,147 @@ public void processesEventBatchWithLastEnqueued() throws InterruptedException {
}
}

/**
* Checks that number of prefetched events stays under allowed maximum.
*/
@ParameterizedTest
@ValueSource(ints = {1, 16, 64, 128})
public void processBatchPrefetch(int maxBatchSize) throws InterruptedException {
// Arrange
final int batches = 5;
final int maxExpectedPrefetched = Math.max(prefetch / maxBatchSize, 1) * maxBatchSize;

final CountDownLatch receiveCounter = new CountDownLatch(batches);

final EventProcessorClientOptions options = new EventProcessorClientOptions()
.setConsumerGroup("test-consumer")
.setMaxBatchSize(maxBatchSize)
.setBatchReceiveMode(true);

final PartitionPumpManager manager = new PartitionPumpManager(checkpointStore, () -> partitionProcessor, builder,
DEFAULT_TRACER, options);

final AtomicInteger publishedCounter = new AtomicInteger();
final Instant retrievalTime = Instant.now();

Flux<PartitionEvent> events = Flux.generate(s -> {
int publishedIndex = publishedCounter.getAndIncrement();
if (publishedIndex <= maxBatchSize * batches + prefetch + 1000) {
s.next(createEvent(retrievalTime, publishedIndex));
} else {
s.complete();
}
});

when(consumerAsyncClient.receiveFromPartition(eq(PARTITION_ID), any(EventPosition.class),
any(ReceiveOptions.class))).thenReturn(events);

final AtomicInteger maxPrefetched = new AtomicInteger();
final AtomicInteger processedCounter = new AtomicInteger();
doAnswer(invocation -> {
final EventBatchContext batch = invocation.getArgument(0);
if (!batch.getEvents().isEmpty()) {
receiveCounter.countDown();

int published = publishedCounter.get();
int processed = processedCounter.addAndGet(batch.getEvents().size());
if (published - processed > maxPrefetched.get()) {
maxPrefetched.set(published - processed);
}
}
return null;
}).when(partitionProcessor).processEventBatch(any(EventBatchContext.class));

try {
manager.startPartitionPump(partitionOwnership, checkpoint);
assertTrue(receiveCounter.await(10, TimeUnit.SECONDS));
verify(partitionProcessor, never()).processError(any(ErrorContext.class));
assertTrue(maxPrefetched.get() <= maxExpectedPrefetched,
String.format("Expected at most %s events to be prefetched, got %s", maxExpectedPrefetched, maxPrefetched.get()));
} finally {
manager.stopAllPartitionPumps();
}
}

/**
* Checks that events are processed if batch size is higher than number of available events after max wait time is reached
*/
@Test
public void processBatchNotEnoughEventsAfterMaxTime() throws InterruptedException {
// Arrange
final CountDownLatch receiveCounter = new CountDownLatch(1);

final int maxBatchSize = 16;
final EventProcessorClientOptions options = new EventProcessorClientOptions()
.setConsumerGroup("test-consumer")
.setMaxBatchSize(maxBatchSize)
.setMaxWaitTime(Duration.ofSeconds(3))
.setBatchReceiveMode(true);

final PartitionPumpManager manager = new PartitionPumpManager(checkpointStore, () -> partitionProcessor, builder,
DEFAULT_TRACER, options);

final Instant retrievalTime = Instant.now();

doAnswer(invocation -> {
final EventBatchContext batch = invocation.getArgument(0);
if (!batch.getEvents().isEmpty()) {
receiveCounter.countDown();
}
return null;
}).when(partitionProcessor).processEventBatch(any(EventBatchContext.class));

try {
manager.startPartitionPump(partitionOwnership, checkpoint);

receivePublisher.next(createEvent(retrievalTime, 0), createEvent(retrievalTime, 1));
assertTrue(receiveCounter.await(20, TimeUnit.SECONDS));
verify(partitionProcessor, never()).processError(any(ErrorContext.class));
} finally {
manager.stopAllPartitionPumps();
}
}

/**
* Checks that events are NOT processed if batch size is higher than number of available events if max time is not set
* TODO (limolkova): https://github.com/Azure/azure-sdk-for-java/issues/38586
*/
@Test
public void processBatchNotEnoughEventsNever() throws InterruptedException {
// Arrange
final CountDownLatch receiveCounter = new CountDownLatch(1);

final int maxBatchSize = 16;
final EventProcessorClientOptions options = new EventProcessorClientOptions()
.setConsumerGroup("test-consumer")
.setMaxBatchSize(maxBatchSize)
.setMaxWaitTime(null)
.setBatchReceiveMode(true);

final PartitionPumpManager manager = new PartitionPumpManager(checkpointStore, () -> partitionProcessor, builder,
DEFAULT_TRACER, options);

final Instant retrievalTime = Instant.now();

doAnswer(invocation -> {
final EventBatchContext batch = invocation.getArgument(0);
if (!batch.getEvents().isEmpty()) {
receiveCounter.countDown();
}
return null;
}).when(partitionProcessor).processEventBatch(any(EventBatchContext.class));

try {
manager.startPartitionPump(partitionOwnership, checkpoint);

receivePublisher.next(createEvent(retrievalTime, 0), createEvent(retrievalTime, 1));
assertFalse(receiveCounter.await(10, TimeUnit.SECONDS));
verify(partitionProcessor, never()).processError(any(ErrorContext.class));
} finally {
manager.stopAllPartitionPumps();
}
}

/**
* If no checkpoint, no map position, no default position, will use {@link EventPosition#latest()}.
*/
Expand Down Expand Up @@ -910,4 +1040,11 @@ public void closeCleansUpPartitionOnException() throws InterruptedException {
manager.stopAllPartitionPumps();
}
}

private PartitionEvent createEvent(Instant retrievalTime, int index) {
Instant lastEnqueuedTime = retrievalTime.minusSeconds(60);
LastEnqueuedEventProperties lastEnqueuedProperties =
new LastEnqueuedEventProperties((long) index, (long) index, retrievalTime, lastEnqueuedTime.plusSeconds(index));
return new PartitionEvent(PARTITION_CONTEXT, new EventData(String.valueOf(index)), lastEnqueuedProperties);
}
}

0 comments on commit ad477de

Please sign in to comment.