Skip to content

Commit

Permalink
Merge branch 'master' into rabbitmq-server-500-squashed
Browse files Browse the repository at this point in the history
Conflicts:
	src/rabbit_control_main.erl
  • Loading branch information
michaelklishin committed Aug 18, 2016
2 parents dad8511 + 9f1c12f commit 754feab
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 66 deletions.
41 changes: 37 additions & 4 deletions docs/rabbitmqctl.1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -744,9 +744,7 @@

<varlistentry>
<term>
<cmdsynopsis>
<command>authenticate_user</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>password</replaceable></arg>
</cmdsynopsis>
<cmdsynopsis><command>authenticate_user</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>password</replaceable></arg></cmdsynopsis>
</term>
<listitem>
<variablelist>
Expand Down Expand Up @@ -1281,13 +1279,48 @@

<variablelist>
<varlistentry role="usage-has-option-list">
<term><cmdsynopsis><command>list_queues</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg> <arg choice="opt" role="usage-option-list"><replaceable>queueinfoitem</replaceable> ...</arg></cmdsynopsis></term>
<term>
<cmdsynopsis><command>list_queues</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg> <group choice="opt"><arg>--offline</arg><arg>--online</arg><arg>--local</arg></group> <arg choice="opt" role="usage-option-list"><replaceable>queueinfoitem</replaceable> ...</arg></cmdsynopsis>
</term>
<listitem>
<para>
Returns queue details. Queue details of the <command>/</command> virtual host
are returned if the "-p" flag is absent. The "-p" flag can be used to
override this default.
</para>
<para>
Displayed queues can be filtered by their status or
location using one of the following mutually exclusive
options:
<variablelist>
<varlistentry>
<term><cmdsynopsis><arg choice="opt">--offline</arg></cmdsynopsis></term>
<listitem>
<para>
List only those durable queues that are not
currently available (more specifically, their master node isn't).
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><cmdsynopsis><arg choice="opt">--online</arg></cmdsynopsis></term>
<listitem>
<para>
List queues that are currently available (their master node is).
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><cmdsynopsis><arg choice="opt">--local</arg></cmdsynopsis></term>
<listitem>
<para>
List only those queues whose master process is
located on the current node.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
The <command>queueinfoitem</command> parameter is used to indicate which queue
information items to include in the results. The column order in the
Expand Down
10 changes: 9 additions & 1 deletion docs/usage.xsl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
encoding="UTF-8"
indent="no"/>
<xsl:strip-space elements="*"/>
<xsl:preserve-space elements="cmdsynopsis arg" />
<xsl:preserve-space elements="cmdsynopsis arg group" />

<xsl:template match="/">
<!-- Pull out cmdsynopsis to show the command usage line. -->%% Generated, do not edit!
Expand Down Expand Up @@ -68,6 +68,14 @@ usage() -> %QUOTE%Usage:
<!-- Don't show anything else in command usage -->
<xsl:template match="text()" mode="command-usage"/>

<xsl:template match="group[@choice='opt']">
<xsl:text>[</xsl:text>
<xsl:for-each select="arg">
<xsl:apply-templates/>
<xsl:if test="not(position() = last())"><xsl:text>|</xsl:text></xsl:if>
</xsl:for-each>
<xsl:text>]</xsl:text>
</xsl:template>
<xsl:template match="arg[@choice='opt']">[<xsl:apply-templates/>]</xsl:template>
<xsl:template match="replaceable">&lt;<xsl:value-of select="."/>&gt;</xsl:template>

Expand Down
2 changes: 2 additions & 0 deletions include/rabbit_cli.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
-define(RAM_OPT, "--ram").
-define(OFFLINE_OPT, "--offline").
-define(ONLINE_OPT, "--online").
-define(LOCAL_OPT, "--local").

-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(QUIET_DEF, {?QUIET_OPT, flag}).
Expand All @@ -45,6 +46,7 @@
-define(RAM_DEF, {?RAM_OPT, flag}).
-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
-define(ONLINE_DEF, {?ONLINE_OPT, flag}).
-define(LOCAL_DEF, {?LOCAL_OPT, flag}).

%% Subset of standartized exit codes from sysexits.h, see
%% https://github.com/rabbitmq/rabbitmq-server/issues/396 for discussion.
Expand Down
2 changes: 1 addition & 1 deletion packaging/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ ifeq ($(SOURCE_DIST_FILE),)
$(error Cannot find source archive; please specify SOURCE_DIST_FILE)
endif
ifneq ($(words $(SOURCE_DIST_FILE)),1)
$(error Multile source archives found; please specify SOURCE_DIST_FILE)
$(error Multiple source archives found; please specify SOURCE_DIST_FILE)
endif
ifeq ($(filter %.tar.xz %.txz,$(SOURCE_DIST_FILE)),)
$(error The source archive must a tar.xz archive)
Expand Down
8 changes: 4 additions & 4 deletions scripts/rabbitmq-server-ha.ocf
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,9 @@ block_client_access()
# do not add temporary RMQ blocking rule, if it is already exist
# otherwise, try to add a blocking rule with max of 5 retries
local tries=5
until $(iptables -nvL | grep -q 'temporary RMQ block') || [ $tries -eq 0 ]; do
until $(iptables -nvL --wait | grep -q 'temporary RMQ block') || [ $tries -eq 0 ]; do
tries=$((tries-1))
iptables -I INPUT -p tcp -m tcp --dport ${OCF_RESKEY_node_port} -m state --state NEW,RELATED,ESTABLISHED \
iptables --wait -I INPUT -p tcp -m tcp --dport ${OCF_RESKEY_node_port} -m state --state NEW,RELATED,ESTABLISHED \
-m comment --comment 'temporary RMQ block' -j REJECT --reject-with tcp-reset
sleep 1
done
Expand All @@ -707,8 +707,8 @@ block_client_access()
unblock_client_access()
{
# remove all temporary RMQ blocking rules, if there are more than one exist
for i in $(iptables -nvL --line-numbers | awk '/temporary RMQ block/ {print $1}'); do
iptables -D INPUT -p tcp -m tcp --dport ${OCF_RESKEY_node_port} -m state --state NEW,RELATED,ESTABLISHED \
for i in $(iptables -nvL --wait --line-numbers | awk '/temporary RMQ block/ {print $1}'); do
iptables --wait -D INPUT -p tcp -m tcp --dport ${OCF_RESKEY_node_port} -m state --state NEW,RELATED,ESTABLISHED \
-m comment --comment 'temporary RMQ block' -j REJECT --reject-with tcp-reset
done
}
Expand Down
5 changes: 4 additions & 1 deletion src/gm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,10 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
catch
lost_membership ->
{stop, normal, State}
end.
end;
handle_info(_, State) ->
%% Discard any unexpected messages, such as late replies from neighbour_call/2
noreply(State).

terminate(Reason, #state { module = Module, callback_args = Args }) ->
Module:handle_terminate(Args, Reason).
Expand Down
11 changes: 11 additions & 0 deletions src/rabbit_autoheal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,17 @@ winner_finish(Notify) ->
send(leader(), {autoheal_finished, node()}),
not_healing.

%% XXX This can enter infinite loop, if mnesia was somehow restarted
%% outside of our control - i.e. somebody started app back by hand or
%% completely restarted node. One possible solution would be something
%% like this (but it needs some more pondering and is left for some
%% other patch):
%% - monitor top-level mnesia supervisors of all losers
%% - notify loosers about the fact that they are indeed loosers
%% - wait for all monitors to go 'DOWN' (+ maybe some timeout on the whole process)
%% - do one round of parallel rpc calls to check whether mnesia is still stoppend on all
%% loosers
%% - If everything is still stopped, continue autoheall process. Or cancel it otherwise.
wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
case rpc:call(Node, mnesia, system_info, [is_running]) of
no ->
Expand Down
35 changes: 18 additions & 17 deletions src/rabbit_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
-include("rabbit_cli.hrl").

-export([main/3, start_distribution/0, start_distribution/1,
parse_arguments/4, filter_opts/2,
parse_arguments/4, mutually_exclusive_flags/3,
rpc_call/4, rpc_call/5, rpc_call/7]).

%%----------------------------------------------------------------------------
Expand All @@ -42,8 +42,7 @@
[{string(), optdef()}], string(), [string()]) ->
parse_result().

-spec filter_opts([{option_name(), option_value()}], [option_name()]) ->
[boolean()].
-spec mutually_exclusive_flags([{option_name(), option_value()}], term(), [{option_name(), term()}]) -> {ok, term()} | {error, string()}.

-spec rpc_call(node(), atom(), atom(), [any()]) -> any().
-spec rpc_call(node(), atom(), atom(), [any()], number()) -> any().
Expand Down Expand Up @@ -266,20 +265,22 @@ process_opts(Defs, C, [A | As], Found, KVs, Outs) ->
{none, _, _} -> no_command
end.

%% When we have a set of flags that are used for filtering, we want by
%% default to include every such option in our output. But if a user
%% explicitly specified any such flag, we want to include only items
%% which he has requested.
filter_opts(CurrentOptionValues, AllOptionNames) ->
Explicit = lists:map(fun(OptName) ->
proplists:get_bool(OptName, CurrentOptionValues)
end,
AllOptionNames),
case lists:member(true, Explicit) of
true ->
Explicit;
false ->
lists:duplicate(length(AllOptionNames), true)
mutually_exclusive_flags(CurrentOptionValues, Default, FlagsAndValues) ->
PresentFlags = lists:filtermap(fun({OptName, _} = _O) ->
proplists:get_bool(OptName, CurrentOptionValues)
end,
FlagsAndValues),
case PresentFlags of
[] ->
{ok, Default};
[{_, Value}] ->
{ok, Value};
_ ->
Names = [ [$', N, $'] || {N, _} <- PresentFlags ],
CommaSeparated = string:join(lists:droplast(Names), ", "),
AndOneMore = lists:last(Names),
Msg = io_lib:format("Options ~s and ~s are mutually exclusive", [CommaSeparated, AndOneMore]),
{error, lists:flatten(Msg)}
end.

%%----------------------------------------------------------------------------
Expand Down
73 changes: 51 additions & 22 deletions src/rabbit_control_main.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@

{set_vhost_limits, [?VHOST_DEF]},
{clear_vhost_limits, [?VHOST_DEF]},

{list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]},
{list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF, ?LOCAL_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
{list_connections, [?VHOST_DEF]},
Expand Down Expand Up @@ -647,26 +646,47 @@ action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout)
[{timeout, Timeout}, to_bin_utf8, is_escaped]);

action(list_queues, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing queues", []),
%% User options
[Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, messages]),

%% Data for emission
Nodes = nodes_in_cluster(Node, Timeout),
OnlineChunks = if Online -> length(Nodes); true -> 0 end,
OfflineChunks = if Offline -> 1; true -> 0 end,
ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
TimeoutOpt = {timeout, Timeout},
EmissionRef = make_ref(),
EmissionRefOpt = {ref, EmissionRef},

_ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
[TimeoutOpt, EmissionRefOpt]),
_ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
[TimeoutOpt, EmissionRefOpt]),
display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
case rabbit_cli:mutually_exclusive_flags(
Opts, all, [{?ONLINE_OPT, online}
,{?OFFLINE_OPT, offline}
,{?LOCAL_OPT, local}]) of
{ok, Filter} ->
Inform("Listing queues", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, messages]),

%% Data for emission
Nodes = nodes_in_cluster(Node, Timeout),
ChunksOpt = {chunks, get_number_of_chunks(Filter, Nodes)},
TimeoutOpt = {timeout, Timeout},
EmissionRef = make_ref(),
EmissionRefOpt = {ref, EmissionRef},

case Filter of
all ->
start_emission(Node, {rabbit_amqqueue, emit_info_all,
[Nodes, VHostArg, ArgAtoms]},
[TimeoutOpt, EmissionRefOpt]),
start_emission(Node, {rabbit_amqqueue, emit_info_down,
[VHostArg, ArgAtoms]},
[TimeoutOpt, EmissionRefOpt]);
online ->
start_emission(Node, {rabbit_amqqueue, emit_info_all,
[Nodes, VHostArg, ArgAtoms]},
[TimeoutOpt, EmissionRefOpt]);
offline ->
start_emission(Node, {rabbit_amqqueue, emit_info_down,
[VHostArg, ArgAtoms]},
[TimeoutOpt, EmissionRefOpt]);
local ->
start_emission(Node, {rabbit_amqqueue, emit_info_local,
[VHostArg, ArgAtoms]},
[TimeoutOpt, EmissionRefOpt])
end,
display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
{error, ErrStr} ->
{error_string, ErrStr}
end;

action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing exchanges", []),
Expand Down Expand Up @@ -1008,3 +1028,12 @@ alarms_by_node(Name) ->
{_, As} = lists:keyfind(alarms, 1, Status),
{Name, As}
end.

get_number_of_chunks(all, Nodes) ->
length(Nodes) + 1;
get_number_of_chunks(online, Nodes) ->
length(Nodes);
get_number_of_chunks(offline, _) ->
1;
get_number_of_chunks(local, _) ->
1.
11 changes: 10 additions & 1 deletion test/gm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ all() ->
broadcast,
confirmed_broadcast,
member_death,
receive_in_order
receive_in_order,
unexpected_msg
].

init_per_suite(Config) ->
Expand Down Expand Up @@ -114,6 +115,14 @@ receive_in_order(_Config) ->
passed
end).

unexpected_msg(_Config) ->
passed = with_two_members(
fun(Pid, _) ->
Pid ! {make_ref(), old_gen_server_answer},
true = erlang:is_process_alive(Pid),
passed
end).

do_broadcast(Fun) ->
with_two_members(broadcast_fun(Fun)).

Expand Down
19 changes: 19 additions & 0 deletions test/health_check_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
,ignores_remote_alarms/1
,detects_local_alarm/1
,honors_timeout_argument/1
,detects_stuck_local_node_monitor/1
,ignores_stuck_remote_node_monitor/1
]).

all() ->
Expand All @@ -47,6 +49,8 @@ groups() ->
,ignores_remote_alarms
,detects_local_alarm
,honors_timeout_argument
,detects_stuck_local_node_monitor
,ignores_stuck_remote_node_monitor
]}].

init_per_suite(Config) ->
Expand Down Expand Up @@ -123,6 +127,21 @@ detects_local_alarm(Config) ->
{match, _} = re:run(Str, "resource alarm.*in effect"),
ok.

detects_stuck_local_node_monitor(Config) ->
[A|_] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
rabbit_ct_broker_helpers:rpc(Config, A, sys, suspend, [rabbit_node_monitor]),
{error, 75, Str} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, ["-t", "5", "node_health_check"]),
{match, _} = re:run(Str, "operation node_health_check.*timed out"),
resume_sys_process(Config, A, rabbit_node_monitor),
ok.

ignores_stuck_remote_node_monitor(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
rabbit_ct_broker_helpers:rpc(Config, A, sys, suspend, [rabbit_node_monitor]),
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, B, ["-t", "5", "node_health_check"]),
resume_sys_process(Config, A, rabbit_node_monitor),
ok.

honors_timeout_argument(Config) ->
[A|_] = open_channel_and_declare_queue_everywhere(Config),
QPid = suspend_single_queue(Config, A),
Expand Down
Loading

0 comments on commit 754feab

Please sign in to comment.