diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0b994c640a9f5..b2e67bf4883dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -298,6 +298,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final DefaultMonotonicSnapshotClock monotonicSnapshotClock; private String brokerId; private final CompletableFuture readyForIncomingRequestsFuture = new CompletableFuture<>(); + private final List pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>(); public enum State { Init, Started, Closing, Closed @@ -1023,7 +1024,13 @@ public void start() throws PulsarServerException { this.metricsGenerator = new MetricsGenerator(this); // the broker is ready to accept incoming requests by Pulsar binary protocol and http/https - readyForIncomingRequestsFuture.complete(null); + final List runnables; + synchronized (pendingTasksBeforeReadyForIncomingRequests) { + runnables = new ArrayList<>(pendingTasksBeforeReadyForIncomingRequests); + pendingTasksBeforeReadyForIncomingRequests.clear(); + readyForIncomingRequestsFuture.complete(null); + } + runnables.forEach(Runnable::run); // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, @@ -1082,7 +1089,21 @@ public void start() throws PulsarServerException { } public void runWhenReadyForIncomingRequests(Runnable runnable) { - readyForIncomingRequestsFuture.thenRun(runnable); + // Here we don't call the thenRun() methods because CompletableFuture maintains a stack for pending callbacks, + // not a queue. Once the future is complete, the pending callbacks will be executed in reverse order of + // when they were added. + final boolean addedToPendingTasks; + synchronized (pendingTasksBeforeReadyForIncomingRequests) { + if (readyForIncomingRequestsFuture.isDone()) { + addedToPendingTasks = false; + } else { + pendingTasksBeforeReadyForIncomingRequests.add(runnable); + addedToPendingTasks = true; + } + } + if (!addedToPendingTasks) { + runnable.run(); + } } public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException {