Skip to content

Commit

Permalink
[pulsar-broker] Namespace-resource use namespace name to fetch policy…
Browse files Browse the repository at this point in the history
… resource
  • Loading branch information
rdhabalia committed Mar 16, 2021
1 parent 960a79e commit 6774d6f
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ public BaseResources(MetadataStoreExtended store, TypeReference<T> typeRef, int
this.operationTimeoutSec = operationTimeoutSec;
}

/**
* Creates internal path based on resource location.
*
* @param path
* @return
*/
public String internalPath(String path) {
return path;
}

public List<String> getChildren(String path) throws MetadataStoreException {
try {
return getChildrenAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
Expand All @@ -68,7 +78,7 @@ public List<String> getChildren(String path) throws MetadataStoreException {
}

public CompletableFuture<List<String>> getChildrenAsync(String path) {
return cache.getChildren(path);
return cache.getChildren(internalPath(path));
}

public Optional<T> get(String path) throws MetadataStoreException {
Expand All @@ -83,7 +93,7 @@ public Optional<T> get(String path) throws MetadataStoreException {
}

public CompletableFuture<Optional<T>> getAsync(String path) {
return cache.get(path);
return cache.get(internalPath(path));
}

public void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
Expand All @@ -98,7 +108,7 @@ public void set(String path, Function<T, T> modifyFunction) throws MetadataStore
}

public CompletableFuture<Void> setAsync(String path, Function<T, T> modifyFunction) {
return cache.readModifyUpdate(path, modifyFunction);
return cache.readModifyUpdate(internalPath(path), modifyFunction);
}

public void setWithCreate(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
Expand All @@ -113,7 +123,7 @@ public void setWithCreate(String path, Function<Optional<T>, T> createFunction)
}

public CompletableFuture<Void> setWithCreateAsync(String path, Function<Optional<T>, T> createFunction) {
return cache.readModifyUpdateOrCreate(path, createFunction);
return cache.readModifyUpdateOrCreate(internalPath(path), createFunction);
}

public void create(String path, T data) throws MetadataStoreException {
Expand All @@ -128,7 +138,7 @@ public void create(String path, T data) throws MetadataStoreException {
}

public CompletableFuture<Void> createAsync(String path, T data) {
return cache.create(path, data);
return cache.create(internalPath(path), data);
}

public void delete(String path) throws MetadataStoreException {
Expand All @@ -143,7 +153,7 @@ public void delete(String path) throws MetadataStoreException {
}

public CompletableFuture<Void> deleteAsync(String path) {
return cache.delete(path);
return cache.delete(internalPath(path));
}

public boolean exists(String path) throws MetadataStoreException {
Expand All @@ -158,6 +168,6 @@ public boolean exists(String path) throws MetadataStoreException {
}

public CompletableFuture<Boolean> existsAsync(String path) {
return cache.exists(path);
return cache.exists(internalPath(path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
*/
package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;

import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

Expand All @@ -36,6 +37,11 @@ public class NamespaceResources extends BaseResources<Policies> {
private PartitionedTopicResources partitionedTopicResources;
private MetadataStoreExtended configurationStore;

@Override
public String internalPath(String namespace) {
return PolicyPath.path(POLICIES, namespace);
}

public NamespaceResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.Codec.decode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -330,8 +329,7 @@ protected void validateTopicName(String property, String cluster, String namespa
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
final String namespace = namespaceName.toString();
final String policyPath = AdminResource.path(POLICIES, namespace);
Policies policies = namespaceResources().get(policyPath)
Policies policies = namespaceResources().get(namespace)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
Expand All @@ -340,7 +338,7 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
policies.bundles = bundleData != null ? bundleData : policies.bundles;

// hydrate the namespace polices
mergeNamespaceWithDefaults(policies, namespace, policyPath);
mergeNamespaceWithDefaults(policies, namespace);

return policies;
} catch (RestException re) {
Expand All @@ -354,9 +352,8 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {

protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName namespaceName) {
final String namespace = namespaceName.toString();
final String policyPath = AdminResource.path(POLICIES, namespace);

return namespaceResources().getAsync(policyPath).thenCompose(policies -> {
return namespaceResources().getAsync(namespace).thenCompose(policies -> {
if (policies.isPresent()) {
return pulsar()
.getNamespaceService()
Expand All @@ -372,7 +369,7 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
}
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
// hydrate the namespace polices
mergeNamespaceWithDefaults(policies.get(), namespace, policyPath);
mergeNamespaceWithDefaults(policies.get(), namespace);
return CompletableFuture.completedFuture(policies.get());
});
} else {
Expand All @@ -381,7 +378,7 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
});
}

protected void mergeNamespaceWithDefaults(Policies policies, String namespace, String namespacePath) {
protected void mergeNamespaceWithDefaults(Policies policies, String namespace) {
final ServiceConfiguration config = pulsar().getConfiguration();

if (policies.max_consumers_per_subscription < 1) {
Expand Down Expand Up @@ -585,7 +582,7 @@ protected void validateClusterExists(String cluster) {

protected Policies getNamespacePolicies(String property, String cluster, String namespace) {
try {
Policies policies = namespaceResources().get(AdminResource.path(POLICIES, property, cluster, namespace))
Policies policies = namespaceResources().get(NamespaceName.get(property, cluster, namespace).toString())
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
Expand All @@ -607,7 +604,7 @@ protected boolean isNamespaceReplicated(NamespaceName namespaceName) {

protected Set<String> getNamespaceReplicatedClusters(NamespaceName namespaceName) {
try {
final Policies policies = namespaceResources().get(ZkAdminPaths.namespacePoliciesPath(namespaceName))
final Policies policies = namespaceResources().get(namespaceName.toString())
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return policies.replication_clusters;
} catch (RestException re) {
Expand All @@ -624,16 +621,18 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
try {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE,
namespaceName.toString(), topicDomain.value());
List<String> topics = namespaceResources().getChildren(partitionedTopicPath);
List<String> topics = namespaceResources().getStore().getChildren(partitionedTopicPath).get();
partitionedTopics = topics.stream()
.map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
} catch (NotFoundException e) {
// NoNode means there are no partitioned topics in this domain for this namespace
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
namespaceName.toString(), e);
throw new RestException(e);
if (e instanceof ExecutionException && e.getCause() instanceof NotFoundException) {
// NoNode means there are no partitioned topics in this domain for this namespace
} else {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
namespaceName.toString(), e);
throw new RestException(e);
}
}

partitionedTopics.sort(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected void internalCreateNamespace(Policies policies) {
"Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant());
}
}
namespaceResources().create(path(POLICIES, namespaceName.toString()), policies);
namespaceResources().create(namespaceName.toString(), policies);
log.info("[{}] Created namespace {}", clientAppId(), namespaceName);
} catch (AlreadyExistsException e) {
log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName);
Expand Down Expand Up @@ -301,10 +301,10 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth
try {
// we have successfully removed all the ownership for the namespace, the policies znode can be deleted
// now
final String globalZkPolicyPath = path(POLICIES, namespaceName.toString());
final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString());
namespaceResources().delete(globalZkPolicyPath);
namespaceResources().delete(namespaceName.toString());
try {
//TODO
getLocalPolicies().delete(lcaolZkPolicyPath);
} catch (NotFoundException nne) {
// If the z-node with the modified information is not there anymore, we're already good
Expand Down Expand Up @@ -447,7 +447,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
// remove partitioned topics znode
final String globalPartitionedPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString());
// check whether partitioned topics znode exist
if (namespaceResources().exists(globalPartitionedPath)) {
if (namespaceResources().getStore().exists(globalPartitionedPath).get()) {
deleteRecursive(namespaceResources(), globalPartitionedPath);
}

Expand All @@ -464,7 +464,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
}
} catch (Exception e) {
log.error("[{}] Failed to remove owned namespace {} from ZK", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
asyncResponse.resume(new RestException(e instanceof ExecutionException ? e.getCause() : e));
return null;
}

Expand Down Expand Up @@ -800,7 +800,7 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
}
}
// Force to read the data s.t. the watch to the cache content is setup.
namespaceResources().setAsync(path(POLICIES, namespaceName.toString()), (policies) -> {
namespaceResources().setAsync(namespaceName.toString(), (policies) -> {
policies.autoTopicCreationOverride = autoTopicCreationOverride;
return policies;
}).thenApply(r -> {
Expand Down Expand Up @@ -832,7 +832,7 @@ protected void internalSetAutoSubscriptionCreation(
validatePoliciesReadOnlyAccess();

// Force to read the data s.t. the watch to the cache content is setup.
namespaceResources().setAsync(path(POLICIES, namespaceName.toString()), (policies) -> {
namespaceResources().setAsync(namespaceName.toString(), (policies) -> {
policies.autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
return policies;
}).thenApply(r -> {
Expand Down Expand Up @@ -1345,8 +1345,7 @@ protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, Backlo
final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType
: BacklogQuotaType.destination_storage;
try {
final String path = path(POLICIES, namespaceName.toString());
Policies policies = namespaceResources().get(path)
Policies policies = namespaceResources().get(namespaceName.toString())
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace policies does not exist"));
RetentionPolicies r = policies.retention_policies;
if (r != null) {
Expand All @@ -1363,7 +1362,7 @@ protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, Backlo
}
}
policies.backlog_quota_map.put(quotaType, backlogQuota);
namespaceResources().set(path, p -> policies);
namespaceResources().set(namespaceName.toString(), p -> policies);
log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(), namespaceName,
jsonMapper().writeValueAsString(backlogQuota));

Expand Down Expand Up @@ -1400,8 +1399,7 @@ protected void internalSetRetention(RetentionPolicies retention) {
validatePoliciesReadOnlyAccess();

try {
final String path = path(POLICIES, namespaceName.toString());
Policies policies = namespaceResources().get(path).orElseThrow(() -> new RestException(Status.NOT_FOUND,
Policies policies = namespaceResources().get(namespaceName.toString()).orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace policies does not exist"));
if (!checkQuotas(policies, retention)) {
log.warn("[{}] Failed to update retention configuration"
Expand All @@ -1411,7 +1409,7 @@ protected void internalSetRetention(RetentionPolicies retention) {
"Retention Quota must exceed configured backlog quota for namespace.");
}
policies.retention_policies = retention;
namespaceResources().set(path, p -> policies);
namespaceResources().set(namespaceName.toString(), p -> policies);
log.info("[{}] Successfully updated retention configuration: namespace={}, map={}", clientAppId(),
namespaceName, jsonMapper().writeValueAsString(retention));
} catch (RestException pfe) {
Expand Down Expand Up @@ -1716,13 +1714,12 @@ protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolic

protected void internalSetPolicies(String fieldName, Object value) {
try {
final String path = path(POLICIES, namespaceName.toString());
Policies policies = namespaceResources().get(path).orElseThrow(() -> new RestException(Status.NOT_FOUND,
Policies policies = namespaceResources().get(namespaceName.toString()).orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace policies does not exist"));
Field field = Policies.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(policies, value);
namespaceResources().set(path, p -> policies);
namespaceResources().set(namespaceName.toString(), p -> policies);
log.info("[{}] Successfully updated {} configuration: namespace={}, value={}", clientAppId(), fieldName,
namespaceName, jsonMapper().writeValueAsString(value));

Expand Down Expand Up @@ -2435,8 +2432,7 @@ protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPo
validateOffloadPolicies(offloadPolicies);

try {
final String path = path(POLICIES, namespaceName.toString());
namespaceResources().setAsync(path, (policies) -> {
namespaceResources().setAsync(namespaceName.toString(), (policies) -> {
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms);
Expand Down Expand Up @@ -2474,8 +2470,7 @@ protected void internalRemoveOffloadPolicies(AsyncResponse asyncResponse) {
validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
try {
final String path = path(POLICIES, namespaceName.toString());
namespaceResources().setAsync(path, (policies) -> {
namespaceResources().setAsync(namespaceName.toString(), (policies) -> {
policies.offload_policies = null;
return policies;
}).thenApply(r -> {
Expand Down Expand Up @@ -2549,7 +2544,7 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) {
private void updatePolicies(String path, Function<Policies, Policies> updateFunction) {
try {
// Force to read the data s.t. the watch to the cache content is setup.
namespaceResources().set(path(POLICIES, namespaceName.toString()), updateFunction);
namespaceResources().set(namespaceName.toString(), updateFunction);
log.info("[{}] Successfully updated the on namespace {}", clientAppId(), path, namespaceName);
} catch (NotFoundException e) {
log.warn("[{}] Namespace {}: does not exist", clientAppId(), namespaceName);
Expand Down
Loading

0 comments on commit 6774d6f

Please sign in to comment.