Skip to content

Commit

Permalink
KAFKA-17978: Fix invalid topology on Task assignment (#17778)
Browse files Browse the repository at this point in the history
When we introduced "startup tasks" in #16922, we initialized them with
no input partitions, because they aren't known until assignment.

However, when we update them during assignment, it's possible that we
update the topology with the incorrect source topics for some internal
topics, due to a difference in the way internal topics are handled for
StandbyTasks.

To resolve this, we now initialize startup tasks with the correct input
partitions, by calculating them from the Topology.

When we assign our startup tasks, we now conditionally update their
input partitions only if they've actually changed, just as we do for
regular StandbyTasks.

With this, the E2E tests now pass, as expected.

Reviewer: Bruno Cadonna <[email protected]>
  • Loading branch information
nicktelford authored Nov 12, 2024
1 parent 8563955 commit a696b4d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -233,8 +232,9 @@ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
final LogContext logContext,
final StateDirectory stateDirectory,
final Map<String, String> storeToChangelogTopic,
final Set<TopicPartition> sourcePartitions,
final boolean stateUpdaterEnabled) {
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, new HashSet<>(0), stateUpdaterEnabled);
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
Expand Down Expand Up @@ -48,6 +49,7 @@
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -215,12 +217,17 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
// because it's possible that the topology has changed since that data was written, and is now stateless
// this therefore prevents us from creating unnecessary Tasks just because of some left-over state
if (subTopology.hasStateWithChangelogs()) {
final Set<TopicPartition> inputPartitions = topologyMetadata.nodeToSourceTopics(id).values().stream()
.flatMap(Collection::stream)
.map(t -> new TopicPartition(t, id.partition()))
.collect(Collectors.toSet());
final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager(
id,
eosEnabled,
logContext,
this,
subTopology.storeToChangelogTopic(),
inputPartitions,
stateUpdaterEnabled
);

Expand All @@ -234,7 +241,7 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata,

final Task task = new StandbyTask(
id,
new HashSet<>(),
inputPartitions,
subTopology,
topologyMetadata.taskConfig(id),
streamsMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private Map<Task, Set<TopicPartition>> assignStartupTasks(final Map<TaskId, Set<
// replace our dummy values with the real ones, now we know our thread and assignment
final Set<TopicPartition> inputPartitions = entry.getValue();
task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions);
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(taskId));
updateInputPartitionsOfStandbyTaskIfTheyChanged(task, inputPartitions);

assignedTasks.put(task, inputPartitions);
}
Expand Down

0 comments on commit a696b4d

Please sign in to comment.