Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix realtime ingestion when an entire batch of messages is filtered out #7927

Merged
merged 7 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -85,7 +84,6 @@
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.stream.TransientConsumerException;
import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.CompletionMode;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
Expand Down Expand Up @@ -401,16 +399,12 @@ protected boolean consumeLoop()
.fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
_endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
_consecutiveErrorCount = 0;
} catch (TimeoutException e) {
handleTransientStreamErrors(e);
continue;
} catch (TransientConsumerException e) {
handleTransientStreamErrors(e);
continue;
} catch (PermanentConsumerException e) {
_segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", e);
throw e;
} catch (Exception e) {
// all exceptions but PermanentConsumerException are handled the same way
// can be a TimeoutException or TransientConsumerException routinely
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collapsed identical catch blocks for the sake of the person reading the code.

// Unknown exception from stream. Treat as a transient exception.
// One such exception seen so far is java.net.SocketTimeoutException
handleTransientStreamErrors(e);
Expand All @@ -423,12 +417,17 @@ protected boolean consumeLoop()
consecutiveIdleCount = 0;
// We consumed something. Update the highest stream offset as well as partition-consuming metric.
// TODO Issue 5359 Need to find a way to bump metrics without getting actual offset value.
// _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
// _currentOffset.getOffset());
// _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
// _currentOffset.getOffset());
//_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
//_currentOffset.getOffset());
//_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
//_currentOffset.getOffset());
_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
} else if (messageBatch.getUnfilteredMessageCount() > 0) {
// we consumed something from the stream but filtered all the content out,
// so we need to advance the offsets to avoid getting stuck
_currentOffset = messageBatch.getOffsetOfNextBatch();
lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
} else {
// We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long
// time.
Expand Down Expand Up @@ -559,7 +558,7 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
if (streamMessageCount != 0) {
_segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount,
streamMessageCount, _currentOffset);
} else {
} else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prevents unnecessary latency when there has been a bad batch, there is probably data waiting to be consumed.

// If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,11 @@ protected void pushAvroIntoKafka(List<File> avroFiles)

ClusterIntegrationTestUtils
.pushAvroIntoKafka(avroFiles, "localhost:" + getKafkaPort(), getKafkaTopic(), getMaxNumKafkaMessagesPerBatch(),
getKafkaMessageHeader(), getPartitionColumn());
getKafkaMessageHeader(), getPartitionColumn(), injectTombstones());
}

protected boolean injectTombstones() {
return false;
}

protected List<File> getAllAvroFiles()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig,
* @throws Exception
*/
public static void pushAvroIntoKafka(List<File> avroFiles, String kafkaBroker, String kafkaTopic,
int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn)
int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn,
boolean injectTombstones)
throws Exception {
Properties properties = new Properties();
properties.put("metadata.broker.list", kafkaBroker);
Expand All @@ -329,6 +330,13 @@ public static void pushAvroIntoKafka(List<File> avroFiles, String kafkaBroker, S
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);

try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
if (injectTombstones) {
// publish lots of tombstones to livelock the consumer if it can't handle this properly
for (int i = 0; i < 1000; i++) {
// publish a tombstone first
producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
}
}
for (File avroFile : avroFiles) {
try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) {
BinaryEncoder binaryEncoder = new EncoderFactory().directBinaryEncoder(outputStream, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
private final long _startTime = System.currentTimeMillis();

@Override
protected boolean injectTombstones() {
return true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set this to false to make the test pass at eb6800da96a44e6f5125097cc99a368c2f8f8847

}

@Override
protected boolean useLlc() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
*/
package org.apache.pinot.plugin.stream.kafka20;

import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
Expand All @@ -30,19 +27,31 @@

public class KafkaMessageBatch implements MessageBatch<byte[]> {

private List<MessageAndOffset> _messageList = new ArrayList<>();
private final List<MessageAndOffset> _messageList;
private final int _unfilteredMessageCount;
private final long _lastOffset;

public KafkaMessageBatch(Iterable<ConsumerRecord<String, Bytes>> iterable) {
for (ConsumerRecord<String, Bytes> record : iterable) {
_messageList.add(new MessageAndOffset(record.value().get(), record.offset()));
}
/**
* @param unfilteredMessageCount how many messages were received from the topic before being filtered
* @param lastOffset the offset of the last message in the batch
* @param batch the messages, which may be smaller than {@see unfilteredMessageCount}
*/
public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<MessageAndOffset> batch) {
_messageList = batch;
_lastOffset = lastOffset;
_unfilteredMessageCount = unfilteredMessageCount;
}

@Override
public int getMessageCount() {
return _messageList.size();
}

@Override
public int getUnfilteredMessageCount() {
return _unfilteredMessageCount;
}

@Override
public byte[] getMessageAtIndex(int index) {
return _messageList.get(index).getMessage().array();
Expand All @@ -67,4 +76,9 @@ public long getNextStreamMessageOffsetAtIndex(int index) {
public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) {
return new LongMsgOffset(_messageList.get(index).getNextOffset());
}

@Override
public StreamPartitionMsgOffset getOffsetOfNextBatch() {
return new LongMsgOffset(_lastOffset + 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,51 @@
*/
package org.apache.pinot.plugin.stream.kafka20;

import com.google.common.collect.Iterables;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler
implements PartitionLevelConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);

public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
super(clientId, streamConfig, partition);
}

@Override
public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
int timeoutMillis)
throws TimeoutException {
public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
return fetchMessages(startOffset, endOffset, timeoutMillis);
}

public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
throws TimeoutException {
public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
_consumer.seek(_topicPartition, startOffset);
ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
final Iterable<ConsumerRecord<String, Bytes>> messageAndOffsetIterable =
buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
return new KafkaMessageBatch(messageAndOffsetIterable);
}

private Iterable<ConsumerRecord<String, Bytes>> buildOffsetFilteringIterable(
final List<ConsumerRecord<String, Bytes>> messageAndOffsets, final long startOffset, final long endOffset) {
return Iterables.filter(messageAndOffsets, input -> {
// Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
return input != null && input.value() != null && input.offset() >= startOffset && (endOffset > input.offset()
|| endOffset == -1);
});
}

@Override
public void close()
throws IOException {
super.close();
List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
List<MessageAndOffset> filtered = new ArrayList<>(messageAndOffsets.size());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that a list was being materialised anyway in KafkaMessageBatch, it's just easier to do it here because we can also capture the last offset. This is likely more efficient than using Iterables.filter anyway.

long lastOffset = startOffset;
for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
Bytes message = messageAndOffset.value();
long offset = messageAndOffset.offset();
if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
if (message != null) {
filtered.add(new MessageAndOffset(message.get(), offset));
}
lastOffset = offset;
}
}
return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public KinesisPartitionGroupOffset fromString(String kinesisCheckpointStr) {
}

@Override
public int compareTo(Object o) {
public int compareTo(StreamPartitionMsgOffset o) {
Preconditions.checkNotNull(o);
KinesisPartitionGroupOffset other = (KinesisPartitionGroupOffset) o;
Preconditions.checkNotNull(other._shardToStartSequenceMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public StreamPartitionMsgOffset fromString(String streamPartitionMsgOffsetStr) {
}

@Override
public int compareTo(Object other) {
public int compareTo(StreamPartitionMsgOffset other) {
MessageIdStreamOffset messageIdStreamOffset = (MessageIdStreamOffset) other;
return _messageId.compareTo(messageIdStreamOffset.getMessageId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public LongMsgOffset(StreamPartitionMsgOffset other) {
}

@Override
public int compareTo(Object other) {
public int compareTo(StreamPartitionMsgOffset other) {
return Long.compare(_offset, ((LongMsgOffset) other)._offset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@
@InterfaceStability.Stable
public interface MessageBatch<T> {
/**
*
* @return number of messages returned from the stream
* @return number of available messages
*/
int getMessageCount();

/**
* @return number of messages returned from the stream
*/
default int getUnfilteredMessageCount() {
return getMessageCount();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented for Kafka 2.0 only.

}

/**
* Returns the message at a particular index inside a set of messages returned from the stream.
* @param index
Expand Down Expand Up @@ -82,6 +88,13 @@ default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index
return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
}

/**
* @return last offset in the batch
*/
default StreamPartitionMsgOffset getOffsetOfNextBatch() {
return getNextStreamParitionMsgOffsetAtIndex(getMessageCount() - 1);
}

/**
* Returns true if end of the consumer detects that no more records can be read from this partition group for good
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* versions of the stream implementation
*/
@InterfaceStability.Evolving
public interface StreamPartitionMsgOffset extends Comparable {
public interface StreamPartitionMsgOffset extends Comparable<StreamPartitionMsgOffset> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This caused a lot of warnings which make code harder to read, and helps the compiler to prevent heap pollution bugs.


/**
* A serialized representation of the offset object as a String.
Expand Down