Skip to content

Commit

Permalink
Switch to a handler for connection tracking (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Jan 19, 2016
1 parent 3e1d2b4 commit 504adde
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 3 deletions.
51 changes: 51 additions & 0 deletions src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
%%

-module(rabbit_connection_tracking).

%% Abstracts away how tracked connection records are stored
%% and queried.
%%
%% See also:
%%
%% * rabbit_connection_tracking_handler
%% * rabbit_reader
%% * rabbit_event

-export([register_connection/1, unregister_connection/1]).

-ifdef(use_specs).

-spec(register_connection/1 :: (rabbit_types:tracked_connection()) -> ok).
-spec(unregister_connection/1 :: (rabbit_types:connection_name()) -> ok).

-endif.

-include_lib("rabbit.hrl").

-define(TABLE, rabbit_tracked_connection).
-define(SERVER, ?MODULE).

%%
%% API
%%

register_connection(Conn) ->
mnesia:write(?TABLE, Conn, write).

unregister_connection(ConnName) ->
mnesia:delete({?TABLE, ConnName}).

112 changes: 112 additions & 0 deletions src/rabbit_connection_tracking_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
%%

-module(rabbit_connection_tracking_handler).

%% This module keeps track of connection creation and termination events
%% on its local node. The primary goal here is to decouple connection
%% tracking from rabbit_reader in rabbit_common.
%%
%% Events from other nodes are ignored.

%% This module keeps track of connection creation and termination events
%% on its local node. The primary goal here is to decouple connection
%% tracking from rabbit_reader in rabbit_common.
%%
%% Events from other nodes are ignored.

-behaviour(gen_event).

-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).

-ifdef(use_specs).
-endif.

-include_lib("rabbit.hrl").

-rabbit_boot_step({?MODULE,
[{description, "connection tracking event handler"},
{mfa, {gen_event, add_handler,
[rabbit_event, ?MODULE, []]}},
{cleanup, {gen_event, delete_handler,
[rabbit_event, ?MODULE, []]}},
{requires, rabbit_event},
{enables, recovery}]}).


%%
%% API
%%

init([]) ->
{ok, []}.

handle_event(#event{type = connection_created, props = Details}, State) ->
%% [{type,network},
%% {pid,<0.329.0>},
%% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>},
%% {port,5672},
%% {peer_port,60998},
%% {host,{0,0,0,0,0,65535,32512,1}},
%% {peer_host,{0,0,0,0,0,65535,32512,1}},
%% {ssl,false},
%% {peer_cert_subject,''},
%% {peer_cert_issuer,''},
%% {peer_cert_validity,''},
%% {auth_mechanism,<<"PLAIN">>},
%% {ssl_protocol,''},
%% {ssl_key_exchange,''},
%% {ssl_cipher,''},
%% {ssl_hash,''},
%% {protocol,{0,9,1}},
%% {user,<<"guest">>},
%% {vhost,<<"/">>},
%% {timeout,14},
%% {frame_max,131072},
%% {channel_max,65535},
%% {client_properties,
%% [{<<"capabilities">>,table,
%% [{<<"publisher_confirms">>,bool,true},
%% {<<"consumer_cancel_notify">>,bool,true},
%% {<<"exchange_exchange_bindings">>,bool,true},
%% {<<"basic.nack">>,bool,true},
%% {<<"connection.blocked">>,bool,true},
%% {<<"authentication_failure_close">>,bool,true}]},
%% {<<"product">>,longstr,<<"Bunny">>},
%% {<<"platform">>,longstr,
%% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>},
%% {<<"version">>,longstr,<<"2.3.0.pre">>},
%% {<<"information">>,longstr,
%% <<"http://rubybunny.info">>}]},
%% {connected_at,1453214290847}]
{ok, State};
handle_event(#event{type = connection_closed, props = Details}, State) ->
{ok, State};
handle_event(_Event, State) ->
{ok, State}.

handle_call(_Request, State) ->
{ok, not_understood, State}.

handle_info(_Info, State) ->
{ok, State}.

terminate(_Arg, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.
15 changes: 14 additions & 1 deletion src/rabbit_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
connection/0, protocol/0, auth_user/0, user/0, internal_user/0,
connection/0, connection_name/0, tracked_connection/0,
protocol/0, auth_user/0, user/0, internal_user/0,
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
Expand Down Expand Up @@ -128,8 +129,20 @@
auto_delete :: boolean(),
arguments :: rabbit_framing:amqp_table()}).

-type(connection_name() :: binary()).

%% used e.g. by rabbit_networking
-type(connection() :: pid()).

%% used e.g. by rabbit_connection_tracking
-type(tracked_connection() ::
#tracked_connection{vhost :: vhost(),
name :: connection_name(),
pid :: pid(),
protocol :: protocol(),
peer_host :: rabbit_networking:hostname(),
peer_port :: rabbit_networking:ip_port()}).

-type(protocol() :: rabbit_framing:protocol()).

-type(auth_user() ::
Expand Down
5 changes: 3 additions & 2 deletions src/rabbit_upgrade_functions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@

tracked_connection() ->
create(rabbit_tracked_connection, [{record_name, tracked_connection},
{attributes, [vhost, name, pid, peer_host,
peer_port, connected_at]}]).
{attributes, [vhost, name, pid, protocol,
peer_host, peer_port,
connected_at]}]).

%% replaces vhost.dummy (used to avoid having a single-field record
%% which Mnesia doesn't like) with vhost.limits (which is actually
Expand Down

0 comments on commit 504adde

Please sign in to comment.