From 26521eb93c35a95a9b8e3305f5af6fa52f1c5d3b Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 23 May 2023 11:51:29 +0100 Subject: [PATCH 1/3] Fix quorum queue queue federation bugs Quorum queues would due to a misunderstanding whilst porting queue decorators from classic queues calculate the wrong maximum consumer priority leading to strange behaviour when using queue federation. Also notify decorators on startup. In this case we had to do this when a member becomes leader which isn't always at startup so will have to rely on the mirroried supervisor to ensure we don't start multiple federation links. (cherry picked from commit 21b9eb5e984449425ba5cd695d95e56a6988bdf8) --- deps/rabbit/src/rabbit_fifo.erl | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index eba9be0d3528..c371611acf3c 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -861,8 +861,10 @@ state_enter0(leader, #?MODULE{consumers = Cons, Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), - FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}], - Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation, + FHReservation = [{mod_call, rabbit_quorum_queue, + file_handle_leader_reservation, [Resource]}], + NotifyDecs = notify_decorators_startup(Resource), + Effects = TimerEffs ++ Mons ++ Nots ++ NodeMons ++ FHReservation ++ [NotifyDecs], case BLH of undefined -> Effects; @@ -1299,9 +1301,8 @@ query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) -> MaxActivePriority = maps:fold( fun(_, #consumer{credit = C, status = up, - cfg = #consumer_cfg{priority = P0}}, + cfg = #consumer_cfg{priority = P}}, MaxP) when C > 0 -> - P = -P0, case MaxP of empty -> P; MaxP when MaxP > P -> MaxP; @@ -2477,6 +2478,10 @@ notify_decorators_effect(QName, MaxActivePriority, IsEmpty) -> {mod_call, rabbit_quorum_queue, spawn_notify_decorators, [QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}. +notify_decorators_startup(QName) -> + {mod_call, rabbit_quorum_queue, spawn_notify_decorators, + [QName, startup, []]}. + convert(To, To, State) -> State; convert(0, To, State) -> From 4b17d59c6aca4c253ba307e9a41d5befa1b339eb Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 24 May 2023 10:51:58 +0100 Subject: [PATCH 2/3] test fix (cherry picked from commit 1c727a4ee4c281c229ddb6be5dd8f7d87167e920) --- deps/rabbit/test/rabbit_fifo_SUITE.erl | 29 +++++++++++++------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 12eaa04138cb..aaf82be3de79 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -655,11 +655,10 @@ state_enter_file_handle_leader_reservation_test(_) -> Resource = {resource, <<"/">>, queue, <<"test">>}, Effects = rabbit_fifo:state_enter(leader, S0), - ?assertMatch([ - {mod_call, m, f, [a, the_name]}, - _Timer, - {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} - ], Effects), + ?assertMatch([{mod_call, m, f, [a, the_name]}, + _Timer, + {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} + | _], Effects), ok. state_enter_file_handle_other_reservation_test(_) -> @@ -1161,20 +1160,22 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(C) -> Meta = meta(C, 1), % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> - {NewState, _, _} = apply( - Meta, - make_checkout({CTag, ChannelId}, - {once, 1, simple_prefetch}, #{}), - State), - NewState + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState end, - State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + State1 = lists:foldl(AddConsumer, State0, [{<<"ctag1">>, Pid1}, + {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, + {<<"ctag4">>, Pid3}]), Effects = rabbit_fifo:state_enter(leader, State1), %% 2 effects for each consumer process (channel process), 1 effect for the node, %% 1 effect for file handle reservation - ?assertEqual(2 * 3 + 1 + 1 + 1, length(Effects)). + ?assertEqual(2 * 3 + 1 + 1 + 1 + 1, length(Effects)). single_active_consumer_state_enter_eol_include_waiting_consumers_test(C) -> Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), From 1c59dd5361bacadaf6ca24ed84fddf21ad1fd46c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 24 May 2023 14:27:09 +0400 Subject: [PATCH 3/3] Queue federation: improve logging around federated queue link state transitions (pause/unpause). References #8297. (cherry picked from commit 958043caaa548e0e0b07b80e7a7ffcc06bf676a3) --- .../src/rabbit_federation_link_util.erl | 6 +++++- .../src/rabbit_federation_queue_link.erl | 14 +++++++++++--- .../src/rabbit_federation_upstream.erl | 2 +- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl index fb84ee6bf7fc..ad61f5cb6a9f 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl @@ -140,9 +140,13 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag connection_error(remote_start, E, Upstream, UParams, XorQName, State) -> rabbit_federation_status:report( Upstream, UParams, XorQName, clean_reason(E)), + Reason = case E of + {error, Value} -> Value; + Other -> Other + end, log_warning(XorQName, "did not connect to ~ts. Reason: ~tp", [rabbit_federation_upstream:params_to_string(UParams), - E]), + Reason]), {stop, {shutdown, restart}, State}; connection_error(remote, E, Upstream, UParams, XorQName, State) -> diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl index f4b2290ba885..095441f40566 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl @@ -104,7 +104,11 @@ handle_cast(pause, State = #state{run = false}) -> handle_cast(pause, State = #not_started{}) -> {noreply, State#not_started{run = false}}; -handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) -> +handle_cast(pause, State = #state{ch = Ch, upstream = Upstream = #upstream{ + name = UpName, queue_name = QName + }}) -> + rabbit_log_federation:debug("Federation link of ~s (upstream: '~s'): asked to pause", + [QName, UpName]), cancel(Ch, Upstream), {noreply, State#state{run = false}}; @@ -305,9 +309,11 @@ visit_match(_ ,_) -> consumer_tag(#upstream{consumer_tag = ConsumerTag}) -> ConsumerTag. -consume(Ch, Upstream, UQueue) -> +consume(Ch, Upstream = #upstream{name = UpName}, UQueue) -> ConsumerTag = consumer_tag(Upstream), NoAck = Upstream#upstream.ack_mode =:= 'no-ack', + rabbit_log_federation:debug("Federation link of ~ts: will consume from the upstream '~ts'", + [rabbit_misc:rs(amqqueue:get_name(UQueue)), UpName]), amqp_channel:cast( Ch, #'basic.consume'{queue = name(UQueue), no_ack = NoAck, @@ -315,8 +321,10 @@ consume(Ch, Upstream, UQueue) -> consumer_tag = ConsumerTag, arguments = [{<<"x-priority">>, long, -1}]}). -cancel(Ch, Upstream) -> +cancel(Ch, Upstream = #upstream{name = UpName, queue_name = QName}) -> ConsumerTag = consumer_tag(Upstream), + rabbit_log_federation:debug("Federation queue '~ts' link: will cancel consumer '~ts' on upstream '~ts'", + [QName, ConsumerTag, UpName]), amqp_channel:cast(Ch, #'basic.cancel'{nowait = true, consumer_tag = ConsumerTag}). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index 161264c86a1a..6626400da864 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -61,7 +61,7 @@ params_table(SafeURI, XorQ) -> params_to_string(#upstream_params{safe_uri = SafeURI, x_or_q = XorQ}) -> - print("~ts on ~ts", [rabbit_misc:rs(r(XorQ)), SafeURI]). + print("~ts on '~ts'", [rabbit_misc:rs(r(XorQ)), SafeURI]). remove_credentials(URI) -> list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))).