-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-broker] close managed-ledgers before giving up bundle ownership to avoid bad zk-version #5599
[pulsar-broker] close managed-ledgers before giving up bundle ownership to avoid bad zk-version #5599
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1153,9 +1153,10 @@ public void checkTopicNsOwnership(final String topic) throws RuntimeException { | |
* Unload all the topic served by the broker service under the given service unit | ||
* | ||
* @param serviceUnit | ||
* @param force don't wait for clients to disconnect and forcefully close managed-ledger | ||
* @return | ||
*/ | ||
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit) { | ||
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, boolean force) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, the |
||
CompletableFuture<Integer> result = new CompletableFuture<Integer>(); | ||
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList(); | ||
topics.forEach((name, topicFuture) -> { | ||
|
@@ -1164,7 +1165,7 @@ public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit) | |
// Topic needs to be unloaded | ||
log.info("[{}] Unloading topic", topicName); | ||
closeFutures.add(topicFuture | ||
.thenCompose(t -> t.isPresent() ? t.get().close() : CompletableFuture.completedFuture(null))); | ||
.thenCompose(t -> t.isPresent() ? t.get().close(force) : CompletableFuture.completedFuture(null))); | ||
} | ||
}); | ||
CompletableFuture<Void> aggregator = FutureUtil.waitForAll(closeFutures); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,7 +110,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init | |
|
||
CompletableFuture<Void> checkReplication(); | ||
|
||
CompletableFuture<Void> close(); | ||
CompletableFuture<Void> close(boolean force); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apart from the bool vs enum discussion, why do we need 2 different behaviors? Can't we just consider "force" the only approach? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, because first we want to close topic gracefully by closing all clients first and then close managed-ledger. If things don't get closed gracefully then close topic forcefully by closing managed-ledger before giving up ownership of the bundle. So, we need both behavior. |
||
|
||
void checkGC(int gcInterval); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather have an enum here because it's not clear what false means in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
umm.. I think closing forcefully can be represent with boolean flag as we do similar thing at multiple places:
PersistentTopic::delete(boolean... flags)
Also, I was also trying to think about how to accommodate enum here instead of flag. One thing I can think of is to add below enum under
Topic
instead of flag.But I feel enum is not helping much. Instead we can rename the flag to give more meaning
closeWithoutWaitingClientDisconnect
.So, for
PersistentTopic
if flag is enabled then broker skips waiting on client-disconnect and immediately closes managed-ledger before giving up bundle ownership.And for
NonPersistentTopic
just completes the close if flag is enabled.Can you please let me know if I am missing anything while renaming flag instead making enum.? any thoughts?