From 8dfd619efe558d2c6571dd67456aadf71b38e633 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 14 Aug 2024 16:39:55 +0800 Subject: [PATCH] [improve] [broker] Avoid subscription fenced error with consumer.seek whenever possible (#23163) (cherry picked from commit d5ce1cee35363ba2372375c2e8740be6d87488d8) --- .../persistent/PersistentSubscription.java | 32 ++++++--- .../broker/service/SubscriptionSeekTest.java | 65 +++++++++++++++++++ 2 files changed, 87 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 596008305b9b7..a4f6fdc54271a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -134,6 +134,7 @@ public class PersistentSubscription extends AbstractSubscription { private final PendingAckHandle pendingAckHandle; private volatile Map subscriptionProperties; private volatile CompletableFuture fenceFuture; + private volatile CompletableFuture inProgressResetCursorFuture; static Map getBaseCursorProperties(boolean isReplicated) { return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; @@ -222,6 +223,16 @@ public boolean setReplicated(boolean replicated) { @Override public CompletableFuture addConsumer(Consumer consumer) { + CompletableFuture inProgressResetCursorFuture = this.inProgressResetCursorFuture; + if (inProgressResetCursorFuture != null) { + return inProgressResetCursorFuture.handle((ignore, ignoreEx) -> null) + .thenCompose(ignore -> addConsumerInternal(consumer)); + } else { + return addConsumerInternal(consumer); + } + } + + private CompletableFuture addConsumerInternal(Consumer consumer) { return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> { synchronized (PersistentSubscription.this) { cursor.updateLastActive(); @@ -776,7 +787,8 @@ public void findEntryComplete(Position position, Object ctx) { } else { finalPosition = position.getNext(); } - resetCursor(finalPosition, future); + CompletableFuture resetCursorFuture = resetCursor(finalPosition); + FutureUtil.completeAfter(future, resetCursorFuture); } @Override @@ -795,18 +807,13 @@ public void findEntryFailed(ManagedLedgerException exception, } @Override - public CompletableFuture resetCursor(Position position) { - CompletableFuture future = new CompletableFuture<>(); - resetCursor(position, future); - return future; - } - - private void resetCursor(Position finalPosition, CompletableFuture future) { + public CompletableFuture resetCursor(Position finalPosition) { if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { - future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription")); - return; + return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription")); } + final CompletableFuture future = new CompletableFuture<>(); + inProgressResetCursorFuture = future; final CompletableFuture disconnectFuture; // Lock the Subscription object before locking the Dispatcher object to avoid deadlocks @@ -826,6 +833,7 @@ private void resetCursor(Position finalPosition, CompletableFuture future) if (throwable != null) { log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable); IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; future.completeExceptionally( new SubscriptionBusyException("Failed to disconnect consumers from subscription")); return; @@ -865,6 +873,7 @@ public void resetComplete(Object ctx) { dispatcher.afterAckMessages(null, finalPosition); } IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; future.complete(null); } @@ -873,6 +882,7 @@ public void resetFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}][{}] Failed to reset subscription to position {}", topicName, subName, finalPosition, exception); IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; // todo - retry on InvalidCursorPositionException // or should we just ask user to retry one more time? if (exception instanceof InvalidCursorPositionException) { @@ -887,10 +897,12 @@ public void resetFailed(ManagedLedgerException exception, Object ctx) { }).exceptionally((e) -> { log.error("[{}][{}] Error while resetting cursor", topicName, subName, e); IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; future.completeExceptionally(new BrokerServiceException(e)); return null; }); }); + return future; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index fd08f284bbf99..3fc795a8c3e2a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -34,12 +34,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -50,8 +52,13 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.common.api.proto.CommandError; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.util.RelativeTimeUtil; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; @@ -781,6 +788,64 @@ public void testSeekByFunctionAndMultiTopic() throws Exception { assertEquals(count, (msgInTopic1Partition0 + msgInTopic1Partition1 + msgInTopic1Partition2) * 2); } + @Test + public void testSeekWillNotEncounteredFencedError() throws Exception { + String topicName = "persistent://prop/ns-abc/my-topic2"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topicPolicies().setRetention(topicName, new RetentionPolicies(3600, 0)); + // Create a pulsar client with a subscription fenced counter. + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + AtomicInteger receivedFencedErrorCounter = new AtomicInteger(); + PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> + new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { + protected void handleError(CommandError error) { + if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced")) { + receivedFencedErrorCounter.incrementAndGet(); + } + super.handleError(error); + } + }); + + // publish some messages. + org.apache.pulsar.client.api.Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("s1") + .subscribe(); + Producer producer = client.newProducer(Schema.STRING) + .topic(topicName).create(); + MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0"); + for (int i = 1; i < 11; i++) { + admin.topics().unload(topicName); + producer.send(i + ""); + } + + // Inject a delay for reset-cursor. + mockZooKeeper.delay(3000, (op, path) -> { + if (path.equals("/managed-ledgers/prop/ns-abc/persistent/my-topic2/s1")) { + return op.toString().equalsIgnoreCase("SET"); + } + return false; + }); + + // Verify: consumer will not receive "subscription fenced" error after a seek. + for (int i = 1; i < 11; i++) { + Message msg = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg); + consumer.acknowledge(msg); + } + consumer.seek(msgId1); + Awaitility.await().untilAsserted(() -> { + assertTrue(consumer.isConnected()); + }); + assertEquals(receivedFencedErrorCounter.get(), 0); + + // cleanup. + producer.close(); + consumer.close(); + client.close(); + admin.topics().delete(topicName); + } + @Test public void testExceptionBySeekFunction() throws Exception { final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();