-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 13854][broker] Fix call sync method in async rest api for internalSetReplicatedSubscriptionStatus #13887
[Issue 13854][broker] Fix call sync method in async rest api for internalSetReplicatedSubscriptionStatus #13887
Conversation
private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse, | ||
String subName,boolean authoritative,boolean enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse, | |
String subName,boolean authoritative,boolean enabled) { | |
private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse, | |
String subName, boolean authoritative, boolean enabled) { |
private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse, | ||
String subName,boolean authoritative,boolean enabled) { | ||
// Redirect the request to the appropriate broker if this broker is not the owner of the topic | ||
return validateTopicOwnershipAsync(topicName, authoritative).thenRun(() -> { | ||
Topic topic = getTopicReference(topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method can be async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
// 2.Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters | ||
CompletableFuture<Void> future = | ||
validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName) | ||
.thenCompose(__-> validateGlobalNamespaceOwnershipAsync(namespaceName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.thenCompose(__-> validateGlobalNamespaceOwnershipAsync(namespaceName)); | |
.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)); |
validateTopicOwnership(topicName, authoritative); | ||
|
||
private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse, | ||
String subName,boolean authoritative,boolean enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String subName,boolean authoritative,boolean enabled) { | |
String subName, boolean authoritative, boolean enabled) { |
subName); | ||
asyncResponse.resume(Response.noContent().build()); | ||
} else { | ||
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, | ||
"Cannot enable/disable replicated subscriptions on non-persistent topics")); | ||
} | ||
} catch (Exception e) { | ||
}).exceptionally(e -> { | ||
resumeAsyncResponseExceptionally(asyncResponse, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Maybe we need
e.getCause
here. - We need log some information when get exception.
408b069
to
e4eecb2
Compare
enabled, topicName, subName, ex); | ||
resumeAsyncResponseExceptionally(asyncResponse, ex); | ||
return null; | ||
}); | ||
} else { | ||
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need future.thenCompose
here. This should be executed after previous validation is finished.
@@ -4565,8 +4563,8 @@ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncRespon | |||
return null; | |||
}); | |||
} else { | |||
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, | |||
enabled); | |||
internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(asyncResponse, subName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add "Async", let's keep it consistent with other methods.
e4a9897
to
d5f4da8
Compare
d5f4da8
to
fbd70aa
Compare
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
…rnalSetReplicatedSubscriptionStatus (apache#13887)
Master Issue: #13854
Motivation
See #13854
Verifying this change
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
no-need-doc