Skip to content


CQv2: Fix property suite
Browse files Browse the repository at this point in the history
Also always check the CRC32 even if not currently configured
to do so, if the CRC is available in the data.
  • Loading branch information
lhoguin committed Jul 18, 2022
1 parent be3e439 commit fd1d7f1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 54 deletions.
110 changes: 60 additions & 50 deletions deps/rabbit/src/rabbit_classic_queue_store_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ init(#resource{ virtual_host = VHost } = Name) ->

terminate(State0 = #qs{ read_fd = ReadFd }) ->
?DEBUG("~0p", [State0]),
State = flush_buffer(State0),
State = flush_buffer(State0, fun(Fd) -> ok = file:sync(Fd) end),
State#qs{ write_segment = undefined,
write_offset = ?HEADER_SIZE,
cache = #{},
read_segment = undefined,
read_fd = undefined }.

Expand Down Expand Up @@ -159,35 +160,41 @@ get_write_offset(Segment, Size, State = #qs{ write_segment = Segment,
write_offset = Offset }) ->
{Offset, State#qs{ write_offset = Offset + ?ENTRY_HEADER_SIZE + Size }};
get_write_offset(Segment, Size, State = #qs{ write_segment = WriteSegment })
when Segment > WriteSegment; WriteSegment =:= undefined ->
SegmentEntryCount = segment_entry_count(),
FromSeqId = Segment * SegmentEntryCount,
ToSeqId = FromSeqId + SegmentEntryCount,
ok = file:write_file(segment_file(Segment, State),
<< ?MAGIC:32,
0:344 >>,
when Segment > WriteSegment ->
{?HEADER_SIZE, State#qs{ write_segment = Segment,
write_offset = ?HEADER_SIZE + ?ENTRY_HEADER_SIZE + Size }}.
write_offset = ?HEADER_SIZE + ?ENTRY_HEADER_SIZE + Size }};
%% The first time we write we have to figure out the write_offset by
%% looking at the segment file directly.
get_write_offset(Segment, Size, State = #qs{ write_segment = undefined }) ->
Offset = case file:open(segment_file(Segment, State), [read, raw, binary]) of
{ok, Fd} ->
{ok, Offset0} = file:position(Fd, eof),
ok = file:close(Fd),
case Offset0 of
_ -> Offset0
{error, enoent} ->
{Offset, State#qs{ write_segment = Segment,
write_offset = Offset + ?ENTRY_HEADER_SIZE + Size }}.

-spec sync(State) -> State when State::state().

sync(State) ->
?DEBUG("~0p", [State]),
flush_buffer(State, fun(_) -> ok end).

maybe_flush_buffer(State = #qs{ write_buffer_size = WriteBufferSize }) ->
case WriteBufferSize >= max_cache_size() of
true -> flush_buffer(State);
true -> flush_buffer(State, fun(_) -> ok end);
false -> State

flush_buffer(State = #qs{ write_buffer_size = 0 }) ->
flush_buffer(State = #qs{ write_buffer_size = 0 }, _) ->
flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }) ->
flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
CheckCRC32 = check_crc32(),
SegmentEntryCount = segment_entry_count(),
%% First we prepare the writes sorted by segment.
Expand All @@ -196,7 +203,22 @@ flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }) ->
%% Then we do the writes for each segment.
State = lists:foldl(fun({Segment, LocBytes}, FoldState) ->
{ok, Fd} = file:open(segment_file(Segment, FoldState), [read, write, raw, binary]),
case file:position(Fd, eof) of
{ok, 0} ->
%% We write the file header if it does not exist.
FromSeqId = Segment * SegmentEntryCount,
ToSeqId = FromSeqId + SegmentEntryCount,
ok = file:write(Fd,
<< ?MAGIC:32,
0:344 >>);
_ ->
ok = file:pwrite(Fd, lists:sort(LocBytes)),
ok = file:close(Fd),
end, State0, Writes),
Expand All @@ -222,17 +244,17 @@ flush_buffer_build(Tail = [{SeqId, _}|_], _, SegmentThreshold, _, WriteOffset, W
_ -> {Tail, [{WriteOffset, lists:reverse(WriteAcc)}|Acc]}
flush_buffer_build([{_, Entry = {Offset, Size, _}}|Tail],
CheckCRC32, SegmentEntryCount, Offset, WriteOffset, WriteAcc, Acc) ->
flush_buffer_build(Tail, CheckCRC32, SegmentEntryCount,
CheckCRC32, SegmentThreshold, Offset, WriteOffset, WriteAcc, Acc) ->
flush_buffer_build(Tail, CheckCRC32, SegmentThreshold,
Offset + ?ENTRY_HEADER_SIZE + Size, WriteOffset,
[build_data(Entry, CheckCRC32)|WriteAcc], Acc);
flush_buffer_build([{_, Entry = {Offset, Size, _}}|Tail],
CheckCRC32, SegmentEntryCount, _, _, [], Acc) ->
flush_buffer_build(Tail, CheckCRC32, SegmentEntryCount,
CheckCRC32, SegmentThreshold, _, _, [], Acc) ->
flush_buffer_build(Tail, CheckCRC32, SegmentThreshold,
Offset + ?ENTRY_HEADER_SIZE + Size, Offset, [build_data(Entry, CheckCRC32)], Acc);
flush_buffer_build([{_, Entry = {Offset, Size, _}}|Tail],
CheckCRC32, SegmentEntryCount, _, WriteOffset, WriteAcc, Acc) ->
flush_buffer_build(Tail, CheckCRC32, SegmentEntryCount,
CheckCRC32, SegmentThreshold, _, WriteOffset, WriteAcc, Acc) ->
flush_buffer_build(Tail, CheckCRC32, SegmentThreshold,
Offset + ?ENTRY_HEADER_SIZE + Size, Offset, [build_data(Entry, CheckCRC32)],
[{WriteOffset, lists:reverse(WriteAcc)}|Acc]);
flush_buffer_build([], _, _, _, _, [], Acc) ->
Expand Down Expand Up @@ -423,7 +445,8 @@ get_read_fd(Segment, State = #qs{ read_fd = OldFd }) ->
read_fd = undefined }}
{error, enoent} ->
{{error, no_file}, State}
{{error, no_file}, State#qs{ read_segment = undefined,
read_fd = undefined }}

-spec check_msg_on_disk(rabbit_variable_queue:seq_id(), msg_location(), State)
Expand All @@ -444,16 +467,10 @@ check_msg_on_disk(SeqId, {?MODULE, Offset, Size}, State0) ->
0 ->
{ok, State};
1 ->
%% We only want to check the CRC32 if configured to do so.
case check_crc32() of
false ->
{ok, State};
true ->
CRC32 = erlang:crc32(MsgBin),
case <<CRC32:16>> of
CRC32Expected -> {ok, State};
_ -> {{error, bad_crc}, State}
CRC32 = erlang:crc32(MsgBin),
case <<CRC32:16>> of
CRC32Expected -> {ok, State};
_ -> {{error, bad_crc}, State}
_ ->
Expand Down Expand Up @@ -518,23 +535,16 @@ delete_segments(Segments, State0 = #qs{ write_buffer = WriteBuffer0,
|| Segment <- Segments],
%% Finally, we remove any entries from the buffer that fall within
%% the segments that were deleted. For simplicity's sake, we take
%% the highest SeqId from these files and remove any SeqId lower
%% than or equal to this SeqId from the buffer.
%% @todo If this becomes a performance issue we may take inspiration
%% from sets:filter/2.
HighestSegment = lists:foldl(fun
(S, SAcc) when S > SAcc -> S;
(_, SAcc) -> SAcc
end, -1, Segments),
HighestSeqId = (1 + HighestSegment) * segment_entry_count(),
%% the segments that were deleted.
SegmentEntryCount = segment_entry_count(),
{WriteBuffer, WriteBufferSize} = maps:fold(fun
(SeqId, {_, MsgSize, _}, {WriteBufferAcc, WriteBufferSize1})
when SeqId =< HighestSeqId ->
{WriteBufferAcc, WriteBufferSize1 - MsgSize};
(SeqId, Value, {WriteBufferAcc, WriteBufferSize1}) ->
{WriteBufferAcc#{SeqId => Value}, WriteBufferSize1}
(SeqId, Value = {_, MsgSize, _}, {WriteBufferAcc, WriteBufferSize1}) ->
case lists:member(SeqId div SegmentEntryCount, Segments) of
true ->
{WriteBufferAcc, WriteBufferSize1 - MsgSize};
false ->
{WriteBufferAcc#{SeqId => Value}, WriteBufferSize1}
end, {#{}, WriteBufferSize0}, WriteBuffer0),
State#qs{ write_buffer = WriteBuffer,
write_buffer_size = WriteBufferSize }.
Expand Down
7 changes: 3 additions & 4 deletions deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ convert_from_v2_to_v1(State0 = #vqstate{ index_mod = rabbit_classic_queue_inde
%% We have already closed the v2 index/store FDs when deleting the files.
State#vqstate{ index_mod = rabbit_queue_index,
index_state = V1Index,
store_state = V2Store }.
store_state = rabbit_classic_queue_store_v2:terminate(V2Store) }.

convert_from_v2_to_v1_in_memory(State0 = #vqstate{ q1 = Q1b,
q2 = Q2b,
Expand Down Expand Up @@ -2406,13 +2406,12 @@ purge_pending_ack(KeepPersistent,
store_state = StoreState0,
msg_store_clients = MSCState }) ->
{IndexOnDiskSeqIds, MsgIdsByStore, SeqIdsInStore, State1} = purge_pending_ack1(State),
StoreState1 = lists:foldl(fun rabbit_classic_queue_store_v2:remove/2, StoreState0, SeqIdsInStore),
%% @todo Sounds like we might want to remove only transients from the cache?
case KeepPersistent of
true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState),
State1 #vqstate { store_state = StoreState1 };
false -> {DeletedSegments, IndexState1} =
IndexMod:ack(IndexOnDiskSeqIds, IndexState),
StoreState1 = lists:foldl(fun rabbit_classic_queue_store_v2:remove/2, StoreState0, SeqIdsInStore),
StoreState = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState1),
remove_vhost_msgs_by_id(MsgIdsByStore, MSCState),
State1 #vqstate { index_state = IndexState1,
Expand Down

0 comments on commit fd1d7f1

Please sign in to comment.