Skip to content

Commit

Permalink
KAFKA-16985: Ensure consumer attempts to send leave request on close …
Browse files Browse the repository at this point in the history
…even if interrupted (apache#16686)

Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>, Lianet Magrans <[email protected]>, Philip Nee <[email protected]>
  • Loading branch information
kirktrue authored Nov 13, 2024
1 parent 48ff6a6 commit b6b2c9e
Show file tree
Hide file tree
Showing 9 changed files with 385 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -130,7 +129,8 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
* requests in cases where a currently assigned topic is in the target assignment (new
* partition assigned, or revoked), but it is not present the Metadata cache at that moment.
* The cache is cleared when the subscription changes ({@link #transitionToJoining()}, the
* member fails ({@link #transitionToFatal()} or leaves the group ({@link #leaveGroup()}).
* member fails ({@link #transitionToFatal()} or leaves the group
* ({@link #leaveGroup()}/{@link #leaveGroupOnClose()}).
*/
private final Map<Uuid, String> assignedTopicNamesCache;

Expand All @@ -157,9 +157,9 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
private boolean rejoinedWhileReconciliationInProgress;

/**
* If the member is currently leaving the group after a call to {@link #leaveGroup()}}, this
* will have a future that will complete when the ongoing leave operation completes
* (callbacks executed and heartbeat request to leave is sent out). This will be empty is the
* If the member is currently leaving the group after a call to {@link #leaveGroup()} or
* {@link #leaveGroupOnClose()}, this will have a future that will complete when the ongoing leave operation
* completes (callbacks executed and heartbeat request to leave is sent out). This will be empty is the
* member is not leaving.
*/
private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();
Expand Down Expand Up @@ -481,6 +481,7 @@ public void onConsumerPoll() {
private void clearAssignment() {
if (subscriptions.hasAutoAssignedPartitions()) {
subscriptions.assignFromSubscribed(Collections.emptySet());
notifyAssignmentChange(Collections.emptySet());
}
currentAssignment = LocalAssignment.NONE;
clearPendingAssignmentsAndLocalNamesCache();
Expand All @@ -496,8 +497,9 @@ private void clearAssignment() {
*/
private void updateSubscriptionAwaitingCallback(SortedSet<TopicIdPartition> assignedPartitions,
SortedSet<TopicPartition> addedPartitions) {
Collection<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedPartitions);
Set<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedPartitions);
subscriptions.assignFromSubscribedAwaitingCallback(assignedTopicPartitions, addedPartitions);
notifyAssignmentChange(assignedTopicPartitions);
}

/**
Expand All @@ -523,18 +525,45 @@ public void transitionToJoining() {
/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request and leave the group.
* This is expected to be invoked when the user calls the unsubscribe API.
* This is expected to be invoked when the user calls the {@link Consumer#close()} API.
*
* @return Future that will complete when the heartbeat to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroupOnClose() {
return leaveGroup(false);
}

/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request and leave the group.
* This is expected to be invoked when the user calls the {@link Consumer#unsubscribe()} API.
*
* @return Future that will complete when the callback execution completes and the heartbeat
* to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroup() {
return leaveGroup(true);
}

/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request and leave the group.
* This is expected to be invoked when the user calls the unsubscribe API or is closing the consumer.
*
* @param runCallbacks {@code true} to insert the step to execute the {@link ConsumerRebalanceListener} callback,
* {@code false} to skip
*
* @return Future that will complete when the callback execution completes and the heartbeat
* to leave the group has been sent out.
*/
protected CompletableFuture<Void> leaveGroup(boolean runCallbacks) {
if (isNotInGroup()) {
if (state == MemberState.FENCED) {
clearAssignment();
transitionTo(MemberState.UNSUBSCRIBED);
}
subscriptions.unsubscribe();
notifyAssignmentChange(Collections.emptySet());
return CompletableFuture.completedFuture(null);
}

Expand All @@ -549,31 +578,39 @@ public CompletableFuture<Void> leaveGroup() {
CompletableFuture<Void> leaveResult = new CompletableFuture<>();
leaveGroupInProgress = Optional.of(leaveResult);

CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
callbackResult.whenComplete((result, error) -> {
if (error != null) {
log.error("Member {} callback to release assignment failed. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberId, error);
} else {
log.info("Member {} completed callback to release assignment. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberId);
}

// Clear the subscription, no matter if the callback execution failed or succeeded.
subscriptions.unsubscribe();
clearAssignment();
if (runCallbacks) {
CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
callbackResult.whenComplete((result, error) -> {
if (error != null) {
log.error("Member {} callback to release assignment failed. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberId, error);
} else {
log.info("Member {} completed callback to release assignment. It will proceed " +
"to clear its assignment and send a leave group heartbeat", memberId);
}

// Transition to ensure that a heartbeat request is sent out to effectively leave the
// group (even in the case where the member had no assignment to release or when the
// callback execution failed.)
transitionToSendingLeaveGroup(false);
});
// Clear the assignment, no matter if the callback execution failed or succeeded.
clearAssignmentAndLeaveGroup();
});
} else {
clearAssignmentAndLeaveGroup();
}

// Return future to indicate that the leave group is done when the callbacks
// complete, and the transition to send the heartbeat has been made.
return leaveResult;
}

private void clearAssignmentAndLeaveGroup() {
subscriptions.unsubscribe();
clearAssignment();

// Transition to ensure that a heartbeat request is sent out to effectively leave the
// group (even in the case where the member had no assignment to release or when the
// callback execution failed.)
transitionToSendingLeaveGroup(false);
}

/**
* Reset member epoch to the value required for the leave the group heartbeat request, and
* transition to the {@link MemberState#LEAVING} state so that a heartbeat request is sent
Expand Down Expand Up @@ -616,6 +653,15 @@ void notifyEpochChange(Optional<Integer> epoch) {
stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId));
}

/**
* Invokes the {@link MemberStateListener#onGroupAssignmentUpdated(Set)} callback for each listener when the
* set of assigned partitions changes. This includes on assignment changes, unsubscribe, and when leaving
* the group.
*/
void notifyAssignmentChange(Set<TopicPartition> partitions) {
stateUpdatesListeners.forEach(stateListener -> stateListener.onGroupAssignmentUpdated(partitions));
}

/**
* @return True if the member should send heartbeat to the coordinator without waiting for
* the interval.
Expand Down
Loading

0 comments on commit b6b2c9e

Please sign in to comment.