Skip to content

Commit

Permalink
KAFKA-16460: New consumer times out consuming records in multiple con…
Browse files Browse the repository at this point in the history
…sumer_test.py system tests

Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Nov 12, 2024
1 parent 939831f commit e608c5b
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
private final String groupId;
private final Optional<String> groupInstanceId;
private final long retryBackoffMaxMs;
private final FetchBuffer fetchBuffer;
// For testing only
private final OptionalDouble jitter;
private final boolean throwOnFetchStableOffsetUnsupported;
Expand Down Expand Up @@ -112,7 +113,8 @@ public CommitRequestManager(
final String groupId,
final Optional<String> groupInstanceId,
final Metrics metrics,
final ConsumerMetadata metadata) {
final ConsumerMetadata metadata,
final FetchBuffer fetchBuffer) {
this(time,
logContext,
subscriptions,
Expand All @@ -125,10 +127,12 @@ public CommitRequestManager(
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG),
OptionalDouble.empty(),
metrics,
metadata);
metadata,
fetchBuffer);
}

// Visible for testing
@SuppressWarnings("ParameterNumber")
CommitRequestManager(
final Time time,
final LogContext logContext,
Expand All @@ -142,7 +146,8 @@ public CommitRequestManager(
final long retryBackoffMaxMs,
final OptionalDouble jitter,
final Metrics metrics,
final ConsumerMetadata metadata) {
final ConsumerMetadata metadata,
final FetchBuffer fetchBuffer) {
Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
this.time = time;
this.logContext = logContext;
Expand All @@ -168,6 +173,7 @@ public CommitRequestManager(
this.metricsManager = new OffsetCommitMetricsManager(metrics);
this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
this.lastEpochSentOnCommit = Optional.empty();
this.fetchBuffer = fetchBuffer;
}

/**
Expand Down Expand Up @@ -320,6 +326,14 @@ public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final long de
return CompletableFuture.completedFuture(null);
}

Timer timer = TimedRequestState.deadlineTimer(time, deadlineMs);
do {
if (fetchBuffer.isEmpty()) {
break;
}
} while (timer.notExpired());

log.info("Member {} will auto-commit latest offsets {} before revocation", memberInfo.memberId, subscriptions.allConsumed());
CompletableFuture<Void> result = new CompletableFuture<>();
OffsetCommitRequestState requestState =
createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class FetchRequestManager extends AbstractFetch implements RequestManager

private final NetworkClientDelegate networkClientDelegate;
private CompletableFuture<Void> pendingFetchRequestFuture;
private Optional<ConsumerMembershipManager> consumerMembershipManager;

FetchRequestManager(final LogContext logContext,
final Time time,
Expand All @@ -54,9 +55,11 @@ public class FetchRequestManager extends AbstractFetch implements RequestManager
final FetchBuffer fetchBuffer,
final FetchMetricsManager metricsManager,
final NetworkClientDelegate networkClientDelegate,
final ApiVersions apiVersions) {
final ApiVersions apiVersions,
final Optional<ConsumerMembershipManager> consumerMembershipManager) {
super(logContext, metadata, subscriptions, fetchConfig, fetchBuffer, metricsManager, time, apiVersions);
this.networkClientDelegate = networkClientDelegate;
this.consumerMembershipManager = consumerMembershipManager;
}

@Override
Expand Down Expand Up @@ -101,6 +104,9 @@ public CompletableFuture<Void> createFetchRequests() {
*/
@Override
public PollResult poll(long currentTimeMs) {
if (consumerMembershipManager.isPresent() && consumerMembershipManager.get().reconciliationInProgress()) {
return PollResult.EMPTY;
}
return pollInternal(
this::prepareFetchRequests,
this::handleFetchSuccess,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,6 @@ protected RequestManagers create() {
final int requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
final int defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);

final FetchRequestManager fetch = new FetchRequestManager(logContext,
time,
metadata,
subscriptions,
fetchConfig,
fetchBuffer,
fetchMetricsManager,
networkClientDelegate,
apiVersions);
final TopicMetadataRequestManager topic = new TopicMetadataRequestManager(
logContext,
time,
Expand Down Expand Up @@ -206,7 +197,8 @@ protected RequestManagers create() {
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,
metrics,
metadata);
metadata,
fetchBuffer);
membershipManager = new ConsumerMembershipManager(
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,
Expand Down Expand Up @@ -241,6 +233,17 @@ protected RequestManagers create() {
metrics);
}

final FetchRequestManager fetch = new FetchRequestManager(logContext,
time,
metadata,
subscriptions,
fetchConfig,
fetchBuffer,
fetchMetricsManager,
networkClientDelegate,
apiVersions,
Optional.ofNullable(membershipManager));

final OffsetsRequestManager listOffsets = new OffsetsRequestManager(subscriptions,
metadata,
fetchConfig.isolationLevel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ public synchronized boolean hasAutoAssignedPartitions() {
}

public synchronized void position(TopicPartition tp, FetchPosition position) {
log.info("Set offset for partition {} to position {}", tp, position);
assignedState(tp).position(position);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class CommitRequestManagerTest {
private MockTime time;
private CoordinatorRequestManager coordinatorRequestManager;
private OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private FetchBuffer fetchBuffer;
private final Metrics metrics = new Metrics();
private Properties props;

Expand All @@ -123,6 +124,7 @@ public void setup() {
this.metadata = mock(ConsumerMetadata.class);
this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
this.offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class);
this.fetchBuffer = mock(FetchBuffer.class);
this.props = new Properties();
this.props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
this.props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Expand All @@ -146,7 +148,8 @@ public void testOffsetFetchRequestStateToStringBase() {
retryBackoffMaxMs,
OptionalDouble.of(0),
metrics,
metadata);
metadata,
fetchBuffer);

commitRequestManager.onMemberEpochUpdated(Optional.of(1), Uuid.randomUuid().toString());
Set<TopicPartition> requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1));
Expand Down Expand Up @@ -608,6 +611,7 @@ public void testAutocommitEnsureOnlyOneInflightRequest() {
@Test
public void testAutoCommitBeforeRevocationNotBlockedByAutoCommitOnIntervalInflightRequest() {
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
when(fetchBuffer.isEmpty()).thenReturn(true);
TopicPartition t1p = new TopicPartition("topic1", 0);
subscriptionState.assignFromUser(singleton(t1p));
subscriptionState.seek(t1p, 100);
Expand Down Expand Up @@ -1205,6 +1209,7 @@ public void testAutoCommitSyncBeforeRevocationRetriesOnRetriableAndStaleEpoch(Er
// interval and just test the auto-commits triggered before revocation
CommitRequestManager commitRequestManager = create(true, Integer.MAX_VALUE);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
when(fetchBuffer.isEmpty()).thenReturn(true);

TopicPartition tp = new TopicPartition("topic", 1);
subscriptionState.assignFromUser(singleton(tp));
Expand Down Expand Up @@ -1258,6 +1263,7 @@ public void testLastEpochSentOnCommit() {
// interval and just test the auto-commits triggered before revocation
CommitRequestManager commitRequestManager = create(true, Integer.MAX_VALUE);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
when(fetchBuffer.isEmpty()).thenReturn(true);

TopicPartition tp = new TopicPartition("topic", 1);
subscriptionState.assignFromUser(singleton(tp));
Expand Down Expand Up @@ -1576,7 +1582,8 @@ private CommitRequestManager create(final boolean autoCommitEnabled, final long
retryBackoffMaxMs,
OptionalDouble.of(0),
metrics,
metadata));
metadata,
fetchBuffer));
}

private ClientResponse buildOffsetFetchClientResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3724,7 +3724,7 @@ public TestableFetchRequestManager(LogContext logContext,
NetworkClientDelegate networkClientDelegate,
FetchCollector<K, V> fetchCollector,
ApiVersions apiVersions) {
super(logContext, time, metadata, subscriptions, fetchConfig, fetchBuffer, metricsManager, networkClientDelegate, apiVersions);
super(logContext, time, metadata, subscriptions, fetchConfig, fetchBuffer, metricsManager, networkClientDelegate, apiVersions, Optional.empty());
this.fetchCollector = fetchCollector;
}

Expand Down

0 comments on commit e608c5b

Please sign in to comment.