From 0d056c3034e2cd852d8bf96d8082c29b7e3f0d81 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 22 Dec 2016 14:00:26 -0800 Subject: [PATCH] Safely stop(close replication-producer) and remove replicator --- .../persistent/PersistentReplicator.java | 3 +- .../service/persistent/PersistentTopic.java | 22 ++++-- .../pulsar/broker/service/ReplicatorTest.java | 68 +++++++++++++++++++ 3 files changed, 87 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index afd3610c36014..6e28148c48195 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -605,7 +605,8 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return disconnectFuture; } - if (producer != null && state.compareAndSet(State.Started, State.Stopping)) { + if (producer != null && (state.compareAndSet(State.Starting, State.Stopping) + || state.compareAndSet(State.Started, State.Stopping))) { log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster, remoteCluster, cursor.getMarkDeletedPosition(), cursor.getNumberOfEntriesInBacklog()); return closeProducerAsync(); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index a22a601f43c79..cbf010ea32e27 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -118,6 +118,8 @@ public class PersistentTopic implements Topic, AddEntryCallback { public final String replicatorPrefix; private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5; + + private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault()); @@ -662,15 +664,13 @@ public void deleteCursorComplete(Object ctx) { @Override public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { - log.error("[{}] Failed to delete cursor {}", topic, name); - // Connect the producers back - replicators.get(remoteCluster).startProducer(); + log.error("[{}] Failed to delete cursor {} {}", topic, name, exception.getMessage(), exception); future.completeExceptionally(new PersistenceException(exception)); } }, null); }).exceptionally(e -> { - log.error("[{}] Failed to close replication producer {}", topic, name); + log.error("[{}] Failed to close replication producer {} {}", topic, name, e.getMessage(), e); future.completeExceptionally(e); return null; }); @@ -1071,7 +1071,19 @@ public CompletableFuture onPoliciesUpdate(Policies data) { producers.forEach(Producer::checkPermissions); subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions)); checkMessageExpiry(); - return checkReplication(); + CompletableFuture result = new CompletableFuture(); + checkReplication().thenAccept(res -> { + log.info("Policies updated successfully {}", data); + result.complete(null); + }).exceptionally(th -> { + log.error("Policies update failed {} {}, scheduled retry in {} seconds", data, th.getMessage(), + POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, th); + brokerService.executor().schedule(this::checkReplication, POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, + TimeUnit.SECONDS); + result.completeExceptionally(th); + return null; + }); + return result; } /** diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java index b0cd6028d3a9e..a29e7565ed95f 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ReplicatorTest.java @@ -18,19 +18,24 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.fail; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -48,6 +53,7 @@ import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import com.yahoo.pulsar.client.api.MessageBuilder; import com.yahoo.pulsar.client.api.PulsarClient; +import com.yahoo.pulsar.client.impl.ProducerImpl; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.naming.NamespaceBundle; import com.yahoo.pulsar.common.naming.NamespaceName; @@ -563,6 +569,68 @@ public Void call() throws Exception { } } } + + /** + * It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and + * it should have cleaned up from the list + * + * @throws Exception + */ + @Test + public void testDeleteReplicatorFailure2() throws Exception { + log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---"); + final String topicName = "persistent://pulsar/global/ns/repltopicbatch"; + final DestinationName dest = DestinationName.get(topicName); + MessageProducer producer1 = new MessageProducer(url1, dest); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName); + final String replicatorClusterName = topic.getReplicators().keys().get(0); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); + CountDownLatch latch = new CountDownLatch(1); + // delete cursor already : so next time if topic.removeReplicator will get exception but then it should + // remove-replicator from the list even with failure + ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new DeleteCursorCallback() { + @Override + public void deleteCursorComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + } + }, null); + latch.await(); + + Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class); + removeReplicator.setAccessible(true); + // invoke removeReplicator : it fails as cursor is not present: but still it should remove the replicator from + // list without restarting it + CompletableFuture result = (CompletableFuture) removeReplicator.invoke(topic, + replicatorClusterName); + result.thenApply((v) -> { + assertNull(topic.getPersistentReplicator(replicatorClusterName)); + return null; + }); + } + + @Test + public void testReplicatorProducerClosing() throws Exception { + log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---"); + final String topicName = "persistent://pulsar/global/ns/repltopicbatch"; + final DestinationName dest = DestinationName.get(topicName); + MessageProducer producer1 = new MessageProducer(url1, dest); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName); + final String replicatorClusterName = topic.getReplicators().keys().get(0); + PersistentReplicator replicator = topic.getPersistentReplicator(replicatorClusterName); + pulsar2.close(); + pulsar3.close(); + replicator.disconnect(false); + Thread.sleep(100); + Field field = PersistentReplicator.class.getDeclaredField("producer"); + field.setAccessible(true); + ProducerImpl producer = (ProducerImpl) field.get(replicator); + assertNull(producer); + } private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);