From b1d46393e00d074381aaa45a97b480fd157894ad Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Wed, 31 Jan 2024 20:14:02 +0000 Subject: [PATCH 1/2] Check consumer size for clean strats --- deps/rabbit/src/rabbit_channel.erl | 5 ++- .../src/rabbit_mgmt_metrics_gc.erl | 38 +++++++++++-------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index c30e4f32f7ce..5b098c09030d 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -824,8 +824,10 @@ terminate(_Reason, rabbit_global_counters:consumer_deleted(amqp091) end, CM), rabbit_core_metrics:channel_closed(self()), + CMSize = maps:size(CM), rabbit_event:notify(channel_closed, [{pid, self()}, - {user_who_performed_action, Username}]), + {user_who_performed_action, Username}, + {consumer_count, CMSize}]), case rabbit_confirms:size(State#ch.unconfirmed) of 0 -> ok; NumConfirms -> @@ -2860,4 +2862,3 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) -> is_global_qos_permitted() -> rabbit_deprecated_features:is_permitted(global_qos). - diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl index 260b2137615a..71ea2faa0fa3 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl @@ -19,6 +19,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-define(LARGE_CONSUMER_COUNT, 1000). + name(EventType) -> list_to_atom((atom_to_list(EventType) ++ "_metrics_gc")). @@ -42,7 +44,8 @@ handle_cast({event, #event{type = connection_closed, props = Props}}, handle_cast({event, #event{type = channel_closed, props = Props}}, State = #state{basic_i = BIntervals}) -> Pid = pget(pid, Props), - remove_channel(Pid, BIntervals), + CMCount = pget(consumer_count, Props), + remove_channel(Pid, CMCount, BIntervals), {noreply, State}; handle_cast({event, #event{type = consumer_deleted, props = Props}}, State) -> remove_consumer(Props), @@ -82,13 +85,13 @@ remove_connection(Id, BIntervals) -> delete_samples(connection_stats_coarse_conn_stats, Id, BIntervals), ok. -remove_channel(Id, BIntervals) -> +remove_channel(Id, CMCount, BIntervals) -> ets:delete(channel_created_stats, Id), ets:delete(channel_stats, Id), delete_samples(channel_process_stats, Id, BIntervals), delete_samples(channel_stats_fine_stats, Id, BIntervals), delete_samples(channel_stats_deliver_stats, Id, BIntervals), - index_delete(consumer_stats, channel, Id), + index_delete(consumer_stats, {channel, CMCount}, Id), index_delete(channel_exchange_stats_fine_stats, channel, Id), index_delete(channel_queue_stats_deliver_stats, channel, Id), ok. @@ -137,18 +140,23 @@ delete_samples(Table, Id, Intervals) -> [ets:delete(Table, {Id, I}) || I <- Intervals], ok. -index_delete(consumer_stats = Table, channel = Type, Id) -> - IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type), - MatchPattern = {'_', Id, '_'}, - %% Delete consumer_stats_queue_index - ets:match_delete(consumer_stats_queue_index, - {'_', MatchPattern}), - %% Delete consumer_stats - ets:match_delete(consumer_stats, - {MatchPattern,'_'}), - %% Delete consumer_stats_channel_index - ets:delete(IndexTable, Id), - ok; +index_delete(consumer_stats = Table, {channel = Type, Count}, Id) -> + case Count > ?LARGE_CONSUMER_COUNT of + true -> + IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type), + MatchPattern = {'_', Id, '_'}, + %% Delete consumer_stats_queue_index + ets:match_delete(consumer_stats_queue_index, + {'_', MatchPattern}), + %% Delete consumer_stats + ets:match_delete(consumer_stats, + {MatchPattern,'_'}), + %% Delete consumer_stats_channel_index + ets:delete(IndexTable, Id), + ok; + false -> + index_delete(Table, Type, Id) + end; index_delete(Table, Type, Id) -> IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type), Keys = ets:lookup(IndexTable, Id), From 5d5cf2cd23f6e2cb3d80235e1c3e328d9ee65dff Mon Sep 17 00:00:00 2001 From: Simon Unge Date: Thu, 1 Feb 2024 19:07:12 +0000 Subject: [PATCH 2/2] Better variable names, and comments --- deps/rabbit/src/rabbit_channel.erl | 3 +-- .../src/rabbit_mgmt_metrics_gc.erl | 16 ++++++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 5b098c09030d..16655b28d5a0 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -824,10 +824,9 @@ terminate(_Reason, rabbit_global_counters:consumer_deleted(amqp091) end, CM), rabbit_core_metrics:channel_closed(self()), - CMSize = maps:size(CM), rabbit_event:notify(channel_closed, [{pid, self()}, {user_who_performed_action, Username}, - {consumer_count, CMSize}]), + {consumer_count, maps:size(CM)}]), case rabbit_confirms:size(State#ch.unconfirmed) of 0 -> ok; NumConfirms -> diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl index 71ea2faa0fa3..a5f02e550def 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl @@ -44,8 +44,8 @@ handle_cast({event, #event{type = connection_closed, props = Props}}, handle_cast({event, #event{type = channel_closed, props = Props}}, State = #state{basic_i = BIntervals}) -> Pid = pget(pid, Props), - CMCount = pget(consumer_count, Props), - remove_channel(Pid, CMCount, BIntervals), + ConsumerCount = pget(consumer_count, Props), + remove_channel(Pid, ConsumerCount, BIntervals), {noreply, State}; handle_cast({event, #event{type = consumer_deleted, props = Props}}, State) -> remove_consumer(Props), @@ -85,13 +85,13 @@ remove_connection(Id, BIntervals) -> delete_samples(connection_stats_coarse_conn_stats, Id, BIntervals), ok. -remove_channel(Id, CMCount, BIntervals) -> +remove_channel(Id, ConsumerCount, BIntervals) -> ets:delete(channel_created_stats, Id), ets:delete(channel_stats, Id), delete_samples(channel_process_stats, Id, BIntervals), delete_samples(channel_stats_fine_stats, Id, BIntervals), delete_samples(channel_stats_deliver_stats, Id, BIntervals), - index_delete(consumer_stats, {channel, CMCount}, Id), + index_delete(consumer_stats, {channel, ConsumerCount}, Id), index_delete(channel_exchange_stats_fine_stats, channel, Id), index_delete(channel_queue_stats_deliver_stats, channel, Id), ok. @@ -140,8 +140,12 @@ delete_samples(Table, Id, Intervals) -> [ets:delete(Table, {Id, I}) || I <- Intervals], ok. -index_delete(consumer_stats = Table, {channel = Type, Count}, Id) -> - case Count > ?LARGE_CONSUMER_COUNT of +index_delete(consumer_stats = Table, {channel = Type, ConsumerCount}, Id) -> + %% In case of very large amount of consumers on a single channel, + %% we use the more memoroy costly bulk operation `ets:match_delete` to reduce + %% CPU load. For the more common case, few consumers per channel, we loop + %% through the consumers and remove them one by one + case ConsumerCount > ?LARGE_CONSUMER_COUNT of true -> IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type), MatchPattern = {'_', Id, '_'},