diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java index 06ee990e9e8c..d50e3993fb6b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; @@ -204,28 +205,33 @@ public String getStatusDescription() { /** * Service status callback that checks whether realtime consumption has caught up - * TODO: In this initial version, we are simply adding a configurable static wait time - * This can be made smarter: - * 1) Keep track of average consumption rate for table in server stats - * 2) Monitor consumption rate during startup, report GOOD when it stabilizes to average rate - * 3) Monitor consumption rate during startup, report GOOD if it is idle + * An offset based consumption status checker is being added in two phases. First phase adds the new status checker, + * but it doesn't apply its output. Instead it only logs its behavior. When the behavior is analysed and approved + * for different tables with different consumption rates, we can safely use the new status checker. + * (Another approach would be to define a new config and disable it by default. Since this feature is not urgent, + * we decided to not define yet another config and go with this two phase approach) */ public static class RealtimeConsumptionCatchupServiceStatusCallback implements ServiceStatusCallback { private final long _endWaitTime; private final Status _serviceStatus = Status.STARTING; + private final Supplier _getNumConsumingSegmentsNotReachedTheirLatestOffset; String _statusDescription = STATUS_DESCRIPTION_INIT; + private boolean _consumptionNotYetCaughtUp = true; + /** * Realtime consumption catchup service which adds a static wait time for consuming segments to catchup */ public RealtimeConsumptionCatchupServiceStatusCallback(HelixManager helixManager, String clusterName, - String instanceName, long realtimeConsumptionCatchupWaitMs) { + String instanceName, long realtimeConsumptionCatchupWaitMs, + Supplier getNumConsumingSegmentsNotReachedTheirLatestOffset) { // A consuming segment will actually be ready to serve queries after (time of creation of partition consumer) + // (configured max time to catchup) // We are approximating it to (time of server startup) + (configured max time to catch up) _endWaitTime = System.currentTimeMillis() + realtimeConsumptionCatchupWaitMs; + _getNumConsumingSegmentsNotReachedTheirLatestOffset = getNumConsumingSegmentsNotReachedTheirLatestOffset; LOGGER.info("Monitoring realtime consumption catchup. Will allow {} ms before marking status GOOD", realtimeConsumptionCatchupWaitMs); } @@ -236,13 +242,27 @@ public synchronized Status getServiceStatus() { return _serviceStatus; } long now = System.currentTimeMillis(); - if (now < _endWaitTime) { - _statusDescription = - String.format("Waiting for consuming segments to catchup, timeRemaining=%dms", _endWaitTime - now); - return Status.STARTING; + int numConsumingSegmentsNotCaughtUp = _getNumConsumingSegmentsNotReachedTheirLatestOffset.get(); + if (now >= _endWaitTime) { + _statusDescription = String.format("Consuming segments status GOOD since %dms " + + "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime, numConsumingSegmentsNotCaughtUp); + return Status.GOOD; } - _statusDescription = String.format("Consuming segments status GOOD since %dms", _endWaitTime); - return Status.GOOD; + if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) { + // TODO: Once the performance of offset based consumption checker is validated: + // - remove the log line + // - uncomment the status & statusDescription lines + // - remove variable _consumptionNotYetCaughtUp + _consumptionNotYetCaughtUp = false; + LOGGER.info("All consuming segments have reached their latest offsets! " + + "Finished {} msec earlier than time threshold.", _endWaitTime - now); +// _statusDescription = "Consuming segments status GOOD as all consuming segments have reached the latest offset"; +// return Status.GOOD; + } + _statusDescription = + String.format("Waiting for consuming segments to catchup: numConsumingSegmentsNotCaughtUp=%d, " + + "timeRemaining=%dms", numConsumingSegmentsNotCaughtUp, _endWaitTime - now); + return Status.STARTING; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 17ab449bf1f5..de5268f9a05d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -72,6 +72,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.metrics.PinotMeter; import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; @@ -287,6 +288,8 @@ public void deleteSegmentFile() { private final boolean _nullHandlingEnabled; private final SegmentCommitterFactory _segmentCommitterFactory; + private volatile StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime = null; + // TODO each time this method is called, we print reason for stop. Good to print only once. private boolean endCriteriaReached() { Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state); @@ -762,11 +765,14 @@ public long getLastConsumedTimestamp() { return _lastLogTime; } - @VisibleForTesting - protected StreamPartitionMsgOffset getCurrentOffset() { + public StreamPartitionMsgOffset getCurrentOffset() { return _currentOffset; } + public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() { + return _latestStreamOffsetAtStartupTime; + } + @VisibleForTesting protected SegmentBuildDescriptor getSegmentBuildDescriptor() { return _segmentBuildDescriptor; @@ -1363,6 +1369,16 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo } _state = State.INITIAL_CONSUMING; + // fetch latest stream offset + try (StreamMetadataProvider metadataProvider = _streamConsumerFactory + .createPartitionMetadataProvider(_clientId, _partitionGroupId)) { + _latestStreamOffsetAtStartupTime = metadataProvider + .fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000); + } catch (Exception e) { + _segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId, + _partitionGroupId); + } + long now = now(); _consumeStartTime = now; long maxConsumeTimeMillis = _partitionLevelStreamConfig.getFlushThresholdTimeMillis(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index e58d020cd521..9f964d5e668a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -215,8 +215,8 @@ private void registerServiceStatusHandler() { // collect all resources which have this instance in the ideal state List resourcesToMonitor = new ArrayList<>(); - // if even 1 resource has this instance in ideal state with state CONSUMING, set this to true - boolean foundConsuming = false; + + Set consumingSegments = new HashSet<>(); boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0; for (String resourceName : _helixAdmin.getResourcesInCluster(_helixClusterName)) { @@ -235,12 +235,11 @@ private void registerServiceStatusHandler() { break; } } - if (checkRealtime && !foundConsuming && TableNameBuilder.isRealtimeTableResource(resourceName)) { + if (checkRealtime && TableNameBuilder.isRealtimeTableResource(resourceName)) { for (String partitionName : idealState.getPartitionSet()) { if (StateModel.SegmentStateModel.CONSUMING .equals(idealState.getInstanceStateMap(partitionName).get(_instanceId))) { - foundConsuming = true; - break; + consumingSegments.add(partitionName); } } } @@ -255,10 +254,14 @@ private void registerServiceStatusHandler() { serviceStatusCallbackListBuilder.add( new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _helixClusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup)); + boolean foundConsuming = !consumingSegments.isEmpty(); if (checkRealtime && foundConsuming) { + OffsetBasedConsumptionStatusChecker consumptionStatusChecker = + new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments); serviceStatusCallbackListBuilder.add( new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName, - _instanceId, realtimeConsumptionCatchupWaitMs)); + _instanceId, realtimeConsumptionCatchupWaitMs, + consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset)); } LOGGER.info("Registering service status handler"); ServiceStatus.setServiceStatusCallback(_instanceId, diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java new file mode 100644 index 000000000000..e67eacdb09ad --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.server.starter.helix; + +import java.util.HashSet; +import java.util.Set; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class is used at startup time to have a more accurate estimate of the catchup period in which no query execution + * happens and consumers try to catch up to the latest messages available in streams. + * To achieve this, every time status check is called - {@link #getNumConsumingSegmentsNotReachedTheirLatestOffset} - + * for each consuming segment, we check if segment's latest ingested offset has reached the latest stream offset that's + * fetched once at startup time. + */ +public class OffsetBasedConsumptionStatusChecker { + private static final Logger LOGGER = LoggerFactory.getLogger(OffsetBasedConsumptionStatusChecker.class); + + // constructor parameters + private final InstanceDataManager _instanceDataManager; + private final Set _consumingSegments; + + // helper variable + private final Set _caughtUpSegments = new HashSet<>(); + + public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set consumingSegments) { + _instanceDataManager = instanceDataManager; + _consumingSegments = consumingSegments; + } + + public int getNumConsumingSegmentsNotReachedTheirLatestOffset() { + for (String segName : _consumingSegments) { + if (_caughtUpSegments.contains(segName)) { + continue; + } + TableDataManager tableDataManager = getTableDataManager(segName); + if (tableDataManager == null) { + LOGGER.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", segName); + continue; + } + SegmentDataManager segmentDataManager = null; + try { + segmentDataManager = tableDataManager.acquireSegment(segName); + if (segmentDataManager == null) { + LOGGER + .info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later", segName); + continue; + } + if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) { + // There's a possibility that a consuming segment has converted to a committed segment. If that's the case, + // segment data manager will not be of type LLRealtime. + LOGGER.info("Segment {} is already committed and is considered caught up.", segName); + _caughtUpSegments.add(segName); + continue; + } + LLRealtimeSegmentDataManager rtSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager; + StreamPartitionMsgOffset latestIngestedOffset = rtSegmentDataManager.getCurrentOffset(); + StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.getLatestStreamOffsetAtStartupTime(); + if (latestStreamOffset == null || latestIngestedOffset == null) { + LOGGER.info("Null offset found for segment {} - latest stream offset: {}, latest ingested offset: {}. " + + "Will check consumption status later", segName, latestStreamOffset, latestIngestedOffset); + continue; + } + if (latestIngestedOffset.compareTo(latestStreamOffset) < 0) { + LOGGER.info("Latest ingested offset {} in segment {} is smaller than stream latest available offset {} ", + latestIngestedOffset, segName, latestStreamOffset); + continue; + } + LOGGER.info("Segment {} with latest ingested offset {} has caught up to the latest stream offset {}", segName, + latestIngestedOffset, latestStreamOffset); + _caughtUpSegments.add(segName); + } finally { + if (segmentDataManager != null) { + tableDataManager.releaseSegment(segmentDataManager); + } + } + } + return _consumingSegments.size() - _caughtUpSegments.size(); + } + + private TableDataManager getTableDataManager(String segmentName) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + String tableName = llcSegmentName.getTableName(); + String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); + return _instanceDataManager.getTableDataManager(tableNameWithType); + } +} diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java new file mode 100644 index 000000000000..be03eefcb137 --- /dev/null +++ b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.server.starter.helix; + +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.stream.LongMsgOffset; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.*; + + +public class OffsetBasedConsumptionStatusCheckerTest { + + @Test + public void regularCase() { + + String segA0 = "tableA__0__0__123Z"; + String segA1 = "tableA__1__0__123Z"; + String segB0 = "tableB__0__0__123Z"; + Set consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + OffsetBasedConsumptionStatusChecker statusChecker = + new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments); + + // setup TableDataMangers + TableDataManager tableDataManagerA = mock(TableDataManager.class); + TableDataManager tableDataManagerB = mock(TableDataManager.class); + when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA); + when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); + + // setup SegmentDataManagers + LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); + LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); + LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); + when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); + when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); + when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15)); + when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150)); + when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500)); + + // latest ingested offset latest stream offset + // segA0 10 15 + // segA1 100 150 + // segB0 1000 1500 + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100)); + when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3); + + // latest ingested offset latest stream offset + // segA0 20 15 + // segA1 200 150 + // segB0 2000 1500 + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200)); + when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 0); + } + + @Test + public void dataMangersBeingSetup() { + + String segA0 = "tableA__0__0__123Z"; + String segA1 = "tableA__1__0__123Z"; + String segB0 = "tableB__0__0__123Z"; + Set consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + + OffsetBasedConsumptionStatusChecker statusChecker = + new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments); + + // TableDataManager is not set up yet + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3); + + // setup TableDataMangers + TableDataManager tableDataManagerA = mock(TableDataManager.class); + TableDataManager tableDataManagerB = mock(TableDataManager.class); + when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA); + when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); + + // setup some SegmentDataManagers + LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); + LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); + when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); + when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); + + // latest ingested offset latest stream offset + // segA0 10 15 + // segA1 100 150 + // segB0 not setup yet 1500 + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100)); + when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15)); + when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3); + + // setup the remaining SegmentDataManager + LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); + + // latest ingested offset latest stream offset + // segA0 20 15 + // segA1 200 150 + // segB0 1000 1500 + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200)); + when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000)); + when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1); + + // latest ingested offset latest stream offset + // segA0 30 15 + // segA1 300 150 + // segB0 2000 1500 + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(30)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300)); + when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 0); + } + + @Test + public void segmentsBeingCommitted() { + + String segA0 = "tableA__0__0__123Z"; + String segA1 = "tableA__1__0__123Z"; + String segB0 = "tableB__0__0__123Z"; + Set consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + OffsetBasedConsumptionStatusChecker statusChecker = + new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments); + + // setup TableDataMangers + TableDataManager tableDataManagerA = mock(TableDataManager.class); + TableDataManager tableDataManagerB = mock(TableDataManager.class); + when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA); + when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); + + // setup SegmentDataManagers + LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); + LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); + LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); + when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); + when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); + + // latest ingested offset latest stream offset + // segA0 10 15 + // segA1 100 150 + // segB0 1000 1500 + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100)); + when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000)); + when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15)); + when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150)); + when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(1500)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3); + + // segB0 is now committed; ImmutableSegmentDataManager is returned by table data manager + ImmutableSegmentDataManager immSegMngrB0 = mock(ImmutableSegmentDataManager.class); + when(tableDataManagerB.acquireSegment(segB0)).thenReturn(immSegMngrB0); + + // latest ingested offset latest stream offset + // segA0 20 15 + // segA1 200 150 + // segB0 committed at 1200 1500 + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 0); + } + + @Test + public void cannotGetLatestStreamOffset() { + + String segA0 = "tableA__0__0__123Z"; + String segA1 = "tableA__1__0__123Z"; + String segB0 = "tableB__0__0__123Z"; + Set consumingSegments = ImmutableSet.of(segA0, segA1, segB0); + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + OffsetBasedConsumptionStatusChecker statusChecker = + new OffsetBasedConsumptionStatusChecker(instanceDataManager, consumingSegments); + + // setup TableDataMangers + TableDataManager tableDataManagerA = mock(TableDataManager.class); + TableDataManager tableDataManagerB = mock(TableDataManager.class); + when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA); + when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB); + + // setup SegmentDataManagers + LLRealtimeSegmentDataManager segMngrA0 = mock(LLRealtimeSegmentDataManager.class); + LLRealtimeSegmentDataManager segMngrA1 = mock(LLRealtimeSegmentDataManager.class); + LLRealtimeSegmentDataManager segMngrB0 = mock(LLRealtimeSegmentDataManager.class); + when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0); + when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1); + when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0); + + // latest ingested offset latest stream offset + // segA0 10 15 + // segA1 100 150 + // segB0 1000 null - could not get the latest offset from stream at startup + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100)); + when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000)); + when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(15)); + when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new LongMsgOffset(150)); + when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(null); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 3); + + // latest ingested offset latest stream offset + // segA0 20 15 + // segA1 200 150 + // segB0 2000 null - could not get the latest offset from stream at startup + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(200)); + when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1); + + // latest ingested offset latest stream offset + // segA0 30 15 + // segA1 300 150 + // segB0 3000 null - could not get the latest offset from stream at startup + when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(30)); + when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300)); + when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(3000)); + assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1); + } +} \ No newline at end of file diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java index e0d71661af3b..14cb6a45da0a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/OffsetCriteria.java @@ -31,6 +31,9 @@ public class OffsetCriteria { public static final OffsetCriteria SMALLEST_OFFSET_CRITERIA = new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(); + public static final OffsetCriteria LARGEST_OFFSET_CRITERIA = + new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(); + /** * Enumerates the supported offset types */