From 6774d6ff39f7d71bb2962ef0ec93db6a0ab52366 Mon Sep 17 00:00:00 2001 From: rajan Date: Tue, 16 Mar 2021 10:53:48 -0700 Subject: [PATCH 1/5] [pulsar-broker] Namespace-resource use namespace name to fetch policy resource --- .../broker/resources/BaseResources.java | 24 ++++++++---- .../broker/resources/NamespaceResources.java | 8 +++- .../pulsar/broker/admin/AdminResource.java | 31 ++++++++-------- .../broker/admin/impl/NamespacesBase.java | 37 ++++++++----------- .../admin/impl/PersistentTopicsBase.java | 12 +++--- .../pulsar/broker/web/PulsarWebResource.java | 4 +- 6 files changed, 63 insertions(+), 53 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 8b8346a09c8cf..12ec709f75f9a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -56,6 +56,16 @@ public BaseResources(MetadataStoreExtended store, TypeReference typeRef, int this.operationTimeoutSec = operationTimeoutSec; } + /** + * Creates internal path based on resource location. + * + * @param path + * @return + */ + public String internalPath(String path) { + return path; + } + public List getChildren(String path) throws MetadataStoreException { try { return getChildrenAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS); @@ -68,7 +78,7 @@ public List getChildren(String path) throws MetadataStoreException { } public CompletableFuture> getChildrenAsync(String path) { - return cache.getChildren(path); + return cache.getChildren(internalPath(path)); } public Optional get(String path) throws MetadataStoreException { @@ -83,7 +93,7 @@ public Optional get(String path) throws MetadataStoreException { } public CompletableFuture> getAsync(String path) { - return cache.get(path); + return cache.get(internalPath(path)); } public void set(String path, Function modifyFunction) throws MetadataStoreException { @@ -98,7 +108,7 @@ public void set(String path, Function modifyFunction) throws MetadataStore } public CompletableFuture setAsync(String path, Function modifyFunction) { - return cache.readModifyUpdate(path, modifyFunction); + return cache.readModifyUpdate(internalPath(path), modifyFunction); } public void setWithCreate(String path, Function, T> createFunction) throws MetadataStoreException { @@ -113,7 +123,7 @@ public void setWithCreate(String path, Function, T> createFunction) } public CompletableFuture setWithCreateAsync(String path, Function, T> createFunction) { - return cache.readModifyUpdateOrCreate(path, createFunction); + return cache.readModifyUpdateOrCreate(internalPath(path), createFunction); } public void create(String path, T data) throws MetadataStoreException { @@ -128,7 +138,7 @@ public void create(String path, T data) throws MetadataStoreException { } public CompletableFuture createAsync(String path, T data) { - return cache.create(path, data); + return cache.create(internalPath(path), data); } public void delete(String path) throws MetadataStoreException { @@ -143,7 +153,7 @@ public void delete(String path) throws MetadataStoreException { } public CompletableFuture deleteAsync(String path) { - return cache.delete(path); + return cache.delete(internalPath(path)); } public boolean exists(String path) throws MetadataStoreException { @@ -158,6 +168,6 @@ public boolean exists(String path) throws MetadataStoreException { } public CompletableFuture existsAsync(String path) { - return cache.exists(path); + return cache.exists(internalPath(path)); } } \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 4cb08f1e28f4b..3301bbb5686e5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -18,15 +18,16 @@ */ package org.apache.pulsar.broker.resources; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import com.fasterxml.jackson.core.type.TypeReference; import java.util.Map; import java.util.Optional; import lombok.Getter; - import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; +import org.apache.pulsar.common.policies.path.PolicyPath; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -36,6 +37,11 @@ public class NamespaceResources extends BaseResources { private PartitionedTopicResources partitionedTopicResources; private MetadataStoreExtended configurationStore; + @Override + public String internalPath(String namespace) { + return PolicyPath.path(POLICIES, namespace); + } + public NamespaceResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) { super(configurationStore, Policies.class, operationTimeoutSec); this.configurationStore = configurationStore; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index c394bbacaa1da..132d18dc75275 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.admin; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.common.util.Codec.decode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; @@ -330,8 +329,7 @@ protected void validateTopicName(String property, String cluster, String namespa protected Policies getNamespacePolicies(NamespaceName namespaceName) { try { final String namespace = namespaceName.toString(); - final String policyPath = AdminResource.path(POLICIES, namespace); - Policies policies = namespaceResources().get(policyPath) + Policies policies = namespaceResources().get(namespace) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist")); // fetch bundles from LocalZK-policies NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() @@ -340,7 +338,7 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) { policies.bundles = bundleData != null ? bundleData : policies.bundles; // hydrate the namespace polices - mergeNamespaceWithDefaults(policies, namespace, policyPath); + mergeNamespaceWithDefaults(policies, namespace); return policies; } catch (RestException re) { @@ -354,9 +352,8 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) { protected CompletableFuture getNamespacePoliciesAsync(NamespaceName namespaceName) { final String namespace = namespaceName.toString(); - final String policyPath = AdminResource.path(POLICIES, namespace); - return namespaceResources().getAsync(policyPath).thenCompose(policies -> { + return namespaceResources().getAsync(namespace).thenCompose(policies -> { if (policies.isPresent()) { return pulsar() .getNamespaceService() @@ -372,7 +369,7 @@ protected CompletableFuture getNamespacePoliciesAsync(NamespaceName na } policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles; // hydrate the namespace polices - mergeNamespaceWithDefaults(policies.get(), namespace, policyPath); + mergeNamespaceWithDefaults(policies.get(), namespace); return CompletableFuture.completedFuture(policies.get()); }); } else { @@ -381,7 +378,7 @@ protected CompletableFuture getNamespacePoliciesAsync(NamespaceName na }); } - protected void mergeNamespaceWithDefaults(Policies policies, String namespace, String namespacePath) { + protected void mergeNamespaceWithDefaults(Policies policies, String namespace) { final ServiceConfiguration config = pulsar().getConfiguration(); if (policies.max_consumers_per_subscription < 1) { @@ -585,7 +582,7 @@ protected void validateClusterExists(String cluster) { protected Policies getNamespacePolicies(String property, String cluster, String namespace) { try { - Policies policies = namespaceResources().get(AdminResource.path(POLICIES, property, cluster, namespace)) + Policies policies = namespaceResources().get(NamespaceName.get(property, cluster, namespace).toString()) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist")); // fetch bundles from LocalZK-policies NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() @@ -607,7 +604,7 @@ protected boolean isNamespaceReplicated(NamespaceName namespaceName) { protected Set getNamespaceReplicatedClusters(NamespaceName namespaceName) { try { - final Policies policies = namespaceResources().get(ZkAdminPaths.namespacePoliciesPath(namespaceName)) + final Policies policies = namespaceResources().get(namespaceName.toString()) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist")); return policies.replication_clusters; } catch (RestException re) { @@ -624,16 +621,18 @@ protected List getPartitionedTopicList(TopicDomain topicDomain) { try { String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), topicDomain.value()); - List topics = namespaceResources().getChildren(partitionedTopicPath); + List topics = namespaceResources().getStore().getChildren(partitionedTopicPath).get(); partitionedTopics = topics.stream() .map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s))) .collect(Collectors.toList()); - } catch (NotFoundException e) { - // NoNode means there are no partitioned topics in this domain for this namespace } catch (Exception e) { - log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(), - namespaceName.toString(), e); - throw new RestException(e); + if (e instanceof ExecutionException && e.getCause() instanceof NotFoundException) { + // NoNode means there are no partitioned topics in this domain for this namespace + } else { + log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(), + namespaceName.toString(), e); + throw new RestException(e); + } } partitionedTopics.sort(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d212fb50f0ebb..698b746ab5b60 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -143,7 +143,7 @@ protected void internalCreateNamespace(Policies policies) { "Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant()); } } - namespaceResources().create(path(POLICIES, namespaceName.toString()), policies); + namespaceResources().create(namespaceName.toString(), policies); log.info("[{}] Created namespace {}", clientAppId(), namespaceName); } catch (AlreadyExistsException e) { log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName); @@ -301,10 +301,10 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth try { // we have successfully removed all the ownership for the namespace, the policies znode can be deleted // now - final String globalZkPolicyPath = path(POLICIES, namespaceName.toString()); final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString()); - namespaceResources().delete(globalZkPolicyPath); + namespaceResources().delete(namespaceName.toString()); try { + //TODO getLocalPolicies().delete(lcaolZkPolicyPath); } catch (NotFoundException nne) { // If the z-node with the modified information is not there anymore, we're already good @@ -447,7 +447,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo // remove partitioned topics znode final String globalPartitionedPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString()); // check whether partitioned topics znode exist - if (namespaceResources().exists(globalPartitionedPath)) { + if (namespaceResources().getStore().exists(globalPartitionedPath).get()) { deleteRecursive(namespaceResources(), globalPartitionedPath); } @@ -464,7 +464,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo } } catch (Exception e) { log.error("[{}] Failed to remove owned namespace {} from ZK", clientAppId(), namespaceName, e); - asyncResponse.resume(new RestException(e)); + asyncResponse.resume(new RestException(e instanceof ExecutionException ? e.getCause() : e)); return null; } @@ -800,7 +800,7 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, } } // Force to read the data s.t. the watch to the cache content is setup. - namespaceResources().setAsync(path(POLICIES, namespaceName.toString()), (policies) -> { + namespaceResources().setAsync(namespaceName.toString(), (policies) -> { policies.autoTopicCreationOverride = autoTopicCreationOverride; return policies; }).thenApply(r -> { @@ -832,7 +832,7 @@ protected void internalSetAutoSubscriptionCreation( validatePoliciesReadOnlyAccess(); // Force to read the data s.t. the watch to the cache content is setup. - namespaceResources().setAsync(path(POLICIES, namespaceName.toString()), (policies) -> { + namespaceResources().setAsync(namespaceName.toString(), (policies) -> { policies.autoSubscriptionCreationOverride = autoSubscriptionCreationOverride; return policies; }).thenApply(r -> { @@ -1345,8 +1345,7 @@ protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, Backlo final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType : BacklogQuotaType.destination_storage; try { - final String path = path(POLICIES, namespaceName.toString()); - Policies policies = namespaceResources().get(path) + Policies policies = namespaceResources().get(namespaceName.toString()) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace policies does not exist")); RetentionPolicies r = policies.retention_policies; if (r != null) { @@ -1363,7 +1362,7 @@ protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, Backlo } } policies.backlog_quota_map.put(quotaType, backlogQuota); - namespaceResources().set(path, p -> policies); + namespaceResources().set(namespaceName.toString(), p -> policies); log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(), namespaceName, jsonMapper().writeValueAsString(backlogQuota)); @@ -1400,8 +1399,7 @@ protected void internalSetRetention(RetentionPolicies retention) { validatePoliciesReadOnlyAccess(); try { - final String path = path(POLICIES, namespaceName.toString()); - Policies policies = namespaceResources().get(path).orElseThrow(() -> new RestException(Status.NOT_FOUND, + Policies policies = namespaceResources().get(namespaceName.toString()).orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace policies does not exist")); if (!checkQuotas(policies, retention)) { log.warn("[{}] Failed to update retention configuration" @@ -1411,7 +1409,7 @@ protected void internalSetRetention(RetentionPolicies retention) { "Retention Quota must exceed configured backlog quota for namespace."); } policies.retention_policies = retention; - namespaceResources().set(path, p -> policies); + namespaceResources().set(namespaceName.toString(), p -> policies); log.info("[{}] Successfully updated retention configuration: namespace={}, map={}", clientAppId(), namespaceName, jsonMapper().writeValueAsString(retention)); } catch (RestException pfe) { @@ -1716,13 +1714,12 @@ protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolic protected void internalSetPolicies(String fieldName, Object value) { try { - final String path = path(POLICIES, namespaceName.toString()); - Policies policies = namespaceResources().get(path).orElseThrow(() -> new RestException(Status.NOT_FOUND, + Policies policies = namespaceResources().get(namespaceName.toString()).orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace policies does not exist")); Field field = Policies.class.getDeclaredField(fieldName); field.setAccessible(true); field.set(policies, value); - namespaceResources().set(path, p -> policies); + namespaceResources().set(namespaceName.toString(), p -> policies); log.info("[{}] Successfully updated {} configuration: namespace={}, value={}", clientAppId(), fieldName, namespaceName, jsonMapper().writeValueAsString(value)); @@ -2435,8 +2432,7 @@ protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPo validateOffloadPolicies(offloadPolicies); try { - final String path = path(POLICIES, namespaceName.toString()); - namespaceResources().setAsync(path, (policies) -> { + namespaceResources().setAsync(namespaceName.toString(), (policies) -> { if (Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) { offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms); @@ -2474,8 +2470,7 @@ protected void internalRemoveOffloadPolicies(AsyncResponse asyncResponse) { validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); try { - final String path = path(POLICIES, namespaceName.toString()); - namespaceResources().setAsync(path, (policies) -> { + namespaceResources().setAsync(namespaceName.toString(), (policies) -> { policies.offload_policies = null; return policies; }).thenApply(r -> { @@ -2549,7 +2544,7 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) { private void updatePolicies(String path, Function updateFunction) { try { // Force to read the data s.t. the watch to the cache content is setup. - namespaceResources().set(path(POLICIES, namespaceName.toString()), updateFunction); + namespaceResources().set(namespaceName.toString(), updateFunction); log.info("[{}] Successfully updated the on namespace {}", clientAppId(), path, namespaceName); } catch (NotFoundException e) { log.warn("[{}] Namespace {}: does not exist", clientAppId(), namespaceName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d1d5aa414a12a..8ff4fb58f4f90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -140,7 +140,7 @@ protected List internalGetList() { // Validate that namespace exists, throws 404 if it doesn't exist try { - if (!namespaceResources().exists(path(POLICIES, namespaceName.toString()))) { + if (!namespaceResources().exists(namespaceName.toString())) { throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); } } catch (RestException re) { @@ -174,7 +174,7 @@ protected List internalGetPartitionedTopicList() { validateAdminAccessForTenant(namespaceName.getTenant()); // Validate that namespace exists, throws 404 if it doesn't exist try { - if (!namespaceResources().exists(path(POLICIES, namespaceName.toString()))) { + if (!namespaceResources().exists(namespaceName.toString())) { log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist", clientAppId(), namespaceName); throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); @@ -195,7 +195,7 @@ protected Map> internalGetPermissionsOnTopic() { String topicUri = topicName.toString(); try { - Policies policies = namespaceResources().get(path(POLICIES, namespaceName.toString())) + Policies policies = namespaceResources().get(namespaceName.toString()) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist")); Map> permissions = Maps.newHashMap(); @@ -321,7 +321,7 @@ private void validateAdminAccessForSubscriber(String subscriptionName) { private void grantPermissions(String topicUri, String role, Set actions) { try { - namespaceResources().set(path(POLICIES, namespaceName.toString()), (policies) -> { + namespaceResources().set(namespaceName.toString(), (policies) -> { if (!policies.auth_policies.destination_auth.containsKey(topicUri)) { policies.auth_policies.destination_auth.put(topicUri, new HashMap<>()); } @@ -374,7 +374,7 @@ protected void internalDeleteTopicForcefully(boolean authoritative, boolean dele private void revokePermissions(String topicUri, String role) { Policies policies; try { - policies = namespaceResources().get(path(POLICIES, namespaceName.toString())) + policies = namespaceResources().get(namespaceName.toString()) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist")); } catch (Exception e) { log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e); @@ -2497,7 +2497,7 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) // Validate that namespace exists, throw 404 if it doesn't exist // note that we do not want to load the topic and hence skip validateAdminOperationOnTopic() try { - namespaceResources().get(path(POLICIES, namespaceName.toString())); + namespaceResources().get(namespaceName.toString()); } catch (org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) { log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName); throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 2263be0280c86..bc3baf92d383c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -943,7 +943,7 @@ protected CompletableFuture hasActiveNamespace(String tenant) { // created // with the v1 admin format (prop/cluster/ns) and then deleted, so no need to // add it to the list - namespaceResources().getAsync(path(POLICIES, namespace)).thenApply(data -> { + namespaceResources().getAsync(namespace).thenApply(data -> { if (data.isPresent()) { checkNs.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, "Tenant has active namespace")); @@ -1058,7 +1058,7 @@ protected List getListOfNamespaces(String tenant) throws Exception { // if the length is 0 then this is probably a leftover cluster from namespace created // with the v1 admin format (prop/cluster/ns) and then deleted, so no need to add it to the list try { - if (namespaceResources().get(path(POLICIES, namespace)).isPresent()) { + if (namespaceResources().get(namespace).isPresent()) { namespaces.add(namespace); } } catch (MetadataStoreException.ContentDeserializationException e) { From dc6fa5dbe4df1f84c90b933291a852b780ae61b5 Mon Sep 17 00:00:00 2001 From: rajan Date: Tue, 16 Mar 2021 15:04:29 -0700 Subject: [PATCH 2/5] fix test --- .../PulsarAuthorizationProvider.java | 17 +++++++++-------- .../org/apache/pulsar/broker/PulsarService.java | 3 +-- .../pulsar/broker/admin/AdminResource.java | 6 +++--- .../broker/admin/impl/NamespacesBase.java | 8 ++++---- .../functions/worker/PulsarWorkerService.java | 2 +- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 96757aebbd1c7..1f8d3fcb1df2c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -109,7 +109,7 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro AuthenticationDataSource authenticationData, String subscription) { CompletableFuture permissionFuture = new CompletableFuture<>(); try { - pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> { + pulsarResources.getNamespaceResources().getAsync(topicName.getNamespace()).thenAccept(policies -> { if (!policies.isPresent()) { if (log.isDebugEnabled()) { log.debug("Policies node couldn't be found for topic : {}", topicName); @@ -236,7 +236,7 @@ private CompletableFuture allowTheSpecifiedActionOpsAsync(NamespaceName AuthAction authAction) { CompletableFuture permissionFuture = new CompletableFuture<>(); try { - pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + namespaceName.toString()).thenAccept(policies -> { + pulsarResources.getNamespaceResources().getAsync(namespaceName.toString()).thenAccept(policies -> { if (!policies.isPresent()) { if (log.isDebugEnabled()) { log.debug("Policies node couldn't be found for namespace : {}", namespaceName); @@ -293,7 +293,7 @@ public CompletableFuture grantPermissionAsync(NamespaceName namespaceName, final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespaceName.toString()); try { - pulsarResources.getNamespaceResources().set(policiesPath, (policies)->{ + pulsarResources.getNamespaceResources().set(namespaceName.toString(), (policies)->{ policies.auth_policies.namespace_auth.put(role, actions); return policies; }); @@ -341,7 +341,7 @@ private CompletableFuture updateSubscriptionPermissionAsync(NamespaceName final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespace.toString()); try { - Policies policies = pulsarResources.getNamespaceResources().get(policiesPath) + Policies policies = pulsarResources.getNamespaceResources().get(namespace.toString()) .orElseThrow(() -> new NotFoundException(policiesPath + " not found")); if (remove) { if (policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) { @@ -354,7 +354,7 @@ private CompletableFuture updateSubscriptionPermissionAsync(NamespaceName } else { policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles); } - pulsarResources.getNamespaceResources().set(policiesPath, (data)->policies); + pulsarResources.getNamespaceResources().set(namespace.toString(), (data)->policies); log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, roles); result.complete(null); @@ -394,7 +394,7 @@ private boolean checkCluster(TopicName topicName) { public CompletableFuture checkPermission(TopicName topicName, String role, AuthAction action) { CompletableFuture permissionFuture = new CompletableFuture<>(); try { - pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> { + pulsarResources.getNamespaceResources().getAsync(topicName.getNamespace()).thenAccept(policies -> { if (!policies.isPresent()) { if (log.isDebugEnabled()) { log.debug("Policies node couldn't be found for topic : {}", topicName); @@ -484,9 +484,10 @@ private void validatePoliciesReadOnlyAccess() { boolean arePoliciesReadOnly = true; try { - arePoliciesReadOnly = pulsarResources.getNamespaceResources().exists(POLICIES_READONLY_FLAG_PATH); + arePoliciesReadOnly = pulsarResources.getNamespaceResources().getStore().exists(POLICIES_READONLY_FLAG_PATH) + .get(); } catch (Exception e) { - log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e); + log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e.getCause()); throw new IllegalStateException("Unable to fetch content from global zk"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 919434aaccac6..8c8b1f4ab8107 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -778,8 +778,7 @@ protected void acquireSLANamespace() { try { // Namespace not created hence no need to unload it String nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config); - if (!this.pulsarResources.getNamespaceResources().exists( - AdminResource.path(POLICIES) + "/" + nsName)) { + if (!this.pulsarResources.getNamespaceResources().exists(nsName)) { LOG.info("SLA Namespace = {} doesn't exist.", nsName); return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 132d18dc75275..570ac49607053 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -156,11 +156,11 @@ public void validatePoliciesReadOnlyAccess() { boolean arePoliciesReadOnly = true; try { - arePoliciesReadOnly = pulsar().getPulsarResources().getNamespaceResources() - .exists(POLICIES_READONLY_FLAG_PATH); + arePoliciesReadOnly = pulsar().getPulsarResources().getNamespaceResources().getStore() + .exists(POLICIES_READONLY_FLAG_PATH).get(); } catch (Exception e) { log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e); - throw new RestException(e); + throw new RestException(e instanceof ExecutionException ? e.getCause() : e); } if (arePoliciesReadOnly) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 698b746ab5b60..54c0a9bde6758 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -177,7 +177,7 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth // ensure the local cluster is the only cluster for the global namespace configuration try { - policies = namespaceResources().get(path(POLICIES, namespaceName.toString())).orElseThrow( + policies = namespaceResources().get(namespaceName.toString()).orElseThrow( () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist.")); if (namespaceName.isGlobal()) { if (policies.replication_clusters.size() > 1) { @@ -250,7 +250,7 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth // set the policies to deleted so that somebody else cannot acquire this namespace try { - namespaceResources().set(path(POLICIES, namespaceName.toString()), (old) -> { + namespaceResources().set(namespaceName.toString(), (old) -> { old.deleted = true; return old; }); @@ -340,7 +340,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo // ensure the local cluster is the only cluster for the global namespace configuration try { - policies = namespaceResources().get(path(POLICIES, namespaceName.toString())).orElseThrow( + policies = namespaceResources().get(namespaceName.toString()).orElseThrow( () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist.")); if (namespaceName.isGlobal()) { if (policies.replication_clusters.size() > 1) { @@ -391,7 +391,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo // set the policies to deleted so that somebody else cannot acquire this namespace try { - namespaceResources().set(path(POLICIES, namespaceName.toString()), (old) -> { + namespaceResources().set(namespaceName.toString(), (old) -> { old.deleted = true; return old; }); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java index af25ebb3d4a82..b06ed401ca563 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java @@ -345,7 +345,7 @@ public void initInBroker(ServiceConfiguration brokerConfig, policies.bundles = getBundles(defaultNumberOfBundles); configurationCacheService.policiesCache().invalidate(PolicyPath.path(POLICIES, namespace)); - pulsarResources.getNamespaceResources().create(PolicyPath.path(POLICIES, namespace), policies); + pulsarResources.getNamespaceResources().create(namespace, policies); LOG.info("Created namespace {} for function worker service", namespace); } catch (AlreadyExistsException e) { LOG.debug("Failed to create already existing namespace {} for function worker service", namespace); From 81e48abf88ec1f1bc37947282943ee217a41b5b2 Mon Sep 17 00:00:00 2001 From: rajan Date: Tue, 16 Mar 2021 15:08:38 -0700 Subject: [PATCH 3/5] check ExecutionException in RestException --- .../java/org/apache/pulsar/broker/admin/AdminResource.java | 2 +- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 +- .../main/java/org/apache/pulsar/broker/web/RestException.java | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 570ac49607053..7bd15a6a48ae4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -160,7 +160,7 @@ public void validatePoliciesReadOnlyAccess() { .exists(POLICIES_READONLY_FLAG_PATH).get(); } catch (Exception e) { log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e); - throw new RestException(e instanceof ExecutionException ? e.getCause() : e); + throw new RestException(e); } if (arePoliciesReadOnly) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 54c0a9bde6758..642cb594bddfb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -464,7 +464,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo } } catch (Exception e) { log.error("[{}] Failed to remove owned namespace {} from ZK", clientAppId(), namespaceName, e); - asyncResponse.resume(new RestException(e instanceof ExecutionException ? e.getCause() : e)); + asyncResponse.resume(new RestException(e)); return null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java index 0cec819c46a53..df48043b76cb0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java @@ -20,6 +20,8 @@ import java.io.PrintWriter; import java.io.StringWriter; +import java.util.concurrent.ExecutionException; + import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -54,7 +56,7 @@ public RestException(int code, String message) { } public RestException(Throwable t) { - super(getResponse(t)); + super(getResponse(t instanceof ExecutionException ? t.getCause() : t)); } public RestException(Response.Status status, Throwable t) { From 6ab1aea9526e5750c540cc600c3dc2600b70b0c4 Mon Sep 17 00:00:00 2001 From: rajan Date: Tue, 16 Mar 2021 13:31:33 -0700 Subject: [PATCH 4/5] [pulsar-broker] cluster-resource use cluster name to fetch cluster-metadata --- .../cache/ConfigurationCacheService.java | 5 ++- .../broker/resources/ClusterResources.java | 10 ++++- .../pulsar/broker/admin/AdminResource.java | 2 +- .../broker/admin/impl/ClustersBase.java | 38 +++++++++---------- .../broker/admin/impl/NamespacesBase.java | 10 ++--- .../pulsar/broker/admin/v1/Namespaces.java | 16 +++++--- .../pulsar/broker/web/PulsarWebResource.java | 6 +-- 7 files changed, 51 insertions(+), 36 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java index 33b30f2e83dcc..0d9c83c5aeeff 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java @@ -65,10 +65,11 @@ public class ConfigurationCacheService { private PulsarResources pulsarResources; public static final String POLICIES = "policies"; + public static final String CLUSTERS = "clusters"; public static final String FAILURE_DOMAIN = "failureDomain"; public final String CLUSTER_FAILURE_DOMAIN_ROOT; - public static final String POLICIES_ROOT = "/admin/policies"; - private static final String CLUSTERS_ROOT = "/admin/clusters"; + public static final String POLICIES_ROOT = "/admin/" + POLICIES; + public static final String CLUSTERS_ROOT = "/admin/" + CLUSTERS; public static final String PARTITIONED_TOPICS_ROOT = "/admin/partitioned-topics"; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index 938d667a62043..1977045a2f2d6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -18,18 +18,21 @@ */ package org.apache.pulsar.broker.resources; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.CLUSTERS; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.CLUSTERS_ROOT; + import java.util.HashSet; import java.util.Set; import lombok.Getter; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomain; +import org.apache.pulsar.common.policies.path.PolicyPath; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; public class ClusterResources extends BaseResources { - public static final String CLUSTERS_ROOT = "/admin/clusters"; @Getter private FailureDomainResources failureDomainResources; @@ -38,6 +41,11 @@ public ClusterResources(MetadataStoreExtended store, int operationTimeoutSec) { this.failureDomainResources = new FailureDomainResources(store, FailureDomain.class, operationTimeoutSec); } + @Override + public String internalPath(String clusterName) { + return PolicyPath.path(CLUSTERS, clusterName); + } + public Set list() throws MetadataStoreException { return new HashSet<>(super.getChildren(CLUSTERS_ROOT)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 7bd15a6a48ae4..116395ff96866 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -572,7 +572,7 @@ protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllo protected void validateClusterExists(String cluster) { try { - if (!clusterResources().get(path("clusters", cluster)).isPresent()) { + if (!clusterResources().get(cluster).isPresent()) { throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist."); } } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index ce0f2211918a8..6c8522c1bacfd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -35,6 +35,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.ws.rs.DELETE; @@ -114,7 +115,7 @@ public ClusterData getCluster( validateSuperUserAccess(); try { - return clusterResources().get(path("clusters", cluster)) + return clusterResources().get(cluster) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")); } catch (Exception e) { log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); @@ -166,18 +167,18 @@ public void createCluster( try { NamedEntity.checkName(cluster); - if (clusterResources().get(path("clusters", cluster)).isPresent()) { + if (clusterResources().get(cluster).isPresent()) { log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), cluster); throw new RestException(Status.CONFLICT, "Cluster already exists"); } - clusterResources().create(path("clusters", cluster), clusterData); + clusterResources().create(cluster, clusterData); log.info("[{}] Created cluster {}", clientAppId(), cluster); } catch (IllegalArgumentException e) { log.warn("[{}] Failed to create cluster with invalid name {}", clientAppId(), cluster, e); throw new RestException(Status.PRECONDITION_FAILED, "Cluster name is not valid"); } catch (Exception e) { log.error("[{}] Failed to create cluster {}", clientAppId(), cluster, e); - throw new RestException(e); + throw new RestException(e instanceof ExecutionException ? e.getCause() : e); } } @@ -218,7 +219,7 @@ public void updateCluster( validatePoliciesReadOnlyAccess(); try { - clusterResources().set(path("clusters", cluster), old -> { + clusterResources().set(cluster, old -> { old.update(clusterData); return old; }); @@ -277,7 +278,7 @@ public void setPeerClusterNames( throw new RestException(Status.PRECONDITION_FAILED, cluster + " itself can't be part of peer-list"); } - clusterResources().get(path("clusters", peerCluster)) + clusterResources().get(peerCluster) .orElseThrow(() -> new RestException(Status.PRECONDITION_FAILED, "Peer cluster " + peerCluster + " does not exist")); } catch (RestException e) { @@ -293,7 +294,7 @@ public void setPeerClusterNames( } try { - clusterResources().set(path("clusters", cluster), old -> { + clusterResources().set(cluster, old -> { old.setPeerClusterNames(peerClusterNames); return old; }); @@ -329,7 +330,7 @@ public Set getPeerCluster( ) { validateSuperUserAccess(); try { - ClusterData clusterData = clusterResources().get(path("clusters", cluster)) + ClusterData clusterData = clusterResources().get(cluster) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")); return clusterData.getPeerClusterNames(); } catch (Exception e) { @@ -365,11 +366,11 @@ public void deleteCluster( boolean isClusterUsed = false; try { for (String property : tenantResources().getChildren(path(POLICIES))) { - if (!clusterResources().exists(path(POLICIES, property, cluster))) { + if (!clusterResources().getStore().exists(path(POLICIES, property, cluster)).get()) { continue; } - if (!clusterResources().getChildren(path(POLICIES, property, cluster)).isEmpty()) { + if (!clusterResources().getStore().getChildren(path(POLICIES, property, cluster)).get().isEmpty()) { // We found a property that has at least a namespace in this cluster isClusterUsed = true; break; @@ -390,7 +391,7 @@ public void deleteCluster( } } catch (Exception e) { log.error("[{}] Failed to get cluster usage {}", clientAppId(), cluster, e); - throw new RestException(e); + throw new RestException(e instanceof ExecutionException ? e.getCause() : e); } if (isClusterUsed) { @@ -399,9 +400,8 @@ public void deleteCluster( } try { - String clusterPath = path("clusters", cluster); - deleteFailureDomain(clusterPath); - clusterResources().delete(clusterPath); + deleteFailureDomain(path("clusters", cluster)); + clusterResources().delete(cluster); log.info("[{}] Deleted cluster {}", clientAppId(), cluster); } catch (NotFoundException e) { log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster); @@ -415,17 +415,17 @@ public void deleteCluster( private void deleteFailureDomain(String clusterPath) { try { String failureDomain = joinPath(clusterPath, ConfigurationCacheService.FAILURE_DOMAIN); - if (!clusterResources().exists(failureDomain)) { + if (!clusterResources().getFailureDomainResources().exists(failureDomain)) { return; } - for (String domain : clusterResources().getChildren(failureDomain)) { + for (String domain : clusterResources().getFailureDomainResources().getChildren(failureDomain)) { String domainPath = joinPath(failureDomain, domain); - clusterResources().delete(domainPath); + clusterResources().getFailureDomainResources().delete(domainPath); } clusterResources().delete(failureDomain); } catch (Exception e) { log.warn("Failed to delete failure-domain under cluster {}", clusterPath); - throw new RestException(e); + throw new RestException(e instanceof ExecutionException ? e.getCause() : e); } } @@ -450,7 +450,7 @@ public Map getNamespaceIsolationPolicies( @PathParam("cluster") String cluster ) throws Exception { validateSuperUserAccess(); - if (!clusterResources().exists(path("clusters", cluster))) { + if (!clusterResources().exists(cluster)) { throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist."); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 642cb594bddfb..eac9adf614211 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -189,7 +189,7 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth && !policies.replication_clusters.contains(config().getClusterName())) { // the only replication cluster is other cluster, redirect String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); - ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster)) + ClusterData replClusterData = clusterResources().get(replCluster) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster " + replCluster + " does not exist")); URL replClusterUrl; @@ -352,7 +352,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo && !policies.replication_clusters.contains(config().getClusterName())) { // the only replication cluster is other cluster, redirect String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); - ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster)) + ClusterData replClusterData = clusterResources().get(replCluster) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster " + replCluster + " does not exist")); URL replClusterUrl; @@ -504,7 +504,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori && !policies.replication_clusters.contains(config().getClusterName())) { // the only replication cluster is other cluster, redirect String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); - ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster)) + ClusterData replClusterData = clusterResources().get(replCluster) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster " + replCluster + " does not exist")); URL replClusterUrl; @@ -577,7 +577,7 @@ protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boole && !policies.replication_clusters.contains(config().getClusterName())) { // the only replication cluster is other cluster, redirect String replCluster = Lists.newArrayList(policies.replication_clusters).get(0); - ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster)) + ClusterData replClusterData = clusterResources().get(replCluster) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster " + replCluster + " does not exist")); URL replClusterUrl; @@ -1918,7 +1918,7 @@ private void unsubscribe(NamespaceName nsName, String bundleRange, String subscr */ private void validatePeerClusterConflict(String clusterName, Set replicationClusters) { try { - ClusterData clusterData = clusterResources().get(path("clusters", clusterName)).orElseThrow( + ClusterData clusterData = clusterResources().get(clusterName).orElseThrow( () -> new RestException(Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName)); Set peerClusters = clusterData.getPeerClusterNames(); if (peerClusters != null && !peerClusters.isEmpty()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 783a37b08a4be..f8d1de386f12f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -20,6 +20,8 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.common.policies.data.Policies.getBundles; +import static org.hamcrest.CoreMatchers.instanceOf; + import com.google.common.collect.Lists; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -29,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; + import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -104,14 +108,16 @@ public List getNamespacesForCluster(@PathParam("property") String proper throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); } try { - for (String namespace : clusterResources().getChildren(path(POLICIES, property, cluster))) { + for (String namespace : clusterResources().getStore().getChildren(path(POLICIES, property, cluster)).get()) { namespaces.add(String.format("%s/%s/%s", property, cluster, namespace)); } - } catch (NotFoundException e) { - // NoNode means there are no namespaces for this property on the specified cluster, returning empty list } catch (Exception e) { - log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e); - throw new RestException(e); + if (e instanceof ExecutionException && e.getCause() instanceof NotFoundException) { + // NoNode means there are no namespaces for this property on the specified cluster, returning empty list + } else { + log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e); + throw new RestException(e); + } } namespaces.sort(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index bc3baf92d383c..7dbe2355562ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -905,13 +905,13 @@ public static ObjectMapper jsonMapper() { public void validatePoliciesReadOnlyAccess() { try { - if (clusterResources().existsAsync(AdminResource.POLICIES_READONLY_FLAG_PATH).get()) { + if (clusterResources().getStore().exists(AdminResource.POLICIES_READONLY_FLAG_PATH).get()) { log.debug("Policies are read-only. Broker cannot do read-write operations"); throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations"); } } catch (Exception e) { log.warn("Unable to fetch read-only policy config {}", POLICIES_READONLY_FLAG_PATH, e); - throw new RestException(e); + throw new RestException(e instanceof ExecutionException ? e.getCause() : e); } } @@ -978,7 +978,7 @@ protected CompletableFuture hasActiveNamespace(String tenant) { protected void validateClusterExists(String cluster) { try { - if (!clusterResources().get(path("clusters", cluster)).isPresent()) { + if (!clusterResources().get(cluster).isPresent()) { throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist."); } } catch (Exception e) { From cc6fe09feb972229fa538d578627fb7725c469d5 Mon Sep 17 00:00:00 2001 From: rajan Date: Tue, 16 Mar 2021 15:12:17 -0700 Subject: [PATCH 5/5] fix exception --- .../org/apache/pulsar/broker/admin/impl/ClustersBase.java | 6 +++--- .../org/apache/pulsar/broker/web/PulsarWebResource.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 6c8522c1bacfd..0ff645181d6b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -178,7 +178,7 @@ public void createCluster( throw new RestException(Status.PRECONDITION_FAILED, "Cluster name is not valid"); } catch (Exception e) { log.error("[{}] Failed to create cluster {}", clientAppId(), cluster, e); - throw new RestException(e instanceof ExecutionException ? e.getCause() : e); + throw new RestException(e); } } @@ -391,7 +391,7 @@ public void deleteCluster( } } catch (Exception e) { log.error("[{}] Failed to get cluster usage {}", clientAppId(), cluster, e); - throw new RestException(e instanceof ExecutionException ? e.getCause() : e); + throw new RestException(e); } if (isClusterUsed) { @@ -425,7 +425,7 @@ private void deleteFailureDomain(String clusterPath) { clusterResources().delete(failureDomain); } catch (Exception e) { log.warn("Failed to delete failure-domain under cluster {}", clusterPath); - throw new RestException(e instanceof ExecutionException ? e.getCause() : e); + throw new RestException(e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 7dbe2355562ba..09dc7e0a96568 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -911,7 +911,7 @@ public void validatePoliciesReadOnlyAccess() { } } catch (Exception e) { log.warn("Unable to fetch read-only policy config {}", POLICIES_READONLY_FLAG_PATH, e); - throw new RestException(e instanceof ExecutionException ? e.getCause() : e); + throw new RestException(e); } }