diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index c8d49d370f2a..5c7edba4cfac 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -317,7 +317,7 @@ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, - numPartitionGroups, numReplicas, newPartitionGroupMetadataList, false); + numPartitionGroups, numReplicas, newPartitionGroupMetadataList); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, instancePartitionsMap); @@ -875,8 +875,12 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevel if (idealState.isEnabled()) { List currentPartitionGroupConsumptionStatusList = getPartitionGroupConsumptionStatusList(idealState, streamConfig); + // Read the smallest offset when a new partition is detected + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); List newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + streamConfig.setOffsetCriteria(originalOffsetCriteria); return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList); } else { LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName); @@ -1196,7 +1200,7 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) { String newSegmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, - numPartitions, numReplicas, newPartitionGroupMetadataList, true); + numPartitions, numReplicas, newPartitionGroupMetadataList); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap); } @@ -1206,14 +1210,11 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS } private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) { - Map streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap()); - streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties - .constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), - OffsetCriteria.SMALLEST_OFFSET_CRITERIA.getOffsetString()); - StreamConfig smallestOffsetCriteriaStreamConfig = - new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria); + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); List smallestOffsetCriteriaPartitionGroupMetadata = - getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig, Collections.emptyList()); + getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); + streamConfig.setOffsetCriteria(originalOffsetCriteria); StreamPartitionMsgOffset partitionStartOffset = null; for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) { if (info.getPartitionGroupId() == partitionGroupId) { @@ -1235,16 +1236,10 @@ private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, */ private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions, - int numPartitionGroups, int numReplicas, List partitionGroupMetadataList, - boolean isLiveTable) { + int numPartitionGroups, int numReplicas, List partitionGroupMetadataList) { String realtimeTableName = tableConfig.getTableName(); int partitionGroupId = partitionGroupMetadata.getPartitionGroupId(); - StreamPartitionMsgOffset startOffset; - if (isLiveTable) { - startOffset = getPartitionGroupSmallestOffset(streamConfig, partitionGroupId); - } else { - startOffset = partitionGroupMetadata.getStartOffset(); - } + String startOffset = partitionGroupMetadata.getStartOffset().toString(); LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName); String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); @@ -1252,8 +1247,7 @@ private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStr new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs); String newSegmentName = newLLCSegmentName.getSegmentName(); - CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, - startOffset.toString(), 0); + CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs, committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas, partitionGroupMetadataList); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index 34e3a625abc7..73fb3c620dee 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -63,7 +63,6 @@ public enum ConsumerType { private final String _tableNameWithType; private final List _consumerTypes = new ArrayList<>(); private final String _consumerFactoryClassName; - private final OffsetCriteria _offsetCriteria; private final String _decoderClass; private final Map _decoderProperties = new HashMap<>(); @@ -79,6 +78,9 @@ public enum ConsumerType { private final Map _streamConfigMap = new HashMap<>(); + // Allow overriding it to use different offset criteria + private OffsetCriteria _offsetCriteria; + /** * Initializes a StreamConfig using the map of stream configs from the table config */ @@ -284,6 +286,10 @@ public OffsetCriteria getOffsetCriteria() { return _offsetCriteria; } + public void setOffsetCriteria(OffsetCriteria offsetCriteria) { + _offsetCriteria = offsetCriteria; + } + public String getDecoderClass() { return _decoderClass; }