Skip to content

Commit

Permalink
Chunk quorum queue deliveries
Browse files Browse the repository at this point in the history
This puts a limit to the amount of message data that is added
to the process heap at the same time to around 128KB.

Large prefetch values combined with large messages could cause
excessive garbage collection work.

Also similify the intermediate delivery message format to avoid
allocations that aren't necessary.
  • Loading branch information
kjnilsson committed Feb 27, 2023
1 parent 4f25779 commit ede4f67
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 13 deletions.
48 changes: 36 additions & 12 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
]).

-ifdef(TEST).
-export([update_header/4]).
-export([update_header/4,
chunk_disk_msgs/3]).
-endif.

%% command records representing all the protocol actions that are supported
Expand Down Expand Up @@ -1876,9 +1877,9 @@ checkout(#{index := Index} = Meta,
end.

checkout0(Meta, {success, ConsumerId, MsgId,
?MSG(RaftIdx, Header), ExpiredMsg, State, Effects},
?MSG(_RaftIdx, _Header) = Msg, ExpiredMsg, State, Effects},
SendAcc0) ->
DelMsg = {RaftIdx, {MsgId, Header}},
DelMsg = {MsgId, Msg},
SendAcc = case maps:get(ConsumerId, SendAcc0, undefined) of
undefined ->
SendAcc0#{ConsumerId => [DelMsg]};
Expand All @@ -1887,7 +1888,7 @@ checkout0(Meta, {success, ConsumerId, MsgId,
end,
checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc);
checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
Effects = append_delivery_effects(Effects0, SendAcc, State0),
Effects = add_delivery_effects(Effects0, SendAcc, State0),
{State0, ExpiredMsg, lists:reverse(Effects)}.

evaluate_limit(_Index, Result, _BeforeState,
Expand Down Expand Up @@ -1942,12 +1943,33 @@ evaluate_limit(Index, Result, BeforeState,
{State0, Result, Effects0}
end.

append_delivery_effects(Effects0, AccMap, _State) when map_size(AccMap) == 0 ->

%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]]
chunk_disk_msgs([], _Bytes, [[] | Chunks]) ->
Chunks;
chunk_disk_msgs([], _Bytes, Chunks) ->
Chunks;
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem],
Bytes, Chunks)
when Bytes >= ?DELIVERY_CHUNK_LIMIT_B ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Size, [[Msg] | Chunks]);
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], Bytes,
[CurChunk | Chunks]) ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Bytes + Size, [[Msg | CurChunk] | Chunks]).

add_delivery_effects(Effects0, AccMap, _State)
when map_size(AccMap) == 0 ->
%% does this ever happen?
Effects0;
append_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Ef) when is_list(DiskMsgs) ->
[delivery_effect(C, lists:reverse(DiskMsgs), State) | Ef]
add_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Efs)
when is_list(DiskMsgs) ->
lists:foldl(
fun (Msgs, E) ->
[delivery_effect(C, Msgs, State) | E]
end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]]))
end, Effects0, AccMap).

take_next_msg(#?MODULE{returns = Returns0,
Expand Down Expand Up @@ -1978,18 +2000,20 @@ get_next_msg(#?MODULE{returns = Returns0,
Msg
end.

delivery_effect({CTag, CPid}, [{Idx, {MsgId, Header}}],
delivery_effect({CTag, CPid}, [{MsgId, ?MSG(Idx, Header)}],
#?MODULE{msg_cache = {Idx, RawMsg}}) ->
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
[local, ra_event]};
delivery_effect({CTag, CPid}, Msgs, _State) ->
{RaftIdxs, Data} = lists:unzip(Msgs),
RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) ->
[I | Acc]
end, [], Msgs),
{log, RaftIdxs,
fun(Log) ->
DelMsgs = lists:zipwith(
fun (Cmd, {MsgId, Header}) ->
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
{MsgId, {Header, get_msg(Cmd)}}
end, Log, Data),
end, Log, Msgs),
[{send_msg, CPid, {delivery, CTag, DelMsgs}, [local, ra_event]}]
end,
{local, node(CPid)}}.
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@

-define(MB, 1_048_576).
-define(LOW_LIMIT, 0.8).
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).

-record(consumer_cfg,
{meta = #{} :: consumer_meta(),
Expand Down
21 changes: 20 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ all_tests() ->
per_message_ttl_expiration_too_high,
consumer_priorities,
cancel_consumer_gh_3729,
cancel_and_consume_with_same_tag
cancel_and_consume_with_same_tag,
validate_messages_on_queue

].

memory_tests() ->
Expand Down Expand Up @@ -2936,6 +2938,23 @@ cancel_and_consume_with_same_tag(Config) ->



ok.

validate_messages_on_queue(Config) ->
QQ = ?config(queue_name, Config),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
#'queue.declare_ok'{} = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
Messages = [begin
M = <<I:8000/integer>>,
publish(Ch, QQ, M),
M
end || I <- lists:seq(1, 200)],
amqp_channel:wait_for_confirms_or_die(Ch),
validate_queue(Ch, QQ, Messages),

ok.

leader_locator_client_local(Config) ->
Expand Down
21 changes: 21 additions & 0 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,27 @@ header_test(_) ->
?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)),
ok.

chunk_disk_msgs_test(_Config) ->
%% NB: this does test an internal function
%% input to this function is a reversed list of MSGs
Input = [{I, ?MSG(I, 1000)} || I <- lists:seq(200, 1, -1)],
Chunks = rabbit_fifo:chunk_disk_msgs(Input, 0, [[]]),
?assertMatch([_, _], Chunks),
[Chunk1, Chunk2] = Chunks,
?assertMatch([{1, ?MSG(1, 1000)} | _], Chunk1),
%% the chunks are worked out in backwards order, hence the first chunk
%% will be a "remainder" chunk
?assertMatch([{73, ?MSG(73, 1000)} | _], Chunk2),
?assertEqual(128, length(Chunk2)),
?assertEqual(72, length(Chunk1)),

TwoBigMsgs = [{124, ?MSG(124, 200_000)},
{123, ?MSG(123, 200_000)}],
?assertMatch([[{123, ?MSG(123, 200_000)}],
[{124, ?MSG(124, 200_000)}]],
rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])),
ok.

%% Utility

init(Conf) -> rabbit_fifo:init(Conf).
Expand Down

0 comments on commit ede4f67

Please sign in to comment.