Skip to content

Commit

Permalink
Fix quorum queue queue federation bugs
Browse files Browse the repository at this point in the history
Quorum queues would due to a misunderstanding whilst porting
queue decorators from classic queues calculate the wrong maximum
consumer priority leading to strange behaviour when using queue
federation.

Also notify decorators on startup. In this case we had to do this
when a member becomes leader which isn't always at startup so
will have to rely on the mirroried supervisor to ensure we don't
start multiple federation links.
  • Loading branch information
kjnilsson authored and lukebakken committed May 23, 2023
1 parent f83ed85 commit 21b9eb5
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,10 @@ state_enter0(leader, #?MODULE{consumers = Cons,
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation,
FHReservation = [{mod_call, rabbit_quorum_queue,
file_handle_leader_reservation, [Resource]}],
NotifyDecs = notify_decorators_startup(Resource),
Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs],
case BLH of
undefined ->
Effects;
Expand Down Expand Up @@ -1299,9 +1301,8 @@ query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) ->
MaxActivePriority = maps:fold(
fun(_, #consumer{credit = C,
status = up,
cfg = #consumer_cfg{priority = P0}},
cfg = #consumer_cfg{priority = P}},
MaxP) when C > 0 ->
P = -P0,
case MaxP of
empty -> P;
MaxP when MaxP > P -> MaxP;
Expand Down Expand Up @@ -2477,6 +2478,10 @@ notify_decorators_effect(QName, MaxActivePriority, IsEmpty) ->
{mod_call, rabbit_quorum_queue, spawn_notify_decorators,
[QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}.

notify_decorators_startup(QName) ->
{mod_call, rabbit_quorum_queue, spawn_notify_decorators,
[QName, startup, []]}.

convert(To, To, State) ->
State;
convert(0, To, State) ->
Expand Down

0 comments on commit 21b9eb5

Please sign in to comment.