diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl index 2a9709148cf4..0ce02dbb37dc 100644 --- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl @@ -123,10 +123,11 @@ init(#resource{ virtual_host = VHost } = Name) -> terminate(State0 = #qs{ read_fd = ReadFd }) -> ?DEBUG("~0p", [State0]), - State = flush_buffer(State0), + State = flush_buffer(State0, fun(Fd) -> ok = file:sync(Fd) end), maybe_close_fd(ReadFd), State#qs{ write_segment = undefined, write_offset = ?HEADER_SIZE, + cache = #{}, read_segment = undefined, read_fd = undefined }. @@ -159,35 +160,41 @@ get_write_offset(Segment, Size, State = #qs{ write_segment = Segment, write_offset = Offset }) -> {Offset, State#qs{ write_offset = Offset + ?ENTRY_HEADER_SIZE + Size }}; get_write_offset(Segment, Size, State = #qs{ write_segment = WriteSegment }) - when Segment > WriteSegment; WriteSegment =:= undefined -> - SegmentEntryCount = segment_entry_count(), - FromSeqId = Segment * SegmentEntryCount, - ToSeqId = FromSeqId + SegmentEntryCount, - ok = file:write_file(segment_file(Segment, State), - << ?MAGIC:32, - ?VERSION:8, - FromSeqId:64/unsigned, - ToSeqId:64/unsigned, - 0:344 >>, - [raw]), + when Segment > WriteSegment -> {?HEADER_SIZE, State#qs{ write_segment = Segment, - write_offset = ?HEADER_SIZE + ?ENTRY_HEADER_SIZE + Size }}. + write_offset = ?HEADER_SIZE + ?ENTRY_HEADER_SIZE + Size }}; +%% The first time we write we have to figure out the write_offset by +%% looking at the segment file directly. +get_write_offset(Segment, Size, State = #qs{ write_segment = undefined }) -> + Offset = case file:open(segment_file(Segment, State), [read, raw, binary]) of + {ok, Fd} -> + {ok, Offset0} = file:position(Fd, eof), + ok = file:close(Fd), + case Offset0 of + 0 -> ?HEADER_SIZE; + _ -> Offset0 + end; + {error, enoent} -> + ?HEADER_SIZE + end, + {Offset, State#qs{ write_segment = Segment, + write_offset = Offset + ?ENTRY_HEADER_SIZE + Size }}. -spec sync(State) -> State when State::state(). sync(State) -> ?DEBUG("~0p", [State]), - flush_buffer(State). + flush_buffer(State, fun(_) -> ok end). maybe_flush_buffer(State = #qs{ write_buffer_size = WriteBufferSize }) -> case WriteBufferSize >= max_cache_size() of - true -> flush_buffer(State); + true -> flush_buffer(State, fun(_) -> ok end); false -> State end. -flush_buffer(State = #qs{ write_buffer_size = 0 }) -> +flush_buffer(State = #qs{ write_buffer_size = 0 }, _) -> State; -flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }) -> +flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) -> CheckCRC32 = check_crc32(), SegmentEntryCount = segment_entry_count(), %% First we prepare the writes sorted by segment. @@ -196,7 +203,22 @@ flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }) -> %% Then we do the writes for each segment. State = lists:foldl(fun({Segment, LocBytes}, FoldState) -> {ok, Fd} = file:open(segment_file(Segment, FoldState), [read, write, raw, binary]), + case file:position(Fd, eof) of + {ok, 0} -> + %% We write the file header if it does not exist. + FromSeqId = Segment * SegmentEntryCount, + ToSeqId = FromSeqId + SegmentEntryCount, + ok = file:write(Fd, + << ?MAGIC:32, + ?VERSION:8, + FromSeqId:64/unsigned, + ToSeqId:64/unsigned, + 0:344 >>); + _ -> + ok + end, ok = file:pwrite(Fd, lists:sort(LocBytes)), + FsyncFun(Fd), ok = file:close(Fd), FoldState end, State0, Writes), @@ -222,17 +244,17 @@ flush_buffer_build(Tail = [{SeqId, _}|_], _, SegmentThreshold, _, WriteOffset, W _ -> {Tail, [{WriteOffset, lists:reverse(WriteAcc)}|Acc]} end; flush_buffer_build([{_, Entry = {Offset, Size, _}}|Tail], - CheckCRC32, SegmentEntryCount, Offset, WriteOffset, WriteAcc, Acc) -> - flush_buffer_build(Tail, CheckCRC32, SegmentEntryCount, + CheckCRC32, SegmentThreshold, Offset, WriteOffset, WriteAcc, Acc) -> + flush_buffer_build(Tail, CheckCRC32, SegmentThreshold, Offset + ?ENTRY_HEADER_SIZE + Size, WriteOffset, [build_data(Entry, CheckCRC32)|WriteAcc], Acc); flush_buffer_build([{_, Entry = {Offset, Size, _}}|Tail], - CheckCRC32, SegmentEntryCount, _, _, [], Acc) -> - flush_buffer_build(Tail, CheckCRC32, SegmentEntryCount, + CheckCRC32, SegmentThreshold, _, _, [], Acc) -> + flush_buffer_build(Tail, CheckCRC32, SegmentThreshold, Offset + ?ENTRY_HEADER_SIZE + Size, Offset, [build_data(Entry, CheckCRC32)], Acc); flush_buffer_build([{_, Entry = {Offset, Size, _}}|Tail], - CheckCRC32, SegmentEntryCount, _, WriteOffset, WriteAcc, Acc) -> - flush_buffer_build(Tail, CheckCRC32, SegmentEntryCount, + CheckCRC32, SegmentThreshold, _, WriteOffset, WriteAcc, Acc) -> + flush_buffer_build(Tail, CheckCRC32, SegmentThreshold, Offset + ?ENTRY_HEADER_SIZE + Size, Offset, [build_data(Entry, CheckCRC32)], [{WriteOffset, lists:reverse(WriteAcc)}|Acc]); flush_buffer_build([], _, _, _, _, [], Acc) -> @@ -423,7 +445,8 @@ get_read_fd(Segment, State = #qs{ read_fd = OldFd }) -> read_fd = undefined }} end; {error, enoent} -> - {{error, no_file}, State} + {{error, no_file}, State#qs{ read_segment = undefined, + read_fd = undefined }} end. -spec check_msg_on_disk(rabbit_variable_queue:seq_id(), msg_location(), State) @@ -444,16 +467,10 @@ check_msg_on_disk(SeqId, {?MODULE, Offset, Size}, State0) -> 0 -> {ok, State}; 1 -> - %% We only want to check the CRC32 if configured to do so. - case check_crc32() of - false -> - {ok, State}; - true -> - CRC32 = erlang:crc32(MsgBin), - case <> of - CRC32Expected -> {ok, State}; - _ -> {{error, bad_crc}, State} - end + CRC32 = erlang:crc32(MsgBin), + case <> of + CRC32Expected -> {ok, State}; + _ -> {{error, bad_crc}, State} end end; _ -> @@ -518,23 +535,16 @@ delete_segments(Segments, State0 = #qs{ write_buffer = WriteBuffer0, end || Segment <- Segments], %% Finally, we remove any entries from the buffer that fall within - %% the segments that were deleted. For simplicity's sake, we take - %% the highest SeqId from these files and remove any SeqId lower - %% than or equal to this SeqId from the buffer. - %% - %% @todo If this becomes a performance issue we may take inspiration - %% from sets:filter/2. - HighestSegment = lists:foldl(fun - (S, SAcc) when S > SAcc -> S; - (_, SAcc) -> SAcc - end, -1, Segments), - HighestSeqId = (1 + HighestSegment) * segment_entry_count(), + %% the segments that were deleted. + SegmentEntryCount = segment_entry_count(), {WriteBuffer, WriteBufferSize} = maps:fold(fun - (SeqId, {_, MsgSize, _}, {WriteBufferAcc, WriteBufferSize1}) - when SeqId =< HighestSeqId -> - {WriteBufferAcc, WriteBufferSize1 - MsgSize}; - (SeqId, Value, {WriteBufferAcc, WriteBufferSize1}) -> - {WriteBufferAcc#{SeqId => Value}, WriteBufferSize1} + (SeqId, Value = {_, MsgSize, _}, {WriteBufferAcc, WriteBufferSize1}) -> + case lists:member(SeqId div SegmentEntryCount, Segments) of + true -> + {WriteBufferAcc, WriteBufferSize1 - MsgSize}; + false -> + {WriteBufferAcc#{SeqId => Value}, WriteBufferSize1} + end end, {#{}, WriteBufferSize0}, WriteBuffer0), State#qs{ write_buffer = WriteBuffer, write_buffer_size = WriteBufferSize }. diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index e40fa6aa890c..da261ac53dd0 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -1192,7 +1192,7 @@ convert_from_v2_to_v1(State0 = #vqstate{ index_mod = rabbit_classic_queue_inde %% We have already closed the v2 index/store FDs when deleting the files. State#vqstate{ index_mod = rabbit_queue_index, index_state = V1Index, - store_state = V2Store }. + store_state = rabbit_classic_queue_store_v2:terminate(V2Store) }. convert_from_v2_to_v1_in_memory(State0 = #vqstate{ q1 = Q1b, q2 = Q2b, @@ -2406,13 +2406,12 @@ purge_pending_ack(KeepPersistent, store_state = StoreState0, msg_store_clients = MSCState }) -> {IndexOnDiskSeqIds, MsgIdsByStore, SeqIdsInStore, State1} = purge_pending_ack1(State), - StoreState1 = lists:foldl(fun rabbit_classic_queue_store_v2:remove/2, StoreState0, SeqIdsInStore), - %% @todo Sounds like we might want to remove only transients from the cache? case KeepPersistent of true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState), - State1 #vqstate { store_state = StoreState1 }; + State1; false -> {DeletedSegments, IndexState1} = IndexMod:ack(IndexOnDiskSeqIds, IndexState), + StoreState1 = lists:foldl(fun rabbit_classic_queue_store_v2:remove/2, StoreState0, SeqIdsInStore), StoreState = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState1), remove_vhost_msgs_by_id(MsgIdsByStore, MSCState), State1 #vqstate { index_state = IndexState1,