Skip to content

Commit

Permalink
KAFKA-16460: New consumer times out consuming records in multiple con…
Browse files Browse the repository at this point in the history
…sumer_test.py system tests (apache#17777)

Reviewers: Lianet Magrans <[email protected]>
  • Loading branch information
FrankYang0529 authored Nov 15, 2024
1 parent 283d56c commit 5725a51
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,10 @@ void maybeReconcile() {
revokedPartitions
);

// Mark partitions as pending revocation to stop fetching from the partitions (no new
// fetches sent out, and no in-flight fetches responses processed).
markPendingRevocationToPauseFetching(revokedPartitions);

// Commit offsets if auto-commit enabled before reconciling a new assignment. Request will
// be retried until it succeeds, fails with non-retriable error, or timer expires.
CompletableFuture<Void> commitResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -86,6 +87,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -1433,6 +1435,7 @@ public void testReconcilePartitionsRevokedNoAutoCommitNoCallbacks() {
membershipManager.poll(time.milliseconds());

testRevocationOfAllPartitionsCompleted(membershipManager);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));
}

@Test
Expand All @@ -1456,6 +1459,10 @@ public void testReconcilePartitionsRevokedWithSuccessfulAutoCommitNoCallbacks()

// Complete commit request
commitResult.complete(null);
InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRevocation(anyLong());
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));

testRevocationOfAllPartitionsCompleted(membershipManager);
}
Expand All @@ -1480,6 +1487,7 @@ public void testReconcilePartitionsRevokedWithFailedAutoCommitCompletesRevocatio
// Complete commit request
commitResult.completeExceptionally(new KafkaException("Commit request failed with " +
"non-retriable error"));
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));

testRevocationOfAllPartitionsCompleted(membershipManager);
}
Expand Down Expand Up @@ -1579,11 +1587,11 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());

// Member received assignment to reconcile;

receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);

verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
verify(subscriptionState).markPendingRevocation(Set.of());

// Member should complete reconciliation
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
Expand All @@ -1607,6 +1615,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
receiveAssignment(topicId, Collections.singletonList(1), membershipManager);

membershipManager.poll(time.milliseconds());
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition(topicName, 0)));

// Revocation should complete without requesting any metadata update given that the topic
// received in target assignment should exist in local topic name cache.
Expand Down Expand Up @@ -2551,7 +2560,6 @@ private void testRevocationCompleted(ConsumerMembershipManager membershipManager
assertEquals(assignmentByTopicId, membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress());

verify(subscriptionState).markPendingRevocation(anySet());
List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment);
HashSet<TopicPartition> expectedSet = new HashSet<>(expectedTopicPartitionAssignment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable

verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
verify(subscriptionState).markPendingRevocation(Set.of());

// Member should complete reconciliation
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
Expand All @@ -1123,6 +1124,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
receiveAssignment(topicId, Collections.singletonList(1), membershipManager);

membershipManager.poll(time.milliseconds());
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition(topicName, 0)));

// Revocation should complete without requesting any metadata update given that the topic
// received in target assignment should exist in local topic name cache.
Expand Down Expand Up @@ -1423,7 +1425,6 @@ private void testRevocationCompleted(ShareMembershipManager membershipManager,
assertEquals(assignmentByTopicId, membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress());

verify(subscriptionState).markPendingRevocation(anySet());
List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment);
HashSet<TopicPartition> expectedSet = new HashSet<>(expectedTopicPartitionAssignment);
Expand Down

0 comments on commit 5725a51

Please sign in to comment.