From da2b99dd3399376dd2eba25d4d0dd5d82879afbe Mon Sep 17 00:00:00 2001 From: richardstartin Date: Sat, 18 Dec 2021 12:29:46 +0000 Subject: [PATCH 1/7] inject tombstones to break RT ingestion --- .../integration/tests/BaseClusterIntegrationTest.java | 6 +++++- .../integration/tests/ClusterIntegrationTestUtils.java | 10 +++++++++- .../tests/LLCRealtimeClusterIntegrationTest.java | 5 +++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index ab5e0284f78f..b98c7c45589b 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -473,7 +473,11 @@ protected void pushAvroIntoKafka(List avroFiles) ClusterIntegrationTestUtils .pushAvroIntoKafka(avroFiles, "localhost:" + getKafkaPort(), getKafkaTopic(), getMaxNumKafkaMessagesPerBatch(), - getKafkaMessageHeader(), getPartitionColumn()); + getKafkaMessageHeader(), getPartitionColumn(), injectTombstones()); + } + + protected boolean injectTombstones() { + return false; } protected List getAllAvroFiles() diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index 2b6231cf0156..fc78bf4b2147 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -317,7 +317,8 @@ public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig, * @throws Exception */ public static void pushAvroIntoKafka(List avroFiles, String kafkaBroker, String kafkaTopic, - int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn) + int maxNumKafkaMessagesPerBatch, @Nullable byte[] header, @Nullable String partitionColumn, + boolean injectTombstones) throws Exception { Properties properties = new Properties(); properties.put("metadata.broker.list", kafkaBroker); @@ -329,6 +330,13 @@ public static void pushAvroIntoKafka(List avroFiles, String kafkaBroker, S StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) { + if (injectTombstones) { + // publish lots of tombstones to livelock the consumer if it can't handle this properly + for (int i = 0; i < 1000; i++) { + // publish a tombstone first + producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null); + } + } for (File avroFile : avroFiles) { try (DataFileStream reader = AvroUtils.getAvroReader(avroFile)) { BinaryEncoder binaryEncoder = new EncoderFactory().directBinaryEncoder(outputStream, null); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 3e8dc6a626d6..3c9f8ac08646 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -63,6 +63,11 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); private final long _startTime = System.currentTimeMillis(); + @Override + protected boolean injectTombstones() { + return true; + } + @Override protected boolean useLlc() { return true; From 2d089e9ef7d2203a24093a892edf95c1ac28c530 Mon Sep 17 00:00:00 2001 From: richardstartin Date: Sat, 18 Dec 2021 12:30:43 +0000 Subject: [PATCH 2/7] prevent ingestion from breaking when an entire batch of messsages is filtered out --- .../LLRealtimeSegmentDataManager.java | 39 +++++++++------ .../stream/kafka20/KafkaMessageBatch.java | 25 +++++++--- .../kafka20/KafkaPartitionLevelConsumer.java | 50 ++++++++----------- .../kinesis/KinesisPartitionGroupOffset.java | 2 +- .../stream/pulsar/MessageIdStreamOffset.java | 2 +- .../pinot/spi/stream/LongMsgOffset.java | 2 +- .../apache/pinot/spi/stream/MessageBatch.java | 17 ++++++- .../spi/stream/StreamPartitionMsgOffset.java | 2 +- 8 files changed, 81 insertions(+), 58 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index c68efecc0410..5e6df10b566f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -35,7 +35,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.Utils; @@ -230,6 +232,8 @@ public void deleteSegmentFile() { private final MutableSegmentImpl _realtimeSegment; private volatile StreamPartitionMsgOffset _currentOffset; private volatile State _state; + private static final AtomicIntegerFieldUpdater NUM_ROWS_CONSUMED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(LLRealtimeSegmentDataManager.class, "_numRowsConsumed"); private volatile int _numRowsConsumed = 0; private volatile int _numRowsIndexed = 0; // Can be different from _numRowsConsumed when metrics update is enabled. private volatile int _numRowsErrored = 0; @@ -243,6 +247,8 @@ public void deleteSegmentFile() { private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory; // Segment end criteria + private static final AtomicLongFieldUpdater CONSUME_END_TIME_UPDATER = + AtomicLongFieldUpdater.newUpdater(LLRealtimeSegmentDataManager.class, "_consumeEndTime"); private volatile long _consumeEndTime = 0; private volatile boolean _endOfPartitionGroup = false; private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one @@ -294,15 +300,17 @@ public void deleteSegmentFile() { private boolean endCriteriaReached() { Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state); long now = now(); + long consumeEndTime = _consumeEndTime; switch (_state) { case INITIAL_CONSUMING: // The segment has been created, and we have not posted a segmentConsumed() message on the controller yet. // We need to consume as much data as available, until we have either reached the max number of rows or // the max time we are allowed to consume. - if (now >= _consumeEndTime) { + if (now >= consumeEndTime) { if (_realtimeSegment.getNumDocsIndexed() == 0) { _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS); - _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS); + CONSUME_END_TIME_UPDATER.compareAndSet(this, consumeEndTime, consumeEndTime + + TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS)); return false; } _segmentLogger @@ -347,7 +355,7 @@ private boolean endCriteriaReached() { if (_currentOffset.compareTo(_finalOffset) == 0) { _segmentLogger.info("Caught up to offset={}, state={}", _finalOffset, _state.toString()); return true; - } else if (now >= _consumeEndTime) { + } else if (now >= consumeEndTime) { _segmentLogger.info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString()); return true; } @@ -401,16 +409,12 @@ protected boolean consumeLoop() .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis()); _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); _consecutiveErrorCount = 0; - } catch (TimeoutException e) { - handleTransientStreamErrors(e); - continue; - } catch (TransientConsumerException e) { - handleTransientStreamErrors(e); - continue; } catch (PermanentConsumerException e) { _segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", e); throw e; } catch (Exception e) { + // all exceptions but PermanentConsumerException are handled the same way + // can be a TimeoutException or TransientConsumerException routinely // Unknown exception from stream. Treat as a transient exception. // One such exception seen so far is java.net.SocketTimeoutException handleTransientStreamErrors(e); @@ -423,12 +427,17 @@ protected boolean consumeLoop() consecutiveIdleCount = 0; // We consumed something. Update the highest stream offset as well as partition-consuming metric. // TODO Issue 5359 Need to find a way to bump metrics without getting actual offset value. -// _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, -// _currentOffset.getOffset()); -// _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, -// _currentOffset.getOffset()); + //_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, + //_currentOffset.getOffset()); + //_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, + //_currentOffset.getOffset()); _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1); lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); + } else if (messageBatch.getUnfilteredMessageCount() > 0) { + // we consumed something from the stream but filtered all the content out, + // so we need to advance the offsets to avoid getting stuck + _currentOffset = messageBatch.getLastOffset(); + lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); } else { // We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long // time. @@ -552,14 +561,14 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS _currentOffset = messagesAndOffsets.getNextStreamParitionMsgOffsetAtIndex(index); _numRowsIndexed = _realtimeSegment.getNumDocsIndexed(); - _numRowsConsumed++; streamMessageCount++; + NUM_ROWS_CONSUMED_UPDATER.incrementAndGet(this); } updateCurrentDocumentCountMetrics(); if (streamMessageCount != 0) { _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount, streamMessageCount, _currentOffset); - } else { + } else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) { // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java index db91da788487..05b7f2652aeb 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java @@ -18,10 +18,7 @@ */ package org.apache.pinot.plugin.stream.kafka20; -import java.util.ArrayList; import java.util.List; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.utils.Bytes; import org.apache.pinot.plugin.stream.kafka.MessageAndOffset; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; @@ -30,12 +27,14 @@ public class KafkaMessageBatch implements MessageBatch { - private List _messageList = new ArrayList<>(); + private final List _messageList; + private final int _unfilteredMessageCount; + private final long _lastOffset; - public KafkaMessageBatch(Iterable> iterable) { - for (ConsumerRecord record : iterable) { - _messageList.add(new MessageAndOffset(record.value().get(), record.offset())); - } + public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List batch) { + _messageList = batch; + _lastOffset = lastOffset; + _unfilteredMessageCount = unfilteredMessageCount; } @Override @@ -43,6 +42,11 @@ public int getMessageCount() { return _messageList.size(); } + @Override + public int getUnfilteredMessageCount() { + return _unfilteredMessageCount; + } + @Override public byte[] getMessageAtIndex(int index) { return _messageList.get(index).getMessage().array(); @@ -67,4 +71,9 @@ public long getNextStreamMessageOffsetAtIndex(int index) { public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) { return new LongMsgOffset(_messageList.get(index).getNextOffset()); } + + @Override + public StreamPartitionMsgOffset getLastOffset() { + return new LongMsgOffset(_lastOffset); + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index f816b81dc9d6..19331df8d1bc 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -18,61 +18,53 @@ */ package org.apache.pinot.plugin.stream.kafka20; -import com.google.common.collect.Iterables; -import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.utils.Bytes; +import org.apache.pinot.plugin.stream.kafka.MessageAndOffset; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler implements PartitionLevelConsumer { - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class); public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) { super(clientId, streamConfig, partition); } @Override - public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset, - int timeoutMillis) - throws TimeoutException { + public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset, + int timeoutMillis) { final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset(); return fetchMessages(startOffset, endOffset, timeoutMillis); } - public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) - throws TimeoutException { + public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) { _consumer.seek(_topicPartition, startOffset); ConsumerRecords consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); - final Iterable> messageAndOffsetIterable = - buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset); - return new KafkaMessageBatch(messageAndOffsetIterable); - } - - private Iterable> buildOffsetFilteringIterable( - final List> messageAndOffsets, final long startOffset, final long endOffset) { - return Iterables.filter(messageAndOffsets, input -> { - // Filter messages that are either null or have an offset ∉ [startOffset, endOffset] - return input != null && input.value() != null && input.offset() >= startOffset && (endOffset > input.offset() - || endOffset == -1); - }); - } - - @Override - public void close() - throws IOException { - super.close(); + List> messageAndOffsets = consumerRecords.records(_topicPartition); + List filtered = new ArrayList<>(messageAndOffsets.size()); + long lastOffset = startOffset; + for (ConsumerRecord messageAndOffset : messageAndOffsets) { + if (messageAndOffset != null) { + Bytes message = messageAndOffset.value(); + long offset = messageAndOffset.offset(); + if (offset >= startOffset & (endOffset > offset | endOffset == -1)) { + if (message != null) { + filtered.add(new MessageAndOffset(message.get(), offset)); + } + lastOffset = offset; + } + } + } + return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java index c2b06d7f72f5..4ee247f36528 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java @@ -77,7 +77,7 @@ public KinesisPartitionGroupOffset fromString(String kinesisCheckpointStr) { } @Override - public int compareTo(Object o) { + public int compareTo(StreamPartitionMsgOffset o) { Preconditions.checkNotNull(o); KinesisPartitionGroupOffset other = (KinesisPartitionGroupOffset) o; Preconditions.checkNotNull(other._shardToStartSequenceMap); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/MessageIdStreamOffset.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/MessageIdStreamOffset.java index 45cfca030809..cf23580bc62d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/MessageIdStreamOffset.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/MessageIdStreamOffset.java @@ -60,7 +60,7 @@ public StreamPartitionMsgOffset fromString(String streamPartitionMsgOffsetStr) { } @Override - public int compareTo(Object other) { + public int compareTo(StreamPartitionMsgOffset other) { MessageIdStreamOffset messageIdStreamOffset = (MessageIdStreamOffset) other; return _messageId.compareTo(messageIdStreamOffset.getMessageId()); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java index 5ee78ec83dcb..eb1d4f5d166f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java @@ -42,7 +42,7 @@ public LongMsgOffset(StreamPartitionMsgOffset other) { } @Override - public int compareTo(Object other) { + public int compareTo(StreamPartitionMsgOffset other) { return Long.compare(_offset, ((LongMsgOffset) other)._offset); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 02c721f4d602..e066ea7336fb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -31,11 +31,17 @@ @InterfaceStability.Stable public interface MessageBatch { /** - * - * @return number of messages returned from the stream + * @return number of available messages */ int getMessageCount(); + /** + * @return number of messages returned from the stream + */ + default int getUnfilteredMessageCount() { + return getMessageCount(); + } + /** * Returns the message at a particular index inside a set of messages returned from the stream. * @param index @@ -82,6 +88,13 @@ default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index)); } + /** + * @return last offset in the batch + */ + default StreamPartitionMsgOffset getLastOffset() { + return getNextStreamParitionMsgOffsetAtIndex(getMessageCount() - 1); + } + /** * Returns true if end of the consumer detects that no more records can be read from this partition group for good */ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java index 7a5018c107ad..6315debb3ce7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java @@ -40,7 +40,7 @@ * versions of the stream implementation */ @InterfaceStability.Evolving -public interface StreamPartitionMsgOffset extends Comparable { +public interface StreamPartitionMsgOffset extends Comparable { /** * A serialized representation of the offset object as a String. From 97e5f153b88885e50a9cd21e7bfe310e7544f20d Mon Sep 17 00:00:00 2001 From: richardstartin Date: Sat, 18 Dec 2021 12:58:45 +0000 Subject: [PATCH 3/7] linter appeasement --- .../data/manager/realtime/LLRealtimeSegmentDataManager.java | 2 -- .../plugin/stream/kafka20/KafkaPartitionLevelConsumer.java | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 5e6df10b566f..516aefe428c0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -33,7 +33,6 @@ import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; @@ -87,7 +86,6 @@ import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; -import org.apache.pinot.spi.stream.TransientConsumerException; import org.apache.pinot.spi.utils.CommonConstants.ConsumerState; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.CompletionMode; import org.apache.pinot.spi.utils.IngestionConfigUtils; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index 19331df8d1bc..5e68b5effb53 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -40,8 +40,8 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i } @Override - public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset, - int timeoutMillis) { + public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, + StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) { final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset(); return fetchMessages(startOffset, endOffset, timeoutMillis); From 9c73b6ddc01d9626be812b9b07970e982979808a Mon Sep 17 00:00:00 2001 From: richardstartin Date: Sat, 18 Dec 2021 13:27:31 +0000 Subject: [PATCH 4/7] fix off by one bug for bad batches --- .../data/manager/realtime/LLRealtimeSegmentDataManager.java | 2 +- .../apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java | 4 ++-- .../main/java/org/apache/pinot/spi/stream/MessageBatch.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 516aefe428c0..d9f8b20b7bab 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -434,7 +434,7 @@ protected boolean consumeLoop() } else if (messageBatch.getUnfilteredMessageCount() > 0) { // we consumed something from the stream but filtered all the content out, // so we need to advance the offsets to avoid getting stuck - _currentOffset = messageBatch.getLastOffset(); + _currentOffset = messageBatch.getOffsetOfNextBatch(); lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); } else { // We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java index 05b7f2652aeb..3265893fab2d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java @@ -73,7 +73,7 @@ public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) } @Override - public StreamPartitionMsgOffset getLastOffset() { - return new LongMsgOffset(_lastOffset); + public StreamPartitionMsgOffset getOffsetOfNextBatch() { + return new LongMsgOffset(_lastOffset + 1); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index e066ea7336fb..dfcf369d0687 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -91,7 +91,7 @@ default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index /** * @return last offset in the batch */ - default StreamPartitionMsgOffset getLastOffset() { + default StreamPartitionMsgOffset getOffsetOfNextBatch() { return getNextStreamParitionMsgOffsetAtIndex(getMessageCount() - 1); } From 08394b2dc5f424ebf644437baab0c514002db64a Mon Sep 17 00:00:00 2001 From: richardstartin Date: Mon, 20 Dec 2021 19:35:53 +0000 Subject: [PATCH 5/7] ignore concurrency warnings --- .../realtime/LLRealtimeSegmentDataManager.java | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index d9f8b20b7bab..62c39a6c0c47 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -34,9 +34,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.Utils; @@ -230,8 +228,6 @@ public void deleteSegmentFile() { private final MutableSegmentImpl _realtimeSegment; private volatile StreamPartitionMsgOffset _currentOffset; private volatile State _state; - private static final AtomicIntegerFieldUpdater NUM_ROWS_CONSUMED_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(LLRealtimeSegmentDataManager.class, "_numRowsConsumed"); private volatile int _numRowsConsumed = 0; private volatile int _numRowsIndexed = 0; // Can be different from _numRowsConsumed when metrics update is enabled. private volatile int _numRowsErrored = 0; @@ -245,8 +241,6 @@ public void deleteSegmentFile() { private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory; // Segment end criteria - private static final AtomicLongFieldUpdater CONSUME_END_TIME_UPDATER = - AtomicLongFieldUpdater.newUpdater(LLRealtimeSegmentDataManager.class, "_consumeEndTime"); private volatile long _consumeEndTime = 0; private volatile boolean _endOfPartitionGroup = false; private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one @@ -298,17 +292,15 @@ public void deleteSegmentFile() { private boolean endCriteriaReached() { Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state); long now = now(); - long consumeEndTime = _consumeEndTime; switch (_state) { case INITIAL_CONSUMING: // The segment has been created, and we have not posted a segmentConsumed() message on the controller yet. // We need to consume as much data as available, until we have either reached the max number of rows or // the max time we are allowed to consume. - if (now >= consumeEndTime) { + if (now >= _consumeEndTime) { if (_realtimeSegment.getNumDocsIndexed() == 0) { _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS); - CONSUME_END_TIME_UPDATER.compareAndSet(this, consumeEndTime, consumeEndTime - + TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS)); + _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS); return false; } _segmentLogger @@ -353,7 +345,7 @@ private boolean endCriteriaReached() { if (_currentOffset.compareTo(_finalOffset) == 0) { _segmentLogger.info("Caught up to offset={}, state={}", _finalOffset, _state.toString()); return true; - } else if (now >= consumeEndTime) { + } else if (now >= _consumeEndTime) { _segmentLogger.info("Past max time budget: offset={}, state={}", _currentOffset, _state.toString()); return true; } @@ -559,8 +551,8 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS _currentOffset = messagesAndOffsets.getNextStreamParitionMsgOffsetAtIndex(index); _numRowsIndexed = _realtimeSegment.getNumDocsIndexed(); + _numRowsConsumed++; streamMessageCount++; - NUM_ROWS_CONSUMED_UPDATER.incrementAndGet(this); } updateCurrentDocumentCountMetrics(); if (streamMessageCount != 0) { From 29e5d20c381bc40076071603d98c068f8c8420ce Mon Sep 17 00:00:00 2001 From: richardstartin Date: Tue, 21 Dec 2021 05:55:08 +0000 Subject: [PATCH 6/7] javadoc for KafkaMessageBatch --- .../pinot/plugin/stream/kafka20/KafkaMessageBatch.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java index 3265893fab2d..081334d91442 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java @@ -31,6 +31,11 @@ public class KafkaMessageBatch implements MessageBatch { private final int _unfilteredMessageCount; private final long _lastOffset; + /** + * @param unfilteredMessageCount how many messages were received from the topic before being filtered + * @param lastOffset the offset of the last message in the batch + * @param batch the messages, which may be smaller than {@see unfilteredMessageCount} + */ public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List batch) { _messageList = batch; _lastOffset = lastOffset; From 452bbbe2589efd0c5c7b97878c9d9984b49ecc48 Mon Sep 17 00:00:00 2001 From: richardstartin Date: Tue, 21 Dec 2021 10:38:48 +0000 Subject: [PATCH 7/7] remove redundant null check --- .../kafka20/KafkaPartitionLevelConsumer.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index 5e68b5effb53..68bbc9e49f3b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -54,15 +54,13 @@ public MessageBatch fetchMessages(long startOffset, long endOffset, int List filtered = new ArrayList<>(messageAndOffsets.size()); long lastOffset = startOffset; for (ConsumerRecord messageAndOffset : messageAndOffsets) { - if (messageAndOffset != null) { - Bytes message = messageAndOffset.value(); - long offset = messageAndOffset.offset(); - if (offset >= startOffset & (endOffset > offset | endOffset == -1)) { - if (message != null) { - filtered.add(new MessageAndOffset(message.get(), offset)); - } - lastOffset = offset; + Bytes message = messageAndOffset.value(); + long offset = messageAndOffset.offset(); + if (offset >= startOffset & (endOffset > offset | endOffset == -1)) { + if (message != null) { + filtered.add(new MessageAndOffset(message.get(), offset)); } + lastOffset = offset; } } return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);