From 06f1d8dd09df1211ad748d8419f439c4ee2859c8 Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 11 Oct 2022 18:04:56 +0800 Subject: [PATCH 1/2] fix close order --- .../pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 11d663322be52..20d437b64a2ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -279,10 +279,10 @@ public void run(Timeout timeout) throws Exception { @Override public void close() { - priorityQueue.close(); if (timeout != null) { timeout.cancel(); } + priorityQueue.close(); } @Override From bbba3bf2a64d5c9369fa44ada869e177aa0857db Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 12 Oct 2022 16:51:07 +0800 Subject: [PATCH 2/2] add test & set timeout=null --- .../InMemoryDelayedDeliveryTracker.java | 7 +-- .../delayed/InMemoryDeliveryTrackerTest.java | 49 +++++++++++++++++-- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 20d437b64a2ee..ba8a931181776 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -33,7 +33,7 @@ @Slf4j public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask { - private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); + protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); private final PersistentDispatcherMultipleConsumers dispatcher; @@ -41,7 +41,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T private final Timer timer; // Current timeout or null if not set - private Timeout timeout; + protected Timeout timeout; // Timestamp at which the timeout is currently set private long currentTimeoutTarget; @@ -265,7 +265,7 @@ public void run(Timeout timeout) throws Exception { if (log.isDebugEnabled()) { log.debug("[{}] Timer triggered", dispatcher.getName()); } - if (timeout.isCancelled()) { + if (timeout == null || timeout.isCancelled()) { return; } @@ -281,6 +281,7 @@ public void run(Timeout timeout) throws Exception { public void close() { if (timeout != null) { timeout.cancel(); + timeout = null; } priorityQueue.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index 1ff47a4ca5065..11b681d80a640 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -28,13 +28,13 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; - +import io.netty.util.concurrent.DefaultThreadFactory; import java.time.Clock; import java.util.Collections; import java.util.NavigableMap; @@ -42,10 +42,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - -import io.netty.util.concurrent.DefaultThreadFactory; import lombok.Cleanup; - import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.awaitility.Awaitility; @@ -433,4 +430,46 @@ public void testWithNoDelays() throws Exception { assertFalse(tracker.shouldPauseAllDeliveries()); } + @Test + public void testClose() throws Exception { + Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), + 1, TimeUnit.MILLISECONDS); + + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + final Exception[] exceptions = new Exception[1]; + + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + true, 0) { + @Override + public void run(Timeout timeout) throws Exception { + super.timeout = timer.newTimeout(this, 1, TimeUnit.MILLISECONDS); + if (timeout == null || timeout.isCancelled()) { + return; + } + try { + this.priorityQueue.peekN1(); + } catch (Exception e) { + e.printStackTrace(); + exceptions[0] = e; + } + } + }; + + tracker.addMessage(1, 1, 10); + clockTime.set(10); + + Thread.sleep(300); + + tracker.close(); + + assertNull(exceptions[0]); + + timer.stop(); + } + }