diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 1960eaf03a65..b0f0a43967fb 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -265,15 +265,27 @@ apply(Meta, #settle{msg_ids = MsgIds, _ -> {State, ok} end; -apply(Meta, #discard{consumer_key = ConsumerKey, - msg_ids = MsgIds}, +apply(#{machine_version := 4} = Meta, + #discard{consumer_key = ConsumerKey, + msg_ids = MsgIds}, #?STATE{consumers = Consumers } = State0) -> + %% buggy version that would have not found the consumer if the ConsumerKey + %% was a consumer_id() case find_consumer(ConsumerKey, Consumers) of {ConsumerKey, #consumer{} = Con} -> discard(Meta, MsgIds, ConsumerKey, Con, true, #{}, State0); _ -> {State0, ok} end; +apply(Meta, #discard{consumer_key = ConsumerKey, + msg_ids = MsgIds}, + #?STATE{consumers = Consumers } = State0) -> + case find_consumer(ConsumerKey, Consumers) of + {ActualConsumerKey, #consumer{} = Con} -> + discard(Meta, MsgIds, ActualConsumerKey, Con, true, #{}, State0); + _ -> + {State0, ok} + end; apply(Meta, #return{consumer_key = ConsumerKey, msg_ids = MsgIds}, #?STATE{consumers = Cons} = State) -> @@ -291,13 +303,14 @@ apply(Meta, #modify{consumer_key = ConsumerKey, msg_ids = MsgIds}, #?STATE{consumers = Cons} = State) -> case find_consumer(ConsumerKey, Cons) of - {ConsumerKey, #consumer{checked_out = Checked}} + {ActualConsumerKey, #consumer{checked_out = Checked}} when Undel == false -> - return(Meta, ConsumerKey, MsgIds, DelFailed, + return(Meta, ActualConsumerKey, MsgIds, DelFailed, Anns, Checked, [], State); - {ConsumerKey, #consumer{} = Con} + {ActualConsumerKey, #consumer{} = Con} when Undel == true -> - discard(Meta, MsgIds, ConsumerKey, Con, DelFailed, Anns, State); + discard(Meta, MsgIds, ActualConsumerKey, + Con, DelFailed, Anns, State); _ -> {State, ok} end; @@ -898,13 +911,14 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) -> end. -spec version() -> pos_integer(). -version() -> 4. +version() -> 5. which_module(0) -> rabbit_fifo_v0; which_module(1) -> rabbit_fifo_v1; which_module(2) -> rabbit_fifo_v3; which_module(3) -> rabbit_fifo_v3; -which_module(4) -> ?MODULE. +which_module(4) -> ?MODULE; +which_module(5) -> ?MODULE. -define(AUX, aux_v3). @@ -2520,7 +2534,7 @@ make_checkout({_, _} = ConsumerId, Spec0, Meta) -> make_settle(ConsumerKey, MsgIds) when is_list(MsgIds) -> #settle{consumer_key = ConsumerKey, msg_ids = MsgIds}. --spec make_return(consumer_id(), [msg_id()]) -> protocol(). +-spec make_return(consumer_key(), [msg_id()]) -> protocol(). make_return(ConsumerKey, MsgIds) -> #return{consumer_key = ConsumerKey, msg_ids = MsgIds}. @@ -2528,7 +2542,7 @@ make_return(ConsumerKey, MsgIds) -> is_return(Command) -> is_record(Command, return). --spec make_discard(consumer_id(), [msg_id()]) -> protocol(). +-spec make_discard(consumer_key(), [msg_id()]) -> protocol(). make_discard(ConsumerKey, MsgIds) -> #discard{consumer_key = ConsumerKey, msg_ids = MsgIds}. @@ -2701,7 +2715,10 @@ convert(Meta, 1, To, State) -> convert(Meta, 2, To, State) -> convert(Meta, 3, To, rabbit_fifo_v3:convert_v2_to_v3(State)); convert(Meta, 3, To, State) -> - convert(Meta, 4, To, convert_v3_to_v4(Meta, State)). + convert(Meta, 4, To, convert_v3_to_v4(Meta, State)); +convert(Meta, 4, To, State) -> + %% no conversion needed, this version only includes a logic change + convert(Meta, 5, To, State). smallest_raft_index(#?STATE{messages = Messages, ra_indexes = Indexes, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 0643842bf511..06341e37b851 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -174,6 +174,7 @@ all_tests() -> per_message_ttl_expiration_too_high, consumer_priorities, cancel_consumer_gh_3729, + cancel_consumer_gh_12424, cancel_and_consume_with_same_tag, validate_messages_on_queue, amqpl_headers, @@ -3600,6 +3601,37 @@ cancel_consumer_gh_3729(Config) -> ok = rabbit_ct_client_helpers:close_channel(Ch). +cancel_consumer_gh_12424(Config) -> + QQ = ?config(queue_name, Config), + + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0}, + DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + ?assertMatch(ExpectedDeclareRslt0, DeclareRslt0), + + ok = publish(Ch, QQ), + + ok = subscribe(Ch, QQ, false), + + DeliveryTag = receive + {#'basic.deliver'{delivery_tag = DT}, _} -> + DT + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + ok = cancel(Ch), + + R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = false}, + ok = amqp_channel:cast(Ch, R), + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), + + ok. + + %% Test the scenario where a message is published to a quorum queue cancel_and_consume_with_same_tag(Config) -> %% https://github.com/rabbitmq/rabbitmq-server/issues/5927 QQ = ?config(queue_name, Config), diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 8d45aecca10f..e14b9406eee8 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -42,12 +42,12 @@ groups() -> ]. init_per_group(tests, Config) -> - [{machine_version, 4} | Config]; + [{machine_version, 5} | Config]; init_per_group(machine_version_conversion, Config) -> Config. init_per_testcase(_Testcase, Config) -> - FF = ?config(machine_version, Config) == 4, + FF = ?config(machine_version, Config) == 5, ok = meck:new(rabbit_feature_flags, [passthrough]), meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> FF end), Config. @@ -804,6 +804,19 @@ discarded_message_with_dead_letter_handler_emits_log_effect_test(Config) -> ok. +discard_after_cancel_test(Config) -> + Cid = {?FUNCTION_NAME_B, self()}, + {State0, _} = enq(Config, 1, 1, first, test_init(test)), + {State1, #{key := _CKey, + next_msg_id := MsgId}, _Effects1} = + checkout(Config, ?LINE, Cid, 10, State0), + {State2, _, _} = apply(meta(Config, ?LINE), + rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + {State, _, _} = apply(meta(Config, ?LINE), + rabbit_fifo:make_discard(Cid, [MsgId]), State2), + ct:pal("State ~p", [State]), + ok. + enqueued_msg_with_delivery_count_test(Config) -> State00 = init(#{name => test, queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), @@ -2786,45 +2799,9 @@ modify_test(Config) -> ok. -ttb_test(Config) -> - S0 = init(#{name => ?FUNCTION_NAME, - queue_resource => - rabbit_misc:r("/", queue, ?FUNCTION_NAME_B)}), - - - S1 = do_n(5_000_000, - fun (N, Acc) -> - I = (5_000_000 - N), - element(1, enq(Config, I, I, ?FUNCTION_NAME_B, Acc)) - end, S0), - - - - {T1, _Res} = timer:tc(fun () -> - do_n(100, fun (_, S) -> - term_to_binary(S), - S1 end, S1) - end), - ct:pal("T1 took ~bus", [T1]), - - - {T2, _} = timer:tc(fun () -> - do_n(100, fun (_, S) -> term_to_iovec(S), S1 end, S1) - end), - ct:pal("T2 took ~bus", [T2]), - - ok. - %% Utility %% -do_n(0, _, A) -> - A; -do_n(N, Fun, A0) -> - A = Fun(N, A0), - do_n(N-1, Fun, A). - - init(Conf) -> rabbit_fifo:init(Conf). make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid). apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).