Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] fix how ns-isolation-policy API works for replicated namespaces #23094

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
Expand All @@ -57,6 +60,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -706,7 +710,9 @@ public void setNamespaceIsolationPolicy(
@ApiParam(value = "The namespace isolation policy name", required = true)
@PathParam("policyName") String policyName,
@ApiParam(value = "The namespace isolation policy data", required = true)
NamespaceIsolationDataImpl policyData
NamespaceIsolationDataImpl policyData,
@DefaultValue("false")
@QueryParam("unloadBundles") boolean unload
) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
Expand All @@ -721,10 +727,16 @@ public void setNamespaceIsolationPolicy(
.setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
.thenApply(__ -> new NamespaceIsolationPolicies()))
).thenCompose(nsIsolationPolicies -> {
NamespaceIsolationPolicy currentIsolationPolicy = nsIsolationPolicies.getPolicyByName(policyName);
List<String> oldNamespaceRegex = currentIsolationPolicy == null ? new ArrayList<>() :
currentIsolationPolicy.getNamespaces();
nsIsolationPolicies.setPolicy(policyName, policyData);
return namespaceIsolationPolicies()
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
}).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies())
.thenApply(policy -> oldNamespaceRegex);
}).thenCompose(oldNSRegex -> unload ? filterAndUnloadMatchedNamespaceAsync(cluster, policyData,
oldNSRegex) :
CompletableFuture.completedFuture(null))
.thenAccept(__ -> {
log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.",
clientAppId(), cluster, policyName);
Expand Down Expand Up @@ -759,21 +771,33 @@ public void setNamespaceIsolationPolicy(
* Get matched namespaces; call unload for each namespaces.
*/
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String cluster,
NamespaceIsolationDataImpl policyData) {
NamespaceIsolationDataImpl policyData,
List<String> oldNSRegex) {
PulsarAdmin adminClient;
try {
adminClient = pulsar().getAdminClient();
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
List<String> currentRegex = policyData.getNamespaces();
// (new ∩ old), namespaces to remove matching this regex
List<String> commonRegex = currentRegex.stream().distinct().filter(oldNSRegex::contains).toList();
// compile regex patterns once
List<Pattern> namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList();
List<Pattern> excludeNamespacePatterns = commonRegex.stream().map(Pattern::compile).toList();
// (new ⋃ old), namespaces to keep matching this regex
List<Pattern> namespacePatterns =
Stream.concat(currentRegex.stream(), oldNSRegex.stream()).map(Pattern::compile).toList();
return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
List<CompletableFuture<List<String>>> filteredNamespacesForEachTenant = tenants.stream()
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
List<CompletableFuture<String>> namespaceNamesInCluster = namespaces.stream()
// filter namespaces using ns regex from current policy
.filter(namespaceName -> namespacePatterns.stream()
.anyMatch(pattern -> pattern.matcher(namespaceName).matches()))
// remove namespaces using old ns regex
// only unloads namespaces matching delta of policy, (new ⋃ old) - (new ∩ old)
.filter(((Predicate<String>) (namespaceName -> excludeNamespacePatterns.stream()
.anyMatch(pattern -> pattern.matcher(namespaceName).matches()))).negate())
.map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName)
.thenApply(policies -> policies.replication_clusters.contains(cluster)
? namespaceName : null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -48,6 +49,11 @@ public class AdminApiNamespaceIsolationMultiBrokersTest extends MultiBrokerBaseT
PulsarAdmin localAdmin;
PulsarAdmin remoteAdmin;

@Override
protected int numberOfAdditionalBrokers() {
return 4;
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
Expand All @@ -69,23 +75,39 @@ public void setupClusters() throws Exception {
.createCluster("cluster-1", ClusterData.builder().serviceUrl(localBrokerWebService).build());
remoteAdmin.clusters()
.createCluster("cluster-2", ClusterData.builder().serviceUrl(remoteBrokerWebService).build());
setupForTenant("A");
setupForTenant("B");
}

private void setupForTenant(String prefix) throws PulsarAdminException {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of(""), Set.of("test", "cluster-1", "cluster-2"));
localAdmin.tenants().createTenant("prop-ig", tenantInfo);
localAdmin.namespaces().createNamespace("prop-ig/ns1", Set.of("test", "cluster-1"));
localAdmin.tenants().createTenant(prefix+"prop-ig", tenantInfo);
localAdmin.namespaces().createNamespace(prefix+"prop-ig/ns1", Set.of("test", "cluster-1"));
localAdmin.namespaces().createNamespace(prefix+"prop-ig/n1", Set.of("test", "cluster-1"));
localAdmin.topics().createNonPartitionedTopic(prefix+"prop-ig/ns1/t1");
}

public void testNamespaceIsolationPolicyForReplNSWithUnload() throws Exception {
testNamespaceIsolationPolicyForReplNS("A", "policy-1", true);
}

public void testNamespaceIsolationPolicyForReplNSWithoutUnload() throws Exception {
testNamespaceIsolationPolicyForReplNS("B", "policy-2", false);
}

public void testNamespaceIsolationPolicyForReplNS() throws Exception {
private void testNamespaceIsolationPolicyForReplNS(String prefix, String policyName1, boolean unload) throws Exception {

// Verify that namespace is not present in cluster-2.
Set<String> replicationClusters = localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters;
// Verify that namespaces are not present in cluster-2.
Set<String> replicationClusters = localAdmin.namespaces().getPolicies(prefix+"prop-ig/ns1").replication_clusters;
Assert.assertFalse(replicationClusters.contains("cluster-2"));
replicationClusters = localAdmin.namespaces().getPolicies(prefix+"prop-ig/n1").replication_clusters;
Assert.assertFalse(replicationClusters.contains("cluster-2"));

// setup ns-isolation-policy in both the clusters.
String policyName1 = "policy-1";
Map<String, String> parameters1 = new HashMap<>();
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
List<String> nsRegexList = new ArrayList<>(Arrays.asList("prop-ig/.*"));
List<String> nsRegexList = new ArrayList<>(Arrays.asList(prefix+"prop-ig/ns1.*"));

NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
// "prop-ig/ns1" is present in test cluster, policy set on test2 should work
Expand All @@ -98,17 +120,33 @@ public void testNamespaceIsolationPolicyForReplNS() throws Exception {
.build())
.build();

localAdmin.clusters().createNamespaceIsolationPolicy("test", policyName1, nsPolicyData1);
// 1. Create policy should work in local cluster
localAdmin.clusters().createNamespaceIsolationPolicy("test", policyName1, nsPolicyData1, unload);
// verify policy is present in local cluster
Map<String, ? extends NamespaceIsolationData> policiesMap =
localAdmin.clusters().getNamespaceIsolationPolicies("test");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);

remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2", policyName1, nsPolicyData1);
// 2. Create policy should work in remote cluster
remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2", policyName1, nsPolicyData1, unload);
// verify policy is present in remote cluster
policiesMap = remoteAdmin.clusters().getNamespaceIsolationPolicies("cluster-2");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);

// 3. Update (add) policy should work in local cluster
nsPolicyData1.getNamespaces().add(prefix+"prop-ig/n1.*"); // this will add public/.* namespaces
localAdmin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1, unload);
// verify policy is present in local cluster
policiesMap = localAdmin.clusters().getNamespaceIsolationPolicies("test");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);

// 4. Update (remove) policy should work in local cluster
nsPolicyData1.getNamespaces().remove(prefix+"prop-ig/n1.*"); // this will add public/.* namespaces
localAdmin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1, unload);
// verify policy is present in local cluster
policiesMap = localAdmin.clusters().getNamespaceIsolationPolicies("test");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public void clusters() throws Exception {
.build())
.build();
asyncRequests(ctx -> clusters.setNamespaceIsolationPolicy(ctx,
"use", "policy1", policyData));
"use", "policy1", policyData, true));
asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
* Admin interface for clusters management.
*/
public interface Clusters {

/**
* Defaults for all the flags.
*/
boolean UNLOAD_BUNDLE_DEFAULT = false;

/**
* Get the list of clusters.
* <p/>
Expand Down Expand Up @@ -418,9 +424,15 @@ Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(String cluster
* Unexpected error
*/
void createNamespaceIsolationPolicy(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles)
throws PulsarAdminException;

default void createNamespaceIsolationPolicy(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
throws PulsarAdminException {
createNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT);
}

/**
* Create a namespace isolation policy for a cluster asynchronously.
* <p/>
Expand All @@ -437,7 +449,13 @@ void createNamespaceIsolationPolicy(
* @return
*/
CompletableFuture<Void> createNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData);
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles);

default CompletableFuture<Void> createNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return createNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT);
}


/**
* Returns list of active brokers with namespace-isolation policies attached to it.
Expand Down Expand Up @@ -506,9 +524,15 @@ CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIsolationP
* Unexpected error
*/
void updateNamespaceIsolationPolicy(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles)
throws PulsarAdminException;

default void updateNamespaceIsolationPolicy(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData)
throws PulsarAdminException {
updateNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT);
}

/**
* Update a namespace isolation policy for a cluster asynchronously.
* <p/>
Expand All @@ -526,7 +550,12 @@ void updateNamespaceIsolationPolicy(
*
*/
CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData);
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles);

default CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return updateNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT);
}

/**
* Delete a namespace isolation policy for a cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,26 +202,26 @@ public CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIso

@Override
public void createNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, unloadBundles);
}

@Override
public CompletableFuture<Void> createNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles);
}

@Override
public void updateNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, unloadBundles);
}

@Override
public CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles);
}

@Override
Expand All @@ -236,13 +236,14 @@ public CompletableFuture<Void> deleteNamespaceIsolationPolicyAsync(String cluste
}

private void setNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
sync(() -> setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData));
NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException {
sync(() -> setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles));
}

private CompletableFuture<Void> setNamespaceIsolationPolicyAsync(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName)
.queryParam("unloadBundles", unloadBundles);
return asyncPostRequest(path, Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ private class SetPolicy extends CliCommand {
required = true, split = ",")
private Map<String, String> autoFailoverPolicyParams;

@Option(names = "--unloadBundles", description = "Unload namespace bundles after applying policy")
private boolean unloadBundles;

void run() throws PulsarAdminException {
// validate and create the POJO
NamespaceIsolationData namespaceIsolationData = createNamespaceIsolationData(namespaces, primary, secondary,
autoFailoverPolicyTypeName, autoFailoverPolicyParams);

getAdmin().clusters().createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData);
getAdmin().clusters()
.createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData, unloadBundles);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ public interface NamespaceIsolationPolicy {
*/
List<String> getSecondaryBrokers();

/**
* Get the list of namespace regex used in policy.
*
* @return
*/
List<String> getNamespaces();

/**
* Get the list of primary brokers for the namespace according to the policy.
*
Expand Down
Loading