Skip to content

Commit

Permalink
Merge branch 'fix/DEX-2892/consumer-lag-metrics-after-rebalance' into…
Browse files Browse the repository at this point in the history
… 'master'

[DEX-2892] fix: properly report metrics for partition offset lag after rebalance

Closes DEX-2892

See merge request nstmrt/rubygems/sbmt-kafka_consumer!77
  • Loading branch information
Сатаров Юрий Сергеевич committed Mar 6, 2025
2 parents b13e683 + c9604fa commit c2822b1
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 6 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [3.4.1] - 2025-03-06

### Fixed
- reset consumer offset lag metric to zero after cg rebalance to avoid reporting stale metrics

## [3.4.0] - 2025-01-27

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ def report_topic_stats(group_id, topic_stats)
offset_lag = partition_statistics["consumer_lag"]
next if offset_lag == -1

next unless partition_owned?(partition_statistics)
unless partition_owned?(partition_statistics)
# reset offset lag after cg rebalance
offset_lag = 0
end

Yabeda.kafka_consumer.offset_lag
.set({
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_consumer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaConsumer
VERSION = "3.4.0"
VERSION = "3.4.1"
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
"partitions" => {
"0" => {
"partition" => 0,
"consumer_lag" => 0
"consumer_lag" => 10
},
"1" => {
"partition" => 1,
Expand All @@ -113,9 +113,10 @@
it "reports topic metrics" do
expect {
described_class.new.send(:report_rdkafka_stats, event, async: false)
}.to update_yabeda_gauge(Yabeda.kafka_consumer.offset_lag)
.with_tags(client: "some-name", group_id: "consumer-group-id",
partition: "0", topic: "topic_with_json_data")
}.to update_yabeda_gauge(Yabeda.kafka_consumer.offset_lag).with_tags(client: "some-name", group_id: "consumer-group-id", partition: "0", topic: "topic_with_json_data").with(10)
.and update_yabeda_gauge(Yabeda.kafka_consumer.offset_lag).with_tags(client: "some-name", group_id: "consumer-group-id", partition: "1", topic: "topic_with_json_data").with(0)
.and update_yabeda_gauge(Yabeda.kafka_consumer.offset_lag).with_tags(client: "some-name", group_id: "consumer-group-id", partition: "2", topic: "topic_with_json_data").with(0)
.and not_update_yabeda_gauge(Yabeda.kafka_consumer.offset_lag).with_tags(partition: "-1")
end
end
end
Expand Down

0 comments on commit c2822b1

Please sign in to comment.