Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] cluster-resource use cluster name to fetch cluster-metadata #9928

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
AuthenticationDataSource authenticationData, String subscription) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> {
pulsarResources.getNamespaceResources().getAsync(topicName.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for topic : {}", topicName);
Expand Down Expand Up @@ -236,7 +236,7 @@ private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName
AuthAction authAction) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + namespaceName.toString()).thenAccept(policies -> {
pulsarResources.getNamespaceResources().getAsync(namespaceName.toString()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
Expand Down Expand Up @@ -293,7 +293,7 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName,

final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespaceName.toString());
try {
pulsarResources.getNamespaceResources().set(policiesPath, (policies)->{
pulsarResources.getNamespaceResources().set(namespaceName.toString(), (policies)->{
policies.auth_policies.namespace_auth.put(role, actions);
return policies;
});
Expand Down Expand Up @@ -341,7 +341,7 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespace.toString());

try {
Policies policies = pulsarResources.getNamespaceResources().get(policiesPath)
Policies policies = pulsarResources.getNamespaceResources().get(namespace.toString())
.orElseThrow(() -> new NotFoundException(policiesPath + " not found"));
if (remove) {
if (policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) {
Expand All @@ -354,7 +354,7 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
} else {
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
}
pulsarResources.getNamespaceResources().set(policiesPath, (data)->policies);
pulsarResources.getNamespaceResources().set(namespace.toString(), (data)->policies);

log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, roles);
result.complete(null);
Expand Down Expand Up @@ -394,7 +394,7 @@ private boolean checkCluster(TopicName topicName) {
public CompletableFuture<Boolean> checkPermission(TopicName topicName, String role, AuthAction action) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> {
pulsarResources.getNamespaceResources().getAsync(topicName.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for topic : {}", topicName);
Expand Down Expand Up @@ -484,9 +484,10 @@ private void validatePoliciesReadOnlyAccess() {
boolean arePoliciesReadOnly = true;

try {
arePoliciesReadOnly = pulsarResources.getNamespaceResources().exists(POLICIES_READONLY_FLAG_PATH);
arePoliciesReadOnly = pulsarResources.getNamespaceResources().getStore().exists(POLICIES_READONLY_FLAG_PATH)
.get();
} catch (Exception e) {
log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e);
log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e.getCause());
throw new IllegalStateException("Unable to fetch content from global zk");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ public class ConfigurationCacheService {
private PulsarResources pulsarResources;

public static final String POLICIES = "policies";
public static final String CLUSTERS = "clusters";
public static final String FAILURE_DOMAIN = "failureDomain";
public final String CLUSTER_FAILURE_DOMAIN_ROOT;
public static final String POLICIES_ROOT = "/admin/policies";
private static final String CLUSTERS_ROOT = "/admin/clusters";
public static final String POLICIES_ROOT = "/admin/" + POLICIES;
public static final String CLUSTERS_ROOT = "/admin/" + CLUSTERS;

public static final String PARTITIONED_TOPICS_ROOT = "/admin/partitioned-topics";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ public BaseResources(MetadataStoreExtended store, TypeReference<T> typeRef, int
this.operationTimeoutSec = operationTimeoutSec;
}

/**
* Creates internal path based on resource location.
*
* @param path
* @return
*/
public String internalPath(String path) {
return path;
}

public List<String> getChildren(String path) throws MetadataStoreException {
try {
return getChildrenAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
Expand All @@ -68,7 +78,7 @@ public List<String> getChildren(String path) throws MetadataStoreException {
}

public CompletableFuture<List<String>> getChildrenAsync(String path) {
return cache.getChildren(path);
return cache.getChildren(internalPath(path));
}

public Optional<T> get(String path) throws MetadataStoreException {
Expand All @@ -83,7 +93,7 @@ public Optional<T> get(String path) throws MetadataStoreException {
}

public CompletableFuture<Optional<T>> getAsync(String path) {
return cache.get(path);
return cache.get(internalPath(path));
}

public void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
Expand All @@ -98,7 +108,7 @@ public void set(String path, Function<T, T> modifyFunction) throws MetadataStore
}

public CompletableFuture<Void> setAsync(String path, Function<T, T> modifyFunction) {
return cache.readModifyUpdate(path, modifyFunction);
return cache.readModifyUpdate(internalPath(path), modifyFunction);
}

public void setWithCreate(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
Expand All @@ -113,7 +123,7 @@ public void setWithCreate(String path, Function<Optional<T>, T> createFunction)
}

public CompletableFuture<Void> setWithCreateAsync(String path, Function<Optional<T>, T> createFunction) {
return cache.readModifyUpdateOrCreate(path, createFunction);
return cache.readModifyUpdateOrCreate(internalPath(path), createFunction);
}

public void create(String path, T data) throws MetadataStoreException {
Expand All @@ -128,7 +138,7 @@ public void create(String path, T data) throws MetadataStoreException {
}

public CompletableFuture<Void> createAsync(String path, T data) {
return cache.create(path, data);
return cache.create(internalPath(path), data);
}

public void delete(String path) throws MetadataStoreException {
Expand All @@ -143,7 +153,7 @@ public void delete(String path) throws MetadataStoreException {
}

public CompletableFuture<Void> deleteAsync(String path) {
return cache.delete(path);
return cache.delete(internalPath(path));
}

public boolean exists(String path) throws MetadataStoreException {
Expand All @@ -158,6 +168,6 @@ public boolean exists(String path) throws MetadataStoreException {
}

public CompletableFuture<Boolean> existsAsync(String path) {
return cache.exists(path);
return cache.exists(internalPath(path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
*/
package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.CLUSTERS;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.CLUSTERS_ROOT;

import java.util.HashSet;
import java.util.Set;
import lombok.Getter;

import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

public class ClusterResources extends BaseResources<ClusterData> {

public static final String CLUSTERS_ROOT = "/admin/clusters";
@Getter
private FailureDomainResources failureDomainResources;

Expand All @@ -38,6 +41,11 @@ public ClusterResources(MetadataStoreExtended store, int operationTimeoutSec) {
this.failureDomainResources = new FailureDomainResources(store, FailureDomain.class, operationTimeoutSec);
}

@Override
public String internalPath(String clusterName) {
return PolicyPath.path(CLUSTERS, clusterName);
}

public Set<String> list() throws MetadataStoreException {
return new HashSet<>(super.getChildren(CLUSTERS_ROOT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
*/
package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;

import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

Expand All @@ -36,6 +37,11 @@ public class NamespaceResources extends BaseResources<Policies> {
private PartitionedTopicResources partitionedTopicResources;
private MetadataStoreExtended configurationStore;

@Override
public String internalPath(String namespace) {
return PolicyPath.path(POLICIES, namespace);
}

public NamespaceResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,8 +778,7 @@ protected void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
String nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config);
if (!this.pulsarResources.getNamespaceResources().exists(
AdminResource.path(POLICIES) + "/" + nsName)) {
if (!this.pulsarResources.getNamespaceResources().exists(nsName)) {
LOG.info("SLA Namespace = {} doesn't exist.", nsName);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.Codec.decode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -157,8 +156,8 @@ public void validatePoliciesReadOnlyAccess() {
boolean arePoliciesReadOnly = true;

try {
arePoliciesReadOnly = pulsar().getPulsarResources().getNamespaceResources()
.exists(POLICIES_READONLY_FLAG_PATH);
arePoliciesReadOnly = pulsar().getPulsarResources().getNamespaceResources().getStore()
.exists(POLICIES_READONLY_FLAG_PATH).get();
} catch (Exception e) {
log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e);
throw new RestException(e);
Expand Down Expand Up @@ -330,8 +329,7 @@ protected void validateTopicName(String property, String cluster, String namespa
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
final String namespace = namespaceName.toString();
final String policyPath = AdminResource.path(POLICIES, namespace);
Policies policies = namespaceResources().get(policyPath)
Policies policies = namespaceResources().get(namespace)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
Expand All @@ -340,7 +338,7 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
policies.bundles = bundleData != null ? bundleData : policies.bundles;

// hydrate the namespace polices
mergeNamespaceWithDefaults(policies, namespace, policyPath);
mergeNamespaceWithDefaults(policies, namespace);

return policies;
} catch (RestException re) {
Expand All @@ -354,9 +352,8 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {

protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName namespaceName) {
final String namespace = namespaceName.toString();
final String policyPath = AdminResource.path(POLICIES, namespace);

return namespaceResources().getAsync(policyPath).thenCompose(policies -> {
return namespaceResources().getAsync(namespace).thenCompose(policies -> {
if (policies.isPresent()) {
return pulsar()
.getNamespaceService()
Expand All @@ -372,7 +369,7 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
}
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
// hydrate the namespace polices
mergeNamespaceWithDefaults(policies.get(), namespace, policyPath);
mergeNamespaceWithDefaults(policies.get(), namespace);
return CompletableFuture.completedFuture(policies.get());
});
} else {
Expand All @@ -381,7 +378,7 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
});
}

protected void mergeNamespaceWithDefaults(Policies policies, String namespace, String namespacePath) {
protected void mergeNamespaceWithDefaults(Policies policies, String namespace) {
final ServiceConfiguration config = pulsar().getConfiguration();

if (policies.max_consumers_per_subscription < 1) {
Expand Down Expand Up @@ -575,7 +572,7 @@ protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllo

protected void validateClusterExists(String cluster) {
try {
if (!clusterResources().get(path("clusters", cluster)).isPresent()) {
if (!clusterResources().get(cluster).isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
}
} catch (Exception e) {
Expand All @@ -585,7 +582,7 @@ protected void validateClusterExists(String cluster) {

protected Policies getNamespacePolicies(String property, String cluster, String namespace) {
try {
Policies policies = namespaceResources().get(AdminResource.path(POLICIES, property, cluster, namespace))
Policies policies = namespaceResources().get(NamespaceName.get(property, cluster, namespace).toString())
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
Expand All @@ -607,7 +604,7 @@ protected boolean isNamespaceReplicated(NamespaceName namespaceName) {

protected Set<String> getNamespaceReplicatedClusters(NamespaceName namespaceName) {
try {
final Policies policies = namespaceResources().get(ZkAdminPaths.namespacePoliciesPath(namespaceName))
final Policies policies = namespaceResources().get(namespaceName.toString())
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return policies.replication_clusters;
} catch (RestException re) {
Expand All @@ -624,16 +621,18 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
try {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE,
namespaceName.toString(), topicDomain.value());
List<String> topics = namespaceResources().getChildren(partitionedTopicPath);
List<String> topics = namespaceResources().getStore().getChildren(partitionedTopicPath).get();
partitionedTopics = topics.stream()
.map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
} catch (NotFoundException e) {
// NoNode means there are no partitioned topics in this domain for this namespace
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
namespaceName.toString(), e);
throw new RestException(e);
if (e instanceof ExecutionException && e.getCause() instanceof NotFoundException) {
// NoNode means there are no partitioned topics in this domain for this namespace
} else {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
namespaceName.toString(), e);
throw new RestException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better, just a suggestion
throw new RestException(e instanceof ExecutionException ? e.getCause() : e)

}
}

partitionedTopics.sort(null);
Expand Down
Loading