Skip to content

Commit

Permalink
KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState (
Browse files Browse the repository at this point in the history
…apache#15972)

This rack information is required to compute rack-aware assignments, which many of the current assigners do.

The internal ClientMetadata class was also edited to pass around this rack information.

Reviewers: Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
apourchet authored and gongxuanzhang committed Jun 12, 2024
1 parent d9da8ca commit dd275ce
Show file tree
Hide file tree
Showing 12 changed files with 489 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.Map;
import java.util.Set;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.errors.TaskAssignmentException;

/**
Expand Down Expand Up @@ -49,17 +48,5 @@ public interface ApplicationState {
/**
* @return the set of all tasks in this topology which must be assigned
*/
Set<TaskId> allTasks();

/**
*
* @return the set of stateful and changelogged tasks in this topology
*/
Set<TaskId> statefulTasks();

/**
*
* @return the set of stateless or changelog-less tasks in this topology
*/
Set<TaskId> statelessTasks();
Set<TaskInfo> allTasks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,37 @@ public class KafkaStreamsAssignment {
private final Set<AssignedTask> assignment;
private final Optional<Instant> followupRebalanceDeadline;

/**
* Construct an instance of KafkaStreamsAssignment with this processId and the given set of
* assigned tasks. If you want this KafkaStreams client to request a followup rebalance, you
* can set the followupRebalanceDeadline via the {@link #withFollowupRebalance(Instant)} API.
*
* @param processId the processId for the KafkaStreams client that should receive this assignment
* @param assignment the set of tasks to be assigned to this KafkaStreams client
*
* @return a new KafkaStreamsAssignment object with the given processId and assignment
*/
public static KafkaStreamsAssignment of(final ProcessId processId, final Set<AssignedTask> assignment) {
return new KafkaStreamsAssignment(processId, assignment, Optional.empty());
}

/**
* This API can be used to request that a followup rebalance be triggered by the KafkaStreams client
* receiving this assignment. The followup rebalance will be initiated after the provided deadline
* has passed, although it will always wait until it has finished the current rebalance before
* triggering a new one. This request will last until the new rebalance, and will be erased if a
* new rebalance begins before the scheduled followup rebalance deadline has elapsed. The next
* assignment must request the followup rebalance again if it still wants to schedule one for
* the given instant, otherwise no additional rebalance will be triggered after that.
*
* @param rebalanceDeadline the instant after which this KafkaStreams client will trigger a followup rebalance
*
* @return a new KafkaStreamsAssignment object with the same processId and assignment but with the given rebalanceDeadline
*/
public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline) {
return new KafkaStreamsAssignment(this.processId(), this.assignment(), Optional.of(rebalanceDeadline));
}

private KafkaStreamsAssignment(final ProcessId processId,
final Set<AssignedTask> assignment,
final Optional<Instant> followupRebalanceDeadline) {
Expand Down Expand Up @@ -65,14 +96,6 @@ public Optional<Instant> followupRebalanceDeadline() {
return followupRebalanceDeadline;
}

public static KafkaStreamsAssignment of(final ProcessId processId, final Set<AssignedTask> assignment) {
return new KafkaStreamsAssignment(processId, assignment, Optional.empty());
}

public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline) {
return new KafkaStreamsAssignment(this.processId(), this.assignment(), Optional.of(rebalanceDeadline));
}

public static class AssignedTask {
private final TaskId id;
private final Type taskType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,9 @@ public interface KafkaStreamsState {
* @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig}
*/
Map<String, String> clientTags();

/**
* @return the rackId for this KafkaStreams client, or {@link Optional#empty()} if none was configured
*/
Optional<String> rackId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.assignment;


import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;

/**
* A simple container class corresponding to a given {@link TaskId}.
* Includes metadata such as whether it's stateful and the names of all state stores
* belonging to this task, the set of input topic partitions and changelog topic partitions
* for all logged state stores, and the rack ids of all replicas of each topic partition
* in the task.
*/
public interface TaskInfo {

/**
*
* @return The {@code TaskId} of the underlying task.
*/
TaskId id();

/**
*
* @return true if the underlying task is stateful, and false otherwise.
*/
boolean isStateful();

/**
*
* @return the set of state store names that this task makes use of. In the case of stateless tasks,
* this set will be empty as no state stores are used.
*/
Set<String> stateStoreNames();

/**
*
* @return the set of source topic partitions. This set will include both changelog and non-changelog
* topic partitions.
*/
Set<TopicPartition> sourceTopicPartitions();

/**
*
* @return the set of changelog topic partitions. This set will include both source and non-source
* topic partitions.
*/
Set<TopicPartition> changelogTopicPartitions();

/**
*
* @return the mapping of {@code TopicPartition} to set of rack ids that this partition resides on.
*/
Map<TopicPartition, Set<String>> partitionToRackIds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,15 @@ public Set<InternalTopicConfig> nonSourceChangelogTopics() {
return topicConfigs;
}

/**
*
* @return the set of changelog topics, which includes both source changelog topics and non
* source changelog topics.
*/
public Set<String> changelogTopics() {
return Collections.unmodifiableSet(stateChangelogTopics.keySet());
}

/**
* Returns the topic names for any optimized source changelogs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment;
import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl;
Expand All @@ -52,10 +53,12 @@
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackUtils;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

Expand All @@ -82,6 +85,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableSet;
import static java.util.Map.Entry.comparingByKey;
import static java.util.UUID.randomUUID;
import static org.apache.kafka.common.utils.Utils.filterMap;
Expand Down Expand Up @@ -133,8 +137,9 @@ public static class ClientMetadata {
private final HostInfo hostInfo;
private final ClientState state;
private final SortedSet<String> consumers;
private final Optional<String> rackId;

ClientMetadata(final UUID processId, final String endPoint, final Map<String, String> clientTags) {
ClientMetadata(final UUID processId, final String endPoint, final Map<String, String> clientTags, final Optional<String> rackId) {

// get the host info, or null if no endpoint is configured (ie endPoint == null)
hostInfo = HostInfo.buildFromEndpoint(endPoint);
Expand All @@ -144,6 +149,8 @@ public static class ClientMetadata {

// initialize the client state with client tags
state = new ClientState(processId, clientTags);

this.rackId = rackId;
}

void addConsumer(final String consumerMemberId, final List<TopicPartition> ownedPartitions) {
Expand All @@ -164,6 +171,10 @@ public HostInfo hostInfo() {
return hostInfo;
}

public Optional<String> rackId() {
return rackId;
}

@Override
public String toString() {
return "ClientMetadata{" +
Expand Down Expand Up @@ -355,7 +366,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
futureMetadataVersion = usedVersion;
processId = FUTURE_ID;
if (!clientMetadataMap.containsKey(FUTURE_ID)) {
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(FUTURE_ID, null, Collections.emptyMap()));
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(FUTURE_ID, null, Collections.emptyMap(), subscription.rackId()));
}
} else {
processId = info.processId();
Expand All @@ -367,7 +378,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr

// create the new client metadata if necessary
if (clientMetadata == null) {
clientMetadata = new ClientMetadata(info.processId(), info.userEndPoint(), info.clientTags());
clientMetadata = new ClientMetadata(info.processId(), info.userEndPoint(), info.clientTags(), subscription.rackId());
clientMetadataMap.put(info.processId(), clientMetadata);
}

Expand Down Expand Up @@ -474,23 +485,84 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
*
* @param clientMetadataMap the map of process id to client metadata used to build an immutable
* {@code ApplicationState}
* @param statefulTasks the set of {@code TaskId} that correspond to all the stateful
* tasks that need to be reassigned.
* @return The {@code ApplicationState} needed by the TaskAssigner to compute new task
* assignments.
*/
private ApplicationState buildApplicationState(final Map<UUID, ClientMetadata> clientMetadataMap,
final Set<TaskId> statefulTasks) {
final Set<TaskId> statelessTasks = new HashSet<>();
for (final Map.Entry<UUID, ClientMetadata> clientEntry : clientMetadataMap.entrySet()) {
final ClientState clientState = clientEntry.getValue().state;
statelessTasks.addAll(clientState.statelessActiveTasks());
private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata,
final Map<UUID, ClientMetadata> clientMetadataMap,
final Map<Subtopology, TopicsInfo> topicGroups,
final Cluster cluster) {
final Map<Subtopology, Set<String>> sourceTopicsByGroup = new HashMap<>();
final Map<Subtopology, Set<String>> changelogTopicsByGroup = new HashMap<>();
for (final Map.Entry<Subtopology, TopicsInfo> entry : topicGroups.entrySet()) {
final Set<String> sourceTopics = entry.getValue().sourceTopics;
final Set<String> changelogTopics = entry.getValue().changelogTopics();
sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
}

final Map<TaskId, Set<TopicPartition>> sourcePartitionsForTask =
partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask =
partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);

if (!sourcePartitionsForTask.keySet().equals(changelogPartitionsForTask.keySet())) {
log.error("Partition grouper returned {} tasks for source topics but {} tasks for changelog topics",
sourcePartitionsForTask.size(), changelogPartitionsForTask.size());
throw new TaskAssignmentException("Partition grouper returned conflicting information about the "
+ "tasks for source topics vs changelog topics.");
}

final Set<TopicPartition> sourceTopicPartitions = new HashSet<>();
final Set<TopicPartition> nonSourceChangelogTopicPartitions = new HashSet<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : sourcePartitionsForTask.entrySet()) {
final TaskId taskId = entry.getKey();
final Set<TopicPartition> taskSourcePartitions = entry.getValue();
final Set<TopicPartition> taskChangelogPartitions = changelogPartitionsForTask.get(taskId);
final Set<TopicPartition> taskNonSourceChangelogPartitions = new HashSet<>(taskChangelogPartitions);
taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);

sourceTopicPartitions.addAll(taskSourcePartitions);
nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
}

final Map<TopicPartition, Set<String>> racksForSourcePartitions = RackUtils.getRacksForTopicPartition(
cluster, internalTopicManager, sourceTopicPartitions, false);
final Map<TopicPartition, Set<String>> racksForChangelogPartitions = RackUtils.getRacksForTopicPartition(
cluster, internalTopicManager, nonSourceChangelogTopicPartitions, true);

final Set<TaskId> logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet());
final Set<TaskInfo> logicalTasks = logicalTaskIds.stream().map(taskId -> {
final Set<String> stateStoreNames = topologyMetadata
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
.keySet();
final Set<TopicPartition> sourcePartitions = sourcePartitionsForTask.get(taskId);
final Set<TopicPartition> changelogPartitions = changelogPartitionsForTask.get(taskId);
final Map<TopicPartition, Set<String>> racksForTaskPartition = new HashMap<>();
sourcePartitions.forEach(topicPartition -> {
racksForTaskPartition.put(topicPartition, racksForSourcePartitions.get(topicPartition));
});
changelogPartitions.forEach(topicPartition -> {
if (racksForSourcePartitions.containsKey(topicPartition)) {
racksForTaskPartition.put(topicPartition, racksForSourcePartitions.get(topicPartition));
} else {
racksForTaskPartition.put(topicPartition, racksForChangelogPartitions.get(topicPartition));
}
});

return new DefaultTaskInfo(
taskId,
!stateStoreNames.isEmpty(),
racksForTaskPartition,
stateStoreNames,
sourcePartitions,
changelogPartitions
);
}).collect(Collectors.toSet());

return new ApplicationStateImpl(
assignmentConfigs.toPublicAssignmentConfigs(),
statefulTasks,
statelessTasks,
logicalTasks,
clientMetadataMap
);
}
Expand Down
Loading

0 comments on commit dd275ce

Please sign in to comment.