Skip to content

Commit

Permalink
Refactored PulsarResources to avoid building metadata paths everywhere (
Browse files Browse the repository at this point in the history
#11693)

* Refactored PulsarResources to avoid building metadata paths everywhere

* Fixed usages of ZkAdminPaths

* Removed unused import

* Fixed proxy

* Fixed ResourceGroupServiceTest

* Added MetadataStore deleteRecursive operation

* Fixed merge issues

* Use constant

* Removed unused import

* Fixes for ResourceGroupConfigListenerTest

* Fixed cluster resources cleanup

* Fixed topic list rest call

* Fixed NPE in PersistentDispatcherFailoverConsumerTest setup

* Fixed flaky test ReplicatorTest.testRemoveClusterFromNamespace

* Fixed handling notification for V1 path namespaces

* Fixed LocalPoliciesResources create

* Fixed deadlock when making blocking calls from MetadataStore callback thread

* Fixed usage of NamespaceName as key

* Fixed missing mock in BacklogQuotaManagerConfigurationTest

* Fixed RGUsageMTAggrWaitForAllMesgsTest

* fixed ServerCnxTest

* Fixed mock in PersistentSubscriptionTest

* Fixed PersistentTopicTest

* Fixed getting partitions list from NamespaceService

* Fixed tenant not found expectation

* Fixed ProxyConnection to check for existence of auth_data field
  • Loading branch information
merlimat authored Sep 16, 2021
1 parent 8042512 commit 89841aa
Show file tree
Hide file tree
Showing 60 changed files with 1,412 additions and 1,208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {

public ServiceConfiguration conf;
private PulsarResources pulsarResources;
private static final String POLICY_ROOT = "/admin/policies/";
public static final String POLICIES = "policies";
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";


public PulsarAuthorizationProvider() {
}
Expand Down Expand Up @@ -109,7 +107,8 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
AuthenticationDataSource authenticationData, String subscription) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> {
pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
.thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for topic : {}", topicName);
Expand Down Expand Up @@ -229,7 +228,7 @@ private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName
AuthAction authAction) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + namespaceName.toString()).thenAccept(policies -> {
pulsarResources.getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
Expand Down Expand Up @@ -285,9 +284,8 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName,
result.completeExceptionally(e);
}

final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespaceName.toString());
try {
pulsarResources.getNamespaceResources().set(policiesPath, (policies)->{
pulsarResources.getNamespaceResources().setPolicies(namespaceName, policies -> {
policies.auth_policies.getNamespaceAuthentication().put(role, actions);
return policies;
});
Expand All @@ -300,7 +298,7 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName,
} catch (BadVersionException e) {
log.warn("[{}] Failed to set permissions for namespace {}: concurrent modification", role, namespaceName);
result.completeExceptionally(new IllegalStateException(
"Concurrent modification on zk path: " + policiesPath + ", " + e.getMessage()));
"Concurrent modification on metadata: " + namespaceName + ", " + e.getMessage()));
} catch (Exception e) {
log.error("[{}] Failed to get permissions for namespace {}", role, namespaceName, e);
result.completeExceptionally(
Expand Down Expand Up @@ -332,11 +330,9 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
result.completeExceptionally(e);
}

final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespace.toString());

try {
Policies policies = pulsarResources.getNamespaceResources().get(policiesPath)
.orElseThrow(() -> new NotFoundException(policiesPath + " not found"));
Policies policies = pulsarResources.getNamespaceResources().getPolicies(namespace)
.orElseThrow(() -> new NotFoundException(namespace + " not found"));
if (remove) {
if (policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) != null) {
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles);
Expand All @@ -348,7 +344,7 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
} else {
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles);
}
pulsarResources.getNamespaceResources().set(policiesPath, (data)->policies);
pulsarResources.getNamespaceResources().setPolicies(namespace, (data)->policies);

log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, roles);
result.complete(null);
Expand All @@ -358,7 +354,7 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
} catch (BadVersionException e) {
log.warn("[{}] Failed to set permissions for {} on namespace {}: concurrent modification", subscriptionName, roles, namespace);
result.completeExceptionally(new IllegalStateException(
"Concurrent modification on zk path: " + policiesPath + ", " + e.getMessage()));
"Concurrent modification on metadata path: " + namespace + ", " + e.getMessage()));
} catch (Exception e) {
log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace, e);
result.completeExceptionally(
Expand Down Expand Up @@ -388,7 +384,8 @@ private boolean checkCluster(TopicName topicName) {
public CompletableFuture<Boolean> checkPermission(TopicName topicName, String role, AuthAction action) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> {
pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
.thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for topic : {}", topicName);
Expand Down Expand Up @@ -497,10 +494,10 @@ private void validatePoliciesReadOnlyAccess() {
boolean arePoliciesReadOnly = true;

try {
arePoliciesReadOnly = pulsarResources.getNamespaceResources().exists(POLICIES_READONLY_FLAG_PATH);
arePoliciesReadOnly = pulsarResources.getNamespaceResources().getPoliciesReadOnly();
} catch (Exception e) {
log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e);
throw new IllegalStateException("Unable to fetch content from global zk");
log.warn("Unable to check if policies are read-only", e);
throw new IllegalStateException("Unable to fetch content from configuration metadata store");
}

if (arePoliciesReadOnly) {
Expand Down Expand Up @@ -632,7 +629,7 @@ public CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName,
} else {
try {
TenantInfo tenantInfo = pulsarResources.getTenantResources()
.get(path(POLICIES, tenantName))
.getTenant(tenantName)
.orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
return isTenantAdmin(tenantName, role, tenantInfo, authData);
} catch (NotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,35 @@
package org.apache.pulsar.broker.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.zookeeper.common.PathUtils;

/**
* Base class for all configuration resources to access configurations from metadata-store.
*
* @param <T>
* type of configuration-resources.
*/
@Slf4j
public class BaseResources<T> {

protected static final String BASE_POLICIES_PATH = "/admin/policies";
protected static final String BASE_CLUSTERS_PATH = "/admin/clusters";

@Getter
private final MetadataStore store;
@Getter
Expand All @@ -56,7 +66,7 @@ public BaseResources(MetadataStore store, TypeReference<T> typeRef, int operatio
this.operationTimeoutSec = operationTimeoutSec;
}

public List<String> getChildren(String path) throws MetadataStoreException {
protected List<String> getChildren(String path) throws MetadataStoreException {
try {
return getChildrenAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
Expand All @@ -67,11 +77,11 @@ public List<String> getChildren(String path) throws MetadataStoreException {
}
}

public CompletableFuture<List<String>> getChildrenAsync(String path) {
protected CompletableFuture<List<String>> getChildrenAsync(String path) {
return cache.getChildren(path);
}

public Optional<T> get(String path) throws MetadataStoreException {
protected Optional<T> get(String path) throws MetadataStoreException {
try {
return getAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
Expand All @@ -82,11 +92,11 @@ public Optional<T> get(String path) throws MetadataStoreException {
}
}

public CompletableFuture<Optional<T>> getAsync(String path) {
protected CompletableFuture<Optional<T>> getAsync(String path) {
return cache.get(path);
}

public void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
protected void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
try {
setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
Expand All @@ -97,11 +107,11 @@ public void set(String path, Function<T, T> modifyFunction) throws MetadataStore
}
}

public CompletableFuture<Void> setAsync(String path, Function<T, T> modifyFunction) {
protected CompletableFuture<Void> setAsync(String path, Function<T, T> modifyFunction) {
return cache.readModifyUpdate(path, modifyFunction).thenApply(__ -> null);
}

public void setWithCreate(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
protected void setWithCreate(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
try {
setWithCreateAsync(path, createFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
Expand All @@ -112,11 +122,11 @@ public void setWithCreate(String path, Function<Optional<T>, T> createFunction)
}
}

public CompletableFuture<Void> setWithCreateAsync(String path, Function<Optional<T>, T> createFunction) {
protected CompletableFuture<Void> setWithCreateAsync(String path, Function<Optional<T>, T> createFunction) {
return cache.readModifyUpdateOrCreate(path, createFunction).thenApply(__ -> null);
}

public void create(String path, T data) throws MetadataStoreException {
protected void create(String path, T data) throws MetadataStoreException {
try {
createAsync(path, data).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
Expand All @@ -127,11 +137,11 @@ public void create(String path, T data) throws MetadataStoreException {
}
}

public CompletableFuture<Void> createAsync(String path, T data) {
protected CompletableFuture<Void> createAsync(String path, T data) {
return cache.create(path, data);
}

public void delete(String path) throws MetadataStoreException {
protected void delete(String path) throws MetadataStoreException {
try {
deleteAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
Expand All @@ -142,13 +152,13 @@ public void delete(String path) throws MetadataStoreException {
}
}

public CompletableFuture<Void> deleteAsync(String path) {
protected CompletableFuture<Void> deleteAsync(String path) {
return cache.delete(path);
}

public boolean exists(String path) throws MetadataStoreException {
protected boolean exists(String path) throws MetadataStoreException {
try {
return existsAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
return cache.exists(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -161,7 +171,43 @@ public int getOperationTimeoutSec() {
return operationTimeoutSec;
}

public CompletableFuture<Boolean> existsAsync(String path) {
return cache.exists(path);
protected static String joinPath(String... parts) {
StringBuilder sb = new StringBuilder();
Joiner.on('/').appendTo(sb, parts);
return sb.toString();
}



protected static void deleteRecursive(BaseResources resources, final String pathRoot) throws MetadataStoreException {
PathUtils.validatePath(pathRoot);
List<String> tree = listSubTreeBFS(resources, pathRoot);
log.debug("Deleting {} with size {}", tree, tree.size());
log.debug("Deleting " + tree.size() + " subnodes ");
for (int i = tree.size() - 1; i >= 0; --i) {
// Delete the leaves first and eventually get rid of the root
resources.delete(tree.get(i));
}
}

protected static List<String> listSubTreeBFS(BaseResources resources, final String pathRoot)
throws MetadataStoreException {
Deque<String> queue = new LinkedList<>();
List<String> tree = new ArrayList<>();
queue.add(pathRoot);
tree.add(pathRoot);
while (true) {
String node = queue.pollFirst();
if (node == null) {
break;
}
List<String> children = resources.getChildren(node);
for (final String child : children) {
final String childPath = node + "/" + child;
queue.add(childPath);
tree.add(childPath);
}
}
return tree;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resources;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import lombok.Getter;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;

public class BrokerResources extends BaseResources<ClusterData> {

public BrokerResources(MetadataStore store, int operationTimeoutSec) {
super(store, ClusterData.class, operationTimeoutSec);
}

public Set<String> listActiveBrokers() throws MetadataStoreException {
return new HashSet<>(super.getChildren(BASE_CLUSTERS_PATH));
}
}
Loading

0 comments on commit 89841aa

Please sign in to comment.