Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset based realtime consumption status checker #7267

Conversation

sajjad-moradi
Copy link
Contributor

Description

This PR adds a new realtime consumption status checker which is used at startup time to have a more accurate estimate of the catchup period in which no query execution happens and consumers try to catch up to the latest messages available in streams.
To achieve this, every time status check is called, haveAllConsumingSegmentsReachedStreamLatestOffset method, list of consuming segments is gathered and then for each segment, we check if segment's latest ingested offset has reached the latest stream offset. To prevent chasing a moving target, once the latest stream offset is fetched, it will not be fetched again and subsequent status check calls compare latest ingested offset with the already fetched stream offset.

The new status checker is being added in two phases. First phase adds the new status checker, but it doesn't apply its output. Instead it only logs its behavior. When the behavior is analysed and approved for different tables with different consumption rates, we can safely wire new status checker and use its functionality.
Another approach would be to define a new config and disable it by default. Since this feature is not urgent, @mcvsubbu suggested not to define yet another config and go with this two-phase approach.

Testing Done

  • unit tests
  • modified SegmentPartitionLLCRealtimeClusterIntegrationTest integration test locally and verified the expected behavior. I didn't create a new IT because they're time consuming and different edge cases are already tested in the unit tests.

@codecov-commenter
Copy link

codecov-commenter commented Aug 9, 2021

Codecov Report

Merging #7267 (9f58f6c) into master (69a91ac) will decrease coverage by 32.41%.
The diff coverage is 13.69%.

Impacted file tree graph

@@              Coverage Diff              @@
##             master    #7267       +/-   ##
=============================================
- Coverage     71.93%   39.51%   -32.42%     
+ Complexity     3346      178     -3168     
=============================================
  Files          1516     1517        +1     
  Lines         75109    75171       +62     
  Branches      10945    10954        +9     
=============================================
- Hits          54026    29707    -24319     
- Misses        17454    43217    +25763     
+ Partials       3629     2247     -1382     
Flag Coverage Δ
integration1 30.68% <13.69%> (+0.30%) ⬆️
integration2 29.08% <9.58%> (+<0.01%) ⬆️
unittests1 ?
unittests2 14.49% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...a/org/apache/pinot/common/utils/ServiceStatus.java 41.32% <0.00%> (-26.06%) ⬇️
...ter/helix/OffsetBasedConsumptionStatusChecker.java 0.00% <0.00%> (ø)
...va/org/apache/pinot/spi/stream/OffsetCriteria.java 0.00% <0.00%> (-89.66%) ⬇️
.../pinot/server/starter/helix/BaseServerStarter.java 55.33% <14.28%> (-0.49%) ⬇️
...manager/realtime/LLRealtimeSegmentDataManager.java 60.60% <90.00%> (-11.47%) ⬇️
...c/main/java/org/apache/pinot/common/tier/Tier.java 0.00% <0.00%> (-100.00%) ⬇️
.../java/org/apache/pinot/spi/utils/BooleanUtils.java 0.00% <0.00%> (-100.00%) ⬇️
...ava/org/apache/pinot/spi/data/MetricFieldSpec.java 0.00% <0.00%> (-100.00%) ⬇️
...va/org/apache/pinot/spi/utils/BigDecimalUtils.java 0.00% <0.00%> (-100.00%) ⬇️
...java/org/apache/pinot/common/tier/TierFactory.java 0.00% <0.00%> (-100.00%) ⬇️
... and 814 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 69a91ac...9f58f6c. Read the comment docs.

@kishoreg
Copy link
Member

kishoreg commented Aug 9, 2021

What's the scenario where this feature will be useful?

Any drawbacks? Looks like this will make entire server not accessible during catch-up instead of just one segment. Is my understanding correct?

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 9, 2021

What's the scenario where this feature will be useful?

Any drawbacks? Looks like this will make entire server not accessible during catch-up instead of just one segment. Is my understanding correct?

This is useful in scenarios where there is a very high ingestion rate and low latency requirements for queries (as well as high query throughput). We need to do some catching up during server restarts before we declare the server ready to serve queries. Right now, there is a (configurable) hard time limit that the server waits for before declaring itself ready to serve queries. We would like that the server automatically detect when it is ready to serve queries.

The default behavior (of allowing queries right away) will still remain. Once we analyze a few use cases, we will add appropriate configs, so that the feature can be turned on for tables such as those declared above. We were thinking of per server config, but we could make it a table config, so that if it is not configured, the segments declare as being ready right away. The Config part will be a separate PR, and up for discussion.

@sajjad-moradi
Copy link
Contributor Author

What's the scenario where this feature will be useful?

Any drawbacks? Looks like this will make entire server not accessible during catch-up instead of just one segment. Is my understanding correct?

Here's the existing behavior: if the value for config pinot.server.starter.realtimeConsumptionCatchupWaitMs is greater than zero (zero is the default value), the pinot-server just waits for the period specified in the config to allow ALL consumers to catch up. This requires some knowledge on how much time is good for different use cases to only consume at startup. Keep in mind that consumption might change over time and then we need to adjust the config value.
The new status checker still uses that startup-consumption-timeout as the upper bound, but in each status check interval, which is 10s, it checks if all consuming segments have reached their latest stream offset.
Just FYI we have 2 minutes timeout (10m for high rate) in our prod use cases.

@kishoreg
Copy link
Member

Is it possible to make the status checker an interface and have this implemented within LinkedIn Pinot wrapper. We also have seen some needs of customizing health status checks in Kubernetes and might be a good idea to make it pluggable.

@mcvsubbu
Copy link
Contributor

Is it possible to make the status checker an interface and have this implemented within LinkedIn Pinot wrapper. We also have seen some needs of customizing health status checks in Kubernetes and might be a good idea to make it pluggable.

I think that is a useful thing to have. The static staus checker has always been a bit of a pain.

However it not only increases the scope of this PR a lot, it also causes us to change all our internal software to start using the new interface for healthcheck. Can we file a separate issue for that and get it done? thanks.

// TODO: Once the performance of offset based consumption checker is validated:
// - remove the log line
// - uncomment the status & statusDescription lines
LOGGER.info("All consuming segments have reached their latest offsets!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.info("All consuming segments have reached their latest offsets!");
LOGGER.info("All consuming segments have reached their latest offsets. End time is {} in the {}", Math.abs(now-_endWaitTime), now > _endWaitTime ? "past" : "future" );

This way, we can easily look for how far off we are with the new algorithm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Updated the log line. Please note that we get to this point, now is always less than _endWaitTime.

return _currentOffset;
}

public StreamPartitionMsgOffset fetchLatestStreamOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to add a comment here that this creates a new partition metadata provider each time, so that whoever calls this (public) method is aware.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

try (StreamMetadataProvider metadataProvider = _streamConsumerFactory
.createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
return metadataProvider
.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs=*/5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to take the max wait time as a parameter, so that the caller can decide on this. Please javadoc the error cases, etc. (do you want to throw a timeout exception?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to do this is to make it a private method to get the offset when the consumer starts (or in another thread that tries a few times until we get a value). That way, waiting period is not very important. This public method can return whatever we have obtained as the high-water-mark before (or null if we don't have anything). I prefer this, so that any external caller does not expect any waiting time here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now this method is only called in OffsetBasedConsumptionStatusChecker at startup probably a few times for each stream partition and after health check for startup passes, it doesn't get called. IMO the complexity of 1) having a separate thread to periodically fetch the latest offset and 2) stop updating when the catchup period is finished, is not worth unless there are more usage of this method. I added a comment on javadoc of the method for this.

return _instanceDataManager.getTableDataManager(tableNameWithType);
}

private static Set<String> findConsumingSegments(HelixAdmin helixAdmin, String helixClusterName, String instanceId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we query the local RealtimeTableDataManager instead of getting all the tables and filtering out only those that have this instance for some of the partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually wanted to do this originally, but then realized that TableDataManagers and SegmentDataManagers get added when there's a helix transition message for a segment to go in consuming state from offline state. If for some reason, helix transition messages for some segments are temporarily lost or delayed at startup, then their segmentDataManagers (or even tableDataManagers) won't be available. Ideal state in ZK, on the other hand, is the source of truth.
That being said, I can add one more check before getting ideal state for every table and that's checking if the resource is indeed a realtime table.

private final InstanceDataManager _instanceDataManager;
private Supplier<Set<String>> _consumingSegmentFinder;

private Set<String> _alreadyProcessedSegments = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this has the segments that have reached the target offset. Can we name it appropriately? Maybe targetOffsetReachedSegments ? or _caughtUpSegments or readySegments or _segmentsReadyForQueries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked caughtUpSegment better than the others. Updated.

// There's a small chance that after getting the list of consuming segment names at the beginning of this method
// up to this point, a consuming segment gets converted to a committed segment. In that case status check is
// returned as false and in the next round the new consuming segment will be used for fetching offsets.
LOGGER.info("Segment {} is already committed. Will check consumption status later", segName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.info("Segment {} is already committed. Will check consumption status later", segName);
LOGGER.info("Segment {} is already committed. Will check consumption status later on the next segment", segName);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// up to this point, a consuming segment gets converted to a committed segment. In that case status check is
// returned as false and in the next round the new consuming segment will be used for fetching offsets.
LOGGER.info("Segment {} is already committed. Will check consumption status later", segName);
tableDataManager.releaseSegment(segmentDataManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should release with a try/catch/finally.

Also, can it happen that after the release the segment can move over to be on OfflineSegmentDataManager, then we will see an exception for wrong casting. So, grab the offset while you have the lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a try/catch.
For the 2nd part of your comment: at this line of code, we know that segment has already moved over to be an offline segment and then we return false. We don't continue with casting and the rest of the method.

@sajjad-moradi sajjad-moradi force-pushed the feature/offset.based.consumption.status.checker branch from 99c2228 to 9665abd Compare September 9, 2021 23:48
String.format("Waiting for consuming segments to catchup, timeRemaining=%dms", _endWaitTime - now);
return Status.STARTING;
if (now >= _endWaitTime) {
_statusDescription = String.format("Consuming segments status GOOD since %dms", _endWaitTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest adding the value of _consumptionNotYetCaughtUp to the status description. Or, even the number of segments yet to catch up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the number of segments yet to catch up.

// return Status.GOOD;
}
_statusDescription =
String.format("Waiting for consuming segments to catchup, timeRemaining=%dms", _endWaitTime - now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest adding the value of _consumptionNotYetCaughtUp to the status description. Or, even the number of segments yet to catch up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the number of segments yet to catch up.

_consumingSegments = consumingSegments;
}

public boolean haveAllConsumingSegmentsReachedStreamLatestOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe returning the number of segments yet to reach offset will give us more information? Just a thought. But that means you cannot return early, so it is a double-edged sword.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually helpful. Updated it.

LLRealtimeSegmentDataManager rtSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager;
StreamPartitionMsgOffset latestIngestedOffset = rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
releaseSegment(tableDataManager, segmentDataManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt it better to do release in a try/finally, so that in case there are some exceptions, we still release the segment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return allSegsReachedLatest;
}

void releaseSegment(TableDataManager tableDataManager, SegmentDataManager segmentDataManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why a separate method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it was used in two places. Now with the refactoring, it's only used once; hence no need for function.

when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(100));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1000));
assertFalse(statusChecker.haveAllConsumingSegmentsReachedStreamLatestOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be nice to also call the haveAll method when one or two segments have reached offset but another has not.

but i see you have a case like that below, so it is ok

@sajjad-moradi sajjad-moradi force-pushed the feature/offset.based.consumption.status.checker branch from 1b388ca to c12e1cb Compare September 14, 2021 16:08
@sajjad-moradi sajjad-moradi force-pushed the feature/offset.based.consumption.status.checker branch from c12e1cb to 9f58f6c Compare September 15, 2021 21:06
Copy link
Member

@jackjlli jackjlli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor but LGTM.

LOGGER.info("All consuming segments have reached their latest offsets! "
+ "Finished {} msec earlier than time threshold.", _endWaitTime - now);
// _statusDescription = "Consuming segments status GOOD as all consuming segments have reached the latest offset";
// return Status.GOOD;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we uncomment it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll uncomment this part after we analyzed the performance of the new status checker. It's explained in the TODO part a few lines above.

.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000);
} catch (Exception e) {
_segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId,
_partitionGroupId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be good to log the exception here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. The only thing that can go wrong is timeout and if we log the exception, we'll clutter the logs with unhelpful stack trace of the timeout. If you still think it's better to log the exception, I can add it.

StreamPartitionMsgOffset latestIngestedOffset = rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
if (latestStreamOffset == null || latestIngestedOffset == null) {
LOGGER.info("Null offset found for segment {} - latest stream offset: {}, latest ingested offset: {}. "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems these message will be printed per segment. How about collecting all the segment names and group them into different categories, so that they will be printed once once?
Another approaches is to change the log level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, but we shouldn't need these logs. We should rely on the service status message. If we want to debug some specific partitions/tables, these logs will be used. Because of that, I think it's better to keep the code simple and don't add extra logic for logging purposes.

when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(3000));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedTheirLatestOffset(), 1);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing tail empty line.

Copy link
Member

@jackjlli jackjlli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the prompt reply. I guess we'll have the 2nd PR soon. You can address my comments in the next PR.

kriti-sc pushed a commit to kriti-sc/incubator-pinot that referenced this pull request Dec 12, 2021
* Add offset based realtime consumption status checker

* Applied PR suggestions

* One log line when consumption catches up

* Return numConsumingSegmentsNotCaughtUp

* Also add num of outstanding segments to timeout scenario
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants