diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index f4702c469f8f3..30334abc53e1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -44,7 +44,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -233,8 +232,9 @@ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId, final LogContext logContext, final StateDirectory stateDirectory, final Map storeToChangelogTopic, + final Set sourcePartitions, final boolean stateUpdaterEnabled) { - return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, new HashSet<>(0), stateUpdaterEnabled); + return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 1423fb934e737..04a62bad1bfef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -48,6 +49,7 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -215,12 +217,17 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata, // because it's possible that the topology has changed since that data was written, and is now stateless // this therefore prevents us from creating unnecessary Tasks just because of some left-over state if (subTopology.hasStateWithChangelogs()) { + final Set inputPartitions = topologyMetadata.nodeToSourceTopics(id).values().stream() + .flatMap(Collection::stream) + .map(t -> new TopicPartition(t, id.partition())) + .collect(Collectors.toSet()); final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager( id, eosEnabled, logContext, this, subTopology.storeToChangelogTopic(), + inputPartitions, stateUpdaterEnabled ); @@ -234,7 +241,7 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata, final Task task = new StandbyTask( id, - new HashSet<>(), + inputPartitions, subTopology, topologyMetadata.taskConfig(id), streamsMetrics, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 1ca98aeba5de6..8a6e27b4c9944 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -337,7 +337,7 @@ private Map> assignStartupTasks(final Map inputPartitions = entry.getValue(); task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions); - task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(taskId)); + updateInputPartitionsOfStandbyTaskIfTheyChanged(task, inputPartitions); assignedTasks.put(task, inputPartitions); }