Skip to content

Commit

Permalink
[broker]make validateTopicPolicyOperation method async in PulsarWebRe…
Browse files Browse the repository at this point in the history
…source
  • Loading branch information
HQebupt committed Jan 29, 2022
1 parent da9e806 commit 24c1008
Showing 1 changed file with 30 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1062,22 +1062,41 @@ protected void validateBrokerName(String broker) {
}

public void validateTopicPolicyOperation(TopicName topicName, PolicyName policy, PolicyOperation operation) {
try {
int timeout = pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds();
validateTopicPolicyOperationAsync(topicName, policy, operation).get(timeout, SECONDS);
} catch (InterruptedException | TimeoutException e) {
throw new RestException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WebApplicationException){
throw (WebApplicationException) cause;
} else {
throw new RestException(cause);
}
}
}

public CompletableFuture<Void> validateTopicPolicyOperationAsync(TopicName topicName,
PolicyName policy, PolicyOperation operation) {
if (pulsar().getConfiguration().isAuthenticationEnabled()
&& pulsar().getBrokerService().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId())) {
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}

Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
.allowTopicPolicyOperation(topicName, policy, operation, originalPrincipal(), clientAppId(),
clientAuthData());

if (!isAuthorized) {
throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateTopicPolicyOperation"
+ " for operation [%s] on topic [%s] on policy [%s]", operation.toString(),
topicName, policy.toString()));
return FutureUtil.failedFuture(
new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request"));
}
return pulsar().getBrokerService().getAuthorizationService()
.allowTopicPolicyOperationAsync(topicName, policy, operation, originalPrincipal(), clientAppId(),
clientAuthData()).thenAccept(isAuthorized -> {
if (!isAuthorized) {
throw new RestException(Status.FORBIDDEN,
String.format("Unauthorized to validateTopicPolicyOperation"
+ " for operation [%s] on topic [%s] on policy [%s]", operation.toString(),
topicName, policy.toString()));
}
});
}
return CompletableFuture.completedFuture(null);
}

public void validateTopicOperation(TopicName topicName, TopicOperation operation) {
Expand Down

0 comments on commit 24c1008

Please sign in to comment.