Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Phase 1 of PIP-370 support disable create topics on remote cluster through replication #23169

Merged
merged 5 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,16 @@ replicatorPrefix=pulsar.repl
# due to missing ZooKeeper watch (disable with value 0)
replicationPolicyCheckDurationSeconds=600

# Whether the internal replication of the local cluster will trigger topic auto-creation on the remote cluster.
# 1. After enabling namespace-level Geo-Replication: whether the local broker will create topics on the remote
# cluster automatically when calling `pulsar-admin topics create-partitioned-topic`.
# 2. When enabling topic-level Geo-Replication on a partitioned topic: whether the local broker will create topics on
# the remote cluster.
# 3. Whether the internal Geo-Replicator in the local cluster will trigger non-persistent topic auto-creation for
# remote clusters.
# It is not a dynamic config, the default value is "true" to preserve backward-compatible behavior.
createTopicToRemoteClusterForReplication=true

# Default message retention time.
# 0 means retention is disabled. -1 means data is not removed by time quota.
defaultRetentionTimeInMinutes=0
Expand Down
10 changes: 10 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,16 @@ replicationProducerQueueSize=1000
# due to missing ZooKeeper watch (disable with value 0)
replicationPolicyCheckDurationSeconds=600

# Whether the internal replication of the local cluster will trigger topic auto-creation on the remote cluster.
# 1. After enabling namespace-level Geo-Replication: whether the local broker will create topics on the remote
# cluster automatically when calling `pulsar-admin topics create-partitioned-topic`.
# 2. When enabling topic-level Geo-Replication on a partitioned topic: whether the local broker will create topics on
# the remote cluster.
# 3. Whether the internal Geo-Replicator in the local cluster will trigger non-persistent topic auto-creation for
# remote clusters.
# It is not a dynamic config, the default value is "true" to preserve backward-compatible behavior.
createTopicToRemoteClusterForReplication=true

# Default message retention time. 0 means retention is disabled. -1 means data is not removed by time quota
defaultRetentionTimeInMinutes=0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2879,6 +2879,11 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
+ "inconsistency due to missing ZooKeeper watch (disable with value 0)"
)
private int replicationPolicyCheckDurationSeconds = 600;
@FieldContext(
category = CATEGORY_REPLICATION,
doc = "Whether the internal replicator will trigger topic auto-creation on the remote cluster."
)
private boolean createTopicToRemoteClusterForReplication = true;
@Deprecated
@FieldContext(
category = CATEGORY_REPLICATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,11 +609,15 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
.thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
.thenRun(() -> {
if (!createLocalTopicOnly && topicName.isGlobal()) {
if (!createLocalTopicOnly && topicName.isGlobal()
&& pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
log.info("[{}] Successfully created partitions for topic {} for the remote clusters {}",
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
} else {
log.info("[{}] Skip creating partitions for topic {} for the remote clusters {}",
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
}
log.info("[{}] Successfully created partitions for topic {} in cluster {}",
clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3332,6 +3332,11 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
}
return FutureUtil.waitForAll(futures);
}).thenCompose(__ -> {
if (!pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
log.info("[{}] Skip creating partitions for topic {} for the remote clusters {}",
clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
return CompletableFuture.completedFuture(null);
}
// Sync to create partitioned topic on the remote cluster if needed.
TopicName topicNameWithoutPartition = TopicName.get(topicName.getPartitionedTopicName());
return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
Expand Down Expand Up @@ -159,6 +160,10 @@ public String getRemoteCluster() {
return remoteCluster;
}

protected CompletableFuture<Void> prepareCreateProducer() {
return CompletableFuture.completedFuture(null);
}

public void startProducer() {
// Guarantee only one task call "producerBuilder.createAsync()".
Pair<Boolean, State> setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting);
Expand All @@ -185,12 +190,15 @@ public void startProducer() {
}

log.info("[{}] Starting replicator", replicatorId);

// Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on
// the remote cluster.
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
builderImpl.getConf().setNonPartitionedTopicExpected(true);
producerBuilder.createAsync().thenAccept(producer -> {
setProducerAndTriggerReadEntries(producer);
prepareCreateProducer().thenCompose(ignore -> {
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
builderImpl.getConf().setNonPartitionedTopicExpected(true);
return producerBuilder.createAsync().thenAccept(producer -> {
setProducerAndTriggerReadEntries(producer);
});
}).exceptionally(ex -> {
Pair<Boolean, State> setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected);
if (setDisconnectedRes.getLeft()) {
Expand All @@ -215,6 +223,7 @@ public void startProducer() {
}
return null;
});

}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class GeoPersistentReplicator extends PersistentReplicator {
Expand All @@ -50,6 +52,32 @@ protected String getProducerName() {
return getReplicatorName(replicatorPrefix, localCluster) + REPL_PRODUCER_NAME_DELIMITER + remoteCluster;
}

@Override
protected CompletableFuture<Void> prepareCreateProducer() {
if (brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> topicCheckFuture = new CompletableFuture<>();
replicationClient.getPartitionedTopicMetadata(localTopic.getName(), false, false).whenComplete((metadata, ex) -> {
if (ex == null) {
if (metadata.partitions == 0) {
topicCheckFuture.complete(null);
} else {
String errorMsg = String.format("{} Can not create the replicator due to the partitions in the"
+ " remote cluster is not 0, but is %s",
replicatorId, metadata.partitions);
log.error(errorMsg);
topicCheckFuture.completeExceptionally(
new PulsarClientException.NotAllowedException(errorMsg));
}
} else {
topicCheckFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
}
});
return topicCheckFuture;
}
}

@Override
protected boolean replicateEntries(List<Entry> entries) {
boolean atLeastOneMessageSentForReplication = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Arrays;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class DisabledCreateTopicToRemoteClusterForReplicationTest extends OneWayReplicatorTestBase {

@Override
@BeforeClass(alwaysRun = true, timeOut = 300000)
public void setup() throws Exception {
super.setup();
admin1.namespaces().setRetention(replicatedNamespace, new RetentionPolicies(300, 1024));
admin2.namespaces().setRetention(replicatedNamespace, new RetentionPolicies(300, 1024));
admin1.namespaces().setRetention(nonReplicatedNamespace, new RetentionPolicies(300, 1024));
admin2.namespaces().setRetention(nonReplicatedNamespace, new RetentionPolicies(300, 1024));
}

@Override
@AfterClass(alwaysRun = true, timeOut = 300000)
public void cleanup() throws Exception {
super.cleanup();
}

@Override
protected void setConfigDefaults(ServiceConfiguration config, String clusterName,
LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk);
config.setCreateTopicToRemoteClusterForReplication(false);
config.setReplicationStartAt("earliest");
}

@Test
public void testCreatePartitionedTopicWithNsReplication() throws Exception {
String ns = defaultTenant + "/" + UUID.randomUUID().toString().replace("-", "");
admin1.namespaces().createNamespace(ns);
admin2.namespaces().createNamespace(ns);
admin1.namespaces().setRetention(ns, new RetentionPolicies(3600, -1));
admin2.namespaces().setRetention(ns, new RetentionPolicies(3600, -1));

// Create non-partitioned topic.
// Enable replication.
final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_");
final String part1 = TopicName.get(tp).getPartition(0).toString();
admin1.topics().createPartitionedTopic(tp, 1);
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2)));

// Trigger and wait for replicator starts.
String msgValue = "msg-1";
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(tp).create();
producer1.send(msgValue);
producer1.close();
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(part1, false).join().get();
assertFalse(topicPart1.getReplicators().isEmpty());
});

// Verify: there is no topic with the same name on the remote cluster.
try {
admin2.topics().getPartitionedTopicMetadata(tp);
fail("Expected a not found ex");
} catch (PulsarAdminException.NotFoundException ex) {
// expected.
}

// Verify: after creating the topic on the remote cluster, all things are fine.
admin2.topics().createPartitionedTopic(tp, 1);
Consumer<String> consumer2 = client2.newConsumer(Schema.STRING).topic(tp).isAckReceiptEnabled(true)
.subscriptionName("s1").subscribe();
assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue);
consumer2.close();

// cleanup.
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1)));
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(part1, false).join().get();
assertTrue(topicPart1.getReplicators().isEmpty());
});
admin1.topics().deletePartitionedTopic(tp, false);
admin2.topics().deletePartitionedTopic(tp, false);
admin1.namespaces().deleteNamespace(ns);
admin2.namespaces().deleteNamespace(ns);
}

@Test
public void testEnableTopicReplication() throws Exception {
String ns = nonReplicatedNamespace;

// Create non-partitioned topic.
// Enable replication.
final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_");
final String part1 = TopicName.get(tp).getPartition(0).toString();
admin1.topics().createPartitionedTopic(tp, 1);
admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1, cluster2));

// Trigger and wait for replicator starts.
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
p1.send("msg-1");
p1.close();
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(part1, false).join().get();
assertFalse(topicPart1.getReplicators().isEmpty());
});

// Verify: there is no topic with the same name on the remote cluster.
try {
admin2.topics().getPartitionedTopicMetadata(tp);
fail("Expected a not found ex");
} catch (PulsarAdminException.NotFoundException ex) {
// expected.
}

// Verify: after creating the topic on the remote cluster, all things are fine.
admin2.topics().createPartitionedTopic(tp, 1);
waitReplicatorStarted(part1);

// cleanup.
admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1));
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(part1, false).join().get();
assertTrue(topicPart1.getReplicators().isEmpty());
});
admin1.topics().deletePartitionedTopic(tp, false);
admin2.topics().deletePartitionedTopic(tp, false);
}

@Test
public void testNonPartitionedTopic() throws Exception {
String ns = nonReplicatedNamespace;

// Create non-partitioned topic.
// Enable replication.
final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_");
admin1.topics().createNonPartitionedTopic(tp);
admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1, cluster2));

// Trigger and wait for replicator starts.
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
p1.send("msg-1");
p1.close();
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(tp, false).join().get();
assertFalse(topicPart1.getReplicators().isEmpty());
});

// Verify: there is no topic with the same name on the remote cluster.
try {
admin2.topics().getPartitionedTopicMetadata(tp);
fail("Expected a not found ex");
} catch (PulsarAdminException.NotFoundException ex) {
// expected.
}

// Verify: after creating the topic on the remote cluster, all things are fine.
admin2.topics().createNonPartitionedTopic(tp);
waitReplicatorStarted(tp);

// cleanup.
admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1));
Awaitility.await().untilAsserted(() -> {
PersistentTopic topicPart1 = (PersistentTopic) broker1.getTopic(tp, false).join().get();
assertTrue(topicPart1.getReplicators().isEmpty());
});
admin1.topics().delete(tp, false);
admin2.topics().delete(tp, false);
}
}
Loading
Loading