From 4153e754f1a4ebbd9a3d10be8bf75a7057c82f1d Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 13 May 2021 16:16:35 -0700 Subject: [PATCH] MINOR: prevent cleanup() from being called while Streams is still shutting down (#10666) Currently KafkaStreams#cleanUp only throw an IllegalStateException if the state is RUNNING or REBALANCING, however the application could be in the process of shutting down in which case StreamThreads may still be running. We should also throw if the state is PENDING_ERROR or PENDING_SHUTDOWN Reviewers: Walker Carlson , Guozhang Wang --- .../apache/kafka/streams/KafkaStreams.java | 2 +- .../processor/internals/StateDirectory.java | 54 ++++++---------- .../kafka/streams/KafkaStreamsTest.java | 64 ++++++++++++------- .../StandbyTaskEOSIntegrationTest.java | 4 +- 4 files changed, 66 insertions(+), 58 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index f3f8e8ea64536..8446c35240964 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1444,7 +1444,7 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument * @throws StreamsException if cleanup failed */ public void cleanUp() { - if (isRunningOrRebalancing()) { + if (!(state == State.CREATED || state == State.NOT_RUNNING || state == State.ERROR)) { throw new IllegalStateException("Cannot clean up while running."); } stateDirectory.clean(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 7f01264bbdf38..5f83b6a00546f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -351,7 +351,7 @@ public void close() { public synchronized void clean() { try { - cleanRemovedTasksCalledByUser(); + cleanStateAndTaskDirectoriesCalledByUser(); } catch (final Exception e) { throw new StreamsException(e); } @@ -413,43 +413,31 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { } } - private void cleanRemovedTasksCalledByUser() throws Exception { + private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception { + if (!lockedTasksToOwner.isEmpty()) { + log.warn("Found some still-locked task directories when user requested to cleaning up the state, " + + "since Streams is not running any more these will be ignored to complete the cleanup"); + } final AtomicReference firstException = new AtomicReference<>(); for (final File taskDir : listAllTaskDirectories()) { final String dirName = taskDir.getName(); final TaskId id = TaskId.parseTaskDirectoryName(dirName, null); - if (!lockedTasksToOwner.containsKey(id)) { - try { - if (lock(id)) { - log.info("{} Deleting state directory {} for task {} as user calling cleanup.", - logPrefix(), dirName, id); - Utils.delete(taskDir); - } else { - log.warn("{} Could not get lock for state directory {} for task {} as user calling cleanup.", - logPrefix(), dirName, id); - } - } catch (final OverlappingFileLockException | IOException exception) { - log.error( - String.format("%s Failed to delete state directory %s for task %s with exception:", - logPrefix(), dirName, id), - exception - ); - firstException.compareAndSet(null, exception); - } finally { - try { - unlock(id); - // for manual user call, stream threads are not running so it is safe to delete - // the whole directory - Utils.delete(taskDir); - } catch (final IOException exception) { - log.error( - String.format("%s Failed to release lock on state directory %s for task %s with exception:", - logPrefix(), dirName, id), - exception - ); - firstException.compareAndSet(null, exception); - } + try { + log.info("{} Deleting state directory {} for task {} as user calling cleanup.", + logPrefix(), dirName, id); + + if (lockedTasksToOwner.containsKey(id)) { + log.warn("{} Task {} in state directory {} was still locked by {}", + logPrefix(), dirName, id, lockedTasksToOwner.get(id)); } + Utils.delete(taskDir); + } catch (final IOException exception) { + log.error( + String.format("%s Failed to delete state directory %s for task %s with exception:", + logPrefix(), dirName, id), + exception + ); + firstException.compareAndSet(null, exception); } } final Exception exception = firstException.get(); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 63fa5e7e0c378..492a145d33ba2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.errors.StreamsNotStartedException; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; @@ -96,6 +97,7 @@ import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.easymock.EasyMock.anyInt; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; @@ -235,8 +237,8 @@ private void prepareStreams() throws Exception { EasyMock.expect(StreamThread.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes(); EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes(); - EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes(); - EasyMock.expect(streamThreadTwo.getId()).andReturn(1L).anyTimes(); + EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes(); + EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes(); prepareStreamThread(streamThreadOne, 1, true); prepareStreamThread(streamThreadTwo, 2, false); @@ -298,7 +300,9 @@ private void prepareStreams() throws Exception { ); } - private void prepareStreamThread(final StreamThread thread, final int threadId, final boolean terminable) throws Exception { + private void prepareStreamThread(final StreamThread thread, + final int threadId, + final boolean terminable) throws Exception { final AtomicReference state = new AtomicReference<>(StreamThread.State.CREATED); EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes(); @@ -351,19 +355,21 @@ private void prepareStreamThread(final StreamThread thread, final int threadId, producer.close(); } state.set(StreamThread.State.DEAD); + threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING); threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN); return null; }).anyTimes(); EasyMock.expect(thread.isRunning()).andReturn(state.get() == StreamThread.State.RUNNING).anyTimes(); thread.join(); - if (terminable) + if (terminable) { EasyMock.expectLastCall().anyTimes(); - else + } else { EasyMock.expectLastCall().andAnswer(() -> { - Thread.sleep(50L); + Thread.sleep(2000L); return null; }).anyTimes(); + } EasyMock.expect(thread.activeTasks()).andStubReturn(emptyList()); EasyMock.expect(thread.allTasks()).andStubReturn(Collections.emptyMap()); @@ -387,7 +393,7 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streamsStateListener.numChanges == 2, "Streams never started."); Assert.assertEquals(KafkaStreams.State.RUNNING, streams.state()); @@ -439,7 +445,7 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In streams.close(); - TestUtils.waitForCondition( + waitForCondition( () -> streamsStateListener.numChanges == 6, "Streams never closed."); Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); @@ -453,7 +459,7 @@ public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time); streams.close(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.NOT_RUNNING, "Streams never stopped."); @@ -476,18 +482,18 @@ public void testStateThreadClose() throws Exception { assertEquals(streams.state(), KafkaStreams.State.CREATED); streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never started."); for (int i = 0; i < NUM_THREADS; i++) { final StreamThread tmpThread = streams.threads.get(i); tmpThread.shutdown(); - TestUtils.waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, + waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, "Thread never stopped."); streams.threads.get(i).join(); } - TestUtils.waitForCondition( + waitForCondition( () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), "Streams never stopped" ); @@ -495,7 +501,7 @@ public void testStateThreadClose() throws Exception { streams.close(); } - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.NOT_RUNNING, "Streams never stopped."); @@ -511,17 +517,17 @@ public void testStateGlobalThreadClose() throws Exception { try { streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never started."); final GlobalStreamThread globalStreamThread = streams.globalStreamThread; globalStreamThread.shutdown(); - TestUtils.waitForCondition( + waitForCondition( () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, "Thread never stopped."); globalStreamThread.join(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.PENDING_ERROR, "Thread never stopped." ); @@ -529,7 +535,7 @@ public void testStateGlobalThreadClose() throws Exception { streams.close(); } - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.ERROR, "Thread never stopped." ); @@ -568,7 +574,7 @@ public void shouldAddThreadWhenRunning() throws InterruptedException { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); final int oldSize = streams.threads.size(); - TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); + waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); assertThat(streams.threads.size(), equalTo(oldSize + 1)); } @@ -616,7 +622,7 @@ public void shouldRemoveThread() throws InterruptedException { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); final int oldSize = streams.threads.size(); - TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, + waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "Kafka Streams client did not reach state RUNNING"); assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1))); assertThat(streams.threads.size(), equalTo(oldSize - 1)); @@ -707,7 +713,7 @@ public void shouldAllowCleanupBeforeStartAndAfterClose() { public void shouldThrowOnCleanupWhileRunning() throws InterruptedException { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never started."); @@ -719,6 +725,20 @@ public void shouldThrowOnCleanupWhileRunning() throws InterruptedException { } } + @Test + public void shouldThrowOnCleanupWhileShuttingDown() throws InterruptedException { + final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); + streams.start(); + waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); + + streams.close(Duration.ZERO); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + assertThrows(IllegalStateException.class, streams::cleanUp); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + } + @Test public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { @@ -1027,13 +1047,13 @@ public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedExc assertEquals(streams.state(), KafkaStreams.State.CREATED); streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never started, state is " + streams.state()); streams.close(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.NOT_RUNNING, "Streams never stopped."); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 5ba607528faca..d762370489c51 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -170,8 +170,8 @@ public void shouldSurviveWithOneTaskAsStandby() throws Exception { // Wait for the record to be processed assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); - streamInstanceOne.close(Duration.ZERO); - streamInstanceTwo.close(Duration.ZERO); + streamInstanceOne.close(); + streamInstanceTwo.close(); streamInstanceOne.cleanUp(); streamInstanceTwo.cleanUp();