Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated BK version to 4.3.1.41-yahoo #39

Merged
merged 2 commits into from
Sep 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ flexible messaging model and an intuitive client API.</description>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<bookkeeper.version>4.3.1.32-yahoo</bookkeeper.version>
<bookkeeper.version>4.3.1.41-yahoo</bookkeeper.version>

<netty.version>4.0.40.Final</netty.version>
<storm.version>0.9.5</storm.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,49 +1073,28 @@ public void testPayloadCorruptionDetection() throws Exception {
assertEquals(new String(msg.getData()), "message-1");
}
}

/**
* Verify: Broker should not replay already acknowledged messages again and should clear them from messageReplay bucket
*
*
* 1. produce messages
* 2. consume messages and ack all except 1 msg
* 3. add already acked messages to PersistentDispatcherMultipleConsumers.messagesToReplay bucket
* 4. replay messages present into messagesToReplay bucket
* 5. Verification
* a. should replay only 1 unacked message
* b. messagesToReplay should not have previously added acked messages
*
*
*
* @throws Exception
* 3. Verification: should replay only 1 unacked message
*/
@Test
public void testMessageReplay() throws Exception {

@Test()
public void testMessageRedelivery() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic2";
final String subName = "sub2";

Message msg;
int totalMessages = 10;
int replayIndex = totalMessages / 2;

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);

Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Producer producer = pulsarClient.createProducer(topicName);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getPersistentSubscription(subName);
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef
.getDispatcher();
Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay");
replayMap.setAccessible(true);
TreeSet<PositionImpl> messagesToReplay = Sets.newTreeSet();

assertNotNull(subRef);

// (1) Produce messages
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
Expand All @@ -1130,41 +1109,25 @@ public void testMessageReplay() throws Exception {
unAckedMsg = msg;
} else {
consumer.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
if (i < replayIndex) {
// (3) accumulate acked messages for replay
messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId()));
}
}
}

/**
* 1. Now: ManagedCursorImpl.individualDeletedMessages => (L:0,L9]
* 2. dispatcher.redeliverUnacknowledgedMessages replays => unackMsg (L:0) + position presents into messagesToReplay
* 3. ManagedCursorImpl.asyncReplayEntries will skip replays for already acked messages and those will be deleted
* from messagesToReplay
* 4. Verify:
* A. dispatcher redelivers msg [L:0] and deletes [L:0] from messagesToReplay bucket
* B. messagesToReplay must be empty
*/
assertEquals(replayIndex - 1, messagesToReplay.size());
replayMap.set(dispatcher, messagesToReplay);
//(4) redeliver : replay-msg + unAcked-msg
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));

// Verify: [4.A] msg [L:0] must be redelivered
consumer.redeliverUnacknowledgedMessages();

// Verify: msg [L:0] must be redelivered
try {
msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(msg.getData(), unAckedMsg.getData());
assertEquals(new String(msg.getData()), new String(unAckedMsg.getData()));
} catch (Exception e) {
fail("msg should be redelivered ", e);
}

// Verify: [4.B] messagesToReplay must be empty
assertEquals(messagesToReplay.size(), 0);
// Verify no other messages are redelivered
msg = consumer.receive(100, TimeUnit.MILLISECONDS);
assertNull(msg);

consumer.close();
producer.close();
}

}