Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17100: GlobalStreamThread#start should not busy-wait #16914

Merged
merged 5 commits into from
Aug 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
Expand All @@ -45,9 +44,9 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.CREATED;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
Expand All @@ -72,6 +71,7 @@ public class GlobalStreamThread extends Thread {
private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
private final CountDownLatch initializationLatch = new CountDownLatch(1);

/**
* The states that the global stream thread can be in
Expand Down Expand Up @@ -194,12 +194,6 @@ public boolean inErrorState() {
}
}

public boolean stillInitializing() {
synchronized (stateLock) {
return state.equals(CREATED);
}
}

public GlobalStreamThread(final ProcessorTopology topology,
final StreamsConfig config,
final Consumer<byte[], byte[]> globalConsumer,
Expand Down Expand Up @@ -436,6 +430,8 @@ private StateConsumer initialize() {
} catch (final Exception fatalException) {
closeStateConsumer(stateConsumer, false);
startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException);
} finally {
initializationLatch.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to keep the existing semantics, another one of these is needed in shutdown.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to right now, given that start() blocks and thus shutdown() cannot be triggered as long as init() did not finish (cf https://issues.apache.org/jira/browse/KAFKA-7380)

But maybe good to add to be future proof...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... 💭 I have added the countdown to the shutdown().

}
return null;
}
Expand All @@ -453,11 +449,15 @@ private void closeStateConsumer(final StateConsumer stateConsumer, final boolean
@Override
public synchronized void start() {
super.start();
while (stillInitializing()) {
Utils.sleep(1);
if (startupException != null) {
throw startupException;
}
try {
initializationLatch.await();
} catch (final InterruptedException e) {
currentThread().interrupt();
throw new IllegalStateException("GlobalStreamThread was interrupted during initialization", e);
}

if (startupException != null) {
throw startupException;
}

if (inErrorState()) {
Expand All @@ -469,6 +469,7 @@ public void shutdown() {
// one could call shutdown() multiple times, so ignore subsequent calls
// if already shutting down or dead
setState(PENDING_SHUTDOWN);
initializationLatch.countDown();
}

public Map<MetricName, Metric> consumerMetrics() {
Expand Down
Loading