diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 34b7cc912cab..a2ebc0b817ad 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1212,6 +1212,55 @@
+
+ Virtual Host Limits
+
+ It is possible to enforce certain limits on virtual hosts.
+
+
+
+ set_vhost_limits -p vhostpath definition
+
+
+ Sets virtual host limits
+
+
+
+ definition
+
+ The definition of the limits, as a
+ JSON term. In most shells you are very likely to
+ need to quote this.
+
+ Recognised limits: max-connections (0 means "no limit").
+
+
+
+ For example:
+ rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": 1024}'
+
+ This command limits the max number of concurrent connections in vhost qa_env
+ to 1024.
+
+
+
+
+
+ clear_vhost_limits -p vhostpath
+
+
+ Clears virtual host limits
+
+ For example:
+ rabbitmqctl clear_vhost_limits -p qa_env
+
+ This command clears vhost limits in vhost qa_env.
+
+
+
+
+
+
Server Status
@@ -2069,9 +2118,9 @@
fraction
- Limit relative to the total amount available RAM
- as a non-negative floating point number.
- Values lower than 1.0 can be dangerous and
+ Limit relative to the total amount available RAM
+ as a non-negative floating point number.
+ Values lower than 1.0 can be dangerous and
should be used carefully.
diff --git a/include/rabbit_cli.hrl b/include/rabbit_cli.hrl
index b1cf41261f10..19cae2fc0a06 100644
--- a/include/rabbit_cli.hrl
+++ b/include/rabbit_cli.hrl
@@ -31,7 +31,6 @@
-define(ONLINE_OPT, "--online").
-define(LOCAL_OPT, "--local").
-
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2fa4cdee71ef..5db2c40b666a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -178,6 +178,11 @@
{mfa, {rabbit_direct, boot, []}},
{requires, log_relay}]}).
+-rabbit_boot_step({connection_tracking,
+ [{description, "sets up internal storage for node-local connections"},
+ {mfa, {rabbit_connection_tracking, boot, []}},
+ {requires, log_relay}]}).
+
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
{requires, log_relay}]}).
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
new file mode 100644
index 000000000000..ab945abc2f4f
--- /dev/null
+++ b/src/rabbit_connection_tracking.erl
@@ -0,0 +1,337 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_connection_tracking).
+
+%% Abstracts away how tracked connection records are stored
+%% and queried.
+%%
+%% See also:
+%%
+%% * rabbit_connection_tracking_handler
+%% * rabbit_reader
+%% * rabbit_event
+
+-export([boot/0,
+ ensure_tracked_connections_table_for_node/1,
+ ensure_per_vhost_tracked_connections_table_for_node/1,
+ ensure_tracked_connections_table_for_this_node/0,
+ ensure_per_vhost_tracked_connections_table_for_this_node/0,
+ tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1,
+ delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
+ clear_tracked_connection_tables_for_this_node/0,
+ register_connection/1, unregister_connection/1,
+ list/0, list/1, list_on_node/1,
+ tracked_connection_from_connection_created/1,
+ tracked_connection_from_connection_state/1,
+ is_over_connection_limit/1, count_connections_in/1]).
+
+-include_lib("rabbit.hrl").
+
+-import(rabbit_misc, [pget/2]).
+
+%%
+%% API
+%%
+
+-spec boot() -> ok.
+
+%% Sets up and resets connection tracking tables for this
+%% node.
+boot() ->
+ ensure_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for connection tracking on this node: ~p",
+ [tracked_connection_table_name_for(node())]),
+ ensure_per_vhost_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p",
+ [tracked_connection_per_vhost_table_name_for(node())]),
+ clear_tracked_connection_tables_for_this_node(),
+ ok.
+
+
+-spec ensure_tracked_connections_table_for_this_node() -> ok.
+
+ensure_tracked_connections_table_for_this_node() ->
+ ensure_tracked_connections_table_for_node(node()).
+
+
+-spec ensure_per_vhost_tracked_connections_table_for_this_node() -> ok.
+
+ensure_per_vhost_tracked_connections_table_for_this_node() ->
+ ensure_per_vhost_tracked_connections_table_for_node(node()).
+
+
+-spec ensure_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection},
+ {attributes, record_info(fields, tracked_connection)}]) of
+ {atomic, ok} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a tracked connection table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+
+-spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_per_vhost_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_vhost_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection_per_vhost},
+ {attributes, record_info(fields, tracked_connection_per_vhost)}]) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ %% TODO: propagate errors
+ end.
+
+
+-spec clear_tracked_connection_tables_for_this_node() -> ok.
+
+clear_tracked_connection_tables_for_this_node() ->
+ case mnesia:clear_table(tracked_connection_table_name_for(node())) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ end,
+ case mnesia:clear_table(tracked_connection_per_vhost_table_name_for(node())) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ end.
+
+
+-spec delete_tracked_connections_table_for_node(node()) -> ok.
+
+delete_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_table_name_for(Node),
+ case mnesia:delete_table(TableName) of
+ {atomic, ok} -> ok;
+ {aborted, {no_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to delete a tracked connection table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+
+-spec delete_per_vhost_tracked_connections_table_for_node(node()) -> ok.
+
+delete_per_vhost_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_vhost_table_name_for(Node),
+ case mnesia:delete_table(TableName) of
+ {atomic, ok} -> ok;
+ {aborted, {no_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to delete a per-vhost tracked connection table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+
+-spec tracked_connection_table_name_for(node()) -> atom().
+
+tracked_connection_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_connection_on_node_~s", [Node])).
+
+-spec tracked_connection_per_vhost_table_name_for(node()) -> atom().
+
+tracked_connection_per_vhost_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])).
+
+
+-spec register_connection(rabbit_types:tracked_connection()) -> ok.
+
+register_connection(#tracked_connection{vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ %% upsert
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] ->
+ mnesia:write(TableName, Conn, write),
+ mnesia:dirty_update_counter(
+ PerVhostTableName, VHost, 1);
+ [_Row] ->
+ ok
+ end,
+ ok
+ end).
+
+-spec unregister_connection(rabbit_types:connection_name()) -> ok.
+
+unregister_connection(ConnId = {Node, _Name}) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] -> ok;
+ [Row] ->
+ mnesia:dirty_update_counter(
+ PerVhostTableName, Row#tracked_connection.vhost, -1),
+ mnesia:delete({TableName, ConnId})
+ end
+ end).
+
+
+-spec list() -> [rabbit_types:tracked_connection()].
+
+list() ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = tracked_connection_table_name_for(Node),
+ Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'})
+ end, [], rabbit_mnesia:cluster_nodes(running)).
+
+
+-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
+
+list(VHost) ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = tracked_connection_table_name_for(Node),
+ Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{vhost = VHost, _ = '_'})
+ end, [], rabbit_mnesia:cluster_nodes(running)).
+
+
+-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].
+
+list_on_node(Node) ->
+ try mnesia:dirty_match_object(
+ tracked_connection_table_name_for(Node),
+ #tracked_connection{_ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
+
+-spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
+
+is_over_connection_limit(VirtualHost) ->
+ case rabbit_vhost_limit:connection_limit(VirtualHost) of
+ %% no limit configured
+ undefined -> false;
+ %% with limit = 0, no connections are allowed
+ {ok, 0} -> {true, 0};
+ {ok, Limit} when is_integer(Limit) andalso Limit > 0 ->
+ ConnectionCount = count_connections_in(VirtualHost),
+ case ConnectionCount >= Limit of
+ false -> false;
+ true -> {true, Limit}
+ end;
+ %% any negative value means "no limit". Note that parameter validation
+ %% will replace negative integers with 'undefined', so this is to be
+ %% explicit and extra defensive
+ {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false;
+ %% ignore non-integer limits
+ {ok, _Limit} -> false
+ end.
+
+
+-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer().
+
+count_connections_in(VirtualHost) ->
+ lists:foldl(fun (Node, Acc) ->
+ Tab = tracked_connection_per_vhost_table_name_for(Node),
+ try
+ N = case mnesia:transaction(
+ fun() ->
+ case mnesia:dirty_read({Tab, VirtualHost}) of
+ [] -> 0;
+ [Val] -> Val#tracked_connection_per_vhost.connection_count
+ end
+ end) of
+ {atomic, Val} -> Val;
+ {aborted, _Reason} -> 0
+ end,
+ Acc + N
+ catch _:Err ->
+ rabbit_log:error(
+ "Failed to fetch number of connections in vhost ~p on node ~p:~n~p~n",
+ [VirtualHost, Err, Node]),
+ Acc
+ end
+ end, 0, rabbit_mnesia:cluster_nodes(running)).
+
+%% Returns a #tracked_connection from connection_created
+%% event details.
+%%
+%% @see rabbit_connection_tracking_handler.
+tracked_connection_from_connection_created(EventDetails) ->
+ %% Example event:
+ %%
+ %% [{type,network},
+ %% {pid,<0.329.0>},
+ %% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>},
+ %% {port,5672},
+ %% {peer_port,60998},
+ %% {host,{0,0,0,0,0,65535,32512,1}},
+ %% {peer_host,{0,0,0,0,0,65535,32512,1}},
+ %% {ssl,false},
+ %% {peer_cert_subject,''},
+ %% {peer_cert_issuer,''},
+ %% {peer_cert_validity,''},
+ %% {auth_mechanism,<<"PLAIN">>},
+ %% {ssl_protocol,''},
+ %% {ssl_key_exchange,''},
+ %% {ssl_cipher,''},
+ %% {ssl_hash,''},
+ %% {protocol,{0,9,1}},
+ %% {user,<<"guest">>},
+ %% {vhost,<<"/">>},
+ %% {timeout,14},
+ %% {frame_max,131072},
+ %% {channel_max,65535},
+ %% {client_properties,
+ %% [{<<"capabilities">>,table,
+ %% [{<<"publisher_confirms">>,bool,true},
+ %% {<<"consumer_cancel_notify">>,bool,true},
+ %% {<<"exchange_exchange_bindings">>,bool,true},
+ %% {<<"basic.nack">>,bool,true},
+ %% {<<"connection.blocked">>,bool,true},
+ %% {<<"authentication_failure_close">>,bool,true}]},
+ %% {<<"product">>,longstr,<<"Bunny">>},
+ %% {<<"platform">>,longstr,
+ %% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>},
+ %% {<<"version">>,longstr,<<"2.3.0.pre">>},
+ %% {<<"information">>,longstr,
+ %% <<"http://rubybunny.info">>}]},
+ %% {connected_at,1453214290847}]
+ Name = pget(name, EventDetails),
+ Node = pget(node, EventDetails),
+ #tracked_connection{id = {Node, Name},
+ name = Name,
+ node = Node,
+ vhost = pget(vhost, EventDetails),
+ username = pget(user, EventDetails),
+ connected_at = pget(connected_at, EventDetails),
+ pid = pget(pid, EventDetails),
+ peer_host = pget(peer_host, EventDetails),
+ peer_port = pget(peer_port, EventDetails)}.
+
+tracked_connection_from_connection_state(#connection{
+ vhost = VHost,
+ connected_at = Ts,
+ peer_host = PeerHost,
+ peer_port = PeerPort,
+ user = Username,
+ name = Name
+ }) ->
+ tracked_connection_from_connection_created(
+ [{name, Name},
+ {node, node()},
+ {vhost, VHost},
+ {user, Username},
+ {connected_at, Ts},
+ {pid, self()},
+ {peer_port, PeerPort},
+ {peer_host, PeerHost}]).
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
new file mode 100644
index 000000000000..fd1df8c88adc
--- /dev/null
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -0,0 +1,108 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_connection_tracking_handler).
+
+%% This module keeps track of connection creation and termination events
+%% on its local node. The primary goal here is to decouple connection
+%% tracking from rabbit_reader in rabbit_common.
+%%
+%% Events from other nodes are ignored.
+
+-behaviour(gen_event).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include_lib("rabbit.hrl").
+-import(rabbit_misc, [pget/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "connection tracking event handler"},
+ {mfa, {gen_event, add_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {cleanup, {gen_event, delete_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {requires, [rabbit_event, rabbit_node_monitor]},
+ {enables, recovery}]}).
+
+
+%%
+%% API
+%%
+
+init([]) ->
+ {ok, []}.
+
+handle_event(#event{type = connection_created, props = Details}, State) ->
+ ThisNode = node(),
+ case pget(node, Details) of
+ ThisNode ->
+ rabbit_connection_tracking:register_connection(
+ rabbit_connection_tracking:tracked_connection_from_connection_created(Details)
+ );
+ _OtherNode ->
+ %% ignore
+ ok
+ end,
+ {ok, State};
+handle_event(#event{type = connection_closed, props = Details}, State) ->
+ ThisNode = node(),
+ case pget(node, Details) of
+ ThisNode ->
+ %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
+ %% {pid,<0.1774.0>},
+ %% {node, rabbit@hostname}]
+ rabbit_connection_tracking:unregister_connection(
+ {pget(node, Details),
+ pget(name, Details)});
+ _OtherNode ->
+ %% ignore
+ ok
+ end,
+ {ok, State};
+handle_event(#event{type = vhost_deleted, props = Details}, State) ->
+ VHost = pget(name, Details),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
+ [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) ||
+ #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list(VHost)],
+ {ok, State};
+handle_event(#event{type = user_deleted, props = Details}, State) ->
+ _Username = pget(name, Details),
+ %% TODO: force close and unregister connections from
+ %% this user. Moved to rabbitmq/rabbitmq-server#628.
+ {ok, State};
+%% A node had been deleted from the cluster.
+handle_event(#event{type = node_deleted, props = Details}, State) ->
+ Node = pget(node, Details),
+ rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]),
+ rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node),
+ rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node),
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index c96f662dda2d..0e99ac0f7ef0 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -74,6 +74,8 @@
{clear_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
+ {set_vhost_limits, [?VHOST_DEF]},
+ {clear_vhost_limits, [?VHOST_DEF]},
{list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF, ?LOCAL_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
@@ -544,6 +546,17 @@ action(clear_policy, Node, [Key], Opts, Inform) ->
Inform("Clearing policy ~p", [Key]),
rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
+action(set_vhost_limits, Node, [Defn], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Setting vhost limits for vhost ~p", [VHostArg]),
+ rpc_call(Node, rabbit_vhost_limit, parse_set, [VHostArg, Defn]),
+ ok;
+
+action(clear_vhost_limits, Node, [], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Clearing vhost ~p limits", [VHostArg]),
+ rpc_call(Node, rabbit_vhost_limit, clear, [VHostArg]);
+
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 596eb62b0310..43dd2c3bb809 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -148,6 +148,7 @@ auto_cluster(TryNodes, NodeType) ->
rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
init_db_and_upgrade(DiscNodes, NodeType, true),
+ rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster();
none ->
rabbit_log:warning(
@@ -194,6 +195,7 @@ join_cluster(DiscoveryNode, NodeType) ->
[ClusterNodes, NodeType]),
ok = init_db_with_mnesia(ClusterNodes, NodeType,
true, true),
+ rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster(),
ok;
{error, Reason} ->
@@ -295,6 +297,9 @@ update_cluster_nodes(DiscoveryNode) ->
%% the last or second to last after the node we're removing to go
%% down
forget_cluster_node(Node, RemoveWhenOffline) ->
+ forget_cluster_node(Node, RemoveWhenOffline, true).
+
+forget_cluster_node(Node, RemoveWhenOffline, EmitNodeDeletedEvent) ->
case lists:member(Node, cluster_nodes(all)) of
true -> ok;
false -> e(not_a_cluster_node)
@@ -306,6 +311,9 @@ forget_cluster_node(Node, RemoveWhenOffline) ->
{false, true} -> rabbit_log:info(
"Removing node ~p from cluster~n", [Node]),
case remove_node_if_mnesia_running(Node) of
+ ok when EmitNodeDeletedEvent ->
+ rabbit_event:notify(node_deleted, [{node, Node}]),
+ ok;
ok -> ok;
{error, _} = Err -> throw(Err)
end
@@ -326,7 +334,10 @@ remove_node_offline_node(Node) ->
%% they are loaded.
rabbit_table:force_load(),
rabbit_table:wait_for_replicated(),
- forget_cluster_node(Node, false),
+ %% We skip the 'node_deleted' event because the
+ %% application is stopped and thus, rabbit_event is not
+ %% enabled.
+ forget_cluster_node(Node, false, false),
force_load_next_boot()
after
stop_mnesia()
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
index 0945e3152234..0c3e7c236600 100644
--- a/src/rabbit_mnesia_rename.erl
+++ b/src/rabbit_mnesia_rename.erl
@@ -124,7 +124,13 @@ prepare(Node, NodeMapList) ->
take_backup(Backup) ->
start_mnesia(),
- ok = mnesia:backup(Backup),
+ %% We backup only local tables: in particular, this excludes the
+ %% connection tracking tables which have no local replica.
+ LocalTables = mnesia:system_info(local_tables),
+ {ok, Name, _Nodes} = mnesia:activate_checkpoint([
+ {max, LocalTables}
+ ]),
+ ok = mnesia:backup_checkpoint(Name, Backup),
stop_mnesia().
restore_backup(Backup) ->
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
index 90ab1d528611..00e83d275718 100644
--- a/src/rabbit_parameter_validation.erl
+++ b/src/rabbit_parameter_validation.erl
@@ -16,7 +16,7 @@
-module(rabbit_parameter_validation).
--export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3, enum/1]).
+-export([number/2, integer/2, binary/2, boolean/2, list/2, regex/2, proplist/3, enum/1]).
number(_Name, Term) when is_number(Term) ->
ok;
@@ -24,6 +24,12 @@ number(_Name, Term) when is_number(Term) ->
number(Name, Term) ->
{error, "~s should be number, actually was ~p", [Name, Term]}.
+integer(_Name, Term) when is_integer(Term) ->
+ ok;
+
+integer(Name, Term) ->
+ {error, "~s should be number, actually was ~p", [Name, Term]}.
+
binary(_Name, Term) when is_binary(Term) ->
ok;
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 3d624752ea65..3c6f2a4f1f72 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -55,6 +55,7 @@
-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
+-rabbit_upgrade({vhost_limits, mnesia, []}).
%% -------------------------------------------------------------------
@@ -88,9 +89,22 @@
-spec queue_state() -> 'ok'.
-spec recoverable_slaves() -> 'ok'.
-spec user_password_hashing() -> 'ok'.
+-spec vhost_limits() -> 'ok'.
+
%%--------------------------------------------------------------------
+%% replaces vhost.dummy (used to avoid having a single-field record
+%% which Mnesia doesn't like) with vhost.limits (which is actually
+%% used)
+vhost_limits() ->
+ transform(
+ rabbit_vhost,
+ fun ({vhost, VHost, _Dummy}) ->
+ {vhost, VHost, undefined}
+ end,
+ [virtual_host, limits]).
+
%% It's a bad idea to use records or record_info here, even for the
%% destination form. Because in the future, the destination form of
%% your current transform may not match the record any more, and it
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index df2f8423b48a..01f1046fb8b9 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -20,11 +20,14 @@
%%----------------------------------------------------------------------------
--export([add/1, delete/1, exists/1, list/0, with/2, assert/1]).
+-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2,
+ set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
+
-spec add(rabbit_types:vhost()) -> 'ok'.
-spec delete(rabbit_types:vhost()) -> 'ok'.
+-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
-spec exists(rabbit_types:vhost()) -> boolean().
-spec list() -> [rabbit_types:vhost()].
-spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
@@ -138,6 +141,32 @@ assert(VHostPath) -> case exists(VHostPath) of
false -> throw({error, {no_such_vhost, VHostPath}})
end.
+update(VHostPath, Fun) ->
+ case mnesia:read({rabbit_vhost, VHostPath}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHostPath});
+ [V] ->
+ V1 = Fun(V),
+ ok = mnesia:write(rabbit_vhost, V1, write),
+ V1
+ end.
+
+limits_of(VHostPath) when is_binary(VHostPath) ->
+ assert(VHostPath),
+ case mnesia:dirty_read({rabbit_vhost, VHostPath}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHostPath});
+ [#vhost{limits = Limits}] ->
+ Limits
+ end;
+limits_of(#vhost{virtual_host = Name}) ->
+ limits_of(Name).
+
+set_limits(VHost = #vhost{}, undefined) ->
+ VHost#vhost{limits = undefined};
+set_limits(VHost = #vhost{}, Limits) ->
+ VHost#vhost{limits = Limits}.
+
%%----------------------------------------------------------------------------
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
new file mode 100644
index 000000000000..2d9a2f075e2d
--- /dev/null
+++ b/src/rabbit_vhost_limit.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_limit).
+
+-behaviour(rabbit_runtime_parameter).
+
+-include("rabbit.hrl").
+
+-export([register/0]).
+-export([parse_set/2, clear/1]).
+-export([validate/5, notify/4, notify_clear/3]).
+-export([connection_limit/1]).
+
+-import(rabbit_misc, [pget/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "vhost limit parameters"},
+ {mfa, {rabbit_vhost_limit, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+%%----------------------------------------------------------------------------
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"vhost-limits">>, ?MODULE).
+
+validate(_VHost, <<"vhost-limits">>, Name, Term, _User) ->
+ rabbit_parameter_validation:proplist(
+ Name, vhost_limit_validation(), Term).
+
+notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits) ->
+ rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>} | Limits]),
+ update_vhost(VHost, Limits).
+
+notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) ->
+ rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}]),
+ update_vhost(VHost, undefined).
+
+connection_limit(VirtualHost) ->
+ get_limit(VirtualHost, <<"max-connections">>).
+
+%%----------------------------------------------------------------------------
+
+parse_set(VHost, Defn) ->
+ case rabbit_misc:json_decode(Defn) of
+ {ok, JSON} ->
+ set(VHost, rabbit_misc:json_to_term(JSON));
+ error ->
+ {error_string, "JSON decoding error"}
+ end.
+
+set(VHost, Defn) ->
+ rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>,
+ <<"limits">>, Defn, none).
+
+clear(VHost) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>,
+ <<"limits">>).
+
+vhost_limit_validation() ->
+ [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, mandatory}].
+
+update_vhost(VHostName, Limits) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ rabbit_vhost:update(VHostName,
+ fun(VHost) ->
+ rabbit_vhost:set_limits(VHost, Limits)
+ end)
+ end),
+ ok.
+
+get_limit(VirtualHost, Limit) ->
+ case rabbit_runtime_parameters:list(VirtualHost, <<"vhost-limits">>) of
+ [] -> undefined;
+ [Param] -> case pget(value, Param) of
+ undefined -> undefined;
+ Val -> case pget(Limit, Val) of
+ undefined -> undefined;
+ %% no limit
+ N when N < 0 -> undefined;
+ N when N >= 0 -> {ok, N}
+ end
+ end
+ end.
diff --git a/test/cluster_rename_SUITE.erl b/test/cluster_rename_SUITE.erl
index 8ce29a6695e7..9521e04eb534 100644
--- a/test/cluster_rename_SUITE.erl
+++ b/test/cluster_rename_SUITE.erl
@@ -43,6 +43,12 @@ groups() ->
]}
].
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
@@ -245,7 +251,7 @@ rename_node(Config, Nodename, Map) ->
Config1.
rename_node_fail(Config, Nodename, Map) ->
- error = do_rename_node(Config, Nodename, Map),
+ {error, _, _} = do_rename_node(Config, Nodename, Map),
ok.
do_rename_node(Config, Nodename, Map) ->
@@ -265,8 +271,8 @@ do_rename_node(Config, Nodename, Map) ->
{ok, _} ->
Config1 = update_config_after_rename(Config, Map1),
{ok, Config1};
- {error, _, _} ->
- error
+ {error, _, _} = Error ->
+ Error
end.
update_config_after_rename(Config, [Old, New | Rest]) ->
diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl
new file mode 100644
index 000000000000..4fae129a35be
--- /dev/null
+++ b/test/per_vhost_connection_limit_SUITE.erl
@@ -0,0 +1,795 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(per_vhost_connection_limit_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_1},
+ {group, cluster_size_2}
+ ].
+
+groups() ->
+ [
+ {cluster_size_1, [], [
+ most_basic_single_node_connection_count,
+ single_node_single_vhost_connection_count,
+ single_node_multiple_vhosts_connection_count,
+ single_node_list_in_vhost,
+ single_node_single_vhost_limit,
+ single_node_single_vhost_zero_limit,
+ single_node_multiple_vhosts_limit,
+ single_node_multiple_vhosts_zero_limit,
+ single_node_vhost_deletion_forces_connection_closure
+ ]},
+ {cluster_size_2, [], [
+ most_basic_cluster_connection_count,
+ cluster_single_vhost_connection_count,
+ cluster_multiple_vhosts_connection_count,
+ cluster_node_restart_connection_count,
+ cluster_node_list_on_node,
+ cluster_single_vhost_limit,
+ cluster_single_vhost_limit2,
+ cluster_single_vhost_zero_limit,
+ cluster_multiple_vhosts_zero_limit,
+ cluster_vhost_deletion_forces_connection_closure
+ ]},
+ {cluster_rename, [], [
+ vhost_limit_after_node_renamed
+ ]}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
+%% see partitions_SUITE
+-define(DELAY, 9000).
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config, [
+ fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1
+ ]).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_1, Config) ->
+ init_per_multinode_group(cluster_size_1, Config, 1);
+init_per_group(cluster_size_2, Config) ->
+ init_per_multinode_group(cluster_size_2, Config, 2);
+init_per_group(cluster_rename, Config) ->
+ init_per_multinode_group(cluster_rename, Config, 2).
+
+init_per_multinode_group(Group, Config, NodeCount) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, NodeCount},
+ {rmq_nodename_suffix, Suffix}
+ ]),
+ case Group of
+ cluster_rename ->
+ % The broker is managed by {init,end}_per_testcase().
+ Config1;
+ _ ->
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps())
+ end.
+
+end_per_group(cluster_rename, Config) ->
+ % The broker is managed by {init,end}_per_testcase().
+ Config;
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ clear_all_connection_tracking_tables(Config),
+ Config.
+
+end_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) ->
+ Config1 = ?config(save_config, Config),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase);
+end_per_testcase(Testcase, Config) ->
+ clear_all_connection_tracking_tables(Config),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+clear_all_connection_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_connection_tracking,
+ clear_tracked_connection_tables_for_this_node,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+
+most_basic_single_node_connection_count(Config) ->
+ VHost = <<"/">>,
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+ [Conn] = open_connections(Config, [0]),
+ ?assertEqual(1, count_connections_in(Config, VHost)),
+ close_connections([Conn]),
+ ?assertEqual(0, count_connections_in(Config, VHost)).
+
+single_node_single_vhost_connection_count(Config) ->
+ VHost = <<"/">>,
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ [Conn1] = open_connections(Config, [0]),
+ ?assertEqual(1, count_connections_in(Config, VHost)),
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ [Conn2] = open_connections(Config, [0]),
+ ?assertEqual(1, count_connections_in(Config, VHost)),
+
+ [Conn3] = open_connections(Config, [0]),
+ ?assertEqual(2, count_connections_in(Config, VHost)),
+
+ [Conn4] = open_connections(Config, [0]),
+ ?assertEqual(3, count_connections_in(Config, VHost)),
+
+ kill_connections([Conn4]),
+ ?assertEqual(2, count_connections_in(Config, VHost)),
+
+ [Conn5] = open_connections(Config, [0]),
+ ?assertEqual(3, count_connections_in(Config, VHost)),
+
+ close_connections([Conn2, Conn3, Conn5]),
+ ?assertEqual(0, count_connections_in(Config, VHost)).
+
+single_node_multiple_vhosts_connection_count(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ [Conn1] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+
+ [Conn2] = open_connections(Config, [{0, VHost2}]),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ [Conn3] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ [Conn4] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(2, count_connections_in(Config, VHost1)),
+
+ kill_connections([Conn4]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ [Conn5] = open_connections(Config, [{0, VHost2}]),
+ ?assertEqual(2, count_connections_in(Config, VHost2)),
+
+ [Conn6] = open_connections(Config, [{0, VHost2}]),
+ ?assertEqual(3, count_connections_in(Config, VHost2)),
+
+ close_connections([Conn2, Conn3, Conn5, Conn6]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
+
+single_node_list_in_vhost(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, length(connections_in(Config, VHost1))),
+ ?assertEqual(0, length(connections_in(Config, VHost2))),
+
+ [Conn1] = open_connections(Config, [{0, VHost1}]),
+ [#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1),
+ close_connections([Conn1]),
+ ?assertEqual(0, length(connections_in(Config, VHost1))),
+
+ [Conn2] = open_connections(Config, [{0, VHost2}]),
+ [#tracked_connection{vhost = VHost2}] = connections_in(Config, VHost2),
+
+ [Conn3] = open_connections(Config, [{0, VHost1}]),
+ [#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1),
+
+ [Conn4] = open_connections(Config, [{0, VHost1}]),
+ kill_connections([Conn4]),
+ [#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1),
+
+ [Conn5, Conn6] = open_connections(Config, [{0, VHost2}, {0, VHost2}]),
+ [<<"vhost1">>, <<"vhost2">>] =
+ lists:usort(lists:map(fun (#tracked_connection{vhost = V}) -> V end,
+ all_connections(Config))),
+
+ close_connections([Conn2, Conn3, Conn5, Conn6]),
+ ?assertEqual(0, length(all_connections(Config))),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
+
+most_basic_cluster_connection_count(Config) ->
+ VHost = <<"/">>,
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+ [Conn1] = open_connections(Config, [0]),
+ ?assertEqual(1, count_connections_in(Config, VHost)),
+
+ [Conn2] = open_connections(Config, [1]),
+ ?assertEqual(2, count_connections_in(Config, VHost)),
+
+ [Conn3] = open_connections(Config, [1]),
+ ?assertEqual(3, count_connections_in(Config, VHost)),
+
+ close_connections([Conn1, Conn2, Conn3]),
+ ?assertEqual(0, count_connections_in(Config, VHost)).
+
+cluster_single_vhost_connection_count(Config) ->
+ VHost = <<"/">>,
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ [Conn1] = open_connections(Config, [0]),
+ ?assertEqual(1, count_connections_in(Config, VHost)),
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ [Conn2] = open_connections(Config, [1]),
+ ?assertEqual(1, count_connections_in(Config, VHost)),
+
+ [Conn3] = open_connections(Config, [0]),
+ ?assertEqual(2, count_connections_in(Config, VHost)),
+
+ [Conn4] = open_connections(Config, [1]),
+ ?assertEqual(3, count_connections_in(Config, VHost)),
+
+ kill_connections([Conn4]),
+ ?assertEqual(2, count_connections_in(Config, VHost)),
+
+ [Conn5] = open_connections(Config, [1]),
+ ?assertEqual(3, count_connections_in(Config, VHost)),
+
+ close_connections([Conn2, Conn3, Conn5]),
+ ?assertEqual(0, count_connections_in(Config, VHost)).
+
+cluster_multiple_vhosts_connection_count(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ [Conn1] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+
+ [Conn2] = open_connections(Config, [{1, VHost2}]),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ [Conn3] = open_connections(Config, [{1, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ [Conn4] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(2, count_connections_in(Config, VHost1)),
+
+ kill_connections([Conn4]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ [Conn5] = open_connections(Config, [{1, VHost2}]),
+ ?assertEqual(2, count_connections_in(Config, VHost2)),
+
+ [Conn6] = open_connections(Config, [{0, VHost2}]),
+ ?assertEqual(3, count_connections_in(Config, VHost2)),
+
+ close_connections([Conn2, Conn3, Conn5, Conn6]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
+
+cluster_node_restart_connection_count(Config) ->
+ VHost = <<"/">>,
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ [Conn1] = open_connections(Config, [0]),
+ ?assertEqual(1, count_connections_in(Config, VHost)),
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ [Conn2] = open_connections(Config, [1]),
+ ?assertEqual(1, count_connections_in(Config, VHost)),
+
+ [Conn3] = open_connections(Config, [0]),
+ ?assertEqual(2, count_connections_in(Config, VHost)),
+
+ [Conn4] = open_connections(Config, [1]),
+ ?assertEqual(3, count_connections_in(Config, VHost)),
+
+ [Conn5] = open_connections(Config, [1]),
+ ?assertEqual(4, count_connections_in(Config, VHost)),
+
+ rabbit_ct_broker_helpers:restart_broker(Config, 1),
+ ?assertEqual(1, count_connections_in(Config, VHost)),
+
+ close_connections([Conn2, Conn3, Conn4, Conn5]),
+ ?assertEqual(0, count_connections_in(Config, VHost)).
+
+cluster_node_list_on_node(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ ?assertEqual(0, length(all_connections(Config))),
+ ?assertEqual(0, length(connections_on_node(Config, 0))),
+
+ [Conn1] = open_connections(Config, [0]),
+ [#tracked_connection{node = A}] = connections_on_node(Config, 0),
+ close_connections([Conn1]),
+ ?assertEqual(0, length(connections_on_node(Config, 0))),
+
+ [_Conn2] = open_connections(Config, [1]),
+ [#tracked_connection{node = B}] = connections_on_node(Config, 1),
+
+ [Conn3] = open_connections(Config, [0]),
+ ?assertEqual(1, length(connections_on_node(Config, 0))),
+
+ [Conn4] = open_connections(Config, [1]),
+ ?assertEqual(2, length(connections_on_node(Config, 1))),
+
+ kill_connections([Conn4]),
+ ?assertEqual(1, length(connections_on_node(Config, 1))),
+
+ [Conn5] = open_connections(Config, [0]),
+ ?assertEqual(2, length(connections_on_node(Config, 0))),
+
+ rabbit_ct_broker_helpers:stop_broker(Config, 1),
+ await_running_node_refresh(Config, 0),
+
+ ?assertEqual(2, length(all_connections(Config))),
+ ?assertEqual(0, length(connections_on_node(Config, 0, B))),
+
+ close_connections([Conn3, Conn5]),
+ ?assertEqual(0, length(all_connections(Config, 0))),
+
+ rabbit_ct_broker_helpers:start_broker(Config, 1).
+
+single_node_single_vhost_limit(Config) ->
+ single_node_single_vhost_limit_with(Config, 5),
+ single_node_single_vhost_limit_with(Config, -1).
+
+single_node_single_vhost_limit_with(Config, WatermarkLimit) ->
+ VHost = <<"/">>,
+ set_vhost_connection_limit(Config, VHost, 3),
+
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ [Conn1, Conn2, Conn3] = open_connections(Config, [0, 0, 0]),
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_connection_is_rejected(Config, 0),
+
+ set_vhost_connection_limit(Config, VHost, WatermarkLimit),
+ [Conn4, Conn5] = open_connections(Config, [0, 0]),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4, Conn5]),
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ set_vhost_connection_limit(Config, VHost, -1).
+
+single_node_single_vhost_zero_limit(Config) ->
+ VHost = <<"/">>,
+ set_vhost_connection_limit(Config, VHost, 0),
+
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config),
+ expect_that_client_connection_is_rejected(Config),
+ expect_that_client_connection_is_rejected(Config),
+
+ set_vhost_connection_limit(Config, VHost, -1),
+ [Conn1, Conn2] = open_connections(Config, [0, 0]),
+
+ close_connections([Conn1, Conn2]),
+ ?assertEqual(0, count_connections_in(Config, VHost)).
+
+
+single_node_multiple_vhosts_limit(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ set_vhost_connection_limit(Config, VHost1, 2),
+ set_vhost_connection_limit(Config, VHost2, 2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ [Conn1, Conn2, Conn3, Conn4] = open_connections(Config, [
+ {0, VHost1},
+ {0, VHost1},
+ {0, VHost2},
+ {0, VHost2}]),
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0, VHost1),
+ expect_that_client_connection_is_rejected(Config, 0, VHost2),
+
+ [Conn5] = open_connections(Config, [0]),
+
+ set_vhost_connection_limit(Config, VHost1, 5),
+ set_vhost_connection_limit(Config, VHost2, -10),
+
+ [Conn6, Conn7, Conn8, Conn9, Conn10] = open_connections(Config, [
+ {0, VHost1},
+ {0, VHost1},
+ {0, VHost1},
+ {0, VHost2},
+ {0, VHost2}]),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4, Conn5,
+ Conn6, Conn7, Conn8, Conn9, Conn10]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ set_vhost_connection_limit(Config, VHost1, -1),
+ set_vhost_connection_limit(Config, VHost2, -1),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
+
+
+single_node_multiple_vhosts_zero_limit(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ set_vhost_connection_limit(Config, VHost1, 0),
+ set_vhost_connection_limit(Config, VHost2, 0),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config, 0, VHost1),
+ expect_that_client_connection_is_rejected(Config, 0, VHost2),
+ expect_that_client_connection_is_rejected(Config, 0, VHost1),
+
+ set_vhost_connection_limit(Config, VHost1, -1),
+ [Conn1, Conn2] = open_connections(Config, [{0, VHost1}, {0, VHost1}]),
+
+ close_connections([Conn1, Conn2]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ set_vhost_connection_limit(Config, VHost1, -1),
+ set_vhost_connection_limit(Config, VHost2, -1).
+
+
+cluster_single_vhost_limit(Config) ->
+ VHost = <<"/">>,
+ set_vhost_connection_limit(Config, VHost, 2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ %% here connections are opened to different nodes
+ [Conn1, Conn2] = open_connections(Config, [{0, VHost}, {1, VHost}]),
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0, VHost),
+ expect_that_client_connection_is_rejected(Config, 1, VHost),
+
+ set_vhost_connection_limit(Config, VHost, 5),
+
+ [Conn3, Conn4] = open_connections(Config, [{0, VHost}, {0, VHost}]),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4]),
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ set_vhost_connection_limit(Config, VHost, -1).
+
+cluster_single_vhost_limit2(Config) ->
+ VHost = <<"/">>,
+ set_vhost_connection_limit(Config, VHost, 2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ %% here a limit is reached on one node first
+ [Conn1, Conn2] = open_connections(Config, [{0, VHost}, {0, VHost}]),
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0, VHost),
+ expect_that_client_connection_is_rejected(Config, 1, VHost),
+
+ set_vhost_connection_limit(Config, VHost, 5),
+
+ [Conn3, Conn4, Conn5, {error, not_allowed}] = open_connections(Config, [
+ {1, VHost},
+ {1, VHost},
+ {1, VHost},
+ {1, VHost}]),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4, Conn5]),
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ set_vhost_connection_limit(Config, VHost, -1).
+
+
+cluster_single_vhost_zero_limit(Config) ->
+ VHost = <<"/">>,
+ set_vhost_connection_limit(Config, VHost, 0),
+
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_connection_is_rejected(Config, 1),
+ expect_that_client_connection_is_rejected(Config, 0),
+
+ set_vhost_connection_limit(Config, VHost, -1),
+ [Conn1, Conn2, Conn3, Conn4] = open_connections(Config, [0, 1, 0, 1]),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4]),
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ set_vhost_connection_limit(Config, VHost, -1).
+
+
+cluster_multiple_vhosts_zero_limit(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ set_vhost_connection_limit(Config, VHost1, 0),
+ set_vhost_connection_limit(Config, VHost2, 0),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config, 0, VHost1),
+ expect_that_client_connection_is_rejected(Config, 0, VHost2),
+ expect_that_client_connection_is_rejected(Config, 1, VHost1),
+ expect_that_client_connection_is_rejected(Config, 1, VHost2),
+
+ set_vhost_connection_limit(Config, VHost1, -1),
+ set_vhost_connection_limit(Config, VHost2, -1),
+
+ [Conn1, Conn2, Conn3, Conn4] = open_connections(Config, [
+ {0, VHost1},
+ {0, VHost2},
+ {1, VHost1},
+ {1, VHost2}]),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ set_vhost_connection_limit(Config, VHost1, -1),
+ set_vhost_connection_limit(Config, VHost2, -1).
+
+
+single_node_vhost_deletion_forces_connection_closure(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ [Conn1] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ [_Conn2] = open_connections(Config, [{0, VHost2}]),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+
+cluster_vhost_deletion_forces_connection_closure(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ [Conn1] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ [_Conn2] = open_connections(Config, [{1, VHost2}]),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+
+vhost_limit_after_node_renamed(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ VHost = <<"/renaming_node">>,
+ set_up_vhost(Config, VHost),
+ set_vhost_connection_limit(Config, VHost, 2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ [Conn1, Conn2, {error, not_allowed}] = open_connections(Config,
+ [{0, VHost}, {1, VHost}, {0, VHost}]),
+ ?assertEqual(2, count_connections_in(Config, VHost)),
+ close_connections([Conn1, Conn2]),
+
+ Config1 = cluster_rename_SUITE:stop_rename_start(Config, A, [A, 'new-A']),
+
+ ?assertEqual(0, count_connections_in(Config1, VHost)),
+
+ [Conn3, Conn4, {error, not_allowed}] = open_connections(Config,
+ [{0, VHost}, {1, VHost}, {0, VHost}]),
+ ?assertEqual(2, count_connections_in(Config1, VHost)),
+ close_connections([Conn3, Conn4]),
+
+ set_vhost_connection_limit(Config1, VHost, -1),
+ {save_config, Config1}.
+
+%% -------------------------------------------------------------------
+%% Helpers
+%% -------------------------------------------------------------------
+
+open_connections(Config, NodesAndVHosts) ->
+ Conns = lists:map(fun
+ ({Node, VHost}) ->
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node,
+ VHost);
+ (Node) ->
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node)
+ end, NodesAndVHosts),
+ timer:sleep(500),
+ Conns.
+
+close_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ rabbit_ct_client_helpers:close_connection(Conn)
+ end, Conns),
+ timer:sleep(500).
+
+kill_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ (catch exit(Conn, please_terminate))
+ end, Conns),
+ timer:sleep(500).
+
+count_connections_in(Config, VHost) ->
+ count_connections_in(Config, VHost, 0).
+count_connections_in(Config, VHost, NodeIndex) ->
+ timer:sleep(200),
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ count_connections_in, [VHost]).
+
+connections_in(Config, VHost) ->
+ connections_in(Config, 0, VHost).
+connections_in(Config, NodeIndex, VHost) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ list, [VHost]).
+
+connections_on_node(Config) ->
+ connections_on_node(Config, 0).
+connections_on_node(Config, NodeIndex) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename),
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ list_on_node, [Node]).
+connections_on_node(Config, NodeIndex, NodeForListing) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ list_on_node, [NodeForListing]).
+
+all_connections(Config) ->
+ all_connections(Config, 0).
+all_connections(Config, NodeIndex) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ list, []).
+
+set_up_vhost(Config, VHost) ->
+ rabbit_ct_broker_helpers:add_vhost(Config, VHost),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
+ set_vhost_connection_limit(Config, VHost, -1).
+
+set_vhost_connection_limit(Config, VHost, Count) ->
+ set_vhost_connection_limit(Config, 0, VHost, Count).
+
+set_vhost_connection_limit(Config, NodeIndex, VHost, Count) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ rabbit_ct_broker_helpers:control_action(
+ set_vhost_limits, Node,
+ ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"],
+ [{"-p", binary_to_list(VHost)}]).
+
+await_running_node_refresh(_Config, _NodeIndex) ->
+ timer:sleep(250).
+
+expect_that_client_connection_is_rejected(Config) ->
+ expect_that_client_connection_is_rejected(Config, 0).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost).
diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl
new file mode 100644
index 000000000000..051f3f13b758
--- /dev/null
+++ b/test/per_vhost_connection_limit_partitions_SUITE.erl
@@ -0,0 +1,170 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(per_vhost_connection_limit_partitions_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-import(rabbit_ct_client_helpers, [open_unmanaged_connection/2,
+ open_unmanaged_connection/3]).
+
+
+all() ->
+ [
+ {group, net_ticktime_1}
+ ].
+
+groups() ->
+ [
+ {net_ticktime_1, [], [
+ cluster_full_partition_with_autoheal
+ ]}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
+%% see partitions_SUITE
+-define(DELAY, 12000).
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config, [
+ fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1
+ ]).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(net_ticktime_1 = Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{net_ticktime, 1}]),
+ init_per_multinode_group(Group, Config1, 3).
+
+init_per_multinode_group(_Group, Config, NodeCount) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, NodeCount},
+ {rmq_nodename_suffix, Suffix},
+ {rmq_nodes_clustered, false}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++ [
+ fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1
+ ]).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+
+cluster_full_partition_with_autoheal(Config) ->
+ VHost = <<"/">>,
+ rabbit_ct_broker_helpers:set_partition_handling_mode_globally(Config, autoheal),
+
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ %% 6 connections, 2 per node
+ Conn1 = open_unmanaged_connection(Config, A),
+ Conn2 = open_unmanaged_connection(Config, A),
+ Conn3 = open_unmanaged_connection(Config, B),
+ Conn4 = open_unmanaged_connection(Config, B),
+ Conn5 = open_unmanaged_connection(Config, C),
+ Conn6 = open_unmanaged_connection(Config, C),
+ ?assertEqual(6, count_connections_in(Config, VHost)),
+
+ %% B drops off the network, non-reachable by either A or C
+ rabbit_ct_broker_helpers:block_traffic_between(A, B),
+ rabbit_ct_broker_helpers:block_traffic_between(B, C),
+ timer:sleep(?DELAY),
+
+ %% A and C are still connected, so 4 connections are tracked
+ ?assertEqual(4, count_connections_in(Config, VHost)),
+
+ rabbit_ct_broker_helpers:allow_traffic_between(A, B),
+ rabbit_ct_broker_helpers:allow_traffic_between(B, C),
+ timer:sleep(?DELAY),
+
+ %% during autoheal B's connections were dropped
+ ?assertEqual(4, count_connections_in(Config, VHost)),
+
+ lists:foreach(fun (Conn) ->
+ (catch rabbit_ct_client_helpers:close_connection(Conn))
+ end, [Conn1, Conn2, Conn3, Conn4,
+ Conn5, Conn6]),
+
+ passed.
+
+
+%% -------------------------------------------------------------------
+%% Helpers
+%% -------------------------------------------------------------------
+
+count_connections_in(Config, VHost) ->
+ count_connections_in(Config, VHost, 0).
+count_connections_in(Config, VHost, NodeIndex) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ count_connections_in, [VHost]).
+
+connections_in(Config, VHost) ->
+ connections_in(Config, 0, VHost).
+connections_in(Config, NodeIndex, VHost) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ list, [VHost]).
+
+connections_on_node(Config) ->
+ connections_on_node(Config, 0).
+connections_on_node(Config, NodeIndex) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename),
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ list_on_node, [Node]).
+connections_on_node(Config, NodeIndex, NodeForListing) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ list_on_node, [NodeForListing]).
+
+all_connections(Config) ->
+ all_connections(Config, 0).
+all_connections(Config, NodeIndex) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ list, []).