Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix excessive prefetching in partition pump manager #38572

Merged
merged 4 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixed over-prefetching in EventProcessorClient caused by implicit prefetching in partition pump reactor pipeline ([#38572](https://github.com/Azure/azure-sdk-for-java/issues/38572))

### Other Changes

## 5.18.0 (2024-01-19)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.CONNECTION_ID_KEY;
Expand Down Expand Up @@ -242,7 +241,6 @@ public class EventHubClientBuilder implements
private static final String UNKNOWN = "UNKNOWN";

private static final String AZURE_EVENT_HUBS_CONNECTION_STRING = "AZURE_EVENT_HUBS_CONNECTION_STRING";
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");

private static final ClientLogger LOGGER = new ClientLogger(EventHubClientBuilder.class);
private final Object connectionLock = new Object();
Expand Down Expand Up @@ -718,10 +716,12 @@ public EventHubClientBuilder consumerGroup(String consumerGroup) {
}

/**
* Sets the count used by the receiver to control the number of events the Event Hub consumer will actively receive
* Sets the count used by the receiver to control the number of events per partition the Event Hub consumer will actively receive
* and queue locally without regard to whether a receive operation is currently active.
*
* @param prefetchCount The amount of events to queue locally.
* Defaults to 500 events per partition.
*
* @param prefetchCount The amount of events per partition to queue locally.
*
* @return The updated {@link EventHubClientBuilder} object.
* @throws IllegalArgumentException if {@code prefetchCount} is less than {@link #MINIMUM_PREFETCH_COUNT 1} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,11 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
} else {
partitionEventFlux = receiver.window(options.getMaxBatchSize());
}

int prefetchWindows = Math.max(prefetch / options.getMaxBatchSize(), 1);
partitionEventFlux
.concatMap(Flux::collectList)
.publishOn(scheduler, false, prefetch)
.concatMap(Flux::collectList, 0)
.publishOn(scheduler, false, prefetchWindows)
.subscribe(partitionEventBatch -> {
processEvents(partitionContext, partitionProcessor, partitionPump,
partitionEventBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.test.publisher.TestPublisher;

Expand Down Expand Up @@ -84,16 +88,15 @@ public class PartitionPumpManagerTest {

private final Map<String, EventPosition> initialPartitionPositions = new HashMap<>();
private final TestPublisher<PartitionEvent> receivePublisher = TestPublisher.createCold();

private final Integer prefetch = 100;
private Checkpoint checkpoint;
private PartitionOwnership partitionOwnership;
private AutoCloseable autoCloseable;

@BeforeEach
public void beforeEach() throws InterruptedException {
public void beforeEach() {
this.autoCloseable = MockitoAnnotations.openMocks(this);

final Integer prefetch = 100;
when(builder.getPrefetchCount()).thenReturn(prefetch);
when(builder.buildAsyncClient()).thenReturn(asyncClient);

Expand Down Expand Up @@ -392,21 +395,9 @@ public void processesEventBatchWithLastEnqueued() throws InterruptedException {

// Mock events to add.
final Instant retrievalTime = Instant.now();
final Instant lastEnqueuedTime = retrievalTime.minusSeconds(60);
final LastEnqueuedEventProperties lastEnqueuedProperties1 =
new LastEnqueuedEventProperties(10L, 15L, retrievalTime, lastEnqueuedTime.plusSeconds(1));
final EventData eventData1 = new EventData("1");
final PartitionEvent partitionEvent1 = new PartitionEvent(PARTITION_CONTEXT, eventData1, lastEnqueuedProperties1);

final LastEnqueuedEventProperties lastEnqueuedProperties2 =
new LastEnqueuedEventProperties(20L, 25L, retrievalTime, lastEnqueuedTime.plusSeconds(2));
final EventData eventData2 = new EventData("2");
final PartitionEvent partitionEvent2 = new PartitionEvent(PARTITION_CONTEXT, eventData2, lastEnqueuedProperties2);

final LastEnqueuedEventProperties lastEnqueuedProperties3 =
new LastEnqueuedEventProperties(30L, 35L, retrievalTime, lastEnqueuedTime.plusSeconds(3));
final EventData eventData3 = new EventData("3");
final PartitionEvent partitionEvent3 = new PartitionEvent(PARTITION_CONTEXT, eventData3, lastEnqueuedProperties3);
final PartitionEvent partitionEvent1 = createEvent(retrievalTime, 1);
final PartitionEvent partitionEvent2 = createEvent(retrievalTime, 2);
final PartitionEvent partitionEvent3 = createEvent(retrievalTime, 3);

final AtomicInteger eventCounter = new AtomicInteger();

Expand Down Expand Up @@ -454,6 +445,149 @@ public void processesEventBatchWithLastEnqueued() throws InterruptedException {
}
}

/**
* Checks that number of prefetched events stays under allowed maximum.
*/
@ParameterizedTest
@ValueSource(ints = {1, 16, 64, 128})
@Execution(ExecutionMode.SAME_THREAD)
public void processBatchPrefetch(int maxBatchSize) throws InterruptedException {
// Arrange
final int batches = 5;
final int maxExpectedPrefetched = Math.max(prefetch / maxBatchSize, 1) * maxBatchSize;

final CountDownLatch receiveCounter = new CountDownLatch(batches);

final EventProcessorClientOptions options = new EventProcessorClientOptions()
.setConsumerGroup("test-consumer")
.setMaxBatchSize(maxBatchSize)
.setBatchReceiveMode(true);

final PartitionPumpManager manager = new PartitionPumpManager(checkpointStore, () -> partitionProcessor, builder,
DEFAULT_TRACER, options);

final AtomicInteger publishedCounter = new AtomicInteger();
final Instant retrievalTime = Instant.now();

Flux<PartitionEvent> events = Flux.generate(s -> {
int publishedIndex = publishedCounter.getAndIncrement();
if (publishedIndex < maxBatchSize * batches + prefetch) {
s.next(createEvent(retrievalTime, publishedIndex));
} else {
s.complete();
}
});

when(consumerAsyncClient.receiveFromPartition(eq(PARTITION_ID), any(EventPosition.class),
any(ReceiveOptions.class))).thenReturn(events);

final AtomicInteger maxPrefetched = new AtomicInteger();
final AtomicInteger processedCounter = new AtomicInteger();
doAnswer(invocation -> {
final EventBatchContext batch = invocation.getArgument(0);
if (!batch.getEvents().isEmpty()) {
receiveCounter.countDown();

int published = publishedCounter.get();
int processed = processedCounter.addAndGet(batch.getEvents().size());
if (published - processed > maxPrefetched.get()) {
maxPrefetched.set(published - processed);
}
}
return null;
}).when(partitionProcessor).processEventBatch(any(EventBatchContext.class));

try {
manager.startPartitionPump(partitionOwnership, checkpoint);
assertTrue(receiveCounter.await(10, TimeUnit.SECONDS));
verify(partitionProcessor, never()).processError(any(ErrorContext.class));
assertTrue(maxPrefetched.get() <= maxExpectedPrefetched,
String.format("Expected at most %s events to be prefetched, got %s", maxExpectedPrefetched, maxPrefetched.get()));

} finally {
manager.stopAllPartitionPumps();
}
}

/**
* Checks that events are processed if batch size is higher than number of available events after max wait time is reached
*/
@Test
public void processBatchNotEnoughEventsAfterMaxTime() throws InterruptedException {
// Arrange
final CountDownLatch receiveCounter = new CountDownLatch(1);

final int maxBatchSize = 16;
final EventProcessorClientOptions options = new EventProcessorClientOptions()
.setConsumerGroup("test-consumer")
.setMaxBatchSize(maxBatchSize)
.setMaxWaitTime(Duration.ofSeconds(3))
.setBatchReceiveMode(true);

final PartitionPumpManager manager = new PartitionPumpManager(checkpointStore, () -> partitionProcessor, builder,
DEFAULT_TRACER, options);

final Instant retrievalTime = Instant.now();

doAnswer(invocation -> {
final EventBatchContext batch = invocation.getArgument(0);
if (!batch.getEvents().isEmpty()) {
receiveCounter.countDown();
}
return null;
}).when(partitionProcessor).processEventBatch(any(EventBatchContext.class));

try {
manager.startPartitionPump(partitionOwnership, checkpoint);

receivePublisher.next(createEvent(retrievalTime, 0), createEvent(retrievalTime, 1));
assertTrue(receiveCounter.await(20, TimeUnit.SECONDS));
verify(partitionProcessor, never()).processError(any(ErrorContext.class));
} finally {
manager.stopAllPartitionPumps();
}
}

/**
* Checks that events are NOT processed if batch size is higher than number of available events if max time is not set
* TODO (limolkova): https://github.com/Azure/azure-sdk-for-java/issues/38586
*/
@Test
public void processBatchNotEnoughEventsNever() throws InterruptedException {
// Arrange
final CountDownLatch receiveCounter = new CountDownLatch(1);

final int maxBatchSize = 16;
final EventProcessorClientOptions options = new EventProcessorClientOptions()
.setConsumerGroup("test-consumer")
.setMaxBatchSize(maxBatchSize)
.setMaxWaitTime(null)
.setBatchReceiveMode(true);

final PartitionPumpManager manager = new PartitionPumpManager(checkpointStore, () -> partitionProcessor, builder,
DEFAULT_TRACER, options);

final Instant retrievalTime = Instant.now();

doAnswer(invocation -> {
final EventBatchContext batch = invocation.getArgument(0);
if (!batch.getEvents().isEmpty()) {
receiveCounter.countDown();
}
return null;
}).when(partitionProcessor).processEventBatch(any(EventBatchContext.class));

try {
manager.startPartitionPump(partitionOwnership, checkpoint);

receivePublisher.next(createEvent(retrievalTime, 0), createEvent(retrievalTime, 1));
assertFalse(receiveCounter.await(10, TimeUnit.SECONDS));
verify(partitionProcessor, never()).processError(any(ErrorContext.class));
} finally {
manager.stopAllPartitionPumps();
}
}

/**
* If no checkpoint, no map position, no default position, will use {@link EventPosition#latest()}.
*/
Expand Down Expand Up @@ -910,4 +1044,11 @@ public void closeCleansUpPartitionOnException() throws InterruptedException {
manager.stopAllPartitionPumps();
}
}

private PartitionEvent createEvent(Instant retrievalTime, int index) {
Instant lastEnqueuedTime = retrievalTime.minusSeconds(60);
LastEnqueuedEventProperties lastEnqueuedProperties =
new LastEnqueuedEventProperties((long) index, (long) index, retrievalTime, lastEnqueuedTime.plusSeconds(index));
return new PartitionEvent(PARTITION_CONTEXT, new EventData(String.valueOf(index)), lastEnqueuedProperties);
}
}
Loading