Skip to content

Commit

Permalink
Merge pull request #8328 from rabbitmq/mergify/bp/v3.12.x/pr-8282
Browse files Browse the repository at this point in the history
Fix quorum queue queue federation bugs (backport #8282)
  • Loading branch information
michaelklishin authored May 25, 2023
2 parents 57a2a0a + 1c59dd5 commit f33965e
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 23 deletions.
13 changes: 9 additions & 4 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,10 @@ state_enter0(leader, #?MODULE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation,
FHReservation = [{mod_call, rabbit_quorum_queue,
file_handle_leader_reservation, [Resource]}],
NotifyDecs = notify_decorators_startup(Resource),
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs],
case BLH of
undefined ->
Effects;
Expand Down Expand Up @@ -1299,9 +1301,8 @@ query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) ->
MaxActivePriority = maps:fold(
fun(_, #consumer{credit = C,
status = up,
cfg = #consumer_cfg{priority = P0}},
cfg = #consumer_cfg{priority = P}},
MaxP) when C > 0 ->
P = -P0,
case MaxP of
empty -> P;
MaxP when MaxP > P -> MaxP;
Expand Down Expand Up @@ -2477,6 +2478,10 @@ notify_decorators_effect(QName, MaxActivePriority, IsEmpty) ->
{mod_call, rabbit_quorum_queue, spawn_notify_decorators,
[QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}.

notify_decorators_startup(QName) ->
{mod_call, rabbit_quorum_queue, spawn_notify_decorators,
[QName, startup, []]}.

convert(To, To, State) ->
State;
convert(0, To, State) ->
Expand Down
29 changes: 15 additions & 14 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -655,11 +655,10 @@ state_enter_file_handle_leader_reservation_test(_) ->

Resource = {resource, <<"/">>, queue, <<"test">>},
Effects = rabbit_fifo:state_enter(leader, S0),
?assertMatch([
{mod_call, m, f, [a, the_name]},
_Timer,
{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
], Effects),
?assertMatch([{mod_call, m, f, [a, the_name]},
_Timer,
{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}
| _], Effects),
ok.

state_enter_file_handle_other_reservation_test(_) ->
Expand Down Expand Up @@ -1161,20 +1160,22 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(C) ->
Meta = meta(C, 1),
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
Meta,
make_checkout({CTag, ChannelId},
{once, 1, simple_prefetch}, #{}),
State),
NewState
{NewState, _, _} = apply(
Meta,
make_checkout({CTag, ChannelId},
{once, 1, simple_prefetch}, #{}),
State),
NewState
end,
State1 = lists:foldl(AddConsumer, State0,
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
State1 = lists:foldl(AddConsumer, State0, [{<<"ctag1">>, Pid1},
{<<"ctag2">>, Pid2},
{<<"ctag3">>, Pid2},
{<<"ctag4">>, Pid3}]),

Effects = rabbit_fifo:state_enter(leader, State1),
%% 2 effects for each consumer process (channel process), 1 effect for the node,
%% 1 effect for file handle reservation
?assertEqual(2 * 3 + 1 + 1 + 1, length(Effects)).
?assertEqual(2 * 3 + 1 + 1 + 1 + 1, length(Effects)).

single_active_consumer_state_enter_eol_include_waiting_consumers_test(C) ->
Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbitmq_federation/src/rabbit_federation_link_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,13 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag
connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
Reason = case E of
{error, Value} -> Value;
Other -> Other
end,
log_warning(XorQName, "did not connect to ~ts. Reason: ~tp",
[rabbit_federation_upstream:params_to_string(UParams),
E]),
Reason]),
{stop, {shutdown, restart}, State};

connection_error(remote, E, Upstream, UParams, XorQName, State) ->
Expand Down
14 changes: 11 additions & 3 deletions deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ handle_cast(pause, State = #state{run = false}) ->
handle_cast(pause, State = #not_started{}) ->
{noreply, State#not_started{run = false}};

handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) ->
handle_cast(pause, State = #state{ch = Ch, upstream = Upstream = #upstream{
name = UpName, queue_name = QName
}}) ->
rabbit_log_federation:debug("Federation link of ~s (upstream: '~s'): asked to pause",
[QName, UpName]),
cancel(Ch, Upstream),
{noreply, State#state{run = false}};

Expand Down Expand Up @@ -305,18 +309,22 @@ visit_match(_ ,_) ->
consumer_tag(#upstream{consumer_tag = ConsumerTag}) ->
ConsumerTag.

consume(Ch, Upstream, UQueue) ->
consume(Ch, Upstream = #upstream{name = UpName}, UQueue) ->
ConsumerTag = consumer_tag(Upstream),
NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
rabbit_log_federation:debug("Federation link of ~ts: will consume from the upstream '~ts'",
[rabbit_misc:rs(amqqueue:get_name(UQueue)), UpName]),
amqp_channel:cast(
Ch, #'basic.consume'{queue = name(UQueue),
no_ack = NoAck,
nowait = true,
consumer_tag = ConsumerTag,
arguments = [{<<"x-priority">>, long, -1}]}).

cancel(Ch, Upstream) ->
cancel(Ch, Upstream = #upstream{name = UpName, queue_name = QName}) ->
ConsumerTag = consumer_tag(Upstream),
rabbit_log_federation:debug("Federation queue '~ts' link: will cancel consumer '~ts' on upstream '~ts'",
[QName, ConsumerTag, UpName]),
amqp_channel:cast(Ch, #'basic.cancel'{nowait = true,
consumer_tag = ConsumerTag}).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ params_table(SafeURI, XorQ) ->

params_to_string(#upstream_params{safe_uri = SafeURI,
x_or_q = XorQ}) ->
print("~ts on ~ts", [rabbit_misc:rs(r(XorQ)), SafeURI]).
print("~ts on '~ts'", [rabbit_misc:rs(r(XorQ)), SafeURI]).

remove_credentials(URI) ->
list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))).
Expand Down

0 comments on commit f33965e

Please sign in to comment.