From 7423bc64d50b77537eae31f30fd1b782bdfe588e Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Thu, 23 May 2024 10:28:26 +0200 Subject: [PATCH] Provide per-exchange/queue metrics w/out channelID --- .../include/rabbit_core_metrics.hrl | 8 ++ .../rabbit_common/src/rabbit_core_metrics.erl | 44 ++++++--- ...etheus_rabbitmq_core_metrics_collector.erl | 62 +++++++++++- .../test/rabbit_prometheus_http_SUITE.erl | 94 ++++++++++++++++++- 4 files changed, 188 insertions(+), 20 deletions(-) diff --git a/deps/rabbit_common/include/rabbit_core_metrics.hrl b/deps/rabbit_common/include/rabbit_core_metrics.hrl index 59743b4ec7da..e64c7c4b8246 100644 --- a/deps/rabbit_common/include/rabbit_core_metrics.hrl +++ b/deps/rabbit_common/include/rabbit_core_metrics.hrl @@ -28,6 +28,14 @@ {auth_attempt_metrics, set}, {auth_attempt_detailed_metrics, set}]). +% `CORE_NON_CHANNEL_TABLES` are tables that store counters representing the +% same info as some of the channel_queue_metrics, channel_exchange_metrics and +% channel_queue_exchange_metrics but without including the channel ID in the +% key. +-define(CORE_NON_CHANNEL_TABLES, [{queue_counter_metrics, set}, + {exchange_metrics, set}, + {queue_exchange_metrics, set}]). + -define(CONNECTION_CHURN_METRICS, {node(), 0, 0, 0, 0, 0, 0, 0}). %% connection_created :: {connection_id, proplist} diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl index 0c46b41db456..f872a6bc278d 100644 --- a/deps/rabbit_common/src/rabbit_core_metrics.erl +++ b/deps/rabbit_common/src/rabbit_core_metrics.erl @@ -111,13 +111,15 @@ create_table({Table, Type}) -> {read_concurrency, true}]). init() -> - _ = [create_table({Table, Type}) - || {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES], + Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES, + _ = [create_table({Table, Type}) + || {Table, Type} <- Tables], ok. terminate() -> + Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES, [ets:delete(Table) - || {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES], + || {Table, _Type} <- Tables], ok. connection_created(Pid, Infos) -> @@ -166,53 +168,65 @@ channel_stats(reductions, Id, Value) -> ets:insert(channel_process_metrics, {Id, Value}), ok. -channel_stats(exchange_stats, publish, Id, Value) -> +channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(exchange_stats, confirm, Id, Value) -> +channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(exchange_stats, return_unroutable, Id, Value) -> +channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(exchange_stats, drop_unroutable, Id, Value) -> +channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_exchange_stats, publish, Id, Value) -> +channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}), + _ = ets:update_counter(queue_exchange_metrics, QueueExchange, Value, {QueueExchange, 0, 0}), ok; -channel_stats(queue_stats, get, Id, Value) -> +channel_stats(queue_stats, get, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {2, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, get_no_ack, Id, Value) -> +channel_stats(queue_stats, get_no_ack, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {3, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, deliver, Id, Value) -> +channel_stats(queue_stats, deliver, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {4, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, deliver_no_ack, Id, Value) -> +channel_stats(queue_stats, deliver_no_ack, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {5, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, redeliver, Id, Value) -> +channel_stats(queue_stats, redeliver, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {6, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, ack, Id, Value) -> +channel_stats(queue_stats, ack, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {7, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, get_empty, Id, Value) -> +channel_stats(queue_stats, get_empty, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {8, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok. delete(Table, Key) -> diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index d2198ece681e..3dc34db77263 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -162,7 +162,15 @@ {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}, {2, undefined, stream_segments, counter, "Total number of stream segment files", segments} ]}, - + {queue_counter_metrics, [ + {2, undefined, queue_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"}, + {3, undefined, queue_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"}, + {4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"}, + {5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"}, + {6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"}, + {7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers"}, + {8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message"} + ]}, %%% Metrics that contain reference to a channel. Some of them also have %%% a queue name, but in this case filtering on it doesn't make any %%% sense, as the queue is not an object of interest here. @@ -176,6 +184,13 @@ {2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count} ]}, + {exchange_metrics, [ + {2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange on a channel"}, + {3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"}, + {4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"}, + {5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"} + ]}, + {channel_exchange_metrics, [ {2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"}, {3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"}, @@ -210,6 +225,10 @@ {2, undefined, connection_channels, gauge, "Channels on a connection", channels} ]}, + {queue_exchange_metrics, [ + {2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published to queues"} + ]}, + {channel_queue_exchange_metrics, [ {2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"} ]} @@ -544,8 +563,11 @@ get_data(queue_metrics = Table, false, VHostsFilter) -> {disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}]; get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; Table == queue_coarse_metrics; + Table == queue_counter_metrics; Table == channel_queue_metrics; Table == connection_coarse_metrics; + Table == exchange_metrics; + Table == queue_exchange_metrics; Table == channel_queue_exchange_metrics; Table == ra_metrics; Table == channel_process_metrics -> @@ -553,6 +575,8 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; %% For queue_coarse_metrics ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; + ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> + Acc; ({_, V1}, {T, A1}) -> {T, V1 + A1}; ({_, V1, _}, {T, A1}) -> @@ -579,6 +603,36 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; _ -> [Result] end; +get_data(exchange_metrics = Table, true, VHostsFilter) -> + ets:foldl(fun + ({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when + is_map(VHostsFilter), map_get(VHost, VHostsFilter) + -> + [Row | Acc]; + (_Row, Acc) -> + Acc + end, [], Table); +get_data(queue_counter_metrics = Table, true, VHostsFilter) -> + ets:foldl(fun + ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when + is_map(VHostsFilter), map_get(VHost, VHostsFilter) + -> + [Row | Acc]; + (_Row, Acc) -> + Acc + end, [], Table); +get_data(queue_exchange_metrics = Table, true, VHostsFilter) -> + ets:foldl(fun + ({{ + #resource{kind = queue, virtual_host = VHost}, + #resource{kind = exchange, virtual_host = VHost} + }, _, _} = Row, Acc) when + is_map(VHostsFilter), map_get(VHost, VHostsFilter) + -> + [Row | Acc]; + (_Row, Acc) -> + Acc + end, [], Table); get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> @@ -671,15 +725,15 @@ division(A, B) -> accumulate_count_and_sum(Value, {Count, Sum}) -> {Count + 1, Sum + Value}. -empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> +empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> {T, 0}; empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics -> {T, 0, 0, 0}; -empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics -> +empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics -> {T, 0, 0, 0, 0}; empty(T) when T == ra_metrics -> {T, 0, 0, 0, 0, 0, {0, 0}}; -empty(T) when T == channel_queue_metrics; T == channel_metrics -> +empty(T) when T == channel_queue_metrics; T == queue_counter_metrics; T == channel_metrics -> {T, 0, 0, 0, 0, 0, 0, 0}; empty(queue_metrics = T) -> {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}. diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 033723507a8f..50bf0b1ad62a 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -34,7 +34,7 @@ groups() -> {config_path, [], generic_tests()}, {global_labels, [], generic_tests()}, {aggregated_metrics, [], [ - aggregated_metrics_test, + aggregated_metrics_test, specific_erlang_metrics_present_test, global_metrics_present_test, global_metrics_single_metric_family_test @@ -57,6 +57,8 @@ groups() -> queue_consumer_count_single_vhost_per_object_test, queue_consumer_count_all_vhosts_per_object_test, queue_coarse_metrics_per_object_test, + queue_counter_metrics_per_object_test, + queue_exchange_metrics_per_object_test, queue_metrics_per_object_test, queue_consumer_count_and_queue_metrics_mutually_exclusive_test, vhost_status_metric, @@ -523,6 +525,96 @@ queue_coarse_metrics_per_object_test(Config) -> map_get(rabbitmq_detailed_queue_messages, parse_response(Body3))), ok. +queue_counter_metrics_per_object_test(Config) -> + Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]}, + + {_, Body1} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-1&family=queue_counter_metrics", + [], 200), + ?assertEqual( + Expected1, + map_get( + rabbitmq_detailed_queue_messages_delivered_ack_total, + parse_response(Body1))), + + {_, Body2} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-2&family=queue_counter_metrics", + [], 200), + Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11]}, + + ?assertEqual( + Expected2, + map_get( + rabbitmq_detailed_queue_messages_delivered_ack_total, + parse_response(Body2))), + + %% Maybe missing, tests for the queue_exchange_metrics + ok. + + +queue_exchange_metrics_per_object_test(Config) -> + Expected1 = #{ + #{ + queue => "vhost-1-queue-with-messages", + vhost => "vhost-1", + exchange => "" + } => [7], + #{ + exchange => "", + queue => "vhost-1-queue-with-consumer", + vhost => "vhost-1" + } => [7] + }, + + {_, Body1} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-1&family=queue_exchange_metrics", + [], 200), + ?assertEqual( + Expected1, + map_get( + rabbitmq_detailed_queue_exchange_messages_published_total, + parse_response(Body1))), + + + {_, Body2} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-2&family=queue_exchange_metrics", + [], 200), + + + Expected2 = #{ + #{ + queue => "vhost-2-queue-with-messages", + vhost => "vhost-2", + exchange => "" + } => [11], + #{ + exchange => "", + queue => "vhost-2-queue-with-consumer", + vhost => "vhost-2" + } => [11] + }, + + ?assertEqual( + Expected2, + map_get( + rabbitmq_detailed_queue_exchange_messages_published_total, + parse_response(Body2))), + + ok. + +exchange_metrics_per_object_test(Config) -> + Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]}, + + {_, Body} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-1&family=exchange_metrics", + [], 200), + ?assertEqual( + Expected1, + map_get( + rabbitmq_detailed_queue_messages_delivered_ack_total, + parse_response(Body))), + ok. + queue_metrics_per_object_test(Config) -> Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7], #{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [1]},