diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index c45730406a22..15036697b989 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -72,7 +72,8 @@ ]). -ifdef(TEST). --export([update_header/4]). +-export([update_header/4, + chunk_disk_msgs/3]). -endif. %% command records representing all the protocol actions that are supported @@ -1876,9 +1877,9 @@ checkout(#{index := Index} = Meta, end. checkout0(Meta, {success, ConsumerId, MsgId, - ?MSG(RaftIdx, Header), ExpiredMsg, State, Effects}, + ?MSG(_RaftIdx, _Header) = Msg, ExpiredMsg, State, Effects}, SendAcc0) -> - DelMsg = {RaftIdx, {MsgId, Header}}, + DelMsg = {MsgId, Msg}, SendAcc = case maps:get(ConsumerId, SendAcc0, undefined) of undefined -> SendAcc0#{ConsumerId => [DelMsg]}; @@ -1887,7 +1888,7 @@ checkout0(Meta, {success, ConsumerId, MsgId, end, checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc); checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) -> - Effects = append_delivery_effects(Effects0, SendAcc, State0), + Effects = add_delivery_effects(Effects0, SendAcc, State0), {State0, ExpiredMsg, lists:reverse(Effects)}. evaluate_limit(_Index, Result, _BeforeState, @@ -1942,12 +1943,33 @@ evaluate_limit(Index, Result, BeforeState, {State0, Result, Effects0} end. -append_delivery_effects(Effects0, AccMap, _State) when map_size(AccMap) == 0 -> + +%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]] +chunk_disk_msgs([], _Bytes, [[] | Chunks]) -> + Chunks; +chunk_disk_msgs([], _Bytes, Chunks) -> + Chunks; +chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], + Bytes, Chunks) + when Bytes >= ?DELIVERY_CHUNK_LIMIT_B -> + Size = get_header(size, Header), + chunk_disk_msgs(Rem, Size, [[Msg] | Chunks]); +chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], Bytes, + [CurChunk | Chunks]) -> + Size = get_header(size, Header), + chunk_disk_msgs(Rem, Bytes + Size, [[Msg | CurChunk] | Chunks]). + +add_delivery_effects(Effects0, AccMap, _State) + when map_size(AccMap) == 0 -> %% does this ever happen? Effects0; -append_delivery_effects(Effects0, AccMap, State) -> - maps:fold(fun (C, DiskMsgs, Ef) when is_list(DiskMsgs) -> - [delivery_effect(C, lists:reverse(DiskMsgs), State) | Ef] +add_delivery_effects(Effects0, AccMap, State) -> + maps:fold(fun (C, DiskMsgs, Efs) + when is_list(DiskMsgs) -> + lists:foldl( + fun (Msgs, E) -> + [delivery_effect(C, Msgs, State) | E] + end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]])) end, Effects0, AccMap). take_next_msg(#?MODULE{returns = Returns0, @@ -1978,18 +2000,20 @@ get_next_msg(#?MODULE{returns = Returns0, Msg end. -delivery_effect({CTag, CPid}, [{Idx, {MsgId, Header}}], +delivery_effect({CTag, CPid}, [{MsgId, ?MSG(Idx, Header)}], #?MODULE{msg_cache = {Idx, RawMsg}}) -> {send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]}, [local, ra_event]}; delivery_effect({CTag, CPid}, Msgs, _State) -> - {RaftIdxs, Data} = lists:unzip(Msgs), + RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) -> + [I | Acc] + end, [], Msgs), {log, RaftIdxs, fun(Log) -> DelMsgs = lists:zipwith( - fun (Cmd, {MsgId, Header}) -> + fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) -> {MsgId, {Header, get_msg(Cmd)}} - end, Log, Data), + end, Log, Msgs), [{send_msg, CPid, {delivery, CTag, DelMsgs}, [local, ra_event]}] end, {local, node(CPid)}}. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index 1d90610a3fc2..b4c37c70731a 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -96,6 +96,7 @@ -define(MB, 1_048_576). -define(LOW_LIMIT, 0.8). +-define(DELIVERY_CHUNK_LIMIT_B, 128_000). -record(consumer_cfg, {meta = #{} :: consumer_meta(), diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 15f15102a68f..e445ee69b5a0 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -153,7 +153,9 @@ all_tests() -> per_message_ttl_expiration_too_high, consumer_priorities, cancel_consumer_gh_3729, - cancel_and_consume_with_same_tag + cancel_and_consume_with_same_tag, + validate_messages_on_queue + ]. memory_tests() -> @@ -2936,6 +2938,23 @@ cancel_and_consume_with_same_tag(Config) -> + ok. + +validate_messages_on_queue(Config) -> + QQ = ?config(queue_name, Config), + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare_ok'{} = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + Messages = [begin + M = <>, + publish(Ch, QQ, M), + M + end || I <- lists:seq(1, 200)], + amqp_channel:wait_for_confirms_or_die(Ch), + validate_queue(Ch, QQ, Messages), + ok. leader_locator_client_local(Config) -> diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index b24673e7b661..9bc387176e25 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -1971,6 +1971,27 @@ header_test(_) -> ?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)), ok. +chunk_disk_msgs_test(_Config) -> + %% NB: this does test an internal function + %% input to this function is a reversed list of MSGs + Input = [{I, ?MSG(I, 1000)} || I <- lists:seq(200, 1, -1)], + Chunks = rabbit_fifo:chunk_disk_msgs(Input, 0, [[]]), + ?assertMatch([_, _], Chunks), + [Chunk1, Chunk2] = Chunks, + ?assertMatch([{1, ?MSG(1, 1000)} | _], Chunk1), + %% the chunks are worked out in backwards order, hence the first chunk + %% will be a "remainder" chunk + ?assertMatch([{73, ?MSG(73, 1000)} | _], Chunk2), + ?assertEqual(128, length(Chunk2)), + ?assertEqual(72, length(Chunk1)), + + TwoBigMsgs = [{124, ?MSG(124, 200_000)}, + {123, ?MSG(123, 200_000)}], + ?assertMatch([[{123, ?MSG(123, 200_000)}], + [{124, ?MSG(124, 200_000)}]], + rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])), + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf).