From f3cfb57e2e83436e51d7d065edd04cc9197b6539 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sat, 13 Feb 2016 01:33:50 +0300 Subject: [PATCH] Use a counter column to track number of connections per vhost Limit query time is now 50-70 microseconds for 50M connections. --- src/rabbit_connection_tracking.erl | 59 +++++++++++++++++++------ src/rabbit_table.erl | 7 ++- src/rabbit_upgrade_functions.erl | 71 ++++++++++++++++-------------- 3 files changed, 90 insertions(+), 47 deletions(-) diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index e450ab23ef54..4dbd5dc5b427 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -45,30 +45,63 @@ %% API %% -register_connection(Conn) -> - rabbit_misc:execute_mnesia_transaction(fun() -> - mnesia:write(?TABLE, Conn, write) - end). +register_connection(#tracked_connection{vhost = VHost} = Conn) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + mnesia:write(?TABLE, Conn, write), + mnesia:dirty_update_counter( + rabbit_tracked_connection_per_vhost, VHost, 1), + ok + end). unregister_connection(ConnId = {_Node, _Name}) -> - rabbit_misc:execute_mnesia_transaction(fun() -> - mnesia:delete({?TABLE, ConnId}) - end). + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:dirty_read(?TABLE, ConnId) of + [] -> ok; + [Row] -> + mnesia:dirty_update_counter( + rabbit_tracked_connection_per_vhost, + Row#tracked_connection.vhost, -1), + mnesia:delete({?TABLE, ConnId}) + end + end). is_over_connection_limit(VirtualHost) -> ConnectionCount = count_connections_in(VirtualHost), case rabbit_vhost_limit:connection_limit(VirtualHost) of undefined -> false; - {ok, Limit} -> case ConnectionCount > Limit of - false -> false; - true -> {true, Limit} + {ok, Limit} -> case {ConnectionCount, ConnectionCount >= Limit} of + %% 0 = no limit + {0, _} -> false; + %% the limit hasn't been reached + {_, false} -> false; + {_N, true} -> {true, Limit} end end. count_connections_in(VirtualHost) -> - ets:select_count(?TABLE, [{#tracked_connection{vhost = '$1', _ = '_'}, - [{'=:=','$1', VirtualHost}], - [true]}]). + try + case mnesia:transaction( + fun() -> + case mnesia:dirty_read( + {rabbit_tracked_connection_per_vhost, + VirtualHost}) of + [] -> 0; + [Val] -> + Val#tracked_connection_per_vhost.connection_count + end + end) of + {atomic, Val} -> Val; + {aborted, _Reason} -> 0 + end + catch + _:Err -> + rabbit_log:error( + "Failed to fetch number of connections in vhost ~p:~n~p~n", + [VirtualHost, Err]), + 0 + end. %% Returns a #tracked_connection from connection_created %% event details. diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 9352f0937494..a9105c5c56e0 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -319,7 +319,12 @@ definitions() -> {rabbit_tracked_connection, [{record_name, tracked_connection}, {attributes, record_info(fields, tracked_connection)}, - {match, #tracked_connection{_ = '_'}}]} + {match, #tracked_connection{_ = '_'}}]}, + + {rabbit_tracked_connection_per_vhost, + [{record_name, tracked_connection_per_vhost}, + {attributes, record_info(fields, tracked_connection_per_vhost)}, + {match, #tracked_connection_per_vhost{_ = '_'}}]} ] ++ gm:table_definitions() ++ mirrored_supervisor:table_definitions(). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 22e6ab296cd4..59be1fddd195 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -54,42 +54,44 @@ -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). -rabbit_upgrade({vhost_limits, mnesia, []}). -rabbit_upgrade({tracked_connection, mnesia, [vhost_limits]}). +-rabbit_upgrade({tracked_connection_per_vhost, mnesia, [tracked_connection]}). %% ------------------------------------------------------------------- -ifdef(use_specs). --spec(remove_user_scope/0 :: () -> 'ok'). --spec(hash_passwords/0 :: () -> 'ok'). --spec(add_ip_to_listener/0 :: () -> 'ok'). --spec(internal_exchanges/0 :: () -> 'ok'). --spec(user_to_internal_user/0 :: () -> 'ok'). --spec(topic_trie/0 :: () -> 'ok'). --spec(semi_durable_route/0 :: () -> 'ok'). --spec(exchange_event_serial/0 :: () -> 'ok'). --spec(trace_exchanges/0 :: () -> 'ok'). --spec(user_admin_to_tags/0 :: () -> 'ok'). --spec(ha_mirrors/0 :: () -> 'ok'). --spec(gm/0 :: () -> 'ok'). --spec(exchange_scratch/0 :: () -> 'ok'). --spec(mirrored_supervisor/0 :: () -> 'ok'). --spec(topic_trie_node/0 :: () -> 'ok'). --spec(runtime_parameters/0 :: () -> 'ok'). --spec(policy/0 :: () -> 'ok'). --spec(sync_slave_pids/0 :: () -> 'ok'). --spec(no_mirror_nodes/0 :: () -> 'ok'). --spec(gm_pids/0 :: () -> 'ok'). --spec(exchange_decorators/0 :: () -> 'ok'). --spec(policy_apply_to/0 :: () -> 'ok'). --spec(queue_decorators/0 :: () -> 'ok'). --spec(internal_system_x/0 :: () -> 'ok'). --spec(cluster_name/0 :: () -> 'ok'). --spec(down_slave_nodes/0 :: () -> 'ok'). --spec(queue_state/0 :: () -> 'ok'). --spec(recoverable_slaves/0 :: () -> 'ok'). --spec(user_password_hashing/0 :: () -> 'ok'). --spec(vhost_limits/0 :: () -> 'ok'). --spec(tracked_connection/0 :: () -> 'ok'). +-spec(remove_user_scope/0 :: () -> 'ok'). +-spec(hash_passwords/0 :: () -> 'ok'). +-spec(add_ip_to_listener/0 :: () -> 'ok'). +-spec(internal_exchanges/0 :: () -> 'ok'). +-spec(user_to_internal_user/0 :: () -> 'ok'). +-spec(topic_trie/0 :: () -> 'ok'). +-spec(semi_durable_route/0 :: () -> 'ok'). +-spec(exchange_event_serial/0 :: () -> 'ok'). +-spec(trace_exchanges/0 :: () -> 'ok'). +-spec(user_admin_to_tags/0 :: () -> 'ok'). +-spec(ha_mirrors/0 :: () -> 'ok'). +-spec(gm/0 :: () -> 'ok'). +-spec(exchange_scratch/0 :: () -> 'ok'). +-spec(mirrored_supervisor/0 :: () -> 'ok'). +-spec(topic_trie_node/0 :: () -> 'ok'). +-spec(runtime_parameters/0 :: () -> 'ok'). +-spec(policy/0 :: () -> 'ok'). +-spec(sync_slave_pids/0 :: () -> 'ok'). +-spec(no_mirror_nodes/0 :: () -> 'ok'). +-spec(gm_pids/0 :: () -> 'ok'). +-spec(exchange_decorators/0 :: () -> 'ok'). +-spec(policy_apply_to/0 :: () -> 'ok'). +-spec(queue_decorators/0 :: () -> 'ok'). +-spec(internal_system_x/0 :: () -> 'ok'). +-spec(cluster_name/0 :: () -> 'ok'). +-spec(down_slave_nodes/0 :: () -> 'ok'). +-spec(queue_state/0 :: () -> 'ok'). +-spec(recoverable_slaves/0 :: () -> 'ok'). +-spec(user_password_hashing/0 :: () -> 'ok'). +-spec(vhost_limits/0 :: () -> 'ok'). +-spec(tracked_connection/0 :: () -> 'ok'). +-spec(tracked_connection_per_vhost/0 :: () -> 'ok'). -endif. @@ -100,8 +102,11 @@ tracked_connection() -> {attributes, [id, node, vhost, name, pid, protocol, peer_host, peer_port, - username, connected_at]}], - [vhost, username]). + username, connected_at]}]). + +tracked_connection_per_vhost() -> + create(tracked_connection_per_vhost, [{record_name, tracked_connection_per_vhost}, + {attributes, [vhost, connection_count]}]). %% replaces vhost.dummy (used to avoid having a single-field record %% which Mnesia doesn't like) with vhost.limits (which is actually