Skip to content

Commit

Permalink
[fix][broker] Execute the pending callbacks in order before ready for…
Browse files Browse the repository at this point in the history
… incoming requests (#23266)

(cherry picked from commit ca0fb44)
  • Loading branch information
BewareMyPower authored and lhotari committed Sep 9, 2024
1 parent 3935276 commit a828f55
Showing 1 changed file with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
private String brokerId;
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>();
private final List<Runnable> pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>();

public enum State {
Init, Started, Closing, Closed
Expand Down Expand Up @@ -944,7 +945,13 @@ public void start() throws PulsarServerException {
this.metricsGenerator = new MetricsGenerator(this);

// the broker is ready to accept incoming requests by Pulsar binary protocol and http/https
readyForIncomingRequestsFuture.complete(null);
final List<Runnable> runnables;
synchronized (pendingTasksBeforeReadyForIncomingRequests) {
runnables = new ArrayList<>(pendingTasksBeforeReadyForIncomingRequests);
pendingTasksBeforeReadyForIncomingRequests.clear();
readyForIncomingRequestsFuture.complete(null);
}
runnables.forEach(Runnable::run);

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
Expand Down Expand Up @@ -1003,7 +1010,21 @@ public void start() throws PulsarServerException {
}

public void runWhenReadyForIncomingRequests(Runnable runnable) {
readyForIncomingRequestsFuture.thenRun(runnable);
// Here we don't call the thenRun() methods because CompletableFuture maintains a stack for pending callbacks,
// not a queue. Once the future is complete, the pending callbacks will be executed in reverse order of
// when they were added.
final boolean addedToPendingTasks;
synchronized (pendingTasksBeforeReadyForIncomingRequests) {
if (readyForIncomingRequestsFuture.isDone()) {
addedToPendingTasks = false;
} else {
pendingTasksBeforeReadyForIncomingRequests.add(runnable);
addedToPendingTasks = true;
}
}
if (!addedToPendingTasks) {
runnable.run();
}
}

public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException {
Expand Down

0 comments on commit a828f55

Please sign in to comment.