Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a consumer lag as metric via a periodic task in controller #9800

Merged
merged 7 commits into from
Nov 18, 2022

Conversation

navina
Copy link
Contributor

@navina navina commented Nov 15, 2022

Description

This PR enables Pinot to publish consumer lag as a metric for realtime tables. It is emitted via a periodic task in the controller that will periodically query /consumingSegmentsInfo API and record the max consuming lag among the partition's replicas.
Currently, it publishes the following metrics:

  • MAX_RECORDS_LAG
  • MAX_AVAILABILITY_LAG_MS

The task can be configured to run at a given frequency so as to not overwhelm the server (and through that, not to overwhelm the data source).

Labels: observability

Release Notes

  • Consumer lag for realtime tables can be monitored using metrics MAX_RECORDS_LAG and MAX_AVAILABILITY_LAG_MS

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to emit this from the controller? We can aggregate the metrics that is being emitted from the servers, right?

cc: @sajjad-moradi

@codecov-commenter
Copy link

codecov-commenter commented Nov 15, 2022

Codecov Report

Merging #9800 (a5a1dca) into master (73e6129) will increase coverage by 45.61%.
The diff coverage is 77.04%.

@@              Coverage Diff              @@
##             master    #9800       +/-   ##
=============================================
+ Coverage     24.55%   70.17%   +45.61%     
- Complexity       53     5000     +4947     
=============================================
  Files          1952     1965       +13     
  Lines        104676   105105      +429     
  Branches      15856    15904       +48     
=============================================
+ Hits          25700    73753    +48053     
+ Misses        76347    26211    -50136     
- Partials       2629     5141     +2512     
Flag Coverage Δ
integration1 25.26% <27.86%> (?)
integration2 24.37% <27.86%> (-0.18%) ⬇️
unittests1 67.85% <0.00%> (?)
unittests2 15.79% <72.13%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...g/apache/pinot/common/metrics/AbstractMetrics.java 80.83% <0.00%> (+3.67%) ⬆️
...ot/controller/util/ConsumingSegmentInfoReader.java 89.36% <ø> (+89.36%) ⬆️
...org/apache/pinot/spi/stream/PartitionLagState.java 0.00% <ø> (ø)
...inot/controller/helix/RealtimeConsumerMonitor.java 80.43% <80.43%> (ø)
...g/apache/pinot/common/metrics/ControllerGauge.java 98.03% <100.00%> (+0.08%) ⬆️
...apache/pinot/controller/BaseControllerStarter.java 82.53% <100.00%> (+6.16%) ⬆️
...va/org/apache/pinot/controller/ControllerConf.java 58.11% <100.00%> (+8.88%) ⬆️
...pache/pinot/core/query/utils/idset/EmptyIdSet.java 25.00% <0.00%> (ø)
...ot/common/function/scalar/ComparisonFunctions.java 42.85% <0.00%> (ø)
...anager/realtime/SegmentBuildTimeLeaseExtender.java 63.23% <0.00%> (ø)
... and 1439 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@navina
Copy link
Contributor Author

navina commented Nov 15, 2022

Why do we need to emit this from the controller? We can aggregate the metrics that is being emitted from the servers, right?

cc: @sajjad-moradi

We could emit metrics from servers and then, try to compute in the monitoring layer. This is hard as we would have to find the max lag among all replicas that ever existed for a given partition. I am not familiar with a way to find the max value among the current replica set for a given partition. Moreover, when a table is rebalanced, the consuming segments get moved around. This can lead to prolonged stale value that will mostly cause noise.

We can make this an opt-in periodic task. I added it as a separate task to have better control over the frequency of this task.

@navina navina marked this pull request as ready for review November 15, 2022 19:35
@sajjad-moradi
Copy link
Contributor

Moreover, when a table is rebalanced, the consuming segments get moved around. This can lead to prolonged stale value that will mostly cause noise.

When a table is rebalanced, each server can get notified and if they no longer serve a partition, they can remove the corresponding gauge metric.

@sajjad-moradi
Copy link
Contributor

sajjad-moradi commented Nov 15, 2022

We can make this an opt-in periodic task. I added it as a separate task to have better control over the frequency of this task.

This is a useful metric to have. Ideally we'd want the gauge metric to be updated pretty frequently, like in matter of one minute. I'm not sure running the periodic task every minute or so is a good idea!
If we choose to emit the metric on the server side, then we can change the gauge as soon as the events are consumed. It's just up to the metric & monitoring system (outside pinot) to aggregate the metric values (e.g. finding max value) for different replicas of each partition.

@mcvsubbu
Copy link
Contributor

Moreover, when a table is rebalanced, the consuming segments get moved around. This can lead to prolonged stale value that will mostly cause noise.

When a table is rebalanced, each server can get notified and if they no longer serve a partition, they can remove the corresponding gauge metric.

I believe we do invoke the code to remove a metric each time a partition completes consumption.

@mcvsubbu
Copy link
Contributor

We can make this an opt-in periodic task. I added it as a separate task to have better control over the frequency of this task.

This is a useful metric to have. Ideally we'd want the gauge metric to be updated pretty frequently, like in matter of one minute. I'm not sure running the periodic task every minute or so is a good idea! If we choose to emit the metric on the server side, then we can change the gauge as soon as the events are consumed. It's just up to the metric & monitoring system (outside pinot) to aggregate the metric values (e.g. finding max value) for different replicas of each partition.

Agreed.

We should be emitting this metric every few minutes so as to detect lags quickly and act on it.

Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments. LGTM otherwise.

@mcvsubbu
Copy link
Contributor

I see that this has already been approved for merge. We intend to submit a PR soon that will handle this at server level, since we need alerting sooner than later on lag. If you choose to, you can wait for that PR before merging this.

@navina
Copy link
Contributor Author

navina commented Nov 16, 2022

I'm not sure running the periodic task every minute or so is a good idea!

Agreed here :) We will likely not set it to query every minute.

If we choose to emit the metric on the server side, then we can change the gauge as soon as the events are consumed. It's just up to the metric & monitoring system (outside pinot) to aggregate the metric values (e.g. finding max value) for different replicas of each partition.

Agree that we can detect it sooner. but there doesn't seem to be a good way to aggregate it in the monitoring layer in the presence of rebalance (clean/unclean) or consuming segment re-distribution for any other reason.
We have also noted that sometimes all consuming segments get into ERROR state (maybe consumer crashed or hanged) and yet the monitoring metric LLC_PARTITION_CONSUMING doesn't detect [ @npawar may have more context ].
Moreover adding a metric in the segment data manager feels like tip-toeing across a landmine.

A much cleaner way would be to emit at partition level from the connector plugin directly or from server (without involving the server tag, but a stable replica id tag). I believe there are some dependency issues to be sorted out before getting there.

I believe we do invoke the code to remove a metric each time a partition completes consumption.

This works well in a stable state and clean operations. But this doesn't cover cases of unclean shutdown / crashes in production and it has generally been observed to be not very reliable.

@sajjad-moradi
Copy link
Contributor

This works well in a stable state and clean operations. But this doesn't cover cases of unclean shutdown / crashes in production and it has generally been observed to be not very reliable.

Just for my understanding, can you elaborate what you mean by unclean shutdown?

@navina
Copy link
Contributor Author

navina commented Nov 17, 2022

Just for my understanding, can you elaborate what you mean by unclean shutdown?

Say when the server crashed for whatever reason (maybe memory) while the user was rebalancing the table, and moved away the consuming segments to a different server?

@navina
Copy link
Contributor Author

navina commented Nov 17, 2022

Summarizing the discussion (or re-discussion) with @mayankshriv / @snleee / @npawar :

  1. We all agree that periodic tasks on controller is not built for continuously / frequently run jobs. Moreover, this job will increase intra-cluster traffic and can have a negative impact on performance of Pinot components and, possibly even the upstream source (eg. in Kafka).
  2. We all agree that this is not the best approach for emitting consumer lag metrics. It will be better to emit metrics on the server side and aggregate in the monitoring layer. Aggregating in the monitoring the layer and defining the alerting rules has its challenges, esp. during ongoing cluster operations. This has been a challenge in the past with other server-side metrics like LLC_PARTITION_CONSUMING .

Here is the plan of action:

  1. Let's leave this periodic task as an "opt-in" task in the controller. I will add a controller config that will define whether to enable this task or not. By default, it will be turned off.
  2. I will take a stab at adding the lag metric from the server side and create a follow-up PR.

I would like to keep both options open for use in production so that we can observe how these metrics/handlers workout under various scenarios.

@mcvsubbu if Linkedin is also working on the lag metrics, can you please share the design and the timeline for this ? I want to make sure design aligns and works with existing OSS apis.

@mcvsubbu
Copy link
Contributor

[I thought all periodic tasks are opt-in. Just not configure it at all, or set the time inteval to 0 or somethig like that?]

Anyways, yes. we are going to work on it, the timeline is next few weeks.

No design doc but what we will be doing is:

  • One metric per table per consuming host -- basically the worst partition.
  • the metric gets destroyed when the partition is not consuming (as all metrics do today).
  • When new host starts to consume, it will get created in the new host (as metrics do today)

I will let Juan add more details once he has it (or just a PR)

@snleee
Copy link
Contributor

snleee commented Nov 18, 2022

@mcvsubbu @jugomezv I will check in this since this PR doesn't introduce any invasive change given that the feature is disabled by default.

@navina Let's loop back once the server side solution is posted. We can review it together. Thanks for driving the discussion!

@snleee snleee merged commit 3724ba2 into apache:master Nov 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants