diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 994ce03902ad..f20b524002cf 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -66,6 +66,7 @@ groups() -> [ {backing_queue_tests, [], [ msg_store, + msg_store_file_scan, {backing_queue_v2, [], Common ++ V2Only}, {backing_queue_v1, [], Common} ]} @@ -535,6 +536,191 @@ test_msg_store_client_delete_and_terminate(GenRef) -> ok = rabbit_msg_store:client_delete_and_terminate(MSCState), passed. +%% ------------------------------------------------------------------- +%% Message store file scanning. +%% ------------------------------------------------------------------- + +%% While it is possible although very unlikely that this test case +%% produces false positives, all failures of this test case should +%% be investigated thoroughly as they test an algorithm that is +%% central to the reliability of the data in the shared message store. +%% Failing files can be found in the CT private data. +msg_store_file_scan(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, msg_store_file_scan1, [Config]). + +msg_store_file_scan1(Config) -> + Scan = fun (Blocks) -> + Expected = gen_result(Blocks), + Path = gen_msg_file(Config, Blocks), + Result = rabbit_msg_store:scan_file_for_valid_messages(Path), + case Result of + Expected -> ok; + _ -> {expected, Expected, got, Result} + end + end, + %% Empty files. + ok = Scan([]), + ok = Scan([{pad, 1024}]), + ok = Scan([{pad, 1024 * 1024}]), + %% One-message files. + ok = Scan([{msg, gen_id(), <<0>>}]), + ok = Scan([{msg, gen_id(), <<255>>}]), + ok = Scan([{msg, gen_id(), gen_msg()}]), + ok = Scan([{pad, 1024}, {msg, gen_id(), gen_msg()}]), + ok = Scan([{pad, 1024 * 1024}, {msg, gen_id(), gen_msg()}]), + ok = Scan([{msg, gen_id(), gen_msg()}, {pad, 1024}]), + ok = Scan([{msg, gen_id(), gen_msg()}, {pad, 1024 * 1024}]), + %% Multiple messages. + ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 2)]), + ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 5)]), + ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 20)]), + ok = Scan([{msg, gen_id(), gen_msg()} || _ <- lists:seq(1, 100)]), + %% Multiple messages with padding. + ok = Scan([ + {pad, 1024}, + {msg, gen_id(), gen_msg()}, + {msg, gen_id(), gen_msg()} + ]), + ok = Scan([ + {msg, gen_id(), gen_msg()}, + {pad, 1024}, + {msg, gen_id(), gen_msg()} + ]), + ok = Scan([ + {msg, gen_id(), gen_msg()}, + {msg, gen_id(), gen_msg()}, + {pad, 1024} + ]), + ok = Scan([ + {pad, 1024}, + {msg, gen_id(), gen_msg()}, + {pad, 1024}, + {msg, gen_id(), gen_msg()} + ]), + ok = Scan([ + {msg, gen_id(), gen_msg()}, + {pad, 1024}, + {msg, gen_id(), gen_msg()}, + {pad, 1024} + ]), + ok = Scan([ + {pad, 1024}, + {msg, gen_id(), gen_msg()}, + {msg, gen_id(), gen_msg()}, + {pad, 1024} + ]), + ok = Scan([ + {pad, 1024}, + {msg, gen_id(), gen_msg()}, + {pad, 1024}, + {msg, gen_id(), gen_msg()}, + {pad, 1024} + ]), + OneOf = fun(A, B) -> + case rand:uniform() of + F when F < +0.5 -> A; + _ -> B + end + end, + ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 2)]), + ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 5)]), + ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 20)]), + ok = Scan([OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 100)]), + %% Duplicate messages. + Msg = {msg, gen_id(), gen_msg()}, + ok = Scan([Msg, Msg]), + ok = Scan([Msg, Msg, Msg, Msg, Msg]), + ok = Scan([Msg, {pad, 1024}, Msg]), + ok = Scan([Msg] + ++ [OneOf({msg, gen_id(), gen_msg()}, {pad, 1024}) || _ <- lists:seq(1, 100)] + ++ [Msg]), + %% Truncated start of message. + ok = Scan([{bin, <<21:56, "deadbeefdeadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<21:48, "deadbeefdeadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<21:40, "deadbeefdeadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<21:32, "deadbeefdeadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<21:24, "deadbeefdeadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<21:16, "deadbeefdeadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<21:8, "deadbeefdeadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<"deadbeefdeadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<"beefdeadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<"deadbeef", "hello", 255>>}]), + ok = Scan([{bin, <<"beef", "hello", 255>>}]), + ok = Scan([{bin, <<"hello", 255>>}]), + ok = Scan([{bin, <<255>>}]), + %% Truncated end of message (unlikely). + ok = Scan([{bin, <<255>>}]), + ok = Scan([{bin, <<255, 255>>}]), + ok = Scan([{bin, <<255, 255, 255>>}]), + ok = Scan([{bin, <<255, 255, 255, 255>>}]), + ok = Scan([{bin, <<255, 255, 255, 255, 255>>}]), + ok = Scan([{bin, <<255, 255, 255, 255, 255, 255>>}]), + ok = Scan([{bin, <<255, 255, 255, 255, 255, 255, 255>>}]), + ok = Scan([{bin, <<255, 255, 255, 255, 255, 255, 255, 255>>}]), + ok = Scan([{bin, <<15:64, "deadbeefdeadbee">>}]), + ok = Scan([{bin, <<16:64, "deadbeefdeadbeef">>}]), + ok = Scan([{bin, <<17:64, "deadbeefdeadbeef", 0>>}]), + ok = Scan([{bin, <<17:64, "deadbeefdeadbeef", 255>>}]), + ok = Scan([{bin, <<17:64, "deadbeefdeadbeef", 255, 254>>}]), + %% Messages with no content. + ok = Scan([{bin, <<0:64, "deadbeefdeadbeef", 255>>}]), + ok = Scan([{msg, gen_id(), <<>>}]), + %% All good!! + passed. + +gen_id() -> + rand:bytes(16). + +gen_msg() -> + gen_msg(1024 * 1024). + +gen_msg(MaxSize) -> + %% This might generate false positives but very rarely + %% so we don't do anything to prevent them. + rand:bytes(rand:uniform(MaxSize)). + +gen_msg_file(Config, Blocks) -> + PrivDir = ?config(priv_dir, Config), + TmpFile = integer_to_list(erlang:unique_integer([positive])), + Path = filename:join(PrivDir, TmpFile), + ok = file:write_file(Path, [case Block of + {bin, Bin} -> + Bin; + {pad, Size} -> + %% This might generate false positives although very unlikely. + rand:bytes(Size); + {msg, MsgId, Msg} -> + Size = 16 + byte_size(Msg), + [<>, MsgId, Msg, <<255>>] + end || Block <- Blocks]), + Path. + +gen_result(Blocks) -> + Messages = gen_result(Blocks, 0, []), + case Messages of + [] -> + {ok, [], 0}; + [{_, TotalSize, Offset}|_] -> + {ok, Messages, Offset + TotalSize} + end. + +gen_result([], _, Acc) -> + Acc; +gen_result([{bin, Bin}|Tail], Offset, Acc) -> + gen_result(Tail, Offset + byte_size(Bin), Acc); +gen_result([{pad, Size}|Tail], Offset, Acc) -> + gen_result(Tail, Offset + Size, Acc); +gen_result([{msg, MsgId, Msg}|Tail], Offset, Acc) -> + Size = 9 + 16 + byte_size(Msg), + %% Only the first MsgId found is returned when duplicates exist. + case lists:keymember(MsgId, 1, Acc) of + false -> + gen_result(Tail, Offset + Size, [{MsgId, Size, Offset}|Acc]); + true -> + gen_result(Tail, Offset + Size, Acc) + end. + %% ------------------------------------------------------------------- %% Backing queue. %% -------------------------------------------------------------------