Skip to content

Commit

Permalink
Feat[plugins]: report queue depth per appId to prometheus (#446)
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 authored Oct 14, 2024
1 parent 03d9f65 commit 07fa5d1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 25 deletions.
7 changes: 6 additions & 1 deletion src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,12 @@ void StatController::snapshotAndNotify()
// StatConsumers will report all stats
bsl::vector<StatConsumerMp>::iterator it = d_statConsumers.begin();
for (; it != d_statConsumers.end(); ++it) {
(*it)->onSnapshot();
try {
(*it)->onSnapshot();
}
catch (const bsl::exception& e) {
BALL_LOG_ERROR << "#PLUGIN_ERROR " << e.what();
}
}

const bool willPrint = d_printer_mp->nextSnapshotWillPrint();
Expand Down
62 changes: 40 additions & 22 deletions src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ void PrometheusStatConsumer::captureQueueStats()
{"queue_gc_msgs", Stat::e_GC_MSGS_DELTA, true},
{"queue_cfg_msgs", Stat::e_CFG_MSGS, false},
{"queue_cfg_bytes", Stat::e_CFG_BYTES, false},
{"queue_content_msgs", Stat::e_MESSAGES_MAX, false},
{"queue_content_bytes", Stat::e_BYTES_MAX, false},
{"queue_content_msgs_max", Stat::e_MESSAGES_MAX, false},
{"queue_content_bytes_max", Stat::e_BYTES_MAX, false},
{"queue_queue_time_avg", Stat::e_QUEUE_TIME_AVG, false},
{"queue_queue_time_max", Stat::e_QUEUE_TIME_MAX, false},
{"queue_reject_msgs", Stat::e_REJECT_DELTA, true},
Expand Down Expand Up @@ -390,35 +390,53 @@ void PrometheusStatConsumer::captureQueueStats()
}
}

// Add `appId` tag to `queue_confirm_time_max` and
// `queue_queue_time_max` metrics.
static const DatapointDef confirmTimeDataPoint = {
"queue_confirm_time_max",
Stat::e_CONFIRM_TIME_MAX,
false};
static const DatapointDef queueTimeDataPoint = {
"queue_queue_time_max",
Stat::e_QUEUE_TIME_MAX,
false};
// Add `appId` tag to metrics.

// These per-appId metrics exist for both primary and replica
static const DatapointDef defsCommon[] = {
{"queue_confirm_time_max", Stat::e_CONFIRM_TIME_MAX, false},
};

// These per-appId metrics exist only for primary
static const DatapointDef defsPrimary[] = {
{"queue_queue_time_max", Stat::e_QUEUE_TIME_MAX, false},
{"queue_content_msgs_max", Stat::e_MESSAGES_MAX, false},
{"queue_content_bytes_max", Stat::e_BYTES_MAX, false},
};
for (mwcst::StatContextIterator appIdIt =
queueIt->subcontextIterator();
appIdIt;
++appIdIt) {
tagger.setAppId(appIdIt->name());
const auto labels = tagger.getLabels();

auto value = mqbstat::QueueStatsDomain::getValue(
*appIdIt,
d_snapshotId,
mqbstat::QueueStatsDomain::Stat::e_CONFIRM_TIME_MAX);
updateMetric(&confirmTimeDataPoint, labels, value);
for (DatapointDefCIter dpIt =
bdlb::ArrayUtil::begin(defsCommon);
dpIt != bdlb::ArrayUtil::end(defsCommon);
++dpIt) {
const bsls::Types::Int64 value =
mqbstat::QueueStatsDomain::getValue(
*appIdIt,
d_snapshotId,
static_cast<mqbstat::QueueStatsDomain::Stat::Enum>(
dpIt->d_stat));
updateMetric(dpIt, labels, value);
}

if (role == mqbstat::QueueStatsDomain::Role::e_PRIMARY) {
value = mqbstat::QueueStatsDomain::getValue(
*appIdIt,
d_snapshotId,
mqbstat::QueueStatsDomain::Stat::e_QUEUE_TIME_MAX);
updateMetric(&queueTimeDataPoint, labels, value);
for (DatapointDefCIter dpIt =
bdlb::ArrayUtil::begin(defsPrimary);
dpIt != bdlb::ArrayUtil::end(defsPrimary);
++dpIt) {
const bsls::Types::Int64 value =
mqbstat::QueueStatsDomain::getValue(
*appIdIt,
d_snapshotId,
static_cast<
mqbstat::QueueStatsDomain::Stat::Enum>(
dpIt->d_stat));
updateMetric(dpIt, labels, value);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@
"queue_push_bytes",
"queue_ack_msgs",
]
QUEUE_PRIMARY_NODE_METRICS = ["queue_gc_msgs", "queue_cfg_msgs", "queue_content_msgs"]
QUEUE_PRIMARY_NODE_METRICS = [
"queue_gc_msgs",
"queue_cfg_msgs",
"queue_content_msgs_max",
]
CLUSTER_METRICS = ["cluster_healthiness"]
BROKER_METRICS = ["brkr_summary_queues_count", "brkr_summary_clients_count"]

Expand Down Expand Up @@ -330,7 +334,7 @@ def _check_statistic(prometheus_host):
value = response["result"][1]["value"][-1]
assert value == "1", _assert_message(metric, "1", value)
# Queue primary node statistic
elif metric == "queue_content_msgs":
elif metric == "queue_content_msgs_max":
# For first queue
assert value == "2", _assert_message(metric, "2", value)
# For second queue
Expand Down

0 comments on commit 07fa5d1

Please sign in to comment.