Skip to content

Commit

Permalink
Merge pull request #7444 from rabbitmq/mergify/bp/v3.12.x/pr-7175
Browse files Browse the repository at this point in the history
Quorum queues: chunk consumer log effects (backport #7175)
  • Loading branch information
michaelklishin authored Feb 27, 2023
2 parents c83f3e1 + 786c198 commit 7e10327
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 13 deletions.
48 changes: 36 additions & 12 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
]).

-ifdef(TEST).
-export([update_header/4]).
-export([update_header/4,
chunk_disk_msgs/3]).
-endif.

%% command records representing all the protocol actions that are supported
Expand Down Expand Up @@ -1876,9 +1877,9 @@ checkout(#{index := Index} = Meta,
end.

checkout0(Meta, {success, ConsumerId, MsgId,
?MSG(RaftIdx, Header), ExpiredMsg, State, Effects},
?MSG(_RaftIdx, _Header) = Msg, ExpiredMsg, State, Effects},
SendAcc0) ->
DelMsg = {RaftIdx, {MsgId, Header}},
DelMsg = {MsgId, Msg},
SendAcc = case maps:get(ConsumerId, SendAcc0, undefined) of
undefined ->
SendAcc0#{ConsumerId => [DelMsg]};
Expand All @@ -1887,7 +1888,7 @@ checkout0(Meta, {success, ConsumerId, MsgId,
end,
checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc);
checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
Effects = append_delivery_effects(Effects0, SendAcc, State0),
Effects = add_delivery_effects(Effects0, SendAcc, State0),
{State0, ExpiredMsg, lists:reverse(Effects)}.

evaluate_limit(_Index, Result, _BeforeState,
Expand Down Expand Up @@ -1942,12 +1943,33 @@ evaluate_limit(Index, Result, BeforeState,
{State0, Result, Effects0}
end.

append_delivery_effects(Effects0, AccMap, _State) when map_size(AccMap) == 0 ->

%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]]
chunk_disk_msgs([], _Bytes, [[] | Chunks]) ->
Chunks;
chunk_disk_msgs([], _Bytes, Chunks) ->
Chunks;
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem],
Bytes, Chunks)
when Bytes >= ?DELIVERY_CHUNK_LIMIT_B ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Size, [[Msg] | Chunks]);
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], Bytes,
[CurChunk | Chunks]) ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Bytes + Size, [[Msg | CurChunk] | Chunks]).

add_delivery_effects(Effects0, AccMap, _State)
when map_size(AccMap) == 0 ->
%% does this ever happen?
Effects0;
append_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Ef) when is_list(DiskMsgs) ->
[delivery_effect(C, lists:reverse(DiskMsgs), State) | Ef]
add_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Efs)
when is_list(DiskMsgs) ->
lists:foldl(
fun (Msgs, E) ->
[delivery_effect(C, Msgs, State) | E]
end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]]))
end, Effects0, AccMap).

take_next_msg(#?MODULE{returns = Returns0,
Expand Down Expand Up @@ -1978,18 +2000,20 @@ get_next_msg(#?MODULE{returns = Returns0,
Msg
end.

delivery_effect({CTag, CPid}, [{Idx, {MsgId, Header}}],
delivery_effect({CTag, CPid}, [{MsgId, ?MSG(Idx, Header)}],
#?MODULE{msg_cache = {Idx, RawMsg}}) ->
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
[local, ra_event]};
delivery_effect({CTag, CPid}, Msgs, _State) ->
{RaftIdxs, Data} = lists:unzip(Msgs),
RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) ->
[I | Acc]
end, [], Msgs),
{log, RaftIdxs,
fun(Log) ->
DelMsgs = lists:zipwith(
fun (Cmd, {MsgId, Header}) ->
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
{MsgId, {Header, get_msg(Cmd)}}
end, Log, Data),
end, Log, Msgs),
[{send_msg, CPid, {delivery, CTag, DelMsgs}, [local, ra_event]}]
end,
{local, node(CPid)}}.
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@

-define(MB, 1_048_576).
-define(LOW_LIMIT, 0.8).
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).

-record(consumer_cfg,
{meta = #{} :: consumer_meta(),
Expand Down
21 changes: 20 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ all_tests() ->
per_message_ttl_expiration_too_high,
consumer_priorities,
cancel_consumer_gh_3729,
cancel_and_consume_with_same_tag
cancel_and_consume_with_same_tag,
validate_messages_on_queue

].

memory_tests() ->
Expand Down Expand Up @@ -2936,6 +2938,23 @@ cancel_and_consume_with_same_tag(Config) ->



ok.

validate_messages_on_queue(Config) ->
QQ = ?config(queue_name, Config),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
#'queue.declare_ok'{} = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
Messages = [begin
M = <<I:8000/integer>>,
publish(Ch, QQ, M),
M
end || I <- lists:seq(1, 200)],
amqp_channel:wait_for_confirms_or_die(Ch),
validate_queue(Ch, QQ, Messages),

ok.

leader_locator_client_local(Config) ->
Expand Down
21 changes: 21 additions & 0 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,27 @@ header_test(_) ->
?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)),
ok.

chunk_disk_msgs_test(_Config) ->
%% NB: this does test an internal function
%% input to this function is a reversed list of MSGs
Input = [{I, ?MSG(I, 1000)} || I <- lists:seq(200, 1, -1)],
Chunks = rabbit_fifo:chunk_disk_msgs(Input, 0, [[]]),
?assertMatch([_, _], Chunks),
[Chunk1, Chunk2] = Chunks,
?assertMatch([{1, ?MSG(1, 1000)} | _], Chunk1),
%% the chunks are worked out in backwards order, hence the first chunk
%% will be a "remainder" chunk
?assertMatch([{73, ?MSG(73, 1000)} | _], Chunk2),
?assertEqual(128, length(Chunk2)),
?assertEqual(72, length(Chunk1)),

TwoBigMsgs = [{124, ?MSG(124, 200_000)},
{123, ?MSG(123, 200_000)}],
?assertMatch([[{123, ?MSG(123, 200_000)}],
[{124, ?MSG(124, 200_000)}]],
rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])),
ok.

%% Utility

init(Conf) -> rabbit_fifo:init(Conf).
Expand Down

0 comments on commit 7e10327

Please sign in to comment.