From 24c1008691b3adfdca7a24a1a761146f9d9586ba Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Sat, 29 Jan 2022 11:03:36 +0800 Subject: [PATCH] [broker]make validateTopicPolicyOperation method async in PulsarWebResource --- .../pulsar/broker/web/PulsarWebResource.java | 41 ++++++++++++++----- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 4248fc6164819..e9af5658b8566 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1062,22 +1062,41 @@ protected void validateBrokerName(String broker) { } public void validateTopicPolicyOperation(TopicName topicName, PolicyName policy, PolicyOperation operation) { + try { + int timeout = pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(); + validateTopicPolicyOperationAsync(topicName, policy, operation).get(timeout, SECONDS); + } catch (InterruptedException | TimeoutException e) { + throw new RestException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof WebApplicationException){ + throw (WebApplicationException) cause; + } else { + throw new RestException(cause); + } + } + } + + public CompletableFuture validateTopicPolicyOperationAsync(TopicName topicName, + PolicyName policy, PolicyOperation operation) { if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) { if (!isClientAuthenticated(clientAppId())) { - throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request"); - } - - Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .allowTopicPolicyOperation(topicName, policy, operation, originalPrincipal(), clientAppId(), - clientAuthData()); - - if (!isAuthorized) { - throw new RestException(Status.FORBIDDEN, String.format("Unauthorized to validateTopicPolicyOperation" - + " for operation [%s] on topic [%s] on policy [%s]", operation.toString(), - topicName, policy.toString())); + return FutureUtil.failedFuture( + new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request")); } + return pulsar().getBrokerService().getAuthorizationService() + .allowTopicPolicyOperationAsync(topicName, policy, operation, originalPrincipal(), clientAppId(), + clientAuthData()).thenAccept(isAuthorized -> { + if (!isAuthorized) { + throw new RestException(Status.FORBIDDEN, + String.format("Unauthorized to validateTopicPolicyOperation" + + " for operation [%s] on topic [%s] on policy [%s]", operation.toString(), + topicName, policy.toString())); + } + }); } + return CompletableFuture.completedFuture(null); } public void validateTopicOperation(TopicName topicName, TopicOperation operation) {