From 577596e49e0a9650f7a09f523cffad8742375ab2 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 27 Sep 2016 10:38:10 -0700 Subject: [PATCH] Fixed logic of test PersistentTopicE2ETest.testMessageRedelivery Fixes #41 --- .../service/PersistentTopicE2ETest.java | 63 ++++--------------- 1 file changed, 13 insertions(+), 50 deletions(-) diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java index a3a904ff691f0..9032b359d0f0b 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1073,31 +1073,21 @@ public void testPayloadCorruptionDetection() throws Exception { assertEquals(new String(msg.getData()), "message-1"); } } - + /** * Verify: Broker should not replay already acknowledged messages again and should clear them from messageReplay bucket - * + * * 1. produce messages * 2. consume messages and ack all except 1 msg - * 3. add already acked messages to PersistentDispatcherMultipleConsumers.messagesToReplay bucket - * 4. replay messages present into messagesToReplay bucket - * 5. Verification - * a. should replay only 1 unacked message - * b. messagesToReplay should not have previously added acked messages - * - * - * - * @throws Exception + * 3. Verification: should replay only 1 unacked message */ - @Test - public void testMessageReplay() throws Exception { - + @Test() + public void testMessageRedelivery() throws Exception { final String topicName = "persistent://prop/use/ns-abc/topic2"; final String subName = "sub2"; Message msg; int totalMessages = 10; - int replayIndex = totalMessages / 2; ConsumerConfiguration conf = new ConsumerConfiguration(); conf.setSubscriptionType(SubscriptionType.Shared); @@ -1105,17 +1095,6 @@ public void testMessageReplay() throws Exception { Consumer consumer = pulsarClient.subscribe(topicName, subName, conf); Producer producer = pulsarClient.createProducer(topicName); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); - assertNotNull(topicRef); - PersistentSubscription subRef = topicRef.getPersistentSubscription(subName); - PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef - .getDispatcher(); - Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay"); - replayMap.setAccessible(true); - TreeSet messagesToReplay = Sets.newTreeSet(); - - assertNotNull(subRef); - // (1) Produce messages for (int i = 0; i < totalMessages; i++) { String message = "my-message-" + i; @@ -1130,41 +1109,25 @@ public void testMessageReplay() throws Exception { unAckedMsg = msg; } else { consumer.acknowledge(msg); - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - if (i < replayIndex) { - // (3) accumulate acked messages for replay - messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId())); - } } } - /** - * 1. Now: ManagedCursorImpl.individualDeletedMessages => (L:0,L9] - * 2. dispatcher.redeliverUnacknowledgedMessages replays => unackMsg (L:0) + position presents into messagesToReplay - * 3. ManagedCursorImpl.asyncReplayEntries will skip replays for already acked messages and those will be deleted - * from messagesToReplay - * 4. Verify: - * A. dispatcher redelivers msg [L:0] and deletes [L:0] from messagesToReplay bucket - * B. messagesToReplay must be empty - */ - assertEquals(replayIndex - 1, messagesToReplay.size()); - replayMap.set(dispatcher, messagesToReplay); - //(4) redeliver : replay-msg + unAcked-msg - dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0)); - - // Verify: [4.A] msg [L:0] must be redelivered + consumer.redeliverUnacknowledgedMessages(); + + // Verify: msg [L:0] must be redelivered try { msg = consumer.receive(1, TimeUnit.SECONDS); - assertEquals(msg.getData(), unAckedMsg.getData()); + assertEquals(new String(msg.getData()), new String(unAckedMsg.getData())); } catch (Exception e) { fail("msg should be redelivered ", e); } - // Verify: [4.B] messagesToReplay must be empty - assertEquals(messagesToReplay.size(), 0); + // Verify no other messages are redelivered + msg = consumer.receive(100, TimeUnit.MILLISECONDS); + assertNull(msg); consumer.close(); producer.close(); } - + }