diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index f3f8e8ea64536..8446c35240964 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1444,7 +1444,7 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument * @throws StreamsException if cleanup failed */ public void cleanUp() { - if (isRunningOrRebalancing()) { + if (!(state == State.CREATED || state == State.NOT_RUNNING || state == State.ERROR)) { throw new IllegalStateException("Cannot clean up while running."); } stateDirectory.clean(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 7f01264bbdf38..5f83b6a00546f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -351,7 +351,7 @@ public void close() { public synchronized void clean() { try { - cleanRemovedTasksCalledByUser(); + cleanStateAndTaskDirectoriesCalledByUser(); } catch (final Exception e) { throw new StreamsException(e); } @@ -413,43 +413,31 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { } } - private void cleanRemovedTasksCalledByUser() throws Exception { + private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception { + if (!lockedTasksToOwner.isEmpty()) { + log.warn("Found some still-locked task directories when user requested to cleaning up the state, " + + "since Streams is not running any more these will be ignored to complete the cleanup"); + } final AtomicReference firstException = new AtomicReference<>(); for (final File taskDir : listAllTaskDirectories()) { final String dirName = taskDir.getName(); final TaskId id = TaskId.parseTaskDirectoryName(dirName, null); - if (!lockedTasksToOwner.containsKey(id)) { - try { - if (lock(id)) { - log.info("{} Deleting state directory {} for task {} as user calling cleanup.", - logPrefix(), dirName, id); - Utils.delete(taskDir); - } else { - log.warn("{} Could not get lock for state directory {} for task {} as user calling cleanup.", - logPrefix(), dirName, id); - } - } catch (final OverlappingFileLockException | IOException exception) { - log.error( - String.format("%s Failed to delete state directory %s for task %s with exception:", - logPrefix(), dirName, id), - exception - ); - firstException.compareAndSet(null, exception); - } finally { - try { - unlock(id); - // for manual user call, stream threads are not running so it is safe to delete - // the whole directory - Utils.delete(taskDir); - } catch (final IOException exception) { - log.error( - String.format("%s Failed to release lock on state directory %s for task %s with exception:", - logPrefix(), dirName, id), - exception - ); - firstException.compareAndSet(null, exception); - } + try { + log.info("{} Deleting state directory {} for task {} as user calling cleanup.", + logPrefix(), dirName, id); + + if (lockedTasksToOwner.containsKey(id)) { + log.warn("{} Task {} in state directory {} was still locked by {}", + logPrefix(), dirName, id, lockedTasksToOwner.get(id)); } + Utils.delete(taskDir); + } catch (final IOException exception) { + log.error( + String.format("%s Failed to delete state directory %s for task %s with exception:", + logPrefix(), dirName, id), + exception + ); + firstException.compareAndSet(null, exception); } } final Exception exception = firstException.get(); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 63fa5e7e0c378..492a145d33ba2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.errors.StreamsNotStartedException; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; @@ -96,6 +97,7 @@ import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.easymock.EasyMock.anyInt; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; @@ -235,8 +237,8 @@ private void prepareStreams() throws Exception { EasyMock.expect(StreamThread.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes(); EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes(); - EasyMock.expect(streamThreadOne.getId()).andReturn(0L).anyTimes(); - EasyMock.expect(streamThreadTwo.getId()).andReturn(1L).anyTimes(); + EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes(); + EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes(); prepareStreamThread(streamThreadOne, 1, true); prepareStreamThread(streamThreadTwo, 2, false); @@ -298,7 +300,9 @@ private void prepareStreams() throws Exception { ); } - private void prepareStreamThread(final StreamThread thread, final int threadId, final boolean terminable) throws Exception { + private void prepareStreamThread(final StreamThread thread, + final int threadId, + final boolean terminable) throws Exception { final AtomicReference state = new AtomicReference<>(StreamThread.State.CREATED); EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes(); @@ -351,19 +355,21 @@ private void prepareStreamThread(final StreamThread thread, final int threadId, producer.close(); } state.set(StreamThread.State.DEAD); + threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING); threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN); return null; }).anyTimes(); EasyMock.expect(thread.isRunning()).andReturn(state.get() == StreamThread.State.RUNNING).anyTimes(); thread.join(); - if (terminable) + if (terminable) { EasyMock.expectLastCall().anyTimes(); - else + } else { EasyMock.expectLastCall().andAnswer(() -> { - Thread.sleep(50L); + Thread.sleep(2000L); return null; }).anyTimes(); + } EasyMock.expect(thread.activeTasks()).andStubReturn(emptyList()); EasyMock.expect(thread.allTasks()).andStubReturn(Collections.emptyMap()); @@ -387,7 +393,7 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streamsStateListener.numChanges == 2, "Streams never started."); Assert.assertEquals(KafkaStreams.State.RUNNING, streams.state()); @@ -439,7 +445,7 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In streams.close(); - TestUtils.waitForCondition( + waitForCondition( () -> streamsStateListener.numChanges == 6, "Streams never closed."); Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); @@ -453,7 +459,7 @@ public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time); streams.close(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.NOT_RUNNING, "Streams never stopped."); @@ -476,18 +482,18 @@ public void testStateThreadClose() throws Exception { assertEquals(streams.state(), KafkaStreams.State.CREATED); streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never started."); for (int i = 0; i < NUM_THREADS; i++) { final StreamThread tmpThread = streams.threads.get(i); tmpThread.shutdown(); - TestUtils.waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, + waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, "Thread never stopped."); streams.threads.get(i).join(); } - TestUtils.waitForCondition( + waitForCondition( () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), "Streams never stopped" ); @@ -495,7 +501,7 @@ public void testStateThreadClose() throws Exception { streams.close(); } - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.NOT_RUNNING, "Streams never stopped."); @@ -511,17 +517,17 @@ public void testStateGlobalThreadClose() throws Exception { try { streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never started."); final GlobalStreamThread globalStreamThread = streams.globalStreamThread; globalStreamThread.shutdown(); - TestUtils.waitForCondition( + waitForCondition( () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, "Thread never stopped."); globalStreamThread.join(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.PENDING_ERROR, "Thread never stopped." ); @@ -529,7 +535,7 @@ public void testStateGlobalThreadClose() throws Exception { streams.close(); } - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.ERROR, "Thread never stopped." ); @@ -568,7 +574,7 @@ public void shouldAddThreadWhenRunning() throws InterruptedException { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); final int oldSize = streams.threads.size(); - TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); + waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); assertThat(streams.threads.size(), equalTo(oldSize + 1)); } @@ -616,7 +622,7 @@ public void shouldRemoveThread() throws InterruptedException { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); final int oldSize = streams.threads.size(); - TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, + waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "Kafka Streams client did not reach state RUNNING"); assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1))); assertThat(streams.threads.size(), equalTo(oldSize - 1)); @@ -707,7 +713,7 @@ public void shouldAllowCleanupBeforeStartAndAfterClose() { public void shouldThrowOnCleanupWhileRunning() throws InterruptedException { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never started."); @@ -719,6 +725,20 @@ public void shouldThrowOnCleanupWhileRunning() throws InterruptedException { } } + @Test + public void shouldThrowOnCleanupWhileShuttingDown() throws InterruptedException { + final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); + streams.start(); + waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); + + streams.close(Duration.ZERO); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + assertThrows(IllegalStateException.class, streams::cleanUp); + assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); + } + @Test public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { @@ -1027,13 +1047,13 @@ public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedExc assertEquals(streams.state(), KafkaStreams.State.CREATED); streams.start(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.RUNNING, "Streams never started, state is " + streams.state()); streams.close(); - TestUtils.waitForCondition( + waitForCondition( () -> streams.state() == KafkaStreams.State.NOT_RUNNING, "Streams never stopped."); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 5ba607528faca..d762370489c51 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -170,8 +170,8 @@ public void shouldSurviveWithOneTaskAsStandby() throws Exception { // Wait for the record to be processed assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); - streamInstanceOne.close(Duration.ZERO); - streamInstanceTwo.close(Duration.ZERO); + streamInstanceOne.close(); + streamInstanceTwo.close(); streamInstanceOne.cleanUp(); streamInstanceTwo.cleanUp();