Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quorum queues: chunk consumer log effects #7175

Merged
merged 1 commit into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
]).

-ifdef(TEST).
-export([update_header/4]).
-export([update_header/4,
chunk_disk_msgs/3]).
-endif.

%% command records representing all the protocol actions that are supported
Expand Down Expand Up @@ -1876,9 +1877,9 @@ checkout(#{index := Index} = Meta,
end.

checkout0(Meta, {success, ConsumerId, MsgId,
?MSG(RaftIdx, Header), ExpiredMsg, State, Effects},
?MSG(_RaftIdx, _Header) = Msg, ExpiredMsg, State, Effects},
SendAcc0) ->
DelMsg = {RaftIdx, {MsgId, Header}},
DelMsg = {MsgId, Msg},
SendAcc = case maps:get(ConsumerId, SendAcc0, undefined) of
undefined ->
SendAcc0#{ConsumerId => [DelMsg]};
Expand All @@ -1887,7 +1888,7 @@ checkout0(Meta, {success, ConsumerId, MsgId,
end,
checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc);
checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
Effects = append_delivery_effects(Effects0, SendAcc, State0),
Effects = add_delivery_effects(Effects0, SendAcc, State0),
{State0, ExpiredMsg, lists:reverse(Effects)}.

evaluate_limit(_Index, Result, _BeforeState,
Expand Down Expand Up @@ -1942,12 +1943,33 @@ evaluate_limit(Index, Result, BeforeState,
{State0, Result, Effects0}
end.

append_delivery_effects(Effects0, AccMap, _State) when map_size(AccMap) == 0 ->

%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]]
chunk_disk_msgs([], _Bytes, [[] | Chunks]) ->
Chunks;
chunk_disk_msgs([], _Bytes, Chunks) ->
Chunks;
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem],
Bytes, Chunks)
when Bytes >= ?DELIVERY_CHUNK_LIMIT_B ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Size, [[Msg] | Chunks]);
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], Bytes,
[CurChunk | Chunks]) ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Bytes + Size, [[Msg | CurChunk] | Chunks]).

add_delivery_effects(Effects0, AccMap, _State)
when map_size(AccMap) == 0 ->
%% does this ever happen?
Effects0;
append_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Ef) when is_list(DiskMsgs) ->
[delivery_effect(C, lists:reverse(DiskMsgs), State) | Ef]
add_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Efs)
when is_list(DiskMsgs) ->
lists:foldl(
fun (Msgs, E) ->
[delivery_effect(C, Msgs, State) | E]
end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]]))
end, Effects0, AccMap).

take_next_msg(#?MODULE{returns = Returns0,
Expand Down Expand Up @@ -1978,18 +2000,20 @@ get_next_msg(#?MODULE{returns = Returns0,
Msg
end.

delivery_effect({CTag, CPid}, [{Idx, {MsgId, Header}}],
delivery_effect({CTag, CPid}, [{MsgId, ?MSG(Idx, Header)}],
#?MODULE{msg_cache = {Idx, RawMsg}}) ->
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
[local, ra_event]};
delivery_effect({CTag, CPid}, Msgs, _State) ->
{RaftIdxs, Data} = lists:unzip(Msgs),
RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) ->
[I | Acc]
end, [], Msgs),
{log, RaftIdxs,
fun(Log) ->
DelMsgs = lists:zipwith(
fun (Cmd, {MsgId, Header}) ->
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
{MsgId, {Header, get_msg(Cmd)}}
end, Log, Data),
end, Log, Msgs),
[{send_msg, CPid, {delivery, CTag, DelMsgs}, [local, ra_event]}]
end,
{local, node(CPid)}}.
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@

-define(MB, 1_048_576).
-define(LOW_LIMIT, 0.8).
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).

-record(consumer_cfg,
{meta = #{} :: consumer_meta(),
Expand Down
21 changes: 20 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ all_tests() ->
per_message_ttl_expiration_too_high,
consumer_priorities,
cancel_consumer_gh_3729,
cancel_and_consume_with_same_tag
cancel_and_consume_with_same_tag,
validate_messages_on_queue

].

memory_tests() ->
Expand Down Expand Up @@ -2936,6 +2938,23 @@ cancel_and_consume_with_same_tag(Config) ->



ok.

validate_messages_on_queue(Config) ->
QQ = ?config(queue_name, Config),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
#'queue.declare_ok'{} = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
Messages = [begin
M = <<I:8000/integer>>,
publish(Ch, QQ, M),
M
end || I <- lists:seq(1, 200)],
amqp_channel:wait_for_confirms_or_die(Ch),
validate_queue(Ch, QQ, Messages),

ok.

leader_locator_client_local(Config) ->
Expand Down
21 changes: 21 additions & 0 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,27 @@ header_test(_) ->
?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)),
ok.

chunk_disk_msgs_test(_Config) ->
%% NB: this does test an internal function
%% input to this function is a reversed list of MSGs
Input = [{I, ?MSG(I, 1000)} || I <- lists:seq(200, 1, -1)],
Chunks = rabbit_fifo:chunk_disk_msgs(Input, 0, [[]]),
?assertMatch([_, _], Chunks),
[Chunk1, Chunk2] = Chunks,
?assertMatch([{1, ?MSG(1, 1000)} | _], Chunk1),
%% the chunks are worked out in backwards order, hence the first chunk
%% will be a "remainder" chunk
?assertMatch([{73, ?MSG(73, 1000)} | _], Chunk2),
?assertEqual(128, length(Chunk2)),
?assertEqual(72, length(Chunk1)),

TwoBigMsgs = [{124, ?MSG(124, 200_000)},
{123, ?MSG(123, 200_000)}],
?assertMatch([[{123, ?MSG(123, 200_000)}],
[{124, ?MSG(124, 200_000)}]],
rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])),
ok.

%% Utility

init(Conf) -> rabbit_fifo:init(Conf).
Expand Down