Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Keep track of connections, introduce per-vhost limits
Browse files Browse the repository at this point in the history
Part of rabbitmq/rabbitmq-server#500.

Squashed commit of the following:

commit 2f0a08d
Author: Michael Klishin <[email protected]>
Date:   Thu Jul 21 03:20:07 2016 +0300

    Name is already a binary

commit 0678f00
Merge: f16db88 b3468c5
Author: Michael Klishin <[email protected]>
Date:   Thu Jul 21 03:06:02 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit f16db88
Author: Michael Klishin <[email protected]>
Date:   Wed Jul 20 18:30:13 2016 +0300

    Missing file from earlier commit

commit f284cf9
Merge: 6998e6a 3aa3e93
Author: Michael Klishin <[email protected]>
Date:   Wed Jul 20 18:29:57 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit 6998e6a
Author: Michael Klishin <[email protected]>
Date:   Mon Jul 18 11:15:19 2016 +0300

    Move connection record to rabbit.hrl

    So that it can be used outside of rabbit_reader.

commit 1f1f6a1
Merge: d35bb6e 2f3d2b4
Author: Michael Klishin <[email protected]>
Date:   Thu Jul 14 15:26:03 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit d35bb6e
Merge: 26bff83 bd25b0e
Author: Michael Klishin <[email protected]>
Date:   Thu Jul 7 13:45:17 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit 26bff83
Author: Michael Klishin <[email protected]>
Date:   Wed Jul 6 12:31:47 2016 +0300

    Connection re-registration after network split WIP

commit 9cc96f0
Author: Michael Klishin <[email protected]>
Date:   Sun Jul 3 15:25:15 2016 +0300

    Move set_partition_handling_mode_globally/2 and set_partition_handling_mode/3 to broker helpers

commit 5997e06
Author: Michael Klishin <[email protected]>
Date:   Sun Jul 3 03:54:02 2016 +0300

    Move block_traffic_between/2, allow_traffic_between/2 from partition_SUITE

commit c9bb2d2
Merge: 1395888 b518e99
Author: Michael Klishin <[email protected]>
Date:   Sun Jul 3 03:14:36 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit 1395888
Author: Michael Klishin <[email protected]>
Date:   Sun Jul 3 02:38:51 2016 +0300

    Move dist_proxy helpers from partitions_SUITE

commit af11e9e
Author: Michael Klishin <[email protected]>
Date:   Sat Jul 2 22:27:39 2016 +0300

    Test helpers for managing permissions

commit e56b20c
Author: Michael Klishin <[email protected]>
Date:   Sat Jul 2 17:28:01 2016 +0300

    Missing exports

commit 7a458e1
Author: Michael Klishin <[email protected]>
Date:   Sat Jul 2 17:26:07 2016 +0300

    Introduce rabbit_ct_broker_helpers:{add,delete}_vhost/2

commit 6bbcaa2
Author: Michael Klishin <[email protected]>
Date:   Sat Jul 2 15:02:09 2016 +0300

    Export tracked_connection/0

commit 6381608
Merge: c082ad9 a5d1a4f
Author: Michael Klishin <[email protected]>
Date:   Sat Jul 2 02:44:05 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit c082ad9
Merge: 0ba62eb f846d9c
Author: Michael Klishin <[email protected]>
Date:   Wed Jun 29 14:26:53 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit 0ba62eb
Author: Michael Klishin <[email protected]>
Date:   Thu Mar 31 01:53:36 2016 +0300

    Move new types from rabbitmq-server

commit d4a9eca
Merge: b78d3d2 11233ff
Author: Michael Klishin <[email protected]>
Date:   Thu Mar 31 01:50:45 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit b78d3d2
Author: Michael Klishin <[email protected]>
Date:   Thu Feb 18 17:08:33 2016 +0300

    Enforce per-vhost connection limit

commit 7e34dca
Merge: c7f941d dff2b14
Author: Michael Klishin <[email protected]>
Date:   Wed Feb 10 12:23:43 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit c7f941d
Merge: 827b854 3da8ad8
Author: Michael Klishin <[email protected]>
Date:   Fri Feb 5 23:48:31 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit 827b854
Merge: 9720d12 56c8d46
Author: Michael Klishin <[email protected]>
Date:   Wed Feb 3 11:20:21 2016 +0300

    Merge branch 'master' into rabbitmq-server-500

commit 9720d12
Author: Michael Klishin <[email protected]>
Date:   Wed Feb 3 11:19:01 2016 +0300

    Track connection node and username

commit 7375646
Author: Michael Klishin <[email protected]>
Date:   Tue Jan 19 18:14:28 2016 +0300

    Include connection name into connection_closed events

commit d1f96c4
Author: Michael Klishin <[email protected]>
Date:   Tue Jan 19 17:53:35 2016 +0300

    Add protocol to tracked_connection

commit 56db86a
Author: Michael Klishin <[email protected]>
Date:   Tue Jan 19 14:45:26 2016 +0300

    Introduce tracked_connection

commit 0f765dc
Author: Michael Klishin <[email protected]>
Date:   Fri Jan 8 19:13:23 2016 +0300

    Change second vhost record field to be limits
  • Loading branch information
michaelklishin committed Jul 22, 2016
1 parent b3468c5 commit 59d8c33
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 50 deletions.
84 changes: 83 additions & 1 deletion include/rabbit.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,65 @@
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).

-record(vhost, {virtual_host, dummy}).
%% Represents a vhost.
%%
%% Historically this record had 2 arguments although the 2nd
%% was never used (`dummy`, always undefined). This is because
%% single field records were/are illegal in OTP.
%%
%% As of 3.6.x, the second argument is vhost limits,
%% which is actually used and has the same default.
%% Nonetheless, this required a migration, see rabbit_upgrade_functions.
-record(vhost, {
%% vhost name as a binary
virtual_host,
%% proplist of limits configured, if any
limits}).

%% Client connection, used by rabbit_reader
%% and related modules.
-record(connection, {
%% e.g. <<"127.0.0.1:55054 -> 127.0.0.1:5672">>
name,
%% used for logging: same as `name`, but optionally
%% augmented with user-supplied name
log_name,
%% server host
host,
%% client host
peer_host,
%% server port
port,
%% client port
peer_port,
%% protocol implementation module,
%% e.g. rabbit_framing_amqp_0_9_1
protocol,
user,
%% heartbeat timeout value used, 0 means
%% heartbeats are disabled
timeout_sec,
%% maximum allowed frame size,
%% see frame_max in the AMQP 0-9-1 spec
frame_max,
%% greatest channel number allowed,
%% see channel_max in the AMQP 0-9-1 spec
channel_max,
vhost,
%% client name, version, platform, etc
client_properties,
%% what lists protocol extensions
%% does this client support?
capabilities,
%% authentication mechanism used
%% as a pair of {Name, Module}
auth_mechanism,
%% authentication mechanism state,
%% initialised by rabbit_auth_mechanism:init/1
%% implementations
auth_state,
%% time of connection
connected_at}).

-record(content,
{class_id,
Expand Down Expand Up @@ -138,6 +196,30 @@
dependency_version_requirements %% [{atom(), [string()]}]
}).

%% used to track connections across virtual hosts
%% so that limits can be enforced
-record(tracked_connection_per_vhost,
{vhost, connection_count}).

%% Used to track detailed information
%% about connections.
-record(tracked_connection, {
%% {Node, Name}
id,
node,
vhost,
name,
pid,
protocol,
%% client host
peer_host,
%% client port
peer_port,
username,
%% time of connection
connected_at
}).

%%----------------------------------------------------------------------------

-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2016 Pivotal Software, Inc.").
Expand Down
26 changes: 26 additions & 0 deletions src/rabbit_ct_broker_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@
set_parameter/5,
clear_parameter/4,

add_vhost/2,
delete_vhost/2,

set_permissions/6,
set_full_permissions/2,
set_full_permissions/3,

enable_plugin/3,
disable_plugin/3,

Expand Down Expand Up @@ -719,6 +726,25 @@ format_ipaddr_for_uri(
Res1 = re:replace(Res0, "(^0(:0)+$|^(0:)+|(:0)+$)|:(0:)+", "::"),
"[" ++ Res1 ++ "]".


%% Virtual host management

add_vhost(Config, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, add, [VHost]).

delete_vhost(Config, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, delete, [VHost]).

set_full_permissions(Config, VHost) ->
set_permissions(Config, <<"guest">>, VHost, <<".*">>, <<".*">>, <<".*">>).
set_full_permissions(Config, Username, VHost) ->
set_permissions(Config, Username, VHost, <<".*">>, <<".*">>, <<".*">>).
set_permissions(Config, Username, VHost, ConfigurePerm, WritePerm, ReadPerm) ->
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_auth_backend_internal,
set_permissions,
[Username, VHost, ConfigurePerm, WritePerm, ReadPerm]).

%% Functions to execute code on a remote node/broker.

add_code_path_to_node(Node, Module) ->
Expand Down
10 changes: 7 additions & 3 deletions src/rabbit_networking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,13 @@ node_listeners(Node) ->

on_node_down(Node) ->
case lists:member(Node, nodes()) of
false -> ok = mnesia:dirty_delete(rabbit_listener, Node);
true -> rabbit_log:info(
"Keep ~s listeners: the node is already back~n", [Node])
false ->
rabbit_log:info(
"Node ~s is down, deleting its listeners~n", [Node]),
ok = mnesia:dirty_delete(rabbit_listener, Node);
true ->
rabbit_log:info(
"Keeping ~s listeners: the node is already back~n", [Node])
end.

register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
Expand Down
63 changes: 18 additions & 45 deletions src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,49 +103,6 @@
%% credit- and resource-driven flow control
throttle}).

-record(connection, {
%% e.g. <<"127.0.0.1:55054 -> 127.0.0.1:5672">>
name,
%% used for logging: same as `name`, but optionally
%% augmented with user-supplied name
log_name,
%% server host
host,
%% client host
peer_host,
%% server port
port,
%% client port
peer_port,
%% protocol framing implementation module,
%% e.g. rabbit_framing_amqp_0_9_1
protocol,
user,
%% heartbeat timeout value used, 0 means
%% heartbeats are disabled
timeout_sec,
%% maximum allowed frame size,
%% see frame_max in the AMQP 0-9-1 spec
frame_max,
%% greatest channel number allowed,
%% see channel_max in the AMQP 0-9-1 spec
channel_max,
vhost,
%% client name, version, platform, etc
client_properties,
%% what lists protocol extensions
%% does this client support?
capabilities,
%% authentication mechanism used
%% as a pair of {Name, Module}
auth_mechanism,
%% authentication mechanism state,
%% initialised by rabbit_auth_mechanism:init/1
%% implementations
auth_state,
%% time of connection
connected_at}).

-record(throttle, {
%% never | timestamp()
last_blocked_at,
Expand All @@ -171,7 +128,8 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
timeout, frame_max, channel_max, client_properties, connected_at]).
timeout, frame_max, channel_max, client_properties, connected_at,
node]).

-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).

Expand Down Expand Up @@ -409,7 +367,9 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
%% socket w/o delay before termination.
rabbit_net:fast_close(Sock),
rabbit_networking:unregister_connection(self()),
rabbit_event:notify(connection_closed, [{pid, self()}])
rabbit_event:notify(connection_closed, [{name, Name},
{pid, self()},
{node, node()}])
end,
done.

Expand Down Expand Up @@ -566,6 +526,11 @@ handle_other({channel_exit, Channel, Reason}, State) ->
handle_exception(State, Channel, Reason);
handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
handle_dependent_exit(ChPid, Reason, State);
%% We were asked to re-register in rabbit_tracked_connection.
%% See rabbit_connection_tracking, rabbit_connection_tracker.
handle_other(reregister, State = #v1{connection = ConnState}) ->
rabbit_event:notify(connection_reregistered, [{state, ConnState}]),
State;
handle_other(terminate_connection, State) ->
maybe_emit_stats(State),
stop;
Expand Down Expand Up @@ -1167,6 +1132,13 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
case rabbit_connection_tracking:is_over_connection_limit(VHostPath) of
false -> ok;
{true, Limit} -> rabbit_misc:protocol_error(not_allowed,
"access to vhost '~s' refused for user '~s': "
"connection limit (~p) is reached",
[VHostPath, User#user.username, Limit])
end,
ok = rabbit_access_control:check_vhost_access(User, VHostPath, Sock),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Expand Down Expand Up @@ -1344,6 +1316,7 @@ notify_auth_result(Username, AuthResult, ExtraProps, State) ->
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].

i(pid, #v1{}) -> self();
i(node, #v1{}) -> node();
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
Expand Down
22 changes: 21 additions & 1 deletion src/rabbit_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
proc_type_and_name/0, timestamp/0]).
proc_type_and_name/0, timestamp/0,
tracked_connection/0]).

-type(maybe(T) :: T | 'none').
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
Expand Down Expand Up @@ -126,10 +127,29 @@
auto_delete :: boolean(),
arguments :: rabbit_framing:amqp_table()}).

-type(connection_name() :: binary()).

%% used e.g. by rabbit_networking
-type(connection() :: pid()).

%% used e.g. by rabbit_connection_tracking
-type(tracked_connection() ::
#tracked_connection{id :: {node(), connection_name()},
node :: node(),
vhost :: vhost(),
name :: connection_name(),
pid :: pid(),
protocol :: protocol_name(),
peer_host :: rabbit_networking:hostname(),
peer_port :: rabbit_networking:ip_port(),
username :: username(),
connected_at :: integer()}).

%% old AMQP 0-9-1-centric type, avoid when possible
-type(protocol() :: rabbit_framing:protocol()).

-type(protocol_name() :: 'amqp0_8' | 'amqp0_9_1' | 'amqp1_0' | 'mqtt' | 'stomp' | any()).

-type(auth_user() ::
#auth_user{username :: username(),
tags :: [atom()],
Expand Down

0 comments on commit 59d8c33

Please sign in to comment.