Skip to content

Commit

Permalink
Tests for shared message store file scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
lhoguin committed Mar 11, 2024
1 parent f771754 commit 74b9811
Showing 1 changed file with 186 additions and 0 deletions.
186 changes: 186 additions & 0 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ groups() ->
[
{backing_queue_tests, [], [
msg_store,
msg_store_file_scan,
{backing_queue_v2, [], Common ++ V2Only},
{backing_queue_v1, [], Common}
]}
Expand Down Expand Up @@ -535,6 +536,191 @@ test_msg_store_client_delete_and_terminate(GenRef) ->
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
passed.

%% -------------------------------------------------------------------
%% Message store file scanning.
%% -------------------------------------------------------------------

%% While it is possible although very unlikely that this test case
%% produces false positives, all failures of this test case should
%% be investigated thoroughly as they test an algorithm that is
%% central to the reliability of the data in the shared message store.
%% Failing files can be found in the CT private data.
msg_store_file_scan(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, msg_store_file_scan1, [Config]).

msg_store_file_scan1(Config) ->
Scan = fun (Blocks) ->
Expected = gen_result(Blocks),
Path = gen_msg_file(Config, Blocks),
Result = rabbit_msg_store:scan_file_for_valid_messages(Path),
case Result of
Expected -> ok;
_ -> {expected, Expected, got, Result}
end
end,
%% Empty files.
ok = Scan([]),
ok = Scan([{pad, 1024}]),
ok = Scan([{pad, 1024 * 1024}]),
%% One-message files.
ok = Scan([{msg, gen_id(), <<0>>}]),
ok = Scan([{msg, gen_id(), <<255>>}]),
ok = Scan([{msg, gen_id(), gen_msg()}]),
ok = Scan([{pad, 1024}, {msg, gen_id(), gen_msg()}]),
ok = Scan([{pad, 1024 * 1024}, {msg, gen_id(), gen_msg()}]),
ok = Scan([{msg, gen_id(), gen_msg()}, {pad, 1024}]),
ok = Scan([{msg, gen_id(), gen_msg()}, {pad, 1024 * 1024}]),
%% Multiple messages.
ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 2)]),
ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 5)]),
ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 20)]),
ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 100)]),
%% Multiple messages with padding.
ok = Scan([
{pad, 1024},
{msg, gen_id(), gen_msg()},
{msg, gen_id(), gen_msg()}
]),
ok = Scan([
{msg, gen_id(), gen_msg()},
{pad, 1024},
{msg, gen_id(), gen_msg()}
]),
ok = Scan([
{msg, gen_id(), gen_msg()},
{msg, gen_id(), gen_msg()},
{pad, 1024}
]),
ok = Scan([
{pad, 1024},
{msg, gen_id(), gen_msg()},
{pad, 1024},
{msg, gen_id(), gen_msg()}
]),
ok = Scan([
{msg, gen_id(), gen_msg()},
{pad, 1024},
{msg, gen_id(), gen_msg()},
{pad, 1024}
]),
ok = Scan([
{pad, 1024},
{msg, gen_id(), gen_msg()},
{msg, gen_id(), gen_msg()},
{pad, 1024}
]),
ok = Scan([
{pad, 1024},
{msg, gen_id(), gen_msg()},
{pad, 1024},
{msg, gen_id(), gen_msg()},
{pad, 1024}
]),
OneOf = fun(A, B) ->
case rand:uniform() of
F when F < +0.5 -> A;
_ -> B
end
end,
ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 2)]),
ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 5)]),
ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 20)]),
ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 100)]),
%% Duplicate messages.
Msg = {msg, gen_id(), gen_msg()},
ok = Scan([Msg, Msg]),
ok = Scan([Msg, Msg, Msg, Msg, Msg]),
ok = Scan([Msg, {pad, 1024}, Msg]),
ok = Scan([Msg]
++ [OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 100)]
++ [Msg]),
%% Truncated start of message.
ok = Scan([{bin, <<21:56, "deadbeefdeadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<21:48, "deadbeefdeadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<21:40, "deadbeefdeadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<21:32, "deadbeefdeadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<21:24, "deadbeefdeadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<21:16, "deadbeefdeadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<21:8, "deadbeefdeadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<"deadbeefdeadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<"beefdeadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<"deadbeef", "hello", 255>>}]),
ok = Scan([{bin, <<"beef", "hello", 255>>}]),
ok = Scan([{bin, <<"hello", 255>>}]),
ok = Scan([{bin, <<255>>}]),
%% Truncated end of message (unlikely).
ok = Scan([{bin, <<255>>}]),
ok = Scan([{bin, <<255, 255>>}]),
ok = Scan([{bin, <<255, 255, 255>>}]),
ok = Scan([{bin, <<255, 255, 255, 255>>}]),
ok = Scan([{bin, <<255, 255, 255, 255, 255>>}]),
ok = Scan([{bin, <<255, 255, 255, 255, 255, 255>>}]),
ok = Scan([{bin, <<255, 255, 255, 255, 255, 255, 255>>}]),
ok = Scan([{bin, <<255, 255, 255, 255, 255, 255, 255, 255>>}]),
ok = Scan([{bin, <<15:64, "deadbeefdeadbee">>}]),
ok = Scan([{bin, <<16:64, "deadbeefdeadbeef">>}]),
ok = Scan([{bin, <<17:64, "deadbeefdeadbeef", 0>>}]),
ok = Scan([{bin, <<17:64, "deadbeefdeadbeef", 255>>}]),
ok = Scan([{bin, <<17:64, "deadbeefdeadbeef", 255, 254>>}]),
%% Messages with no content.
ok = Scan([{bin, <<0:64, "deadbeefdeadbeef", 255>>}]),
ok = Scan([{msg, gen_id(), <<>>}]),
%% All good!!
passed.

gen_id() ->
rand:bytes(16).

gen_msg() ->
gen_msg(1024 * 1024).

gen_msg(MaxSize) ->
%% This might generate false positives but very rarely
%% so we don't do anything to prevent them.
rand:bytes(rand:uniform(MaxSize)).

gen_msg_file(Config, Blocks) ->
PrivDir = ?config(priv_dir, Config),
TmpFile = integer_to_list(erlang:unique_integer([positive])),
Path = filename:join(PrivDir, TmpFile),
ok = file:write_file(Path, [case Block of
{bin, Bin} ->
Bin;
{pad, Size} ->
%% This might generate false positives although very unlikely.
rand:bytes(Size);
{msg, MsgId, Msg} ->
Size = 16 + byte_size(Msg),
[<<Size:64>>, MsgId, Msg, <<255>>]
end || Block <- Blocks]),
Path.

gen_result(Blocks) ->
Messages = gen_result(Blocks, 0, []),
case Messages of
[] ->
{ok, [], 0};
[{_, TotalSize, Offset}|_] ->
{ok, Messages, Offset + TotalSize}
end.

gen_result([], _, Acc) ->
Acc;
gen_result([{bin, Bin}|Tail], Offset, Acc) ->
gen_result(Tail, Offset + byte_size(Bin), Acc);
gen_result([{pad, Size}|Tail], Offset, Acc) ->
gen_result(Tail, Offset + Size, Acc);
gen_result([{msg, MsgId, Msg}|Tail], Offset, Acc) ->
Size = 9 + 16 + byte_size(Msg),
%% Only the first MsgId found is returned when duplicates exist.
case lists:keymember(MsgId, 1, Acc) of
false ->
gen_result(Tail, Offset + Size, [{MsgId, Size, Offset}|Acc]);
true ->
gen_result(Tail, Offset + Size, Acc)
end.

%% -------------------------------------------------------------------
%% Backing queue.
%% -------------------------------------------------------------------
Expand Down

0 comments on commit 74b9811

Please sign in to comment.