Skip to content

Commit

Permalink
MINOR: Fix flakiness in state updater unit tests (#16562)
Browse files Browse the repository at this point in the history
Unit test shouldRestoreSingleActiveStatefulTask() in DefaultStateUpdaterTest is flaky.

The flakiness comes from the fact that the state updater thread could call the first time changelogReader.allChangelogsCompleted() before it calls the first time changelogReader.completedChangelogs(). That happens, if runOnce() is run before the state updater thread reads a task from the input queue.

This commit fixes the flakiness, by making stubbing changelogReader.allChangelogsCompleted() depend on stubbing changelogReader.completedChangelogs().

Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
cadonna authored Oct 16, 2024
1 parent ff5ef83 commit 3b619db
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class DefaultStateUpdaterTest {
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
private DefaultStateUpdater stateUpdater =
private final DefaultStateUpdater stateUpdater =
new DefaultStateUpdater("test-state-updater", metrics, config, null, changelogReader, topologyMetadata, time);

@AfterEach
Expand Down Expand Up @@ -324,14 +324,15 @@ private void shouldImmediatelyAddStatelessTasksToRestoredTasks(final StreamTask.
public void shouldRestoreSingleActiveStatefulTask() throws Exception {
final StreamTask task =
statefulTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final AtomicBoolean allChangelogCompleted = new AtomicBoolean(false);
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(Set.of(TOPIC_PARTITION_A_0))
.thenReturn(Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted())
.thenReturn(false)
.thenReturn(false)
.thenReturn(true);
.thenAnswer(invocation -> {
allChangelogCompleted.set(true);
return Set.of(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0);
});
when(changelogReader.allChangelogsCompleted()).thenAnswer(invocation -> allChangelogCompleted.get());
stateUpdater.start();

stateUpdater.add(task);
Expand Down Expand Up @@ -362,7 +363,7 @@ public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
allChangelogCompleted.set(true);
return Set.of(TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0);
});
when(changelogReader.allChangelogsCompleted()).thenReturn(allChangelogCompleted.get());
when(changelogReader.allChangelogsCompleted()).thenAnswer(invocation -> allChangelogCompleted.get());
stateUpdater.start();

stateUpdater.add(task1);
Expand Down

0 comments on commit 3b619db

Please sign in to comment.