diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 4fe8a01e679da..5eeebac4db96b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -33,9 +33,12 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; @@ -57,6 +60,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamedEntity; +import org.apache.pulsar.common.policies.NamespaceIsolationPolicy; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.ClusterData; @@ -706,7 +710,9 @@ public void setNamespaceIsolationPolicy( @ApiParam(value = "The namespace isolation policy name", required = true) @PathParam("policyName") String policyName, @ApiParam(value = "The namespace isolation policy data", required = true) - NamespaceIsolationDataImpl policyData + NamespaceIsolationDataImpl policyData, + @DefaultValue("false") + @QueryParam("unloadBundles") boolean unload ) { validateSuperUserAccessAsync() .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) @@ -721,10 +727,16 @@ public void setNamespaceIsolationPolicy( .setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap()) .thenApply(__ -> new NamespaceIsolationPolicies())) ).thenCompose(nsIsolationPolicies -> { + NamespaceIsolationPolicy currentIsolationPolicy = nsIsolationPolicies.getPolicyByName(policyName); + List oldNamespaceRegex = currentIsolationPolicy == null ? new ArrayList<>() : + currentIsolationPolicy.getNamespaces(); nsIsolationPolicies.setPolicy(policyName, policyData); return namespaceIsolationPolicies() - .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()); - }).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData)) + .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies()) + .thenApply(policy -> oldNamespaceRegex); + }).thenCompose(oldNSRegex -> unload ? filterAndUnloadMatchedNamespaceAsync(cluster, policyData, + oldNSRegex) : + CompletableFuture.completedFuture(null)) .thenAccept(__ -> { log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.", clientAppId(), cluster, policyName); @@ -759,21 +771,33 @@ public void setNamespaceIsolationPolicy( * Get matched namespaces; call unload for each namespaces. */ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String cluster, - NamespaceIsolationDataImpl policyData) { + NamespaceIsolationDataImpl policyData, + List oldNSRegex) { PulsarAdmin adminClient; try { adminClient = pulsar().getAdminClient(); } catch (PulsarServerException e) { return FutureUtil.failedFuture(e); } + List currentRegex = policyData.getNamespaces(); + // (new ∩ old), namespaces to remove matching this regex + List commonRegex = currentRegex.stream().distinct().filter(oldNSRegex::contains).toList(); // compile regex patterns once - List namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList(); + List excludeNamespacePatterns = commonRegex.stream().map(Pattern::compile).toList(); + // (new ⋃ old), namespaces to keep matching this regex + List namespacePatterns = + Stream.concat(currentRegex.stream(), oldNSRegex.stream()).map(Pattern::compile).toList(); return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> { List>> filteredNamespacesForEachTenant = tenants.stream() .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> { List> namespaceNamesInCluster = namespaces.stream() + // filter namespaces using ns regex from current policy .filter(namespaceName -> namespacePatterns.stream() .anyMatch(pattern -> pattern.matcher(namespaceName).matches())) + // remove namespaces using old ns regex + // only unloads namespaces matching delta of policy, (new ⋃ old) - (new ∩ old) + .filter(((Predicate) (namespaceName -> excludeNamespacePatterns.stream() + .anyMatch(pattern -> pattern.matcher(namespaceName).matches()))).negate()) .map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName) .thenApply(policies -> policies.replication_clusters.contains(cluster) ? namespaceName : null)) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java index da7d95d677af8..6828ac9dcf026 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.MultiBrokerBaseTest; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; import org.apache.pulsar.common.policies.data.ClusterData; @@ -48,6 +49,11 @@ public class AdminApiNamespaceIsolationMultiBrokersTest extends MultiBrokerBaseT PulsarAdmin localAdmin; PulsarAdmin remoteAdmin; + @Override + protected int numberOfAdditionalBrokers() { + return 4; + } + @Override protected void doInitConf() throws Exception { super.doInitConf(); @@ -69,23 +75,39 @@ public void setupClusters() throws Exception { .createCluster("cluster-1", ClusterData.builder().serviceUrl(localBrokerWebService).build()); remoteAdmin.clusters() .createCluster("cluster-2", ClusterData.builder().serviceUrl(remoteBrokerWebService).build()); + setupForTenant("A"); + setupForTenant("B"); + } + + private void setupForTenant(String prefix) throws PulsarAdminException { TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of(""), Set.of("test", "cluster-1", "cluster-2")); - localAdmin.tenants().createTenant("prop-ig", tenantInfo); - localAdmin.namespaces().createNamespace("prop-ig/ns1", Set.of("test", "cluster-1")); + localAdmin.tenants().createTenant(prefix+"prop-ig", tenantInfo); + localAdmin.namespaces().createNamespace(prefix+"prop-ig/ns1", Set.of("test", "cluster-1")); + localAdmin.namespaces().createNamespace(prefix+"prop-ig/n1", Set.of("test", "cluster-1")); + localAdmin.topics().createNonPartitionedTopic(prefix+"prop-ig/ns1/t1"); + } + + public void testNamespaceIsolationPolicyForReplNSWithUnload() throws Exception { + testNamespaceIsolationPolicyForReplNS("A", "policy-1", true); + } + + public void testNamespaceIsolationPolicyForReplNSWithoutUnload() throws Exception { + testNamespaceIsolationPolicyForReplNS("B", "policy-2", false); } - public void testNamespaceIsolationPolicyForReplNS() throws Exception { + private void testNamespaceIsolationPolicyForReplNS(String prefix, String policyName1, boolean unload) throws Exception { - // Verify that namespace is not present in cluster-2. - Set replicationClusters = localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters; + // Verify that namespaces are not present in cluster-2. + Set replicationClusters = localAdmin.namespaces().getPolicies(prefix+"prop-ig/ns1").replication_clusters; + Assert.assertFalse(replicationClusters.contains("cluster-2")); + replicationClusters = localAdmin.namespaces().getPolicies(prefix+"prop-ig/n1").replication_clusters; Assert.assertFalse(replicationClusters.contains("cluster-2")); // setup ns-isolation-policy in both the clusters. - String policyName1 = "policy-1"; Map parameters1 = new HashMap<>(); parameters1.put("min_limit", "1"); parameters1.put("usage_threshold", "100"); - List nsRegexList = new ArrayList<>(Arrays.asList("prop-ig/.*")); + List nsRegexList = new ArrayList<>(Arrays.asList(prefix+"prop-ig/ns1.*")); NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder() // "prop-ig/ns1" is present in test cluster, policy set on test2 should work @@ -98,17 +120,33 @@ public void testNamespaceIsolationPolicyForReplNS() throws Exception { .build()) .build(); - localAdmin.clusters().createNamespaceIsolationPolicy("test", policyName1, nsPolicyData1); + // 1. Create policy should work in local cluster + localAdmin.clusters().createNamespaceIsolationPolicy("test", policyName1, nsPolicyData1, unload); // verify policy is present in local cluster Map policiesMap = localAdmin.clusters().getNamespaceIsolationPolicies("test"); assertEquals(policiesMap.get(policyName1), nsPolicyData1); - remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2", policyName1, nsPolicyData1); + // 2. Create policy should work in remote cluster + remoteAdmin.clusters().createNamespaceIsolationPolicy("cluster-2", policyName1, nsPolicyData1, unload); // verify policy is present in remote cluster policiesMap = remoteAdmin.clusters().getNamespaceIsolationPolicies("cluster-2"); assertEquals(policiesMap.get(policyName1), nsPolicyData1); + // 3. Update (add) policy should work in local cluster + nsPolicyData1.getNamespaces().add(prefix+"prop-ig/n1.*"); // this will add public/.* namespaces + localAdmin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1, unload); + // verify policy is present in local cluster + policiesMap = localAdmin.clusters().getNamespaceIsolationPolicies("test"); + assertEquals(policiesMap.get(policyName1), nsPolicyData1); + + // 4. Update (remove) policy should work in local cluster + nsPolicyData1.getNamespaces().remove(prefix+"prop-ig/n1.*"); // this will add public/.* namespaces + localAdmin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1, unload); + // verify policy is present in local cluster + policiesMap = localAdmin.clusters().getNamespaceIsolationPolicies("test"); + assertEquals(policiesMap.get(policyName1), nsPolicyData1); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 2894903c0d0c1..9d216014e7738 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -309,7 +309,7 @@ public void clusters() throws Exception { .build()) .build(); asyncRequests(ctx -> clusters.setNamespaceIsolationPolicy(ctx, - "use", "policy1", policyData)); + "use", "policy1", policyData, true)); asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use")); try { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 53e6680946566..3cf014862cc02 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -38,6 +38,12 @@ * Admin interface for clusters management. */ public interface Clusters { + + /** + * Defaults for all the flags. + */ + boolean UNLOAD_BUNDLE_DEFAULT = false; + /** * Get the list of clusters. *

@@ -418,9 +424,15 @@ Map getNamespaceIsolationPolicies(String cluster * Unexpected error */ void createNamespaceIsolationPolicy( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException; + default void createNamespaceIsolationPolicy( + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) + throws PulsarAdminException { + createNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT); + } + /** * Create a namespace isolation policy for a cluster asynchronously. *

@@ -437,7 +449,13 @@ void createNamespaceIsolationPolicy( * @return */ CompletableFuture createNamespaceIsolationPolicyAsync( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData); + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles); + + default CompletableFuture createNamespaceIsolationPolicyAsync( + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) { + return createNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT); + } + /** * Returns list of active brokers with namespace-isolation policies attached to it. @@ -506,9 +524,15 @@ CompletableFuture getBrokerWithNamespaceIsolationP * Unexpected error */ void updateNamespaceIsolationPolicy( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException; + default void updateNamespaceIsolationPolicy( + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) + throws PulsarAdminException { + updateNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT); + } + /** * Update a namespace isolation policy for a cluster asynchronously. *

@@ -526,7 +550,12 @@ void updateNamespaceIsolationPolicy( * */ CompletableFuture updateNamespaceIsolationPolicyAsync( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData); + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles); + + default CompletableFuture updateNamespaceIsolationPolicyAsync( + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) { + return updateNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, UNLOAD_BUNDLE_DEFAULT); + } /** * Delete a namespace isolation policy for a cluster. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index 231d4506d6173..02385ccc55c49 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -202,26 +202,26 @@ public CompletableFuture getBrokerWithNamespaceIso @Override public void createNamespaceIsolationPolicy(String cluster, String policyName, - NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException { - setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData); + NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException { + setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, unloadBundles); } @Override public CompletableFuture createNamespaceIsolationPolicyAsync( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) { - return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData); + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) { + return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles); } @Override public void updateNamespaceIsolationPolicy(String cluster, String policyName, - NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException { - setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData); + NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException { + setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData, unloadBundles); } @Override public CompletableFuture updateNamespaceIsolationPolicyAsync( - String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) { - return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData); + String cluster, String policyName, NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) { + return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles); } @Override @@ -236,13 +236,14 @@ public CompletableFuture deleteNamespaceIsolationPolicyAsync(String cluste } private void setNamespaceIsolationPolicy(String cluster, String policyName, - NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException { - sync(() -> setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData)); + NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) throws PulsarAdminException { + sync(() -> setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData, unloadBundles)); } private CompletableFuture setNamespaceIsolationPolicyAsync(String cluster, String policyName, - NamespaceIsolationData namespaceIsolationData) { - WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName); + NamespaceIsolationData namespaceIsolationData, boolean unloadBundles) { + WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName) + .queryParam("unloadBundles", unloadBundles); return asyncPostRequest(path, Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON)); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java index e9896decd8c96..8ede00294491d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java @@ -73,12 +73,16 @@ private class SetPolicy extends CliCommand { required = true, split = ",") private Map autoFailoverPolicyParams; + @Option(names = "--unloadBundles", description = "Unload namespace bundles after applying policy") + private boolean unloadBundles; + void run() throws PulsarAdminException { // validate and create the POJO NamespaceIsolationData namespaceIsolationData = createNamespaceIsolationData(namespaces, primary, secondary, autoFailoverPolicyTypeName, autoFailoverPolicyParams); - getAdmin().clusters().createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData); + getAdmin().clusters() + .createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData, unloadBundles); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java index bd28d30d4cee9..1742ae25e8d67 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java @@ -43,6 +43,13 @@ public interface NamespaceIsolationPolicy { */ List getSecondaryBrokers(); + /** + * Get the list of namespace regex used in policy. + * + * @return + */ + List getNamespaces(); + /** * Get the list of primary brokers for the namespace according to the policy. * diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java index af3663869fa02..9e451e9e67e84 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java @@ -76,6 +76,11 @@ public List getSecondaryBrokers() { return this.secondary; } + @Override + public List getNamespaces() { + return this.namespaces; + } + @Override public List findPrimaryBrokers(List availableBrokers, NamespaceName namespace) { if (!this.matchNamespaces(namespace.toString())) {