Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Keep track of connections, introduce per-vhost limits #121

Merged
merged 9 commits into from
Aug 26, 2016
84 changes: 83 additions & 1 deletion include/rabbit.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,65 @@
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).

-record(vhost, {virtual_host, dummy}).
%% Represents a vhost.
%%
%% Historically this record had 2 arguments although the 2nd
%% was never used (`dummy`, always undefined). This is because
%% single field records were/are illegal in OTP.
%%
%% As of 3.6.x, the second argument is vhost limits,
%% which is actually used and has the same default.
%% Nonetheless, this required a migration, see rabbit_upgrade_functions.
-record(vhost, {
%% vhost name as a binary
virtual_host,
%% proplist of limits configured, if any
limits}).

%% Client connection, used by rabbit_reader
%% and related modules.
-record(connection, {
%% e.g. <<"127.0.0.1:55054 -> 127.0.0.1:5672">>
name,
%% used for logging: same as `name`, but optionally
%% augmented with user-supplied name
log_name,
%% server host
host,
%% client host
peer_host,
%% server port
port,
%% client port
peer_port,
%% protocol implementation module,
%% e.g. rabbit_framing_amqp_0_9_1
protocol,
user,
%% heartbeat timeout value used, 0 means
%% heartbeats are disabled
timeout_sec,
%% maximum allowed frame size,
%% see frame_max in the AMQP 0-9-1 spec
frame_max,
%% greatest channel number allowed,
%% see channel_max in the AMQP 0-9-1 spec
channel_max,
vhost,
%% client name, version, platform, etc
client_properties,
%% what lists protocol extensions
%% does this client support?
capabilities,
%% authentication mechanism used
%% as a pair of {Name, Module}
auth_mechanism,
%% authentication mechanism state,
%% initialised by rabbit_auth_mechanism:init/1
%% implementations
auth_state,
%% time of connection
connected_at}).

-record(content,
{class_id,
Expand Down Expand Up @@ -138,6 +196,30 @@
dependency_version_requirements %% [{atom(), [string()]}]
}).

%% used to track connections across virtual hosts
%% so that limits can be enforced
-record(tracked_connection_per_vhost,
{vhost, connection_count}).

%% Used to track detailed information
%% about connections.
-record(tracked_connection, {
%% {Node, Name}
id,
node,
vhost,
name,
pid,
protocol,
%% client host
peer_host,
%% client port
peer_port,
username,
%% time of connection
connected_at
}).

%%----------------------------------------------------------------------------

-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2016 Pivotal Software, Inc.").
Expand Down
26 changes: 26 additions & 0 deletions src/rabbit_ct_broker_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@
set_parameter/5,
clear_parameter/4,

add_vhost/2,
delete_vhost/2,

set_permissions/6,
set_full_permissions/2,
set_full_permissions/3,

enable_plugin/3,
disable_plugin/3,

Expand Down Expand Up @@ -719,6 +726,25 @@ format_ipaddr_for_uri(
Res1 = re:replace(Res0, "(^0(:0)+$|^(0:)+|(:0)+$)|:(0:)+", "::"),
"[" ++ Res1 ++ "]".


%% Virtual host management

add_vhost(Config, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, add, [VHost]).

delete_vhost(Config, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, delete, [VHost]).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please not hard-code the node and take it as an argument instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can introduce a new function head but since vhost and user operations are cluster-wide, would they be used often?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nodes may not be clustered. As this is the responsability of the testcase, I would indicate the node in the test, not in rabbit_ct_broker_helpers.erl.


set_full_permissions(Config, VHost) ->
set_permissions(Config, <<"guest">>, VHost, <<".*">>, <<".*">>, <<".*">>).
set_full_permissions(Config, Username, VHost) ->
set_permissions(Config, Username, VHost, <<".*">>, <<".*">>, <<".*">>).
set_permissions(Config, Username, VHost, ConfigurePerm, WritePerm, ReadPerm) ->
rabbit_ct_broker_helpers:rpc(Config, 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

rabbit_auth_backend_internal,
set_permissions,
[Username, VHost, ConfigurePerm, WritePerm, ReadPerm]).

%% Functions to execute code on a remote node/broker.

add_code_path_to_node(Node, Module) ->
Expand Down
10 changes: 7 additions & 3 deletions src/rabbit_networking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,13 @@ node_listeners(Node) ->

on_node_down(Node) ->
case lists:member(Node, nodes()) of
false -> ok = mnesia:dirty_delete(rabbit_listener, Node);
true -> rabbit_log:info(
"Keep ~s listeners: the node is already back~n", [Node])
false ->
rabbit_log:info(
"Node ~s is down, deleting its listeners~n", [Node]),
ok = mnesia:dirty_delete(rabbit_listener, Node);
true ->
rabbit_log:info(
"Keeping ~s listeners: the node is already back~n", [Node])
end.

register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
Expand Down
63 changes: 18 additions & 45 deletions src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,49 +103,6 @@
%% credit- and resource-driven flow control
throttle}).

-record(connection, {
%% e.g. <<"127.0.0.1:55054 -> 127.0.0.1:5672">>
name,
%% used for logging: same as `name`, but optionally
%% augmented with user-supplied name
log_name,
%% server host
host,
%% client host
peer_host,
%% server port
port,
%% client port
peer_port,
%% protocol framing implementation module,
%% e.g. rabbit_framing_amqp_0_9_1
protocol,
user,
%% heartbeat timeout value used, 0 means
%% heartbeats are disabled
timeout_sec,
%% maximum allowed frame size,
%% see frame_max in the AMQP 0-9-1 spec
frame_max,
%% greatest channel number allowed,
%% see channel_max in the AMQP 0-9-1 spec
channel_max,
vhost,
%% client name, version, platform, etc
client_properties,
%% what lists protocol extensions
%% does this client support?
capabilities,
%% authentication mechanism used
%% as a pair of {Name, Module}
auth_mechanism,
%% authentication mechanism state,
%% initialised by rabbit_auth_mechanism:init/1
%% implementations
auth_state,
%% time of connection
connected_at}).

-record(throttle, {
%% never | timestamp()
last_blocked_at,
Expand All @@ -171,7 +128,8 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
timeout, frame_max, channel_max, client_properties, connected_at]).
timeout, frame_max, channel_max, client_properties, connected_at,
node]).

-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).

Expand Down Expand Up @@ -409,7 +367,9 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
%% socket w/o delay before termination.
rabbit_net:fast_close(Sock),
rabbit_networking:unregister_connection(self()),
rabbit_event:notify(connection_closed, [{pid, self()}])
rabbit_event:notify(connection_closed, [{name, Name},
{pid, self()},
{node, node()}])
end,
done.

Expand Down Expand Up @@ -566,6 +526,11 @@ handle_other({channel_exit, Channel, Reason}, State) ->
handle_exception(State, Channel, Reason);
handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
handle_dependent_exit(ChPid, Reason, State);
%% We were asked to re-register in rabbit_tracked_connection.
%% See rabbit_connection_tracking, rabbit_connection_tracker.
handle_other(reregister, State = #v1{connection = ConnState}) ->
rabbit_event:notify(connection_reregistered, [{state, ConnState}]),
State;
handle_other(terminate_connection, State) ->
maybe_emit_stats(State),
stop;
Expand Down Expand Up @@ -1167,6 +1132,13 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
case rabbit_connection_tracking:is_over_connection_limit(VHostPath) of
false -> ok;
{true, Limit} -> rabbit_misc:protocol_error(not_allowed,
"access to vhost '~s' refused for user '~s': "
"connection limit (~p) is reached",
[VHostPath, User#user.username, Limit])
end,
ok = rabbit_access_control:check_vhost_access(User, VHostPath, Sock),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Expand Down Expand Up @@ -1344,6 +1316,7 @@ notify_auth_result(Username, AuthResult, ExtraProps, State) ->
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].

i(pid, #v1{}) -> self();
i(node, #v1{}) -> node();
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
Expand Down
22 changes: 21 additions & 1 deletion src/rabbit_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
proc_type_and_name/0, timestamp/0]).
proc_type_and_name/0, timestamp/0,
tracked_connection/0]).

-type(maybe(T) :: T | 'none').
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
Expand Down Expand Up @@ -126,10 +127,29 @@
auto_delete :: boolean(),
arguments :: rabbit_framing:amqp_table()}).

-type(connection_name() :: binary()).

%% used e.g. by rabbit_networking
-type(connection() :: pid()).

%% used e.g. by rabbit_connection_tracking
-type(tracked_connection() ::
#tracked_connection{id :: {node(), connection_name()},
node :: node(),
vhost :: vhost(),
name :: connection_name(),
pid :: pid(),
protocol :: protocol_name(),
peer_host :: rabbit_networking:hostname(),
peer_port :: rabbit_networking:ip_port(),
username :: username(),
connected_at :: integer()}).

%% old AMQP 0-9-1-centric type, avoid when possible
-type(protocol() :: rabbit_framing:protocol()).

-type(protocol_name() :: 'amqp0_8' | 'amqp0_9_1' | 'amqp1_0' | 'mqtt' | 'stomp' | any()).

-type(auth_user() ::
#auth_user{username :: username(),
tags :: [atom()],
Expand Down