Skip to content

Commit

Permalink
Merge branch 'stable'
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Aug 24, 2016
2 parents ea8df21 + ce0381c commit f057a8c
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 51 deletions.
113 changes: 78 additions & 35 deletions src/rabbit_autoheal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ node_down(_Node, not_healing) ->
node_down(Node, {winner_waiting, _, Notify}) ->
abort([Node], Notify);

node_down(Node, {leader_waiting, Node, _Notify}) ->
%% The winner went down, we don't know what to do so we simply abort.
rabbit_log:info("Autoheal: aborting - winner ~p went down~n", [Node]),
not_healing;

node_down(Node, {leader_waiting, _, _} = St) ->
%% If it is a partial partition, the winner might continue with the
%% healing process. If it is a full partition, the winner will also
%% see it and abort. Let's wait for it.
rabbit_log:info("Autoheal: ~p went down, waiting for winner decision ~n", [Node]),
St;

node_down(Node, _State) ->
rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
not_healing.
Expand Down Expand Up @@ -218,14 +230,24 @@ handle_msg({become_winner, Losers},
not_healing, _Partitions) ->
rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n",
[Losers]),
%% The leader said everything was ready - do we agree? If not then
%% give up.
Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers),
case Down of
[] -> [send(L, {winner_is, node()}) || L <- Losers],
{winner_waiting, Losers, Losers};
_ -> abort(Down, Losers)
end;
stop_partition(Losers);

handle_msg({become_winner, Losers},
{winner_waiting, _, Losers}, _Partitions) ->
%% The leader has aborted the healing, might have seen us down but
%% we didn't see the same. Let's try again as it is the same partition.
rabbit_log:info("Autoheal: I am the winner and received a duplicated "
"request, waiting again for ~p to stop~n", [Losers]),
stop_partition(Losers);

handle_msg({become_winner, _},
{winner_waiting, _, Losers}, _Partitions) ->
%% Something has happened to the leader, it might have seen us down but we
%% are still alive. Partitions have changed, cannot continue.
rabbit_log:info("Autoheal: I am the winner and received another healing "
"request, partitions have changed. Aborting ~n", [Losers]),
winner_finish(Losers),
not_healing;

handle_msg({winner_is, Winner}, State = not_healing,
_Partitions) ->
Expand Down Expand Up @@ -269,6 +291,14 @@ handle_msg({autoheal_finished, Winner}, not_healing, _Partitions)
%% We are the leader and the winner. The state already transitioned
%% to "not_healing" at the end of the autoheal process.
rabbit_log:info("Autoheal finished according to winner ~p~n", [node()]),
not_healing;

handle_msg({autoheal_finished, Winner}, not_healing, _Partitions) ->
%% We might have seen the winner down during a partial partition and
%% transitioned to not_healing. However, the winner was still able
%% to finish. Let it pass.
rabbit_log:info("Autoheal finished according to winner ~p."
" Unexpected, I might have previously seen the winner down~n", [Winner]),
not_healing.

%%----------------------------------------------------------------------------
Expand All @@ -279,7 +309,9 @@ abort(Down, Notify) ->
rabbit_log:info("Autoheal: aborting - ~p down~n", [Down]),
%% Make sure any nodes waiting for us start - it won't necessarily
%% heal the partition but at least they won't get stuck.
winner_finish(Notify).
%% If we are executing this, we are not stopping. Thus, don't wait
%% for ourselves!
winner_finish(Notify -- [node()]).

winner_finish(Notify) ->
%% There is a race in Mnesia causing a starting loser to hang
Expand All @@ -297,32 +329,33 @@ winner_finish(Notify) ->
send(leader(), {autoheal_finished, node()}),
not_healing.

%% XXX This can enter infinite loop, if mnesia was somehow restarted
%% outside of our control - i.e. somebody started app back by hand or
%% completely restarted node. One possible solution would be something
%% like this (but it needs some more pondering and is left for some
%% other patch):
%% - monitor top-level mnesia supervisors of all losers
%% - notify loosers about the fact that they are indeed loosers
%% - wait for all monitors to go 'DOWN' (+ maybe some timeout on the whole process)
%% - do one round of parallel rpc calls to check whether mnesia is still stoppend on all
%% loosers
%% - If everything is still stopped, continue autoheall process. Or cancel it otherwise.
wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
case rpc:call(Node, mnesia, system_info, [is_running]) of
no ->
wait_for_mnesia_shutdown(Rest);
Running when
Running =:= yes orelse
Running =:= starting orelse
Running =:= stopping ->
timer:sleep(?MNESIA_STOPPED_PING_INTERNAL),
wait_for_mnesia_shutdown(AllNodes);
_ ->
wait_for_mnesia_shutdown(Rest)
end;
wait_for_mnesia_shutdown([]) ->
ok.
%% This improves the previous implementation, but could still potentially enter an infinity
%% loop. If it also possible that for when it finishes some of the nodes have been
%% manually restarted, but we can't do much more (apart from stop them again). So let it
%% continue and notify all the losers to restart.
wait_for_mnesia_shutdown(AllNodes) ->
Monitors = lists:foldl(fun(Node, Monitors0) ->
pmon:monitor({mnesia_sup, Node}, Monitors0)
end, pmon:new(), AllNodes),
wait_for_supervisors(Monitors).

wait_for_supervisors(Monitors) ->
case pmon:is_empty(Monitors) of
true ->
ok;
false ->
receive
{'DOWN', _MRef, process, {mnesia_sup, _} = I, _Reason} ->
wait_for_supervisors(pmon:erase(I, Monitors))
after
60000 ->
AliveLosers = [Node || {_, Node} <- pmon:monitored(Monitors)],
rabbit_log:info("Autoheal: mnesia in nodes ~p is still up, sending "
"winner notification again to these ~n", [AliveLosers]),
[send(L, {winner_is, node()}) || L <- AliveLosers],
wait_for_mnesia_shutdown(AliveLosers)
end
end.

restart_loser(State, Winner) ->
rabbit_log:warning(
Expand Down Expand Up @@ -402,3 +435,13 @@ fmt_error({remote_down, RemoteDown}) ->
rabbit_misc:format("Remote nodes disconnected:~n ~p", [RemoteDown]);
fmt_error({nodes_down, NodesDown}) ->
rabbit_misc:format("Local nodes down: ~p", [NodesDown]).

stop_partition(Losers) ->
%% The leader said everything was ready - do we agree? If not then
%% give up.
Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers),
case Down of
[] -> [send(L, {winner_is, node()}) || L <- Losers],
{winner_waiting, Losers, Losers};
_ -> abort(Down, Losers)
end.
53 changes: 37 additions & 16 deletions src/rabbit_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,17 @@ init([]) ->
process_flag(trap_exit, true),
net_kernel:monitor_nodes(true, [nodedown_reason]),
{ok, _} = mnesia:subscribe(system),
{ok, ensure_keepalive_timer(#state{monitors = pmon:new(),
%% If the node has been restarted, Mnesia can trigger a system notification
%% before the monitor subscribes to receive them. To avoid autoheal blocking due to
%% the inconsistent database event never arriving, we being monitoring all running
%% nodes as early as possible. The rest of the monitoring ops will only be triggered
%% when notifications arrive.
Nodes = possibly_partitioned_nodes(),
startup_log(Nodes),
Monitors = lists:foldl(fun(Node, Monitors0) ->
pmon:monitor({rabbit, Node}, Monitors0)
end, pmon:new(), Nodes),
{ok, ensure_keepalive_timer(#state{monitors = Monitors,
subscribers = pmon:new(),
partitions = [],
guid = rabbit_guid:gen(),
Expand Down Expand Up @@ -486,20 +496,22 @@ handle_cast({partial_partition_disconnect, Other}, State) ->
%% mnesia propagation.
handle_cast({node_up, Node, NodeType},
State = #state{monitors = Monitors}) ->
case pmon:is_monitored({rabbit, Node}, Monitors) of
true -> {noreply, State};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({add_node(Node, AllNodes),
case NodeType of
disc -> add_node(Node, DiscNodes);
ram -> DiscNodes
end,
add_node(Node, RunningNodes)}),
ok = handle_live_rabbit(Node),
Monitors1 = pmon:monitor({rabbit, Node}, Monitors),
{noreply, maybe_autoheal(State#state{monitors = Monitors1})}
end;
rabbit_log:info("rabbit on node ~p up~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({add_node(Node, AllNodes),
case NodeType of
disc -> add_node(Node, DiscNodes);
ram -> DiscNodes
end,
add_node(Node, RunningNodes)}),
ok = handle_live_rabbit(Node),
Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
true ->
Monitors;
false ->
pmon:monitor({rabbit, Node}, Monitors)
end,
{noreply, maybe_autoheal(State#state{monitors = Monitors1})};

handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
Expand Down Expand Up @@ -572,7 +584,7 @@ handle_info({mnesia_system_event,
State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
true -> State;
false -> State#state{
monitors = pmon:monitor({rabbit, Node}, Monitors)}
monitors = pmon:monitor({rabbit, Node}, Monitors)}
end,
ok = handle_live_rabbit(Node),
Partitions1 = lists:usort([Node | Partitions]),
Expand Down Expand Up @@ -873,3 +885,12 @@ alive_rabbit_nodes(Nodes) ->
ping_all() ->
[net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
ok.

possibly_partitioned_nodes() ->
alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running).

startup_log([]) ->
rabbit_log:info("Starting rabbit_node_monitor~n", []);
startup_log(Nodes) ->
rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n",
[Nodes]).
23 changes: 23 additions & 0 deletions test/partitions_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ groups() ->
{cluster_size_3, [], [
autoheal,
autoheal_after_pause_if_all_down,
autoheal_multiple_partial_partitions,
autoheal_unexpected_finish,
ignore,
pause_if_all_down_on_blocked,
pause_if_all_down_on_down,
Expand Down Expand Up @@ -307,6 +309,27 @@ do_autoheal(Config) ->
Test([{A, B}, {A, C}, {B, C}]),
ok.

autoheal_multiple_partial_partitions(Config) ->
set_mode(Config, autoheal),
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
block_unblock([{A, B}]),
block_unblock([{A, C}]),
block_unblock([{A, B}]),
block_unblock([{A, C}]),
block_unblock([{A, B}]),
block_unblock([{A, C}]),
[await_listening(N, true) || N <- [A, B, C]],
[await_partitions(N, []) || N <- [A, B, C]],
ok.

autoheal_unexpected_finish(Config) ->
set_mode(Config, autoheal),
[A, B, _C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]),
Pid ! {autoheal_msg, {autoheal_finished, B}},
Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]),
ok.

partial_false_positive(Config) ->
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
block([{A, B}]),
Expand Down

0 comments on commit f057a8c

Please sign in to comment.