Skip to content

Commit

Permalink
Offset based realtime consumption status checker (apache#7267)
Browse files Browse the repository at this point in the history
* Add offset based realtime consumption status checker

* Applied PR suggestions

* One log line when consumption catches up

* Return numConsumingSegmentsNotCaughtUp

* Also add num of outstanding segments to timeout scenario
  • Loading branch information
sajjad-moradi authored and kriti-sc committed Dec 12, 2021
1 parent 792ff25 commit 1d2e6ed
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
Expand Down Expand Up @@ -204,28 +205,33 @@ public String getStatusDescription() {

/**
* Service status callback that checks whether realtime consumption has caught up
* TODO: In this initial version, we are simply adding a configurable static wait time
* This can be made smarter:
* 1) Keep track of average consumption rate for table in server stats
* 2) Monitor consumption rate during startup, report GOOD when it stabilizes to average rate
* 3) Monitor consumption rate during startup, report GOOD if it is idle
* An offset based consumption status checker is being added in two phases. First phase adds the new status checker,
* but it doesn't apply its output. Instead it only logs its behavior. When the behavior is analysed and approved
* for different tables with different consumption rates, we can safely use the new status checker.
* (Another approach would be to define a new config and disable it by default. Since this feature is not urgent,
* we decided to not define yet another config and go with this two phase approach)
*/
public static class RealtimeConsumptionCatchupServiceStatusCallback implements ServiceStatusCallback {

private final long _endWaitTime;
private final Status _serviceStatus = Status.STARTING;
private final Supplier<Integer> _getNumConsumingSegmentsNotReachedTheirLatestOffset;
String _statusDescription = STATUS_DESCRIPTION_INIT;

private boolean _consumptionNotYetCaughtUp = true;

/**
* Realtime consumption catchup service which adds a static wait time for consuming segments to catchup
*/
public RealtimeConsumptionCatchupServiceStatusCallback(HelixManager helixManager, String clusterName,
String instanceName, long realtimeConsumptionCatchupWaitMs) {
String instanceName, long realtimeConsumptionCatchupWaitMs,
Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset) {

// A consuming segment will actually be ready to serve queries after (time of creation of partition consumer) +
// (configured max time to catchup)
// We are approximating it to (time of server startup) + (configured max time to catch up)
_endWaitTime = System.currentTimeMillis() + realtimeConsumptionCatchupWaitMs;
_getNumConsumingSegmentsNotReachedTheirLatestOffset = getNumConsumingSegmentsNotReachedTheirLatestOffset;
LOGGER.info("Monitoring realtime consumption catchup. Will allow {} ms before marking status GOOD",
realtimeConsumptionCatchupWaitMs);
}
Expand All @@ -236,13 +242,27 @@ public synchronized Status getServiceStatus() {
return _serviceStatus;
}
long now = System.currentTimeMillis();
if (now < _endWaitTime) {
_statusDescription =
String.format("Waiting for consuming segments to catchup, timeRemaining=%dms", _endWaitTime - now);
return Status.STARTING;
int numConsumingSegmentsNotCaughtUp = _getNumConsumingSegmentsNotReachedTheirLatestOffset.get();
if (now >= _endWaitTime) {
_statusDescription = String.format("Consuming segments status GOOD since %dms "
+ "(numConsumingSegmentsNotCaughtUp=%d)", _endWaitTime, numConsumingSegmentsNotCaughtUp);
return Status.GOOD;
}
_statusDescription = String.format("Consuming segments status GOOD since %dms", _endWaitTime);
return Status.GOOD;
if (_consumptionNotYetCaughtUp && numConsumingSegmentsNotCaughtUp > 0) {
// TODO: Once the performance of offset based consumption checker is validated:
// - remove the log line
// - uncomment the status & statusDescription lines
// - remove variable _consumptionNotYetCaughtUp
_consumptionNotYetCaughtUp = false;
LOGGER.info("All consuming segments have reached their latest offsets! "
+ "Finished {} msec earlier than time threshold.", _endWaitTime - now);
// _statusDescription = "Consuming segments status GOOD as all consuming segments have reached the latest offset";
// return Status.GOOD;
}
_statusDescription =
String.format("Waiting for consuming segments to catchup: numConsumingSegmentsNotCaughtUp=%d, "
+ "timeRemaining=%dms", numConsumingSegmentsNotCaughtUp, _endWaitTime - now);
return Status.STARTING;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
Expand Down Expand Up @@ -287,6 +288,8 @@ public void deleteSegmentFile() {
private final boolean _nullHandlingEnabled;
private final SegmentCommitterFactory _segmentCommitterFactory;

private volatile StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime = null;

// TODO each time this method is called, we print reason for stop. Good to print only once.
private boolean endCriteriaReached() {
Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
Expand Down Expand Up @@ -763,11 +766,14 @@ public long getLastConsumedTimestamp() {
return _lastLogTime;
}

@VisibleForTesting
protected StreamPartitionMsgOffset getCurrentOffset() {
public StreamPartitionMsgOffset getCurrentOffset() {
return _currentOffset;
}

public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
return _latestStreamOffsetAtStartupTime;
}

@VisibleForTesting
protected SegmentBuildDescriptor getSegmentBuildDescriptor() {
return _segmentBuildDescriptor;
Expand Down Expand Up @@ -1364,6 +1370,16 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
}
_state = State.INITIAL_CONSUMING;

// fetch latest stream offset
try (StreamMetadataProvider metadataProvider = _streamConsumerFactory
.createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
_latestStreamOffsetAtStartupTime = metadataProvider
.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000);
} catch (Exception e) {
_segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId,
_partitionGroupId);
}

long now = now();
_consumeStartTime = now;
long maxConsumeTimeMillis = _partitionLevelStreamConfig.getFlushThresholdTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ private void registerServiceStatusHandler() {

// collect all resources which have this instance in the ideal state
List<String> resourcesToMonitor = new ArrayList<>();
// if even 1 resource has this instance in ideal state with state CONSUMING, set this to true
boolean foundConsuming = false;

Set<String> consumingSegments = new HashSet<>();
boolean checkRealtime = realtimeConsumptionCatchupWaitMs > 0;

for (String resourceName : _helixAdmin.getResourcesInCluster(_helixClusterName)) {
Expand All @@ -235,12 +235,11 @@ private void registerServiceStatusHandler() {
break;
}
}
if (checkRealtime && !foundConsuming && TableNameBuilder.isRealtimeTableResource(resourceName)) {
if (checkRealtime && TableNameBuilder.isRealtimeTableResource(resourceName)) {
for (String partitionName : idealState.getPartitionSet()) {
if (StateModel.SegmentStateModel.CONSUMING
.equals(idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
foundConsuming = true;
break;
consumingSegments.add(partitionName);
}
}
}
Expand All @@ -255,10 +254,14 @@ private void registerServiceStatusHandler() {
serviceStatusCallbackListBuilder.add(
new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _helixClusterName,
_instanceId, resourcesToMonitor, minResourcePercentForStartup));
boolean foundConsuming = !consumingSegments.isEmpty();
if (checkRealtime && foundConsuming) {
OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments);
serviceStatusCallbackListBuilder.add(
new ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager, _helixClusterName,
_instanceId, realtimeConsumptionCatchupWaitMs));
_instanceId, realtimeConsumptionCatchupWaitMs,
consumptionStatusChecker::getNumConsumingSegmentsNotReachedTheirLatestOffset));
}
LOGGER.info("Registering service status handler");
ServiceStatus.setServiceStatusCallback(_instanceId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.server.starter.helix;

import java.util.HashSet;
import java.util.Set;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* This class is used at startup time to have a more accurate estimate of the catchup period in which no query execution
* happens and consumers try to catch up to the latest messages available in streams.
* To achieve this, every time status check is called - {@link #getNumConsumingSegmentsNotReachedTheirLatestOffset} -
* for each consuming segment, we check if segment's latest ingested offset has reached the latest stream offset that's
* fetched once at startup time.
*/
public class OffsetBasedConsumptionStatusChecker {
private static final Logger LOGGER = LoggerFactory.getLogger(OffsetBasedConsumptionStatusChecker.class);

// constructor parameters
private final InstanceDataManager _instanceDataManager;
private final Set<String> _consumingSegments;

// helper variable
private final Set<String> _caughtUpSegments = new HashSet<>();

public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments) {
_instanceDataManager = instanceDataManager;
_consumingSegments = consumingSegments;
}

public int getNumConsumingSegmentsNotReachedTheirLatestOffset() {
for (String segName : _consumingSegments) {
if (_caughtUpSegments.contains(segName)) {
continue;
}
TableDataManager tableDataManager = getTableDataManager(segName);
if (tableDataManager == null) {
LOGGER.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", segName);
continue;
}
SegmentDataManager segmentDataManager = null;
try {
segmentDataManager = tableDataManager.acquireSegment(segName);
if (segmentDataManager == null) {
LOGGER
.info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later", segName);
continue;
}
if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) {
// There's a possibility that a consuming segment has converted to a committed segment. If that's the case,
// segment data manager will not be of type LLRealtime.
LOGGER.info("Segment {} is already committed and is considered caught up.", segName);
_caughtUpSegments.add(segName);
continue;
}
LLRealtimeSegmentDataManager rtSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager;
StreamPartitionMsgOffset latestIngestedOffset = rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
if (latestStreamOffset == null || latestIngestedOffset == null) {
LOGGER.info("Null offset found for segment {} - latest stream offset: {}, latest ingested offset: {}. "
+ "Will check consumption status later", segName, latestStreamOffset, latestIngestedOffset);
continue;
}
if (latestIngestedOffset.compareTo(latestStreamOffset) < 0) {
LOGGER.info("Latest ingested offset {} in segment {} is smaller than stream latest available offset {} ",
latestIngestedOffset, segName, latestStreamOffset);
continue;
}
LOGGER.info("Segment {} with latest ingested offset {} has caught up to the latest stream offset {}", segName,
latestIngestedOffset, latestStreamOffset);
_caughtUpSegments.add(segName);
} finally {
if (segmentDataManager != null) {
tableDataManager.releaseSegment(segmentDataManager);
}
}
}
return _consumingSegments.size() - _caughtUpSegments.size();
}

private TableDataManager getTableDataManager(String segmentName) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
String tableName = llcSegmentName.getTableName();
String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
return _instanceDataManager.getTableDataManager(tableNameWithType);
}
}
Loading

0 comments on commit 1d2e6ed

Please sign in to comment.