Skip to content

Commit

Permalink
Merge branch 'stable'
Browse files Browse the repository at this point in the history
  • Loading branch information
dcorbacho committed Aug 19, 2016
2 parents 80efcc7 + d4020f5 commit e08e2b9
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 18 deletions.
25 changes: 15 additions & 10 deletions scripts/rabbitmq-server-ha.ocf
Original file line number Diff line number Diff line change
Expand Up @@ -1362,25 +1362,18 @@ check_timeouts() {
local op_name=$3

if [ $op_rc -ne 124 -a $op_rc -ne 137 ]; then
ocf_run attrd_updater -p --name $timeouts_attr_name --update 0
ocf_update_private_attr $timeouts_attr_name 0
return 0
fi

local count
count=`attrd_updater --name $timeouts_attr_name --query 2>/dev/null`
if [ $? -ne 0 ]; then
# the attrd_updater exited with error. In that case most probably it printed garbage
# instead of the number we need. So defensively assume that it is zero.

count=0
fi
count=`echo "${count}" | awk '{print $3}' | awk -F "=" '{print $2}' | sed -e '/(null)/d'`
count=$(ocf_get_private_attr $timeouts_attr_name 0)

count=$((count+1))
# There is a slight chance that this piece of code will be executed twice simultaneously.
# As a result, $timeouts_attr_name's value will be one less than it should be. But we don't need
# precise calculation here.
ocf_run attrd_updater -p --name $timeouts_attr_name --update $count
ocf_update_private_attr $timeouts_attr_name $count

if [ $count -lt $OCF_RESKEY_max_rabbitmqctl_timeouts ]; then
ocf_log warn "${LH} 'rabbitmqctl $op_name' timed out $count of max. $OCF_RESKEY_max_rabbitmqctl_timeouts time(s) in a row. Doing nothing for now."
Expand Down Expand Up @@ -1634,6 +1627,18 @@ get_monitor() {
return $rc
}

ocf_get_private_attr() {
local attr_name="${1:?}"
local attr_default_value="${2:?}"
local count
count=$(attrd_updater -p --name "$attr_name" --query)
if [ $? -ne 0 ]; then
echo $attr_default_value
else
echo "$count" | awk -vdef_val="$attr_default_value" '{ gsub(/"/, "", $3); split($3, vals, "="); if (vals[2] != "(null)") print vals[2]; else print def_val }'
fi
}

ocf_update_private_attr() {
local attr_name="${1:?}"
local attr_value="${2:?}"
Expand Down
7 changes: 6 additions & 1 deletion src/gm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,9 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
end;
handle_info(_, State) ->
%% Discard any unexpected messages, such as late replies from neighbour_call/2
%% TODO: For #gm_group{} related info messages, it could be worthwhile to
%% change_view/2, as this might reflect an alteration in the gm group, meaning
%% we now need to update our state. see rabbitmq-server#914.
noreply(State).

terminate(Reason, #state { module = Module, callback_args = Args }) ->
Expand Down Expand Up @@ -1596,7 +1599,9 @@ check_membership(Self, #gm_group{members = M} = Group) ->
Group;
false ->
throw(lost_membership)
end.
end;
check_membership(_Self, {error, not_found}) ->
throw(lost_membership).

check_membership(GroupName) ->
case dirty_read_group(GroupName) of
Expand Down
9 changes: 9 additions & 0 deletions src/rabbit_mirror_queue_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,15 @@ handle_cast({gm_deaths, DeadGMPids},
DeadPids),
rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes, async),
noreply(State);
{ok, _MPid0, DeadPids, _ExtraNodes} ->
%% see rabbitmq-server#914;
%% Different slave is now master, stop current coordinator normally.
%% Initiating queue is now slave and the least we could do is report
%% deaths which we 'think' we saw.
%% NOTE: Reported deaths here, could be inconsistant.
rabbit_mirror_queue_misc:report_deaths(MPid, false, QueueName,
DeadPids),
{stop, normal, State};
{error, not_found} ->
{stop, normal, State}
end;
Expand Down
16 changes: 13 additions & 3 deletions src/rabbit_mirror_queue_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
%% get here.
%% get here. Or, gm group could've altered. see rabbitmq-server#914
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
Expand All @@ -92,15 +92,25 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
{QPid1, SPids1} = promote_slave(Alive),
{QPid1, SPids1} = case Alive of
[] ->
%% GM altered, & if all pids are
%% perceived as dead, rather do
%% do nothing here, & trust the
%% promoted slave to have updated
%% mnesia during the alteration.
{QPid, SPids};
_ -> promote_slave(Alive)
end,
Extra =
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
[];
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
%% become the master. If gm altered,
%% we have no choice but to proceed.
Q1 = Q#amqqueue{pid = QPid1,
slave_pids = SPids1,
gm_pids = AliveGM},
Expand Down
12 changes: 9 additions & 3 deletions src/rabbit_mirror_queue_slave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,15 @@ handle_call({gm_deaths, DeadGMPids}, From,
_ ->
%% master has changed to not us
gen_server2:reply(From, ok),
%% assertion, we don't need to add_mirrors/2 in this
%% branch, see last clause in remove_from_queue/2
[] = ExtraNodes,
%% see rabbitmq-server#914;
%% It's not always guaranteed that we won't have ExtraNodes.
%% If gm alters, master can change to not us with extra nodes,
%% in which case we attempt to add mirrors on those nodes.
case ExtraNodes of
[] -> void;
_ -> rabbit_mirror_queue_misc:add_mirrors(
QName, ExtraNodes, async)
end,
%% Since GM is by nature lazy we need to make sure
%% there is some traffic when a master dies, to
%% make sure all slaves get informed of the
Expand Down
38 changes: 37 additions & 1 deletion test/gm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ all() ->
confirmed_broadcast,
member_death,
receive_in_order,
unexpected_msg
unexpected_msg,
down_in_members_change
].

init_per_suite(Config) ->
Expand Down Expand Up @@ -123,6 +124,41 @@ unexpected_msg(_Config) ->
passed
end).

down_in_members_change(_Config) ->
%% Setup
ok = gm:create_tables(),
{ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
{ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(),
fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),

%% Test. Simulate that the gm group is deleted (forget_group) while
%% processing the 'DOWN' message from the neighbour
process_flag(trap_exit, true),
ok = meck:new(mnesia, [passthrough]),
ok = meck:expect(mnesia, read, fun({gm_group, ?MODULE}) ->
[];
(Key) ->
meck:passthrough([Key])
end),
gm:leave(Pid2),
Passed = receive
{'EXIT', Pid, normal} ->
passed;
{'EXIT', Pid, _} ->
crashed
after 15000 ->
timeout
end,
%% Cleanup
meck:unload(mnesia),
process_flag(trap_exit, false),
passed = Passed.


do_broadcast(Fun) ->
with_two_members(broadcast_fun(Fun)).

Expand Down

0 comments on commit e08e2b9

Please sign in to comment.