diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ApplicationState.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ApplicationState.java index 84dc40b8d6f10..0fb4f06f205a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ApplicationState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/ApplicationState.java @@ -18,7 +18,6 @@ import java.util.Map; import java.util.Set; -import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.errors.TaskAssignmentException; /** @@ -49,17 +48,5 @@ public interface ApplicationState { /** * @return the set of all tasks in this topology which must be assigned */ - Set allTasks(); - - /** - * - * @return the set of stateful and changelogged tasks in this topology - */ - Set statefulTasks(); - - /** - * - * @return the set of stateless or changelog-less tasks in this topology - */ - Set statelessTasks(); + Set allTasks(); } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java index 2dd2896755615..bd2497e261d89 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsAssignment.java @@ -33,6 +33,37 @@ public class KafkaStreamsAssignment { private final Set assignment; private final Optional followupRebalanceDeadline; + /** + * Construct an instance of KafkaStreamsAssignment with this processId and the given set of + * assigned tasks. If you want this KafkaStreams client to request a followup rebalance, you + * can set the followupRebalanceDeadline via the {@link #withFollowupRebalance(Instant)} API. + * + * @param processId the processId for the KafkaStreams client that should receive this assignment + * @param assignment the set of tasks to be assigned to this KafkaStreams client + * + * @return a new KafkaStreamsAssignment object with the given processId and assignment + */ + public static KafkaStreamsAssignment of(final ProcessId processId, final Set assignment) { + return new KafkaStreamsAssignment(processId, assignment, Optional.empty()); + } + + /** + * This API can be used to request that a followup rebalance be triggered by the KafkaStreams client + * receiving this assignment. The followup rebalance will be initiated after the provided deadline + * has passed, although it will always wait until it has finished the current rebalance before + * triggering a new one. This request will last until the new rebalance, and will be erased if a + * new rebalance begins before the scheduled followup rebalance deadline has elapsed. The next + * assignment must request the followup rebalance again if it still wants to schedule one for + * the given instant, otherwise no additional rebalance will be triggered after that. + * + * @param rebalanceDeadline the instant after which this KafkaStreams client will trigger a followup rebalance + * + * @return a new KafkaStreamsAssignment object with the same processId and assignment but with the given rebalanceDeadline + */ + public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline) { + return new KafkaStreamsAssignment(this.processId(), this.assignment(), Optional.of(rebalanceDeadline)); + } + private KafkaStreamsAssignment(final ProcessId processId, final Set assignment, final Optional followupRebalanceDeadline) { @@ -65,14 +96,6 @@ public Optional followupRebalanceDeadline() { return followupRebalanceDeadline; } - public static KafkaStreamsAssignment of(final ProcessId processId, final Set assignment) { - return new KafkaStreamsAssignment(processId, assignment, Optional.empty()); - } - - public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline) { - return new KafkaStreamsAssignment(this.processId(), this.assignment(), Optional.of(rebalanceDeadline)); - } - public static class AssignedTask { private final TaskId id; private final Type taskType; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java index 002ca368ada02..938bb12fedc48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/KafkaStreamsState.java @@ -100,4 +100,9 @@ public interface KafkaStreamsState { * @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig} */ Map clientTags(); + + /** + * @return the rackId for this KafkaStreams client, or {@link Optional#empty()} if none was configured + */ + Optional rackId(); } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java new file mode 100644 index 0000000000000..2e61c2a5b561a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment; + + +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +/** + * A simple container class corresponding to a given {@link TaskId}. + * Includes metadata such as whether it's stateful and the names of all state stores + * belonging to this task, the set of input topic partitions and changelog topic partitions + * for all logged state stores, and the rack ids of all replicas of each topic partition + * in the task. + */ +public interface TaskInfo { + + /** + * + * @return The {@code TaskId} of the underlying task. + */ + TaskId id(); + + /** + * + * @return true if the underlying task is stateful, and false otherwise. + */ + boolean isStateful(); + + /** + * + * @return the set of state store names that this task makes use of. In the case of stateless tasks, + * this set will be empty as no state stores are used. + */ + Set stateStoreNames(); + + /** + * + * @return the set of source topic partitions. This set will include both changelog and non-changelog + * topic partitions. + */ + Set sourceTopicPartitions(); + + /** + * + * @return the set of changelog topic partitions. This set will include both source and non-source + * topic partitions. + */ + Set changelogTopicPartitions(); + + /** + * + * @return the mapping of {@code TopicPartition} to set of rack ids that this partition resides on. + */ + Map> partitionToRackIds(); +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 184f7f96f2b7c..144adca49413a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1974,6 +1974,15 @@ public Set nonSourceChangelogTopics() { return topicConfigs; } + /** + * + * @return the set of changelog topics, which includes both source changelog topics and non + * source changelog topics. + */ + public Set changelogTopics() { + return Collections.unmodifiableSet(stateChangelogTopics.keySet()); + } + /** * Returns the topic names for any optimized source changelogs */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 4312445bae580..82a10615293e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.TaskInfo; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment; import org.apache.kafka.streams.processor.internals.assignment.ApplicationStateImpl; @@ -52,10 +53,12 @@ import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer; import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor; +import org.apache.kafka.streams.processor.internals.assignment.RackUtils; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo; import org.apache.kafka.streams.state.HostInfo; import org.slf4j.Logger; @@ -82,6 +85,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static java.util.Collections.unmodifiableSet; import static java.util.Map.Entry.comparingByKey; import static java.util.UUID.randomUUID; import static org.apache.kafka.common.utils.Utils.filterMap; @@ -133,8 +137,9 @@ public static class ClientMetadata { private final HostInfo hostInfo; private final ClientState state; private final SortedSet consumers; + private final Optional rackId; - ClientMetadata(final UUID processId, final String endPoint, final Map clientTags) { + ClientMetadata(final UUID processId, final String endPoint, final Map clientTags, final Optional rackId) { // get the host info, or null if no endpoint is configured (ie endPoint == null) hostInfo = HostInfo.buildFromEndpoint(endPoint); @@ -144,6 +149,8 @@ public static class ClientMetadata { // initialize the client state with client tags state = new ClientState(processId, clientTags); + + this.rackId = rackId; } void addConsumer(final String consumerMemberId, final List ownedPartitions) { @@ -164,6 +171,10 @@ public HostInfo hostInfo() { return hostInfo; } + public Optional rackId() { + return rackId; + } + @Override public String toString() { return "ClientMetadata{" + @@ -355,7 +366,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr futureMetadataVersion = usedVersion; processId = FUTURE_ID; if (!clientMetadataMap.containsKey(FUTURE_ID)) { - clientMetadataMap.put(FUTURE_ID, new ClientMetadata(FUTURE_ID, null, Collections.emptyMap())); + clientMetadataMap.put(FUTURE_ID, new ClientMetadata(FUTURE_ID, null, Collections.emptyMap(), subscription.rackId())); } } else { processId = info.processId(); @@ -367,7 +378,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // create the new client metadata if necessary if (clientMetadata == null) { - clientMetadata = new ClientMetadata(info.processId(), info.userEndPoint(), info.clientTags()); + clientMetadata = new ClientMetadata(info.processId(), info.userEndPoint(), info.clientTags(), subscription.rackId()); clientMetadataMap.put(info.processId(), clientMetadata); } @@ -474,23 +485,84 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr * * @param clientMetadataMap the map of process id to client metadata used to build an immutable * {@code ApplicationState} - * @param statefulTasks the set of {@code TaskId} that correspond to all the stateful - * tasks that need to be reassigned. * @return The {@code ApplicationState} needed by the TaskAssigner to compute new task * assignments. */ - private ApplicationState buildApplicationState(final Map clientMetadataMap, - final Set statefulTasks) { - final Set statelessTasks = new HashSet<>(); - for (final Map.Entry clientEntry : clientMetadataMap.entrySet()) { - final ClientState clientState = clientEntry.getValue().state; - statelessTasks.addAll(clientState.statelessActiveTasks()); + private ApplicationState buildApplicationState(final TopologyMetadata topologyMetadata, + final Map clientMetadataMap, + final Map topicGroups, + final Cluster cluster) { + final Map> sourceTopicsByGroup = new HashMap<>(); + final Map> changelogTopicsByGroup = new HashMap<>(); + for (final Map.Entry entry : topicGroups.entrySet()) { + final Set sourceTopics = entry.getValue().sourceTopics; + final Set changelogTopics = entry.getValue().changelogTopics(); + sourceTopicsByGroup.put(entry.getKey(), sourceTopics); + changelogTopicsByGroup.put(entry.getKey(), changelogTopics); + } + + final Map> sourcePartitionsForTask = + partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster); + final Map> changelogPartitionsForTask = + partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster); + + if (!sourcePartitionsForTask.keySet().equals(changelogPartitionsForTask.keySet())) { + log.error("Partition grouper returned {} tasks for source topics but {} tasks for changelog topics", + sourcePartitionsForTask.size(), changelogPartitionsForTask.size()); + throw new TaskAssignmentException("Partition grouper returned conflicting information about the " + + "tasks for source topics vs changelog topics."); } + final Set sourceTopicPartitions = new HashSet<>(); + final Set nonSourceChangelogTopicPartitions = new HashSet<>(); + for (final Map.Entry> entry : sourcePartitionsForTask.entrySet()) { + final TaskId taskId = entry.getKey(); + final Set taskSourcePartitions = entry.getValue(); + final Set taskChangelogPartitions = changelogPartitionsForTask.get(taskId); + final Set taskNonSourceChangelogPartitions = new HashSet<>(taskChangelogPartitions); + taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions); + + sourceTopicPartitions.addAll(taskSourcePartitions); + nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions); + } + + final Map> racksForSourcePartitions = RackUtils.getRacksForTopicPartition( + cluster, internalTopicManager, sourceTopicPartitions, false); + final Map> racksForChangelogPartitions = RackUtils.getRacksForTopicPartition( + cluster, internalTopicManager, nonSourceChangelogTopicPartitions, true); + + final Set logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet()); + final Set logicalTasks = logicalTaskIds.stream().map(taskId -> { + final Set stateStoreNames = topologyMetadata + .stateStoreNameToSourceTopicsForTopology(taskId.topologyName()) + .keySet(); + final Set sourcePartitions = sourcePartitionsForTask.get(taskId); + final Set changelogPartitions = changelogPartitionsForTask.get(taskId); + final Map> racksForTaskPartition = new HashMap<>(); + sourcePartitions.forEach(topicPartition -> { + racksForTaskPartition.put(topicPartition, racksForSourcePartitions.get(topicPartition)); + }); + changelogPartitions.forEach(topicPartition -> { + if (racksForSourcePartitions.containsKey(topicPartition)) { + racksForTaskPartition.put(topicPartition, racksForSourcePartitions.get(topicPartition)); + } else { + racksForTaskPartition.put(topicPartition, racksForChangelogPartitions.get(topicPartition)); + } + }); + + return new DefaultTaskInfo( + taskId, + !stateStoreNames.isEmpty(), + racksForTaskPartition, + stateStoreNames, + sourcePartitions, + changelogPartitions + ); + }).collect(Collectors.toSet()); + return new ApplicationStateImpl( assignmentConfigs.toPublicAssignmentConfigs(), - statefulTasks, - statelessTasks, + logicalTasks, clientMetadataMap ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ApplicationStateImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ApplicationStateImpl.java index dbc25f79854fb..41479f756e4e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ApplicationStateImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ApplicationStateImpl.java @@ -19,11 +19,11 @@ import static java.util.Collections.unmodifiableSet; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import org.apache.kafka.streams.processor.assignment.TaskInfo; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.ClientMetadata; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ApplicationState; @@ -35,19 +35,14 @@ public class ApplicationStateImpl implements ApplicationState { private final AssignmentConfigs assignmentConfigs; - private final Set statelessTasks; - private final Set statefulTasks; - private final Set allTasks; + private final Set tasks; private final Map clientStates; public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs, - final Set statefulTasks, - final Set statelessTasks, + final Set tasks, final Map clientStates) { this.assignmentConfigs = assignmentConfigs; - this.statefulTasks = unmodifiableSet(statefulTasks); - this.statelessTasks = unmodifiableSet(statelessTasks); - this.allTasks = unmodifiableSet(computeAllTasks(statelessTasks, statefulTasks)); + this.tasks = unmodifiableSet(tasks); this.clientStates = clientStates; } @@ -67,7 +62,8 @@ public Map kafkaStreamsStates(final boolean comput clientState.previousStandbyTasks(), clientState.taskIdsByPreviousConsumer(), Optional.ofNullable(metadata.hostInfo()), - Optional.ofNullable(taskLagTotals) + Optional.ofNullable(taskLagTotals), + metadata.rackId() ); kafkaStreamsStates.put(processId, kafkaStreamsState); } @@ -81,23 +77,7 @@ public AssignmentConfigs assignmentConfigs() { } @Override - public Set allTasks() { - return allTasks; - } - - @Override - public Set statefulTasks() { - return statefulTasks; - } - - @Override - public Set statelessTasks() { - return statelessTasks; - } - - private static Set computeAllTasks(final Set statelessTasks, final Set statefulTasks) { - final Set union = new HashSet<>(statefulTasks); - union.addAll(statelessTasks); - return union; + public Set allTasks() { + return tasks; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java new file mode 100644 index 0000000000000..c0212db862af2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; + +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.TaskInfo; + +public class DefaultTaskInfo implements TaskInfo { + + private final TaskId id; + private final boolean isStateful; + private final Map> partitionToRackIds; + private final Set stateStoreNames; + private final Set sourceTopicPartitions; + private final Set changelogTopicPartitions; + + public DefaultTaskInfo(final TaskId id, + final boolean isStateful, + final Map> partitionToRackIds, + final Set stateStoreNames, + final Set sourceTopicPartitions, + final Set changelogTopicPartitions) { + this.id = id; + this.partitionToRackIds = unmodifiableMap(partitionToRackIds); + this.isStateful = isStateful; + this.stateStoreNames = unmodifiableSet(stateStoreNames); + this.sourceTopicPartitions = unmodifiableSet(sourceTopicPartitions); + this.changelogTopicPartitions = unmodifiableSet(changelogTopicPartitions); + } + + @Override + public TaskId id() { + return id; + } + + @Override + public boolean isStateful() { + return isStateful; + } + + @Override + public Set stateStoreNames() { + return stateStoreNames; + } + + @Override + public Set sourceTopicPartitions() { + return sourceTopicPartitions; + } + + @Override + public Set changelogTopicPartitions() { + return changelogTopicPartitions; + } + + @Override + public Map> partitionToRackIds() { + return partitionToRackIds; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateImpl.java index e94313e37e0d1..441a445863570 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateImpl.java @@ -48,6 +48,7 @@ public class KafkaStreamsStateImpl implements KafkaStreamsState { private final SortedMap> taskIdsByConsumer; private final Optional hostInfo; private final Optional> taskLagTotals; // contains lag for all stateful tasks in the app topology + private final Optional rackId; public KafkaStreamsStateImpl(final ProcessId processId, final int numProcessingThreads, @@ -56,7 +57,8 @@ public KafkaStreamsStateImpl(final ProcessId processId, final SortedSet previousStandbyTasks, final SortedMap> taskIdsByConsumer, final Optional hostInfo, - final Optional> taskLagTotals) { + final Optional> taskLagTotals, + final Optional rackId) { this.processId = processId; this.numProcessingThreads = numProcessingThreads; this.clientTags = unmodifiableMap(clientTags); @@ -65,6 +67,7 @@ public KafkaStreamsStateImpl(final ProcessId processId, this.taskIdsByConsumer = unmodifiableSortedMap(taskIdsByConsumer); this.hostInfo = hostInfo; this.taskLagTotals = taskLagTotals; + this.rackId = rackId; } @Override @@ -153,4 +156,9 @@ public Optional hostInfo() { public Map clientTags() { return clientTags; } + + @Override + public Optional rackId() { + return rackId; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index 0ccc4bb6a9ded..b10b51f7f15b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -85,7 +85,7 @@ int getCost(final TaskId taskId, private final Map> tasksForTopicGroup; private final AssignmentConfigs assignmentConfigs; private final Map> racksForPartition; - private final Map racksForProcess; + private final Map rackForProcess; private final InternalTopicManager internalTopicManager; private final boolean validClientRack; private final Time time; @@ -106,9 +106,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, this.internalTopicManager = internalTopicManager; this.assignmentConfigs = assignmentConfigs; this.racksForPartition = new HashMap<>(); - this.racksForProcess = new HashMap<>(); + this.rackForProcess = new HashMap<>(); this.time = Objects.requireNonNull(time, "Time was not specified"); - validClientRack = validateClientRack(racksForProcessConsumer); + validClientRack = validateClientRack(racksForProcessConsumer, assignmentConfigs, + rackForProcess + ); } public boolean validClientRack() { @@ -132,7 +134,17 @@ public synchronized boolean canEnableRackAwareAssignor() { } // Visible for testing. This method also checks if all TopicPartitions exist in cluster - public boolean populateTopicsToDescribe(final Set topicsToDescribe, final boolean changelog) { + public boolean populateTopicsToDescribe(final Set topicsToDescribe, + final boolean changelog) { + return populateTopicsToDescribe(fullMetadata, changelogPartitionsForTask, partitionsForTask, changelog, topicsToDescribe, racksForPartition); + } + + public static boolean populateTopicsToDescribe(final Cluster fullMetadata, + final Map> changelogPartitionsForTask, + final Map> partitionsForTask, + final boolean changelog, + final Set topicsToDescribe, + final Map> racksForPartition) { if (changelog) { // Changelog topics are not in metadata, we need to describe them changelogPartitionsForTask.values().stream().flatMap(Collection::stream).forEach(tp -> topicsToDescribe.add(tp.topic())); @@ -166,9 +178,25 @@ public boolean populateTopicsToDescribe(final Set topicsToDescribe, fina } private boolean validateTopicPartitionRack(final boolean changelogTopics) { + return validateTopicPartitionRack(fullMetadata, internalTopicManager, changelogPartitionsForTask, partitionsForTask, changelogTopics, racksForPartition); + } + + /** + * This function populates the {@param racksForPartition} parameter passed into the function by using both + * the {@code Cluster} metadata as well as the {@param internalTopicManager} for topics that have stale + * information. + * + * @return whether the operation successfully completed and the rack information is valid. + */ + public static boolean validateTopicPartitionRack(final Cluster fullMetadata, + final InternalTopicManager internalTopicManager, + final Map> changelogPartitionsForTask, + final Map> partitionsForTask, + final boolean changelogTopics, + final Map> racksForPartition) { // Make sure rackId exist for all TopicPartitions needed final Set topicsToDescribe = new HashSet<>(); - if (!populateTopicsToDescribe(topicsToDescribe, changelogTopics)) { + if (!populateTopicsToDescribe(fullMetadata, changelogPartitionsForTask, partitionsForTask, changelogTopics, topicsToDescribe, racksForPartition)) { return false; } @@ -208,7 +236,16 @@ private boolean validateTopicPartitionRack(final boolean changelogTopics) { return true; } - private boolean validateClientRack(final Map>> racksForProcessConsumer) { + /** + * Verifies that within the {@param racksForProcessConsumer}, a single process is only running on a single + * rack. This function mutates the parameter {@param racksForProcess} to contain the resulting process + * to rack mapping. + * + * @return true if the validation was successful, and false otherwise. + */ + public static boolean validateClientRack(final Map>> racksForProcessConsumer, + final AssignmentConfigs assignmentConfigs, + final Map rackForProcess) { if (racksForProcessConsumer == null) { return false; } @@ -250,14 +287,14 @@ private boolean validateClientRack(final Map> } return false; } - racksForProcess.put(entry.getKey(), previousRackInfo.value); + rackForProcess.put(entry.getKey(), previousRackInfo.value); } return true; } public Map racksForProcess() { - return Collections.unmodifiableMap(racksForProcess); + return Collections.unmodifiableMap(rackForProcess); } public Map> racksForPartition() { @@ -265,7 +302,7 @@ public Map> racksForPartition() { } private int getCost(final TaskId taskId, final UUID processId, final boolean inCurrentAssignment, final int trafficCost, final int nonOverlapCost, final boolean isStandby) { - final String clientRack = racksForProcess.get(processId); + final String clientRack = rackForProcess.get(processId); if (clientRack == null) { throw new IllegalStateException("Client " + processId + " doesn't have rack configured. Maybe forgot to call canEnableRackAwareAssignor first"); } @@ -428,8 +465,8 @@ public long optimizeStandbyTasks(final SortedMap clientStates for (int j = i + 1; j < clientList.size(); j++) { final ClientState clientState2 = clientStates.get(clientList.get(j)); - final String rack1 = racksForProcess.get(clientState1.processId()); - final String rack2 = racksForProcess.get(clientState2.processId()); + final String rack1 = rackForProcess.get(clientState1.processId()); + final String rack2 = rackForProcess.get(clientState2.processId()); // Cross rack traffic can not be reduced if racks are the same if (rack1.equals(rack2)) { continue; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java new file mode 100644 index 0000000000000..b3554c36b03b1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RackUtils { + + private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class); + + private RackUtils() { } + + public static Map> getRacksForTopicPartition(final Cluster cluster, + final InternalTopicManager internalTopicManager, + final Set topicPartitions, + final boolean isChangelog) { + final Set topicsToDescribe = new HashSet<>(); + if (isChangelog) { + topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect( + Collectors.toSet())); + } else { + topicsToDescribe.addAll(topicsWithMissingMetadata(cluster, topicPartitions)); + } + + final Set topicsWithUpToDateMetadata = topicPartitions.stream() + .filter(partition -> !topicsToDescribe.contains(partition.topic())) + .collect(Collectors.toSet()); + final Map> racksForTopicPartition = knownRacksForPartition( + cluster, topicsWithUpToDateMetadata); + + final Map> freshTopicPartitionInfo = + describeTopics(internalTopicManager, topicsToDescribe); + freshTopicPartitionInfo.forEach((topic, partitionInfos) -> { + for (final TopicPartitionInfo partitionInfo : partitionInfos) { + final int partition = partitionInfo.partition(); + final TopicPartition topicPartition = new TopicPartition(topic, partition); + final List replicas = partitionInfo.replicas(); + if (replicas == null || replicas.isEmpty()) { + LOG.error("No replicas found for topic partition {}: {}", topic, partition); + continue; + } + + final Set racks = replicas.stream().filter(Node::hasRack).map(Node::rack).collect( + Collectors.toSet()); + racksForTopicPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()); + racksForTopicPartition.get(topicPartition).addAll(racks); + } + }); + + return racksForTopicPartition; + } + + public static Set topicsWithMissingMetadata(final Cluster cluster, final Set topicPartitions) { + final Set topicsWithStaleMetadata = new HashSet<>(); + for (final TopicPartition topicPartition : topicPartitions) { + final PartitionInfo partitionInfo = cluster.partition(topicPartition); + if (partitionInfo == null) { + LOG.error("TopicPartition {} doesn't exist in cluster", topicPartition); + continue; + } + final Node[] replica = partitionInfo.replicas(); + if (replica == null || replica.length == 0) { + topicsWithStaleMetadata.add(topicPartition.topic()); + } + } + return topicsWithStaleMetadata; + } + + public static Map> knownRacksForPartition(final Cluster cluster, final Set topicPartitions) { + final Map> racksForPartition = new HashMap<>(); + for (final TopicPartition topicPartition : topicPartitions) { + final PartitionInfo partitionInfo = cluster.partition(topicPartition); + if (partitionInfo == null) { + LOG.error("TopicPartition {} doesn't exist in cluster", topicPartition); + continue; + } + final Node[] replicas = partitionInfo.replicas(); + if (replicas == null || replicas.length == 0) { + continue; + } + + Arrays.stream(replicas).filter(node -> !node.hasRack()).forEach(node -> { + LOG.warn("Node {} for topic partition {} doesn't have rack", node, topicPartition); + }); + final Set racks = Arrays.stream(replicas).filter(Node::hasRack) + .map(Node::rack).collect(Collectors.toSet()); + racksForPartition.put(topicPartition, racks); + } + return racksForPartition; + } + + private static Map> describeTopics(final InternalTopicManager internalTopicManager, + final Set topicsToDescribe) { + if (topicsToDescribe.isEmpty()) { + return new HashMap<>(); + } + + try { + final Map> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe); + if (topicsToDescribe.size() > topicPartitionInfo.size()) { + topicsToDescribe.removeAll(topicPartitionInfo.keySet()); + LOG.error("Failed to describe topic for {}", topicsToDescribe); + } + return topicPartitionInfo; + } catch (final Exception e) { + LOG.error("Failed to describe topics {}", topicsToDescribe, e); + return new HashMap<>(); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java index 317feccdcfcba..5a6c5a4b9cb68 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateTest.java @@ -54,7 +54,8 @@ public void shouldCorrectlyReturnTasksByLag() { mkEntry(NAMED_TASK_T0_0_0, 2000L), mkEntry(NAMED_TASK_T0_0_1, 1000L) ) - ) + ), + Optional.empty() ); assertThrows(IllegalStateException.class, () -> state.lagFor(NAMED_TASK_T0_1_0)); @@ -79,6 +80,7 @@ public void shouldThrowExceptionOnLagOperationsIfLagsWereNotComputed() { mkEntry("c1", mkSet(NAMED_TASK_T0_0_0, NAMED_TASK_T0_0_1)) )), Optional.empty(), + Optional.empty(), Optional.empty() );