Skip to content

Commit

Permalink
KAFKA-16133 - Reconciliation auto-commit fix (apache#15194)
Browse files Browse the repository at this point in the history
This fixes an issue with the time boundaries used for the auto-commit performed when partitions are revoked.

Reviewers: Lucas Brutschy <[email protected]>
  • Loading branch information
lianetm authored Jan 15, 2024
1 parent 3041151 commit b16df3b
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

Expand Down Expand Up @@ -275,6 +276,8 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
*/
private final BackgroundEventHandler backgroundEventHandler;

private final Time time;

public MembershipManagerImpl(String groupId,
Optional<String> groupInstanceId,
int rebalanceTimeoutMs,
Expand All @@ -284,7 +287,8 @@ public MembershipManagerImpl(String groupId,
ConsumerMetadata metadata,
LogContext logContext,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
BackgroundEventHandler backgroundEventHandler) {
BackgroundEventHandler backgroundEventHandler,
Time time) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.serverAssignor = serverAssignor;
Expand All @@ -301,6 +305,7 @@ public MembershipManagerImpl(String groupId,
this.clientTelemetryReporter = clientTelemetryReporter;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.backgroundEventHandler = backgroundEventHandler;
this.time = time;
}

/**
Expand Down Expand Up @@ -833,7 +838,7 @@ boolean reconcile() {
// the current reconciliation is in process. Note this is using the rebalance timeout as
// it is the limit enforced by the broker to complete the reconciliation process.
commitResult = commitRequestManager.maybeAutoCommitAllConsumedNow(
Optional.of((long) rebalanceTimeoutMs),
Optional.of(getExpirationTimeForTimeout(rebalanceTimeoutMs)),
true);

// Execute commit -> onPartitionsRevoked -> onPartitionsAssigned.
Expand All @@ -854,6 +859,14 @@ boolean reconcile() {
return true;
}

long getExpirationTimeForTimeout(final long timeoutMs) {
long expiration = time.milliseconds() + timeoutMs;
if (expiration < 0) {
return Long.MAX_VALUE;
}
return expiration;
}

/**
* Trigger onPartitionsRevoked callbacks if any partitions where revoked. If it succeeds,
* proceed to trigger the onPartitionsAssigned (even if no new partitions were added), and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ protected RequestManagers create() {
metadata,
logContext,
clientTelemetryReporter,
backgroundEventHandler);
backgroundEventHandler,
time);
membershipManager.registerStateListener(commit);
heartbeatRequestManager = new HeartbeatRequestManager(
logContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public ConsumerTestBuilder(Optional<GroupInformation> groupInfo, boolean enableA
metadata,
logContext,
Optional.empty(),
backgroundEventHandler
backgroundEventHandler,
time
)
);
HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class MembershipManagerImplTest {
private ConsumerTestBuilder testBuilder;
private BlockingQueue<BackgroundEvent> backgroundEventQueue;
private BackgroundEventHandler backgroundEventHandler;
private Time time;

@BeforeEach
public void setup() {
Expand All @@ -98,6 +100,7 @@ public void setup() {
commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
backgroundEventQueue = testBuilder.backgroundEventQueue;
backgroundEventHandler = testBuilder.backgroundEventHandler;
time = testBuilder.time;
}

@AfterEach
Expand All @@ -111,7 +114,7 @@ private MembershipManagerImpl createMembershipManagerJoiningGroup() {
MembershipManagerImpl manager = spy(new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
backgroundEventHandler));
backgroundEventHandler, time));
manager.transitionToJoining();
return manager;
}
Expand All @@ -120,7 +123,7 @@ private MembershipManagerImpl createMembershipManagerJoiningGroup(String groupIn
MembershipManagerImpl manager = spy(new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
backgroundEventHandler));
backgroundEventHandler, time));
manager.transitionToJoining();
return manager;
}
Expand All @@ -130,7 +133,7 @@ private MembershipManagerImpl createMembershipManagerJoiningGroup(String groupIn
MembershipManagerImpl manager = new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager,
metadata, logContext, Optional.empty(), backgroundEventHandler);
metadata, logContext, Optional.empty(), backgroundEventHandler, time);
manager.transitionToJoining();
return manager;
}
Expand All @@ -156,7 +159,7 @@ public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin()
MembershipManagerImpl manager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
backgroundEventHandler);
backgroundEventHandler, time);
manager.transitionToJoining();
verify(metadata).addClusterUpdateListener(manager);
clearInvocations(metadata);
Expand Down Expand Up @@ -235,7 +238,7 @@ public void testTransitionToFailedWhenTryingToJoin() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
backgroundEventHandler);
backgroundEventHandler, time);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.transitionToJoining();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1553,9 +1553,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}

// TODO: enable this test for the consumer group protocol when auto-commit support is implemented.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testAutoCommitOnRebalance(quorum: String, groupProtocol: String): Unit = {
val topic2 = "topic2"
createTopic(topic2, 2, brokerCount)
Expand Down

0 comments on commit b16df3b

Please sign in to comment.