Skip to content

Commit

Permalink
Emit histogram metric for received message sizes per protocol (#12342)
Browse files Browse the repository at this point in the history
* Add global histogram metrics for received message sizes per-protocol

fixup: add new files to bazel

fixup: expose message_size_bytes as prometheus classic histogram type

`rabbit_msg_size_metrics` does not use `seshat` any more, but
`counters` directly.

fixup: add msg_size_metrics unit test

* Improve message size histogram

1.
Avoid unnecessary time series emitted for stream protocol
The stream protocol cannot observe message sizes.
This commit ensures that the following time series are omitted:
```
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="64"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="256"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1024"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4096"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16384"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="65536"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="262144"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="1048576"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="4194304"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="16777216"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="67108864"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="268435456"} 0
rabbitmq_global_message_size_bytes_bucket{protocol="stream",le="+Inf"} 0
rabbitmq_global_message_size_bytes_count{protocol="stream"} 0
rabbitmq_global_message_size_bytes_sum{protocol="stream"} 0
```

This reduces the number of time series by 15.

2.
Further reduce the number of time series by reducing the number of
buckets. Instead of 13 bucktes, emit only 9 buckets. Buckets are not
free, each is an extra time series stored.

Prior to this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
      92
```

After this commit:
```
curl -s -u guest:guest localhost:15692/metrics | ag message_size | wc -l
      57
```

3.
The emitted metric should be called
`rabbitmq_message_size_bytes_bucket` instead of `rabbitmq_global_message_size_bytes_bucket`.
The latter is poor naming. There is no need to use `global` in
the metric name given that this metric doesn't exist in the old flawed
aggregated metrics.

4.
This commit simplies module `rabbit_global_counters`.

5.
Avoid garbage collecting the 10-elements list of buckets per message
being received.

---------

Co-authored-by: Péter Gömöri <[email protected]>
  • Loading branch information
2 people authored and michaelklishin committed Sep 25, 2024
1 parent 8377eda commit 1e3f4e5
Show file tree
Hide file tree
Showing 17 changed files with 533 additions and 34 deletions.
12 changes: 12 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "msg_size_metrics_SUITE",
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

rabbitmq_integration_suite(
name = "list_consumers_sanity_check_SUITE",
size = "medium",
Expand Down Expand Up @@ -993,6 +1000,11 @@ rabbitmq_integration_suite(
size = "medium",
)

rabbitmq_suite(
name = "unit_msg_size_metrics_SUITE",
size = "small",
)

rabbitmq_suite(
name = "unit_operator_policy_SUITE",
size = "small",
Expand Down
20 changes: 20 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
Expand Down Expand Up @@ -425,6 +426,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
Expand Down Expand Up @@ -703,6 +705,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
Expand Down Expand Up @@ -1714,6 +1717,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "unit_msg_size_metrics_SUITE_beam_files",
testonly = True,
srcs = ["test/unit_msg_size_metrics_SUITE.erl"],
outs = ["test/unit_msg_size_metrics_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "unit_operator_policy_SUITE_beam_files",
testonly = True,
Expand Down Expand Up @@ -2183,3 +2194,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "msg_size_metrics_SUITE_beam_files",
testonly = True,
srcs = ["test/msg_size_metrics_SUITE.erl"],
outs = ["test/msg_size_metrics_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
13 changes: 8 additions & 5 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,9 @@ incoming_link_transfer(
{MsgBin0, FirstDeliveryId, FirstSettled}
end,
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
validate_message_size(PayloadBin, MaxMessageSize),
PayloadSize = iolist_size(PayloadBin),
validate_message_size(PayloadSize, MaxMessageSize),
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
Expand Down Expand Up @@ -3066,9 +3068,8 @@ validate_transfer_rcv_settle_mode(_, _) ->

validate_message_size(_, unlimited) ->
ok;
validate_message_size(Message, MaxMsgSize)
when is_integer(MaxMsgSize) ->
MsgSize = iolist_size(Message),
validate_message_size(MsgSize, MaxMsgSize)
when is_integer(MsgSize) ->
case MsgSize =< MaxMsgSize of
true ->
ok;
Expand All @@ -3082,7 +3083,9 @@ validate_message_size(Message, MaxMsgSize)
?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED,
"message size (~b bytes) > maximum message size (~b bytes)",
[MsgSize, MaxMsgSize])
end.
end;
validate_message_size(Msg, MaxMsgSize) ->
validate_message_size(iolist_size(Msg), MaxMsgSize).

-spec ensure_terminus(source | target,
term(),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ check_msg_size(Content, GCThreshold) ->
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
case Size =< MaxMessageSize of
true ->
ok;
rabbit_msg_size_metrics:observe(amqp091, Size);
false ->
Fmt = case MaxMessageSize of
?MAX_MSG_SIZE ->
Expand Down
21 changes: 14 additions & 7 deletions deps/rabbit/src/rabbit_global_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
boot_step/0,
init/1,
init/2,
overview/0,
prometheus_format/0,
increase_protocol_counter/3,
messages_received/2,
Expand All @@ -38,6 +37,10 @@
messages_dead_lettered_confirmed/3
]).

-ifdef(TEST).
-export([overview/0]).
-endif.

%% PROTOCOL COUNTERS:
-define(MESSAGES_RECEIVED, 1).
-define(MESSAGES_RECEIVED_CONFIRM, 2).
Expand Down Expand Up @@ -132,12 +135,14 @@
boot_step() ->
[begin
%% Protocol counters
init([{protocol, Proto}]),
Protocol = {protocol, Proto},
init([Protocol]),
rabbit_msg_size_metrics:init(Proto),

%% Protocol & Queue Type counters
init([{protocol, Proto}, {queue_type, rabbit_classic_queue}]),
init([{protocol, Proto}, {queue_type, rabbit_quorum_queue}]),
init([{protocol, Proto}, {queue_type, rabbit_stream_queue}])
init([Protocol, {queue_type, rabbit_classic_queue}]),
init([Protocol, {queue_type, rabbit_quorum_queue}]),
init([Protocol, {queue_type, rabbit_stream_queue}])
end || Proto <- [amqp091, amqp10]],

%% Dead Letter counters
Expand Down Expand Up @@ -192,8 +197,10 @@ init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetter
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
persistent_term:put({?MODULE, QueueType, DLS}, Counters).

-ifdef(TEST).
overview() ->
seshat:overview(?MODULE).
-endif.

prometheus_format() ->
seshat:format(?MODULE).
Expand Down Expand Up @@ -247,13 +254,13 @@ publisher_created(Protocol) ->
counters:add(fetch(Protocol), ?PUBLISHERS, 1).

publisher_deleted(Protocol) ->
counters:add(fetch(Protocol), ?PUBLISHERS, -1).
counters:sub(fetch(Protocol), ?PUBLISHERS, 1).

consumer_created(Protocol) ->
counters:add(fetch(Protocol), ?CONSUMERS, 1).

consumer_deleted(Protocol) ->
counters:add(fetch(Protocol), ?CONSUMERS, -1).
counters:sub(fetch(Protocol), ?CONSUMERS, 1).

messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
Index = case Reason of
Expand Down
143 changes: 143 additions & 0 deletions deps/rabbit/src/rabbit_msg_size_metrics.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

%% This module tracks received message size distribution as histogram.
%% (A histogram is represented by a set of counters, one for each bucket.)
-module(rabbit_msg_size_metrics).

-export([init/1,
observe/2,
prometheus_format/0]).

%% Integration tests.
-export([raw_buckets/1,
diff_raw_buckets/2]).

-ifdef(TEST).
-export([cleanup/1]).
-endif.

-define(BUCKET_1, 100).
-define(BUCKET_2, 1_000).
-define(BUCKET_3, 10_000).
-define(BUCKET_4, 100_000).
-define(BUCKET_5, 1_000_000).
-define(BUCKET_6, 10_000_000).
%% rabbit.max_message_size up to RabbitMQ 3.13 was 128 MiB.
%% rabbit.max_message_size since RabbitMQ 4.0 is 16 MiB.
%% To help finding an appropriate rabbit.max_message_size we also add a bucket for 50 MB.
-define(BUCKET_7, 50_000_000).
-define(BUCKET_8, 100_000_000).
%% 'infinity' means practically 512 MiB as hard limited in
%% https://github.com/rabbitmq/rabbitmq-server/blob/v4.0.2/deps/rabbit_common/include/rabbit.hrl#L254-L257
-define(BUCKET_9, 'infinity').

-define(MSG_SIZE_BUCKETS,
[{1, ?BUCKET_1},
{2, ?BUCKET_2},
{3, ?BUCKET_3},
{4, ?BUCKET_4},
{5, ?BUCKET_5},
{6, ?BUCKET_6},
{7, ?BUCKET_7},
{8, ?BUCKET_8},
{9, ?BUCKET_9}]).

-define(POS_MSG_SIZE_SUM, 10).

-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
NumObservations :: non_neg_integer()}].

-spec init(atom()) -> ok.
init(Protocol) ->
Size = ?POS_MSG_SIZE_SUM,
Counters = counters:new(Size, [write_concurrency]),
put_counters(Protocol, Counters).

-spec observe(atom(), non_neg_integer()) -> ok.
observe(Protocol, MessageSize) ->
BucketPos = find_bucket_pos(MessageSize),
Counters = get_counters(Protocol),
counters:add(Counters, BucketPos, 1),
counters:add(Counters, ?POS_MSG_SIZE_SUM, MessageSize).

-spec prometheus_format() -> #{atom() => map()}.
prometheus_format() ->
Values = [prometheus_values(Counters) || Counters <- get_labels_counters()],
#{message_size_bytes => #{type => histogram,
help => "Size of messages received from publishers",
values => Values}}.

find_bucket_pos(Size) when Size =< ?BUCKET_1 -> 1;
find_bucket_pos(Size) when Size =< ?BUCKET_2 -> 2;
find_bucket_pos(Size) when Size =< ?BUCKET_3 -> 3;
find_bucket_pos(Size) when Size =< ?BUCKET_4 -> 4;
find_bucket_pos(Size) when Size =< ?BUCKET_5 -> 5;
find_bucket_pos(Size) when Size =< ?BUCKET_6 -> 6;
find_bucket_pos(Size) when Size =< ?BUCKET_7 -> 7;
find_bucket_pos(Size) when Size =< ?BUCKET_8 -> 8;
find_bucket_pos(_Size) -> 9.

raw_buckets(Protocol)
when is_atom(Protocol) ->
Counters = get_counters(Protocol),
raw_buckets(Counters);
raw_buckets(Counters) ->
[{UpperBound, counters:get(Counters, Pos)}
|| {Pos, UpperBound} <- ?MSG_SIZE_BUCKETS].

-spec diff_raw_buckets(raw_buckets(), raw_buckets()) -> raw_buckets().
diff_raw_buckets(After, Before) ->
diff_raw_buckets(After, Before, []).

diff_raw_buckets([], [], Acc) ->
lists:reverse(Acc);
diff_raw_buckets([{UpperBound, CounterAfter} | After],
[{UpperBound, CounterBefore} | Before],
Acc) ->
case CounterAfter - CounterBefore of
0 ->
diff_raw_buckets(After, Before, Acc);
Diff ->
diff_raw_buckets(After, Before, [{UpperBound, Diff} | Acc])
end.

%% "If you have looked at a /metrics for a histogram, you probably noticed that the buckets
%% aren’t just a count of events that fall into them. The buckets also include a count of
%% events in all the smaller buckets, all the way up to the +Inf, bucket which is the total
%% number of events. This is known as a cumulative histogram, and why the bucket label
%% is called le, standing for less than or equal to.
%% This is in addition to buckets being counters, so Prometheus histograms are cumula‐
%% tive in two different ways."
%% [Prometheus: Up & Running]
prometheus_values({Labels, Counters}) ->
{Buckets, Count} = lists:mapfoldl(
fun({UpperBound, NumObservations}, Acc0) ->
Acc = Acc0 + NumObservations,
{{UpperBound, Acc}, Acc}
end, 0, raw_buckets(Counters)),
Sum = counters:get(Counters, ?POS_MSG_SIZE_SUM),
{Labels, Buckets, Count, Sum}.

put_counters(Protocol, Counters) ->
persistent_term:put({?MODULE, Protocol}, Counters).

get_counters(Protocol) ->
persistent_term:get({?MODULE, Protocol}).

get_labels_counters() ->
[{[{protocol, Protocol}], Counters}
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].

-ifdef(TEST).
%% "Counters are not tied to the current process and are automatically
%% garbage collected when they are no longer referenced."
-spec cleanup(atom()) -> ok.
cleanup(Protocol) ->
persistent_term:erase({?MODULE, Protocol}),
ok.
-endif.
Loading

0 comments on commit 1e3f4e5

Please sign in to comment.