Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Merge pull request #121 from rabbitmq/rabbitmq-server-500-squashed
Browse files Browse the repository at this point in the history
Keep track of connections, introduce per-vhost limits
  • Loading branch information
michaelklishin authored Aug 26, 2016
2 parents 09402b0 + 4f067fd commit 8b975b1
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 50 deletions.
84 changes: 83 additions & 1 deletion include/rabbit.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,65 @@
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).

-record(vhost, {virtual_host, dummy}).
%% Represents a vhost.
%%
%% Historically this record had 2 arguments although the 2nd
%% was never used (`dummy`, always undefined). This is because
%% single field records were/are illegal in OTP.
%%
%% As of 3.6.x, the second argument is vhost limits,
%% which is actually used and has the same default.
%% Nonetheless, this required a migration, see rabbit_upgrade_functions.
-record(vhost, {
%% vhost name as a binary
virtual_host,
%% proplist of limits configured, if any
limits}).

%% Client connection, used by rabbit_reader
%% and related modules.
-record(connection, {
%% e.g. <<"127.0.0.1:55054 -> 127.0.0.1:5672">>
name,
%% used for logging: same as `name`, but optionally
%% augmented with user-supplied name
log_name,
%% server host
host,
%% client host
peer_host,
%% server port
port,
%% client port
peer_port,
%% protocol implementation module,
%% e.g. rabbit_framing_amqp_0_9_1
protocol,
user,
%% heartbeat timeout value used, 0 means
%% heartbeats are disabled
timeout_sec,
%% maximum allowed frame size,
%% see frame_max in the AMQP 0-9-1 spec
frame_max,
%% greatest channel number allowed,
%% see channel_max in the AMQP 0-9-1 spec
channel_max,
vhost,
%% client name, version, platform, etc
client_properties,
%% what lists protocol extensions
%% does this client support?
capabilities,
%% authentication mechanism used
%% as a pair of {Name, Module}
auth_mechanism,
%% authentication mechanism state,
%% initialised by rabbit_auth_mechanism:init/1
%% implementations
auth_state,
%% time of connection
connected_at}).

-record(content,
{class_id,
Expand Down Expand Up @@ -138,6 +196,30 @@
dependency_version_requirements %% [{atom(), [string()]}]
}).

%% used to track connections across virtual hosts
%% so that limits can be enforced
-record(tracked_connection_per_vhost,
{vhost, connection_count}).

%% Used to track detailed information
%% about connections.
-record(tracked_connection, {
%% {Node, Name}
id,
node,
vhost,
name,
pid,
protocol,
%% client host
peer_host,
%% client port
peer_port,
username,
%% time of connection
connected_at
}).

%%----------------------------------------------------------------------------

-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2016 Pivotal Software, Inc.").
Expand Down
41 changes: 41 additions & 0 deletions src/rabbit_ct_broker_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@
set_parameter/5,
clear_parameter/4,

add_vhost/2,
add_vhost/3,
delete_vhost/2,
delete_vhost/3,

set_permissions/6,
set_permissions/7,
set_full_permissions/2,
set_full_permissions/3,
set_full_permissions/4,

enable_plugin/3,
disable_plugin/3,

Expand Down Expand Up @@ -760,6 +771,36 @@ format_ipaddr_for_uri(
Res1 = re:replace(Res0, "(^0(:0)+$|^(0:)+|(:0)+$)|:(0:)+", "::"),
"[" ++ Res1 ++ "]".


%% Virtual host management

add_vhost(Config, VHost) ->
add_vhost(Config, 0, VHost).

add_vhost(Config, Node, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_vhost, add, [VHost]).

delete_vhost(Config, VHost) ->
delete_vhost(Config, 0, VHost).

delete_vhost(Config, Node, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_vhost, delete, [VHost]).

set_full_permissions(Config, VHost) ->
set_permissions(Config, 0, <<"guest">>, VHost, <<".*">>, <<".*">>, <<".*">>).
set_full_permissions(Config, Username, VHost) ->
set_permissions(Config, 0, Username, VHost, <<".*">>, <<".*">>, <<".*">>).
set_full_permissions(Config, Node, Username, VHost) ->
set_permissions(Config, Node, Username, VHost, <<".*">>, <<".*">>, <<".*">>).

set_permissions(Config, Username, VHost, ConfigurePerm, WritePerm, ReadPerm) ->
set_permissions(Config, 0, Username, VHost, ConfigurePerm, WritePerm, ReadPerm).
set_permissions(Config, Node, Username, VHost, ConfigurePerm, WritePerm, ReadPerm) ->
rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_auth_backend_internal,
set_permissions,
[Username, VHost, ConfigurePerm, WritePerm, ReadPerm]).

%% Functions to execute code on a remote node/broker.

add_code_path_to_node(Node, Module) ->
Expand Down
10 changes: 7 additions & 3 deletions src/rabbit_networking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,13 @@ node_listeners(Node) ->

on_node_down(Node) ->
case lists:member(Node, nodes()) of
false -> ok = mnesia:dirty_delete(rabbit_listener, Node);
true -> rabbit_log:info(
"Keep ~s listeners: the node is already back~n", [Node])
false ->
rabbit_log:info(
"Node ~s is down, deleting its listeners~n", [Node]),
ok = mnesia:dirty_delete(rabbit_listener, Node);
true ->
rabbit_log:info(
"Keeping ~s listeners: the node is already back~n", [Node])
end.

register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
Expand Down
58 changes: 13 additions & 45 deletions src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,49 +103,6 @@
%% credit- and resource-driven flow control
throttle}).

-record(connection, {
%% e.g. <<"127.0.0.1:55054 -> 127.0.0.1:5672">>
name,
%% used for logging: same as `name`, but optionally
%% augmented with user-supplied name
log_name,
%% server host
host,
%% client host
peer_host,
%% server port
port,
%% client port
peer_port,
%% protocol framing implementation module,
%% e.g. rabbit_framing_amqp_0_9_1
protocol,
user,
%% heartbeat timeout value used, 0 means
%% heartbeats are disabled
timeout_sec,
%% maximum allowed frame size,
%% see frame_max in the AMQP 0-9-1 spec
frame_max,
%% greatest channel number allowed,
%% see channel_max in the AMQP 0-9-1 spec
channel_max,
vhost,
%% client name, version, platform, etc
client_properties,
%% what lists protocol extensions
%% does this client support?
capabilities,
%% authentication mechanism used
%% as a pair of {Name, Module}
auth_mechanism,
%% authentication mechanism state,
%% initialised by rabbit_auth_mechanism:init/1
%% implementations
auth_state,
%% time of connection
connected_at}).

-record(throttle, {
%% never | timestamp()
last_blocked_at,
Expand All @@ -171,7 +128,8 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
timeout, frame_max, channel_max, client_properties, connected_at]).
timeout, frame_max, channel_max, client_properties, connected_at,
node]).

-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).

Expand Down Expand Up @@ -409,7 +367,9 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
%% socket w/o delay before termination.
rabbit_net:fast_close(Sock),
rabbit_networking:unregister_connection(self()),
rabbit_event:notify(connection_closed, [{pid, self()}])
rabbit_event:notify(connection_closed, [{name, Name},
{pid, self()},
{node, node()}])
end,
done.

Expand Down Expand Up @@ -1167,6 +1127,13 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
case rabbit_connection_tracking:is_over_connection_limit(VHostPath) of
false -> ok;
{true, Limit} -> rabbit_misc:protocol_error(not_allowed,
"access to vhost '~s' refused for user '~s': "
"connection limit (~p) is reached",
[VHostPath, User#user.username, Limit])
end,
ok = rabbit_access_control:check_vhost_access(User, VHostPath, Sock),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Expand Down Expand Up @@ -1344,6 +1311,7 @@ notify_auth_result(Username, AuthResult, ExtraProps, State) ->
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].

i(pid, #v1{}) -> self();
i(node, #v1{}) -> node();
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
Expand Down
22 changes: 21 additions & 1 deletion src/rabbit_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
proc_type_and_name/0, timestamp/0]).
proc_type_and_name/0, timestamp/0,
tracked_connection/0]).

-type(maybe(T) :: T | 'none').
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
Expand Down Expand Up @@ -126,10 +127,29 @@
auto_delete :: boolean(),
arguments :: rabbit_framing:amqp_table()}).

-type(connection_name() :: binary()).

%% used e.g. by rabbit_networking
-type(connection() :: pid()).

%% used e.g. by rabbit_connection_tracking
-type(tracked_connection() ::
#tracked_connection{id :: {node(), connection_name()},
node :: node(),
vhost :: vhost(),
name :: connection_name(),
pid :: pid(),
protocol :: protocol_name(),
peer_host :: rabbit_networking:hostname(),
peer_port :: rabbit_networking:ip_port(),
username :: username(),
connected_at :: integer()}).

%% old AMQP 0-9-1-centric type, avoid when possible
-type(protocol() :: rabbit_framing:protocol()).

-type(protocol_name() :: 'amqp0_8' | 'amqp0_9_1' | 'amqp1_0' | 'mqtt' | 'stomp' | any()).

-type(auth_user() ::
#auth_user{username :: username(),
tags :: [atom()],
Expand Down

0 comments on commit 8b975b1

Please sign in to comment.