Skip to content

Commit

Permalink
[pulsar-broker] cluster-resource use cluster name to fetch cluster-me…
Browse files Browse the repository at this point in the history
…tadata
  • Loading branch information
rdhabalia committed Mar 16, 2021
1 parent 6774d6f commit c608317
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ public class ConfigurationCacheService {
private PulsarResources pulsarResources;

public static final String POLICIES = "policies";
public static final String CLUSTERS = "clusters";
public static final String FAILURE_DOMAIN = "failureDomain";
public final String CLUSTER_FAILURE_DOMAIN_ROOT;
public static final String POLICIES_ROOT = "/admin/policies";
private static final String CLUSTERS_ROOT = "/admin/clusters";
public static final String POLICIES_ROOT = "/admin/" + POLICIES;
public static final String CLUSTERS_ROOT = "/admin/" + CLUSTERS;

public static final String PARTITIONED_TOPICS_ROOT = "/admin/partitioned-topics";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
*/
package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.CLUSTERS;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.CLUSTERS_ROOT;

import java.util.HashSet;
import java.util.Set;
import lombok.Getter;

import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

public class ClusterResources extends BaseResources<ClusterData> {

public static final String CLUSTERS_ROOT = "/admin/clusters";
@Getter
private FailureDomainResources failureDomainResources;

Expand All @@ -38,6 +41,11 @@ public ClusterResources(MetadataStoreExtended store, int operationTimeoutSec) {
this.failureDomainResources = new FailureDomainResources(store, FailureDomain.class, operationTimeoutSec);
}

@Override
public String internalPath(String clusterName) {
return PolicyPath.path(CLUSTERS, clusterName);
}

public Set<String> list() throws MetadataStoreException {
return new HashSet<>(super.getChildren(CLUSTERS_ROOT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllo

protected void validateClusterExists(String cluster) {
try {
if (!clusterResources().get(path("clusters", cluster)).isPresent()) {
if (!clusterResources().get(cluster).isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -114,7 +115,7 @@ public ClusterData getCluster(
validateSuperUserAccess();

try {
return clusterResources().get(path("clusters", cluster))
return clusterResources().get(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e);
Expand Down Expand Up @@ -166,18 +167,18 @@ public void createCluster(

try {
NamedEntity.checkName(cluster);
if (clusterResources().get(path("clusters", cluster)).isPresent()) {
if (clusterResources().get(cluster).isPresent()) {
log.warn("[{}] Failed to create already existing cluster {}", clientAppId(), cluster);
throw new RestException(Status.CONFLICT, "Cluster already exists");
}
clusterResources().create(path("clusters", cluster), clusterData);
clusterResources().create(cluster, clusterData);
log.info("[{}] Created cluster {}", clientAppId(), cluster);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create cluster with invalid name {}", clientAppId(), cluster, e);
throw new RestException(Status.PRECONDITION_FAILED, "Cluster name is not valid");
} catch (Exception e) {
log.error("[{}] Failed to create cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
throw new RestException(e instanceof ExecutionException ? e.getCause() : e);
}
}

Expand Down Expand Up @@ -218,7 +219,7 @@ public void updateCluster(
validatePoliciesReadOnlyAccess();

try {
clusterResources().set(path("clusters", cluster), old -> {
clusterResources().set(cluster, old -> {
old.update(clusterData);
return old;
});
Expand Down Expand Up @@ -277,7 +278,7 @@ public void setPeerClusterNames(
throw new RestException(Status.PRECONDITION_FAILED,
cluster + " itself can't be part of peer-list");
}
clusterResources().get(path("clusters", peerCluster))
clusterResources().get(peerCluster)
.orElseThrow(() -> new RestException(Status.PRECONDITION_FAILED,
"Peer cluster " + peerCluster + " does not exist"));
} catch (RestException e) {
Expand All @@ -293,7 +294,7 @@ public void setPeerClusterNames(
}

try {
clusterResources().set(path("clusters", cluster), old -> {
clusterResources().set(cluster, old -> {
old.setPeerClusterNames(peerClusterNames);
return old;
});
Expand Down Expand Up @@ -329,7 +330,7 @@ public Set<String> getPeerCluster(
) {
validateSuperUserAccess();
try {
ClusterData clusterData = clusterResources().get(path("clusters", cluster))
ClusterData clusterData = clusterResources().get(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return clusterData.getPeerClusterNames();
} catch (Exception e) {
Expand Down Expand Up @@ -365,11 +366,11 @@ public void deleteCluster(
boolean isClusterUsed = false;
try {
for (String property : tenantResources().getChildren(path(POLICIES))) {
if (!clusterResources().exists(path(POLICIES, property, cluster))) {
if (!clusterResources().getStore().exists(path(POLICIES, property, cluster)).get()) {
continue;
}

if (!clusterResources().getChildren(path(POLICIES, property, cluster)).isEmpty()) {
if (!clusterResources().getStore().getChildren(path(POLICIES, property, cluster)).get().isEmpty()) {
// We found a property that has at least a namespace in this cluster
isClusterUsed = true;
break;
Expand All @@ -390,7 +391,7 @@ public void deleteCluster(
}
} catch (Exception e) {
log.error("[{}] Failed to get cluster usage {}", clientAppId(), cluster, e);
throw new RestException(e);
throw new RestException(e instanceof ExecutionException ? e.getCause() : e);
}

if (isClusterUsed) {
Expand All @@ -399,9 +400,8 @@ public void deleteCluster(
}

try {
String clusterPath = path("clusters", cluster);
deleteFailureDomain(clusterPath);
clusterResources().delete(clusterPath);
deleteFailureDomain(path("clusters", cluster));
clusterResources().delete(cluster);
log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
} catch (NotFoundException e) {
log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
Expand All @@ -415,17 +415,17 @@ public void deleteCluster(
private void deleteFailureDomain(String clusterPath) {
try {
String failureDomain = joinPath(clusterPath, ConfigurationCacheService.FAILURE_DOMAIN);
if (!clusterResources().exists(failureDomain)) {
if (!clusterResources().getFailureDomainResources().exists(failureDomain)) {
return;
}
for (String domain : clusterResources().getChildren(failureDomain)) {
for (String domain : clusterResources().getFailureDomainResources().getChildren(failureDomain)) {
String domainPath = joinPath(failureDomain, domain);
clusterResources().delete(domainPath);
clusterResources().getFailureDomainResources().delete(domainPath);
}
clusterResources().delete(failureDomain);
} catch (Exception e) {
log.warn("Failed to delete failure-domain under cluster {}", clusterPath);
throw new RestException(e);
throw new RestException(e instanceof ExecutionException ? e.getCause() : e);
}
}

Expand All @@ -450,7 +450,7 @@ public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(
@PathParam("cluster") String cluster
) throws Exception {
validateSuperUserAccess();
if (!clusterResources().exists(path("clusters", cluster))) {
if (!clusterResources().exists(cluster)) {
throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth
&& !policies.replication_clusters.contains(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster))
ClusterData replClusterData = clusterResources().get(replCluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist"));
URL replClusterUrl;
Expand Down Expand Up @@ -352,7 +352,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
&& !policies.replication_clusters.contains(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster))
ClusterData replClusterData = clusterResources().get(replCluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist"));
URL replClusterUrl;
Expand Down Expand Up @@ -504,7 +504,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori
&& !policies.replication_clusters.contains(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster))
ClusterData replClusterData = clusterResources().get(replCluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist"));
URL replClusterUrl;
Expand Down Expand Up @@ -577,7 +577,7 @@ protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boole
&& !policies.replication_clusters.contains(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
ClusterData replClusterData = clusterResources().get(AdminResource.path("clusters", replCluster))
ClusterData replClusterData = clusterResources().get(replCluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist"));
URL replClusterUrl;
Expand Down Expand Up @@ -1918,7 +1918,7 @@ private void unsubscribe(NamespaceName nsName, String bundleRange, String subscr
*/
private void validatePeerClusterConflict(String clusterName, Set<String> replicationClusters) {
try {
ClusterData clusterData = clusterResources().get(path("clusters", clusterName)).orElseThrow(
ClusterData clusterData = clusterResources().get(clusterName).orElseThrow(
() -> new RestException(Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName));
Set<String> peerClusters = clusterData.getPeerClusterNames();
if (peerClusters != null && !peerClusters.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.policies.data.Policies.getBundles;
import static org.hamcrest.CoreMatchers.instanceOf;

import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
Expand All @@ -29,6 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
Expand Down Expand Up @@ -104,14 +108,16 @@ public List<String> getNamespacesForCluster(@PathParam("property") String proper
throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
}
try {
for (String namespace : clusterResources().getChildren(path(POLICIES, property, cluster))) {
for (String namespace : clusterResources().getStore().getChildren(path(POLICIES, property, cluster)).get()) {
namespaces.add(String.format("%s/%s/%s", property, cluster, namespace));
}
} catch (NotFoundException e) {
// NoNode means there are no namespaces for this property on the specified cluster, returning empty list
} catch (Exception e) {
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
throw new RestException(e);
if (e instanceof ExecutionException && e.getCause() instanceof NotFoundException) {
// NoNode means there are no namespaces for this property on the specified cluster, returning empty list
} else {
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
throw new RestException(e);
}
}

namespaces.sort(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,13 +905,13 @@ public static ObjectMapper jsonMapper() {

public void validatePoliciesReadOnlyAccess() {
try {
if (clusterResources().existsAsync(AdminResource.POLICIES_READONLY_FLAG_PATH).get()) {
if (clusterResources().getStore().exists(AdminResource.POLICIES_READONLY_FLAG_PATH).get()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
throw new RestException(Status.FORBIDDEN, "Broker is forbidden to do read-write operations");
}
} catch (Exception e) {
log.warn("Unable to fetch read-only policy config {}", POLICIES_READONLY_FLAG_PATH, e);
throw new RestException(e);
throw new RestException(e instanceof ExecutionException ? e.getCause() : e);
}
}

Expand Down Expand Up @@ -978,7 +978,7 @@ protected CompletableFuture<Void> hasActiveNamespace(String tenant) {

protected void validateClusterExists(String cluster) {
try {
if (!clusterResources().get(path("clusters", cluster)).isPresent()) {
if (!clusterResources().get(cluster).isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
}
} catch (Exception e) {
Expand Down

0 comments on commit c608317

Please sign in to comment.