Skip to content
This repository has been archived by the owner on Nov 18, 2020. It is now read-only.

Notify reductions, garbage_collection, recv_oct and send_oct in the core metrics #103

Merged
merged 7 commits into from
Jan 27, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions include/rabbit_stomp.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,26 @@
ssl_cert_login}).

-define(SUPPORTED_VERSIONS, ["1.0", "1.1", "1.2"]).

-define(INFO_ITEMS,
[conn_name,
connection,
connection_state,
session_id,
channel,
version,
ssl_cert_login,
implicit_connect,
default_login,
default_passcode,
ssl_login_name,
peer_addr,
host,
port,
peer_host,
peer_port,
protocol,
channels,
channel_max,
frame_max,
client_properties]).
9 changes: 9 additions & 0 deletions src/rabbit_stomp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
-behaviour(application).
-export([start/2, stop/1]).
-export([parse_default_user/2]).
-export([list/0]).

-define(DEFAULT_CONFIGURATION,
#stomp_configuration{
Expand Down Expand Up @@ -86,3 +87,11 @@ report_configuration(#stomp_configuration{
end,

ok.

list() ->
[Client
|| {_, ListSupPid, _, _} <- supervisor2:which_children(rabbit_stomp_sup),
{_, RanchSup, supervisor, _} <- supervisor2:which_children(ListSupPid),
{ranch_conns_sup, ConnSup, _, _} <- supervisor:which_children(RanchSup),
{_, CliSup, _, _} <- supervisor:which_children(ConnSup),
{rabbit_stomp_reader, Client, _, _} <- supervisor:which_children(CliSup)].
35 changes: 32 additions & 3 deletions src/rabbit_stomp_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
send_delivery/5]).

-export([adapter_name/1]).
-export([info/2]).

-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl").
Expand Down Expand Up @@ -105,6 +106,30 @@ process_frame(Frame = #stomp_frame{command = Command}, State) ->
flush_and_die(State) ->
close_connection(State).

info(session_id, #proc_state{session_id = Val}) ->
Val;
info(channel, #proc_state{channel = Val}) -> Val;
info(version, #proc_state{version = Val}) -> Val;
info(ssl_cert_login, #proc_state{config = #stomp_configuration{ssl_cert_login = Val}}) -> Val;
info(implicit_connect, #proc_state{config = #stomp_configuration{implicit_connect = Val}}) -> Val;
info(default_login, #proc_state{config = #stomp_configuration{default_login = Val}}) -> Val;
info(default_passcode, #proc_state{config = #stomp_configuration{default_passcode = Val}}) -> Val;
info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
info(peer_addr, #proc_state{peer_addr = Val}) -> Val;
info(host, #proc_state{adapter_info = #amqp_adapter_info{host = Val}}) -> Val;
info(port, #proc_state{adapter_info = #amqp_adapter_info{port = Val}}) -> Val;
info(peer_host, #proc_state{adapter_info = #amqp_adapter_info{peer_host = Val}}) -> Val;
info(peer_port, #proc_state{adapter_info = #amqp_adapter_info{peer_port = Val}}) -> Val;
info(protocol, #proc_state{adapter_info = #amqp_adapter_info{protocol = Val}}) ->
case Val of
{Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)};
Other -> Other
end;
info(channels, PState) -> additional_info(channels, PState);
info(channel_max, PState) -> additional_info(channel_max, PState);
info(frame_max, PState) -> additional_info(frame_max, PState);
info(client_properties, PState) -> additional_info(client_properties, PState).

initial_state(Configuration,
{SendFun, AdapterInfo0 = #amqp_adapter_info{additional_info = Extra},
SSLLoginName, PeerAddr}) ->
Expand Down Expand Up @@ -204,7 +229,7 @@ process_request(ProcessFun, State) ->
process_request(ProcessFun, fun (StateM) -> StateM end, State).


process_request(ProcessFun, SuccessFun, State=#proc_state{connection=Conn}) ->
process_request(ProcessFun, SuccessFun, State) ->
Res = case catch ProcessFun(State) of
{'EXIT',
{{shutdown,
Expand All @@ -217,13 +242,13 @@ process_request(ProcessFun, SuccessFun, State=#proc_state{connection=Conn}) ->
Result
end,
case Res of
{ok, Frame, NewState} ->
{ok, Frame, NewState = #proc_state{connection = Conn}} ->
case Frame of
none -> ok;
_ -> send_frame(Frame, NewState)
end,
{ok, SuccessFun(NewState), Conn};
{error, Message, Detail, NewState} ->
{error, Message, Detail, NewState = #proc_state{connection = Conn}} ->
{ok, send_error(Message, Detail, NewState), Conn};
{stop, normal, NewState} ->
{stop, normal, SuccessFun(NewState)};
Expand Down Expand Up @@ -1112,3 +1137,7 @@ send_error(Message, Detail, State) ->
send_error(Message, Format, Args, State) ->
send_error(Message, rabbit_misc:format(Format, Args), State).

additional_info(Key,
#proc_state{adapter_info =
#amqp_adapter_info{additional_info = AddInfo}}) ->
proplists:get_value(Key, AddInfo).
68 changes: 59 additions & 9 deletions src/rabbit_stomp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
-export([start_heartbeats/2]).
-export([info/2]).

-include("rabbit_stomp.hrl").
-include("rabbit_stomp_frame.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, garbage_collection, state]).

-record(reader_state, {socket, conn_name, parse_state, processor_state, state,
conserve_resources, recv_outstanding, stats_timer,
parent, connection, heartbeat_sup, heartbeat}).
Expand All @@ -50,6 +54,13 @@ start_link(SupHelperPid, Ref, Sock, Configuration) ->

log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).

info(Pid, InfoItems) ->
case InfoItems -- ?INFO_ITEMS of
[] ->
gen_server2:call(Pid, {info, InfoItems});
UnknownItems -> throw({bad_argument, UnknownItems})
end.

init([SupHelperPid, Ref, Sock, Configuration]) ->
process_flag(trap_exit, true),
rabbit_net:accept_ack(Ref, Sock),
Expand Down Expand Up @@ -90,6 +101,13 @@ init([SupHelperPid, Ref, Sock, Configuration]) ->
end.


handle_call({info, InfoItems}, _From, State) ->
Infos = lists:map(
fun(InfoItem) ->
{InfoItem, info_internal(InfoItem, State)}
end,
InfoItems),
{reply, Infos, State};
handle_call(Msg, From, State) ->
{stop, {stomp_unexpected_call, Msg, From}, State}.

Expand Down Expand Up @@ -356,15 +374,18 @@ maybe_emit_stats(State) ->
rabbit_event:if_enabled(State, #reader_state.stats_timer,
fun() -> emit_stats(State) end).

emit_stats(State=#reader_state{socket = Sock, state = ConnState, connection = Conn}) ->
SockInfos = case rabbit_net:getstat(Sock,
[recv_oct, recv_cnt, send_oct, send_cnt, send_pend]) of
{ok, SI} -> SI;
{error, _} -> []
end,
Infos = [{pid, Conn}, {state, ConnState} | SockInfos],
rabbit_core_metrics:connection_stats(Conn, Infos),
rabbit_event:notify(connection_stats, Infos),
emit_stats(State=#reader_state{connection = C}) when C == none; C == undefined ->
%% Avoid emitting stats on terminate when the connection has not yet been
%% established, as this causes orphan entries on the stats database
State1 = rabbit_event:reset_stats_timer(State, #reader_state.stats_timer),
ensure_stats_timer(State1);
emit_stats(State) ->
[{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I
= infos(?SIMPLE_METRICS, State),
Infos = infos(?OTHER_METRICS, State),
rabbit_core_metrics:connection_stats(Pid, Infos),
rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
rabbit_event:notify(connection_stats, Infos ++ I),
State1 = rabbit_event:reset_stats_timer(State, #reader_state.stats_timer),
ensure_stats_timer(State1).

Expand All @@ -377,3 +398,32 @@ ensure_stats_timer(State = #reader_state{}) ->
processor_state(#reader_state{ processor_state = ProcState }) -> ProcState.
processor_state(ProcState, #reader_state{} = State) ->
State#reader_state{ processor_state = ProcState}.

%%----------------------------------------------------------------------------

infos(Items, State) -> [{Item, info_internal(Item, State)} || Item <- Items].

info_internal(pid, State) -> info_internal(connection, State);
info_internal(SockStat, #reader_state{socket = Sock}) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
case rabbit_net:getstat(Sock, [SockStat]) of
{ok, [{_, I}]} -> I;
{error, _} -> ''
end;
info_internal(state, State) -> info_internal(connection_state, State);
info_internal(garbage_collection, _State) ->
rabbit_misc:get_gc_info(self());
info_internal(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
Reductions;
info_internal(conn_name, #reader_state{conn_name = Val}) ->
rabbit_data_coercion:to_binary(Val);
info_internal(connection, #reader_state{connection = Val}) ->
Val;
info_internal(connection_state, #reader_state{state = Val}) ->
Val;
info_internal(Key, #reader_state{processor_state = ProcState}) ->
rabbit_stomp_processor:info(Key, ProcState).
40 changes: 39 additions & 1 deletion test/connections_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,24 @@
all() ->
[
messages_not_dropped_on_disconnect,
direct_client_connections_are_not_leaked
direct_client_connections_are_not_leaked,
stats_are_not_leaked,
stats
].

merge_app_env(Config) ->
rabbit_ct_helpers:merge_app_env(Config,
{rabbit, [
{collect_statistics, basic},
{collect_statistics_interval, 100}
]}).

init_per_suite(Config) ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config1,
[ fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps()).


Expand Down Expand Up @@ -111,3 +121,31 @@ messages_not_dropped_on_disconnect(Config) ->
get_stomp_port(Config) ->
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp).

stats_are_not_leaked(Config) ->
StompPort = get_stomp_port(Config),
N = rabbit_ct_broker_helpers:rpc(Config, 0, ets, info, [connection_metrics, size]),
{ok, C} = gen_tcp:connect("localhost", StompPort, []),
Bin = <<"GET / HTTP/1.1\r\nHost: www.rabbitmq.com\r\nUser-Agent: curl/7.43.0\r\nAccept: */*\n\n">>,
gen_tcp:send(C, Bin),
gen_tcp:close(C),
timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms
N = rabbit_ct_broker_helpers:rpc(Config, 0, ets, info, [connection_metrics, size]),
ok.

stats(Config) ->
StompPort = get_stomp_port(Config),
{ok, Client} = rabbit_stomp_client:connect(StompPort),
timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms
%% Retrieve the connection Pid
[Reader] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp, list, []),
[{_, Pid}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp_reader,
info, [Reader, [connection]]),
%% Verify the content of the metrics, garbage_collection must be present
[{Pid, Props}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, lookup,
[connection_metrics, Pid]),
true = proplists:is_defined(garbage_collection, Props),
%% If the coarse entry is present, stats were successfully emitted
[{Pid, _, _, _}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, lookup,
[connection_coarse_metrics, Pid]),
rabbit_stomp_client:disconnect(Client),
ok.