Skip to content

Commit

Permalink
Use a counter column to track number of connections per vhost
Browse files Browse the repository at this point in the history
Limit query time is now 50-70 microseconds for
50M connections.
  • Loading branch information
michaelklishin committed Feb 12, 2016
1 parent e9132f1 commit f3cfb57
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 47 deletions.
59 changes: 46 additions & 13 deletions src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,63 @@
%% API
%%

register_connection(Conn) ->
rabbit_misc:execute_mnesia_transaction(fun() ->
mnesia:write(?TABLE, Conn, write)
end).
register_connection(#tracked_connection{vhost = VHost} = Conn) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
mnesia:write(?TABLE, Conn, write),
mnesia:dirty_update_counter(
rabbit_tracked_connection_per_vhost, VHost, 1),
ok
end).

unregister_connection(ConnId = {_Node, _Name}) ->
rabbit_misc:execute_mnesia_transaction(fun() ->
mnesia:delete({?TABLE, ConnId})
end).
rabbit_misc:execute_mnesia_transaction(
fun() ->
case mnesia:dirty_read(?TABLE, ConnId) of
[] -> ok;
[Row] ->
mnesia:dirty_update_counter(
rabbit_tracked_connection_per_vhost,
Row#tracked_connection.vhost, -1),
mnesia:delete({?TABLE, ConnId})
end
end).

is_over_connection_limit(VirtualHost) ->
ConnectionCount = count_connections_in(VirtualHost),
case rabbit_vhost_limit:connection_limit(VirtualHost) of
undefined -> false;
{ok, Limit} -> case ConnectionCount > Limit of
false -> false;
true -> {true, Limit}
{ok, Limit} -> case {ConnectionCount, ConnectionCount >= Limit} of
%% 0 = no limit
{0, _} -> false;
%% the limit hasn't been reached
{_, false} -> false;
{_N, true} -> {true, Limit}
end
end.

count_connections_in(VirtualHost) ->
ets:select_count(?TABLE, [{#tracked_connection{vhost = '$1', _ = '_'},
[{'=:=','$1', VirtualHost}],
[true]}]).
try
case mnesia:transaction(
fun() ->
case mnesia:dirty_read(
{rabbit_tracked_connection_per_vhost,
VirtualHost}) of
[] -> 0;
[Val] ->
Val#tracked_connection_per_vhost.connection_count
end
end) of
{atomic, Val} -> Val;
{aborted, _Reason} -> 0
end
catch
_:Err ->
rabbit_log:error(
"Failed to fetch number of connections in vhost ~p:~n~p~n",
[VirtualHost, Err]),
0
end.

%% Returns a #tracked_connection from connection_created
%% event details.
Expand Down
7 changes: 6 additions & 1 deletion src/rabbit_table.erl
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,12 @@ definitions() ->
{rabbit_tracked_connection,
[{record_name, tracked_connection},
{attributes, record_info(fields, tracked_connection)},
{match, #tracked_connection{_ = '_'}}]}
{match, #tracked_connection{_ = '_'}}]},

{rabbit_tracked_connection_per_vhost,
[{record_name, tracked_connection_per_vhost},
{attributes, record_info(fields, tracked_connection_per_vhost)},
{match, #tracked_connection_per_vhost{_ = '_'}}]}

] ++ gm:table_definitions()
++ mirrored_supervisor:table_definitions().
Expand Down
71 changes: 38 additions & 33 deletions src/rabbit_upgrade_functions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,42 +54,44 @@
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
-rabbit_upgrade({vhost_limits, mnesia, []}).
-rabbit_upgrade({tracked_connection, mnesia, [vhost_limits]}).
-rabbit_upgrade({tracked_connection_per_vhost, mnesia, [tracked_connection]}).

%% -------------------------------------------------------------------

-ifdef(use_specs).

-spec(remove_user_scope/0 :: () -> 'ok').
-spec(hash_passwords/0 :: () -> 'ok').
-spec(add_ip_to_listener/0 :: () -> 'ok').
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
-spec(semi_durable_route/0 :: () -> 'ok').
-spec(exchange_event_serial/0 :: () -> 'ok').
-spec(trace_exchanges/0 :: () -> 'ok').
-spec(user_admin_to_tags/0 :: () -> 'ok').
-spec(ha_mirrors/0 :: () -> 'ok').
-spec(gm/0 :: () -> 'ok').
-spec(exchange_scratch/0 :: () -> 'ok').
-spec(mirrored_supervisor/0 :: () -> 'ok').
-spec(topic_trie_node/0 :: () -> 'ok').
-spec(runtime_parameters/0 :: () -> 'ok').
-spec(policy/0 :: () -> 'ok').
-spec(sync_slave_pids/0 :: () -> 'ok').
-spec(no_mirror_nodes/0 :: () -> 'ok').
-spec(gm_pids/0 :: () -> 'ok').
-spec(exchange_decorators/0 :: () -> 'ok').
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
-spec(internal_system_x/0 :: () -> 'ok').
-spec(cluster_name/0 :: () -> 'ok').
-spec(down_slave_nodes/0 :: () -> 'ok').
-spec(queue_state/0 :: () -> 'ok').
-spec(recoverable_slaves/0 :: () -> 'ok').
-spec(user_password_hashing/0 :: () -> 'ok').
-spec(vhost_limits/0 :: () -> 'ok').
-spec(tracked_connection/0 :: () -> 'ok').
-spec(remove_user_scope/0 :: () -> 'ok').
-spec(hash_passwords/0 :: () -> 'ok').
-spec(add_ip_to_listener/0 :: () -> 'ok').
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
-spec(semi_durable_route/0 :: () -> 'ok').
-spec(exchange_event_serial/0 :: () -> 'ok').
-spec(trace_exchanges/0 :: () -> 'ok').
-spec(user_admin_to_tags/0 :: () -> 'ok').
-spec(ha_mirrors/0 :: () -> 'ok').
-spec(gm/0 :: () -> 'ok').
-spec(exchange_scratch/0 :: () -> 'ok').
-spec(mirrored_supervisor/0 :: () -> 'ok').
-spec(topic_trie_node/0 :: () -> 'ok').
-spec(runtime_parameters/0 :: () -> 'ok').
-spec(policy/0 :: () -> 'ok').
-spec(sync_slave_pids/0 :: () -> 'ok').
-spec(no_mirror_nodes/0 :: () -> 'ok').
-spec(gm_pids/0 :: () -> 'ok').
-spec(exchange_decorators/0 :: () -> 'ok').
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
-spec(internal_system_x/0 :: () -> 'ok').
-spec(cluster_name/0 :: () -> 'ok').
-spec(down_slave_nodes/0 :: () -> 'ok').
-spec(queue_state/0 :: () -> 'ok').
-spec(recoverable_slaves/0 :: () -> 'ok').
-spec(user_password_hashing/0 :: () -> 'ok').
-spec(vhost_limits/0 :: () -> 'ok').
-spec(tracked_connection/0 :: () -> 'ok').
-spec(tracked_connection_per_vhost/0 :: () -> 'ok').

-endif.

Expand All @@ -100,8 +102,11 @@ tracked_connection() ->
{attributes, [id, node, vhost, name,
pid, protocol,
peer_host, peer_port,
username, connected_at]}],
[vhost, username]).
username, connected_at]}]).

tracked_connection_per_vhost() ->
create(tracked_connection_per_vhost, [{record_name, tracked_connection_per_vhost},
{attributes, [vhost, connection_count]}]).

%% replaces vhost.dummy (used to avoid having a single-field record
%% which Mnesia doesn't like) with vhost.limits (which is actually
Expand Down

0 comments on commit f3cfb57

Please sign in to comment.