From a696b4d6f4917ccd942a7ba21ef660f351e47b01 Mon Sep 17 00:00:00 2001 From: Nick Telford Date: Tue, 12 Nov 2024 16:18:42 +0000 Subject: [PATCH] KAFKA-17978: Fix invalid topology on Task assignment (#17778) When we introduced "startup tasks" in #16922, we initialized them with no input partitions, because they aren't known until assignment. However, when we update them during assignment, it's possible that we update the topology with the incorrect source topics for some internal topics, due to a difference in the way internal topics are handled for StandbyTasks. To resolve this, we now initialize startup tasks with the correct input partitions, by calculating them from the Topology. When we assign our startup tasks, we now conditionally update their input partitions only if they've actually changed, just as we do for regular StandbyTasks. With this, the E2E tests now pass, as expected. Reviewer: Bruno Cadonna --- .../processor/internals/ProcessorStateManager.java | 4 ++-- .../streams/processor/internals/StateDirectory.java | 9 ++++++++- .../kafka/streams/processor/internals/TaskManager.java | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index f4702c469f8f3..30334abc53e1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -44,7 +44,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -233,8 +232,9 @@ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId, final LogContext logContext, final StateDirectory stateDirectory, final Map storeToChangelogTopic, + final Set sourcePartitions, final boolean stateUpdaterEnabled) { - return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, new HashSet<>(0), stateUpdaterEnabled); + return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 1423fb934e737..04a62bad1bfef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -48,6 +49,7 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -215,12 +217,17 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata, // because it's possible that the topology has changed since that data was written, and is now stateless // this therefore prevents us from creating unnecessary Tasks just because of some left-over state if (subTopology.hasStateWithChangelogs()) { + final Set inputPartitions = topologyMetadata.nodeToSourceTopics(id).values().stream() + .flatMap(Collection::stream) + .map(t -> new TopicPartition(t, id.partition())) + .collect(Collectors.toSet()); final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager( id, eosEnabled, logContext, this, subTopology.storeToChangelogTopic(), + inputPartitions, stateUpdaterEnabled ); @@ -234,7 +241,7 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata, final Task task = new StandbyTask( id, - new HashSet<>(), + inputPartitions, subTopology, topologyMetadata.taskConfig(id), streamsMetrics, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 1ca98aeba5de6..8a6e27b4c9944 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -337,7 +337,7 @@ private Map> assignStartupTasks(final Map inputPartitions = entry.getValue(); task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions); - task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(taskId)); + updateInputPartitionsOfStandbyTaskIfTheyChanged(task, inputPartitions); assignedTasks.put(task, inputPartitions); }