Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java Reader Client] Start reader inside batch result in read first message in batch. #6345

Merged
merged 8 commits into from
Feb 24, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -69,8 +71,10 @@
import java.util.stream.Collectors;

import lombok.Cleanup;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand Down Expand Up @@ -113,15 +117,15 @@ protected void setup() throws Exception {
public static Object[][] variationsForExpectedPos() {
return new Object[][] {
// batching / start-inclusive / num-of-messages
{true, true, 10 },
{true, false, 10 },
{false, true, 10 },
{false, false, 10 },

{true, true, 100 },
{true, false, 100 },
{false, true, 100 },
{false, false, 100 },
{true, true, 10, "1"},
{true, true, 100, "2"},
{true, false, 10, "3"},
{true, false, 100, "4"},

{false, true, 10, "5"},
{false, true, 100, "6"},
{false, false, 10, "7"},
{false, false, 100, "8"},
};
}

Expand Down Expand Up @@ -1419,7 +1423,6 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -1736,7 +1739,6 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
* Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -3219,9 +3221,9 @@ public void testPartitionedTopicWithOnePartition() throws Exception {
}

@Test(dataProvider = "variationsForExpectedPos")
public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages, String suffix)
throws Exception {
final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos";
final String topicName = "persistent://my-property/my-ns/ConsumerStartMessageIdAtExpectedPos" + suffix;
final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset
final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset

Expand All @@ -3230,14 +3232,30 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star
.enableBatching(batching)
.create();

MessageId resetPos = null;
CountDownLatch latch = new CountDownLatch(numOfMessages);

final AtomicReference<MessageId> resetPos = new AtomicReference<>();

for (int i = 0; i < numOfMessages; i++) {
MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
if (resetIndex == i) {
resetPos = msgId;
}

final int j = i;

producer.sendAsync(String.format("msg num %d", i).getBytes())
.thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
.whenComplete((p, e) -> {
if (e != null) {
fail("send msg failed due to " + e.getMessage());
} else {
if (p.getLeft() == resetIndex) {
resetPos.set(p.getRight());
}
}
latch.countDown();
});
}

latch.await();

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic(topicName);

Expand All @@ -3246,7 +3264,7 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star
}

Consumer<byte[]> consumer = consumerBuilder.subscriptionName("my-subscriber-name").subscribe();
consumer.seek(resetPos);
consumer.seek(resetPos.get());
Set<String> messageSet = Sets.newHashSet();
for (int i = firstMessage; i < numOfMessages; i++) {
Message<byte[]> message = consumer.receive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Expand All @@ -30,12 +31,16 @@
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,15 +70,15 @@ protected void cleanup() throws Exception {
public static Object[][] variationsForExpectedPos() {
return new Object[][] {
// batching / start-inclusive / num-of-messages
{true, true, 10 },
{true, false, 10 },
{false, true, 10 },
{false, false, 10 },

{true, true, 100 },
{true, false, 100 },
{false, true, 100 },
{false, false, 100 },
{true, true, 10, "1"},
{true, true, 100, "2"},
{true, false, 10, "3"},
{true, false, 100, "4"},

{false, true, 10, "5"},
{false, true, 100, "6"},
{false, false, 10, "7"},
{false, false, 100, "8"},
};
}

Expand Down Expand Up @@ -688,9 +693,9 @@ public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
}

@Test(dataProvider = "variationsForExpectedPos")
public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages)
public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean startInclusive, int numOfMessages, String suffix)
throws Exception {
final String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos";
final String topicName = "persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos" + suffix;
final int resetIndex = new Random().nextInt(numOfMessages); // Choose some random index to reset
final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; // First message of reset

Expand All @@ -699,17 +704,33 @@ public void testReaderStartMessageIdAtExpectedPos(boolean batching, boolean star
.enableBatching(batching)
.create();

MessageId resetPos = null;
CountDownLatch latch = new CountDownLatch(numOfMessages);

final AtomicReference<MessageId> resetPos = new AtomicReference<>();

for (int i = 0; i < numOfMessages; i++) {
MessageId msgId = producer.send(String.format("msg num %d", i).getBytes());
if (resetIndex == i) {
resetPos = msgId;
}

final int j = i;

producer.sendAsync(String.format("msg num %d", i).getBytes())
.thenCompose(messageId -> FutureUtils.value(Pair.of(j, messageId)))
.whenComplete((p, e) -> {
if (e != null) {
fail("send msg failed due to " + e.getMessage());
} else {
if (p.getLeft() == resetIndex) {
resetPos.set(p.getRight());
}
}
latch.countDown();
});
}

latch.await();

ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader()
.topic(topicName)
.startMessageId(resetPos);
.startMessageId(resetPos.get());

if (startInclusive) {
readerBuilder.startMessageIdInclusive();
Expand Down Expand Up @@ -761,4 +782,43 @@ public void testReaderBuilderConcurrentCreate() throws Exception {
producers.get(i).close();
}
}

@Test
public void testReaderStartInMiddleOfBatch() throws Exception {
final String topicName = "persistent://my-property/my-ns/ReaderStartInMiddleOfBatch";
final int numOfMessage = 100;

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(true)
.batchingMaxMessages(10)
.create();

CountDownLatch latch = new CountDownLatch(100);

List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());

for (int i = 0; i < numOfMessage; i++) {
producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
if (e != null) {
fail();
} else {
allIds.add(mid);
}
latch.countDown();
});
}

latch.await();

for (MessageId id : allIds) {
Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
.startMessageId(id).startMessageIdInclusive().create();
MessageId idGot = reader.readNext().getMessageId();
assertEquals(idGot, id);
reader.close();
}

producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class BatchReceivePolicy {
/**
* Default batch receive policy.
*
* <p>Max number of messages: 100
* <p>Max number of messages: no limit
* Max number of bytes: 10MB
* Timeout: 100ms<p/>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
}

protected boolean hasEnoughMessagesForBatchReceive() {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumMessages() <= 0) {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
return false;
}
return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {

if (isResetIncludedAndSameEntryLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
Expand Down Expand Up @@ -1018,7 +1018,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
singleMessageMetadataBuilder, i, batchSize);

if (isResetIncludedAndSameEntryLedger(messageId) && isPriorBatchIndex(i)) {
if (isNonDurableAndSameEntryAndLedger(messageId) && isPriorBatchIndex(i)) {
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1091,8 +1091,8 @@ private boolean isPriorBatchIndex(long idx) {
return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
}

private boolean isResetIncludedAndSameEntryLedger(MessageIdData messageId) {
return !resetIncludeHead && startMessageId != null
private boolean isNonDurableAndSameEntryAndLedger(MessageIdData messageId) {
return startMessageId != null
&& messageId.getLedgerId() == startMessageId.getLedgerId()
&& messageId.getEntryId() == startMessageId.getEntryId();
}
Expand Down