Skip to content

Commit

Permalink
MINOR: prevent cleanup() from being called while Streams is still shu…
Browse files Browse the repository at this point in the history
…tting down (#10666)

Currently KafkaStreams#cleanUp only throw an IllegalStateException if the state is RUNNING or REBALANCING, however the application could be in the process of shutting down in which case StreamThreads may still be running. We should also throw if the state is PENDING_ERROR or PENDING_SHUTDOWN

Reviewers: Walker Carlson <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
ableegoldman authored May 13, 2021
1 parent 6d1ae8b commit 4153e75
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument
* @throws StreamsException if cleanup failed
*/
public void cleanUp() {
if (isRunningOrRebalancing()) {
if (!(state == State.CREATED || state == State.NOT_RUNNING || state == State.ERROR)) {
throw new IllegalStateException("Cannot clean up while running.");
}
stateDirectory.clean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public void close() {

public synchronized void clean() {
try {
cleanRemovedTasksCalledByUser();
cleanStateAndTaskDirectoriesCalledByUser();
} catch (final Exception e) {
throw new StreamsException(e);
}
Expand Down Expand Up @@ -413,43 +413,31 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
}
}

private void cleanRemovedTasksCalledByUser() throws Exception {
private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception {
if (!lockedTasksToOwner.isEmpty()) {
log.warn("Found some still-locked task directories when user requested to cleaning up the state, "
+ "since Streams is not running any more these will be ignored to complete the cleanup");
}
final AtomicReference<Exception> firstException = new AtomicReference<>();
for (final File taskDir : listAllTaskDirectories()) {
final String dirName = taskDir.getName();
final TaskId id = TaskId.parseTaskDirectoryName(dirName, null);
if (!lockedTasksToOwner.containsKey(id)) {
try {
if (lock(id)) {
log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
logPrefix(), dirName, id);
Utils.delete(taskDir);
} else {
log.warn("{} Could not get lock for state directory {} for task {} as user calling cleanup.",
logPrefix(), dirName, id);
}
} catch (final OverlappingFileLockException | IOException exception) {
log.error(
String.format("%s Failed to delete state directory %s for task %s with exception:",
logPrefix(), dirName, id),
exception
);
firstException.compareAndSet(null, exception);
} finally {
try {
unlock(id);
// for manual user call, stream threads are not running so it is safe to delete
// the whole directory
Utils.delete(taskDir);
} catch (final IOException exception) {
log.error(
String.format("%s Failed to release lock on state directory %s for task %s with exception:",
logPrefix(), dirName, id),
exception
);
firstException.compareAndSet(null, exception);
}
try {
log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
logPrefix(), dirName, id);

if (lockedTasksToOwner.containsKey(id)) {
log.warn("{} Task {} in state directory {} was still locked by {}",
logPrefix(), dirName, id, lockedTasksToOwner.get(id));
}
Utils.delete(taskDir);
} catch (final IOException exception) {
log.error(
String.format("%s Failed to delete state directory %s for task %s with exception:",
logPrefix(), dirName, id),
exception
);
firstException.compareAndSet(null, exception);
}
}
final Exception exception = firstException.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
Expand Down Expand Up @@ -96,6 +97,7 @@
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
Expand Down Expand Up @@ -235,8 +237,8 @@ private void prepareStreams() throws Exception {

EasyMock.expect(StreamThread.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes();
EasyMock.expect(streamThreadTwo.getId()).andReturn(1L).anyTimes();
EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
prepareStreamThread(streamThreadOne, 1, true);
prepareStreamThread(streamThreadTwo, 2, false);

Expand Down Expand Up @@ -298,7 +300,9 @@ private void prepareStreams() throws Exception {
);
}

private void prepareStreamThread(final StreamThread thread, final int threadId, final boolean terminable) throws Exception {
private void prepareStreamThread(final StreamThread thread,
final int threadId,
final boolean terminable) throws Exception {
final AtomicReference<StreamThread.State> state = new AtomicReference<>(StreamThread.State.CREATED);
EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes();

Expand Down Expand Up @@ -351,19 +355,21 @@ private void prepareStreamThread(final StreamThread thread, final int threadId,
producer.close();
}
state.set(StreamThread.State.DEAD);

threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
return null;
}).anyTimes();
EasyMock.expect(thread.isRunning()).andReturn(state.get() == StreamThread.State.RUNNING).anyTimes();
thread.join();
if (terminable)
if (terminable) {
EasyMock.expectLastCall().anyTimes();
else
} else {
EasyMock.expectLastCall().andAnswer(() -> {
Thread.sleep(50L);
Thread.sleep(2000L);
return null;
}).anyTimes();
}

EasyMock.expect(thread.activeTasks()).andStubReturn(emptyList());
EasyMock.expect(thread.allTasks()).andStubReturn(Collections.emptyMap());
Expand All @@ -387,7 +393,7 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In

streams.start();

TestUtils.waitForCondition(
waitForCondition(
() -> streamsStateListener.numChanges == 2,
"Streams never started.");
Assert.assertEquals(KafkaStreams.State.RUNNING, streams.state());
Expand Down Expand Up @@ -439,7 +445,7 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In

streams.close();

TestUtils.waitForCondition(
waitForCondition(
() -> streamsStateListener.numChanges == 6,
"Streams never closed.");
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
Expand All @@ -453,7 +459,7 @@ public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception
final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
streams.close();

TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.NOT_RUNNING,
"Streams never stopped.");

Expand All @@ -476,26 +482,26 @@ public void testStateThreadClose() throws Exception {
assertEquals(streams.state(), KafkaStreams.State.CREATED);

streams.start();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");

for (int i = 0; i < NUM_THREADS; i++) {
final StreamThread tmpThread = streams.threads.get(i);
tmpThread.shutdown();
TestUtils.waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
"Thread never stopped.");
streams.threads.get(i).join();
}
TestUtils.waitForCondition(
waitForCondition(
() -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
"Streams never stopped"
);
} finally {
streams.close();
}

TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.NOT_RUNNING,
"Streams never stopped.");

Expand All @@ -511,25 +517,25 @@ public void testStateGlobalThreadClose() throws Exception {

try {
streams.start();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");

final GlobalStreamThread globalStreamThread = streams.globalStreamThread;
globalStreamThread.shutdown();
TestUtils.waitForCondition(
waitForCondition(
() -> globalStreamThread.state() == GlobalStreamThread.State.DEAD,
"Thread never stopped.");
globalStreamThread.join();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.PENDING_ERROR,
"Thread never stopped."
);
} finally {
streams.close();
}

TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.ERROR,
"Thread never stopped."
);
Expand Down Expand Up @@ -568,7 +574,7 @@ public void shouldAddThreadWhenRunning() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
final int oldSize = streams.threads.size();
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
assertThat(streams.threads.size(), equalTo(oldSize + 1));
}
Expand Down Expand Up @@ -616,7 +622,7 @@ public void shouldRemoveThread() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
final int oldSize = streams.threads.size();
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
"Kafka Streams client did not reach state RUNNING");
assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1)));
assertThat(streams.threads.size(), equalTo(oldSize - 1));
Expand Down Expand Up @@ -707,7 +713,7 @@ public void shouldAllowCleanupBeforeStartAndAfterClose() {
public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");

Expand All @@ -719,6 +725,20 @@ public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
}
}

@Test
public void shouldThrowOnCleanupWhileShuttingDown() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");

streams.close(Duration.ZERO);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
assertThrows(IllegalStateException.class, streams::cleanUp);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
}

@Test
public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
Expand Down Expand Up @@ -1027,13 +1047,13 @@ public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedExc
assertEquals(streams.state(), KafkaStreams.State.CREATED);

streams.start();
TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started, state is " + streams.state());

streams.close();

TestUtils.waitForCondition(
waitForCondition(
() -> streams.state() == KafkaStreams.State.NOT_RUNNING,
"Streams never stopped.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ public void shouldSurviveWithOneTaskAsStandby() throws Exception {
// Wait for the record to be processed
assertTrue(instanceLatch.await(15, TimeUnit.SECONDS));

streamInstanceOne.close(Duration.ZERO);
streamInstanceTwo.close(Duration.ZERO);
streamInstanceOne.close();
streamInstanceTwo.close();

streamInstanceOne.cleanUp();
streamInstanceTwo.cleanUp();
Expand Down

0 comments on commit 4153e75

Please sign in to comment.