Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17100: GlobalStreamThread#start should not busy-wait #16914

Merged
merged 5 commits into from
Aug 25, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
Expand All @@ -45,9 +44,9 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.CREATED;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
Expand All @@ -72,6 +71,7 @@ public class GlobalStreamThread extends Thread {
private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
private final CountDownLatch initializationLatch = new CountDownLatch(1);

/**
* The states that the global stream thread can be in
Expand Down Expand Up @@ -175,6 +175,12 @@ private void setState(final State newState) {
}

state = newState;

// Question: Is this a good idea? Or should we spread the latch countdown to finally blocks
// Count down the latch if transitioning from CREATED to any other state
if (oldState == State.CREATED && newState != State.CREATED) {
initializationLatch.countDown();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gharris1727 This works (all the tests pass), but I want to double-check it with you. The other alternative would be to count down in finally blocks across the class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other alternative would be to count down in finally blocks across the class.

It could make sense to have a finally block in the initialize method and call countDown in shutdown, but the current implementation appears to have reasonable semantics.

I have a weak feeling that counting down this latch is too much for the setState method to be doing, but at the same time pushing the latch management out into the calling code risks missing a (possibly future) call-site which transitions away from CREATED.

I think i would leave this to see what the Streams folks think about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @gharris1727 -- this is not the right place for this code. I am not 100% sure right now what the best place would be, but both run() or initialize() look like good candidates (maybe initialize() is better).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the countDown logic to the finally block of initialize. It sounds reasonable to me.

}

if (stateListener != null) {
Expand All @@ -194,12 +200,6 @@ public boolean inErrorState() {
}
}

public boolean stillInitializing() {
synchronized (stateLock) {
return state.equals(CREATED);
}
}

public GlobalStreamThread(final ProcessorTopology topology,
final StreamsConfig config,
final Consumer<byte[], byte[]> globalConsumer,
Expand Down Expand Up @@ -453,11 +453,15 @@ private void closeStateConsumer(final StateConsumer stateConsumer, final boolean
@Override
public synchronized void start() {
super.start();
while (stillInitializing()) {
Utils.sleep(1);
if (startupException != null) {
throw startupException;
}
try {
initializationLatch.await();
} catch (final InterruptedException e) {
currentThread().interrupt();
throw new IllegalStateException("Thread was interrupted during initialization", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a "new" exception, as the previous code would have just spun calling Utils.sleep and continuously throwing InterruptedExceptions. It's more informative than the other IllegalStateException thrown here, so this seems reasonable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalStateException("Thread was interrupted during initialization", e);
throw new IllegalStateException("GlobalStreamThread was interrupted during initialization", e);

}

if (startupException != null) {
throw startupException;
}

if (inErrorState()) {
Expand Down