Skip to content

Commit

Permalink
Fixed logic of test PersistentTopicE2ETest.testMessageRedelivery
Browse files Browse the repository at this point in the history
Fixes #41
  • Loading branch information
merlimat committed Sep 27, 2016
1 parent 2203316 commit 577596e
Showing 1 changed file with 13 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1073,49 +1073,28 @@ public void testPayloadCorruptionDetection() throws Exception {
assertEquals(new String(msg.getData()), "message-1");
}
}

/**
* Verify: Broker should not replay already acknowledged messages again and should clear them from messageReplay bucket
*
*
* 1. produce messages
* 2. consume messages and ack all except 1 msg
* 3. add already acked messages to PersistentDispatcherMultipleConsumers.messagesToReplay bucket
* 4. replay messages present into messagesToReplay bucket
* 5. Verification
* a. should replay only 1 unacked message
* b. messagesToReplay should not have previously added acked messages
*
*
*
* @throws Exception
* 3. Verification: should replay only 1 unacked message
*/
@Test
public void testMessageReplay() throws Exception {

@Test()
public void testMessageRedelivery() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic2";
final String subName = "sub2";

Message msg;
int totalMessages = 10;
int replayIndex = totalMessages / 2;

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);

Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Producer producer = pulsarClient.createProducer(topicName);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getPersistentSubscription(subName);
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef
.getDispatcher();
Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay");
replayMap.setAccessible(true);
TreeSet<PositionImpl> messagesToReplay = Sets.newTreeSet();

assertNotNull(subRef);

// (1) Produce messages
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
Expand All @@ -1130,41 +1109,25 @@ public void testMessageReplay() throws Exception {
unAckedMsg = msg;
} else {
consumer.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
if (i < replayIndex) {
// (3) accumulate acked messages for replay
messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId()));
}
}
}

/**
* 1. Now: ManagedCursorImpl.individualDeletedMessages => (L:0,L9]
* 2. dispatcher.redeliverUnacknowledgedMessages replays => unackMsg (L:0) + position presents into messagesToReplay
* 3. ManagedCursorImpl.asyncReplayEntries will skip replays for already acked messages and those will be deleted
* from messagesToReplay
* 4. Verify:
* A. dispatcher redelivers msg [L:0] and deletes [L:0] from messagesToReplay bucket
* B. messagesToReplay must be empty
*/
assertEquals(replayIndex - 1, messagesToReplay.size());
replayMap.set(dispatcher, messagesToReplay);
//(4) redeliver : replay-msg + unAcked-msg
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));

// Verify: [4.A] msg [L:0] must be redelivered
consumer.redeliverUnacknowledgedMessages();

// Verify: msg [L:0] must be redelivered
try {
msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(msg.getData(), unAckedMsg.getData());
assertEquals(new String(msg.getData()), new String(unAckedMsg.getData()));
} catch (Exception e) {
fail("msg should be redelivered ", e);
}

// Verify: [4.B] messagesToReplay must be empty
assertEquals(messagesToReplay.size(), 0);
// Verify no other messages are redelivered
msg = consumer.receive(100, TimeUnit.MILLISECONDS);
assertNull(msg);

consumer.close();
producer.close();
}

}

0 comments on commit 577596e

Please sign in to comment.