diff --git a/deps/rabbit/src/rabbit_osiris_metrics.erl b/deps/rabbit/src/rabbit_osiris_metrics.erl index f5696ed9d1e6..a29ea4c65754 100644 --- a/deps/rabbit/src/rabbit_osiris_metrics.erl +++ b/deps/rabbit/src/rabbit_osiris_metrics.erl @@ -27,7 +27,8 @@ members, memory, readers, - consumers + consumers, + segments ]). -record(state, {timeout :: non_neg_integer()}). diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index b0f94dd4fd92..5ca7ecb154e6 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -65,7 +65,7 @@ -define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state, messages, messages_ready, messages_unacknowledged, committed_offset, policy, operator_policy, effective_policy_definition, type, memory, - consumers]). + consumers, segments]). -type appender_seq() :: non_neg_integer(). @@ -726,6 +726,14 @@ i(committed_offset, Q) -> Data -> maps:get(committed_offset, Data, '') end; +i(segments, Q) -> + Key = {osiris_writer, amqqueue:get_name(Q)}, + case osiris_counters:overview(Key) of + undefined -> + ''; + Data -> + maps:get(segments, Data, '') + end; i(policy, Q) -> case rabbit_policy:name(Q) of none -> ''; diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 5d1c172d6129..1e6d7bfd5556 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -160,7 +160,8 @@ {2, undefined, queue_messages_paged_out_bytes, gauge, "Size in bytes of messages paged out to disk", message_bytes_paged_out}, {2, undefined, queue_head_message_timestamp, gauge, "Timestamp of the first message in the queue, if any", head_message_timestamp}, {2, undefined, queue_disk_reads_total, counter, "Total number of times queue read messages from disk", disk_reads}, - {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes} + {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}, + {2, undefined, stream_segments, counter, "Total number of stream segment files", segments} ]}, %%% Metrics that contain reference to a channel. Some of them also have @@ -538,7 +539,7 @@ get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) -> end, empty(MF), Table), [{Table, [{consumers, A1}]}]; get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) -> - {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} = + {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17} = ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; @@ -559,7 +560,7 @@ get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) -> {messages_bytes_persistent, A9}, {message_bytes, A10}, {message_bytes_ready, A11}, {message_bytes_unacknowledged, A12}, {messages_paged_out, A13}, {message_bytes_paged_out, A14}, - {disk_reads, A15}, {disk_writes, A16}]}]; + {disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}]; get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchange_metrics; Table == queue_coarse_metrics; Table == channel_queue_metrics; @@ -668,7 +669,7 @@ get_data(Table, _, _, _) -> sum_queue_metrics(Props, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, - A12, A13, A14, A15, A16}) -> + A12, A13, A14, A15, A16, A17}) -> {T, sum(proplists:get_value(consumers, Props), A1), sum(proplists:get_value(consumer_utilisation, Props), A2), @@ -685,7 +686,8 @@ sum_queue_metrics(Props, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, sum(proplists:get_value(messages_paged_out, Props), A13), sum(proplists:get_value(message_bytes_paged_out, Props), A14), sum(proplists:get_value(disk_reads, Props), A15), - sum(proplists:get_value(disk_writes, Props), A16) + sum(proplists:get_value(disk_writes, Props), A16), + sum(proplists:get_value(segments, Props), A17) }. division(0, 0) -> @@ -707,7 +709,7 @@ empty(T) when T == ra_metrics -> empty(T) when T == channel_queue_metrics; T == channel_metrics -> {T, 0, 0, 0, 0, 0, 0, 0}; empty(queue_metrics = T) -> - {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}. + {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}. sum(undefined, B) -> B;