diff --git a/.github/workflows/rabbitmq-oci.yaml b/.github/workflows/rabbitmq-oci.yaml index 6244e4e24d07..552b50312376 100644 --- a/.github/workflows/rabbitmq-oci.yaml +++ b/.github/workflows/rabbitmq-oci.yaml @@ -28,7 +28,7 @@ jobs: - name: Inject the git sha as the osiris version working-directory: osiris run: | - sed -i"_orig" "/vsn,/ s/2\\.[0-9]\\.[0-9]/${{ github.event.pull_request.head.sha || github.sha }}/" src/osiris.app.src + sed -i"_orig" -E '/VERSION/ s/1\.[0-9]+\.[0-9]+/${{ github.event.pull_request.head.sha || github.sha }}/' BUILD.bazel - name: Checkout RabbitMQ uses: actions/checkout@v3 @@ -137,7 +137,7 @@ jobs: RABBIT_SHA=${{ steps.load-rabbitmq-info.outputs.RABBITMQ_SHA }} OSIRIS_SHA=${{ github.event.pull_request.head.sha || github.sha }} - OSIRIS_ABBREV=ra-${OSIRIS_SHA:0:7} + OSIRIS_ABBREV=osiris-${OSIRIS_SHA:0:7} TAG_1=rabbitmq-${RABBIT_REF}-${OSIRIS_ABBREV}-${{ steps.load-info.outputs.otp }} TAG_2=rabbitmq-${RABBIT_REF}-${OSIRIS_ABBREV}-${{ matrix.image_tag_suffix }} diff --git a/src/osiris_log.erl b/src/osiris_log.erl index b76cde42554a..fdc7364f0b0f 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -8,6 +8,7 @@ -module(osiris_log). -include("osiris.hrl"). +-include_lib("kernel/include/file.hrl"). -export([init/1, init/2, @@ -338,7 +339,11 @@ tracking_config => osiris_tracking:config(), counter_spec => counter_spec(), %% used when initialising a log from an offset other than 0 - initial_offset => osiris:offset()}. + initial_offset => osiris:offset(), + %% a cached list of the index files for a given log + %% avoids scanning disk for files multiple times if already know + %% e.g. in init_acceptor + index_files => [filename:filename()]}. -type record() :: {offset(), osiris:data()}. -type offset_spec() :: osiris:offset_spec(). -type retention_spec() :: osiris:retention_spec(). @@ -432,16 +437,17 @@ init(Config) -> -spec init(config(), writer | acceptor) -> state(). init(#{dir := Dir, name := Name, - epoch := Epoch} = - Config, + epoch := Epoch} = Config, WriterType) -> %% scan directory for segments if in write mode MaxSizeBytes = maps:get(max_segment_size_bytes, Config, ?DEFAULT_MAX_SEGMENT_SIZE_B), - MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, ?DEFAULT_MAX_SEGMENT_SIZE_C), + MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, + ?DEFAULT_MAX_SEGMENT_SIZE_C), Retention = maps:get(retention, Config, []), ?INFO("Will use ~s for osiris log data directory", [Dir]), - ?DEBUG("osiris_log:init/1 max_segment_size_bytes: ~b, max_segment_size_chunks ~b, retention ~w", + ?DEBUG("osiris_log:init/1 max_segment_size_bytes: ~b, + max_segment_size_chunks ~b, retention ~w", [MaxSizeBytes, MaxSizeChunks, Retention]), ok = filelib:ensure_dir(Dir), case file:make_dir(Dir) of @@ -469,8 +475,8 @@ init(#{dir := Dir, counter = Cnt, counter_id = counter_id(Config), first_offset_fun = FirstOffsetFun}, - case lists:reverse(build_log_overview(Dir)) of - [] -> + case first_and_last_seginfos(Config) of + none -> NextOffset = case Config of #{initial_offset := IO} when WriterType == acceptor -> @@ -483,18 +489,17 @@ init(#{dir := Dir, #write{type = WriterType, tail_info = {NextOffset, empty}, current_epoch = Epoch}}); - [#seg_info{file = Filename, + {NumSegments, + #seg_info{first = #chunk_info{id = FstChId, + timestamp = FstTs}}, + #seg_info{file = Filename, index = IdxFilename, size = Size, last = - #chunk_info{epoch = LastEpoch, - timestamp = LastTs, - id = LastChId, - num = LastNum}} - | _] = Infos -> - [#seg_info{first = #chunk_info{id = FstChId, - timestamp = FstTs}} | _] = - lists:reverse(Infos), + #chunk_info{epoch = LastEpoch, + timestamp = LastTs, + id = LastChId, + num = LastNum}}} -> %% assert epoch is same or larger %% than last known epoch case LastEpoch > Epoch of @@ -509,7 +514,7 @@ init(#{dir := Dir, counters:put(Cnt, ?C_FIRST_OFFSET, FstChId), counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs), counters:put(Cnt, ?C_OFFSET, LastChId + LastNum - 1), - counters:put(Cnt, ?C_SEGMENTS, length(Infos)), + counters:put(Cnt, ?C_SEGMENTS, NumSegments), ?DEBUG("~s:~s/~b: next offset ~b first offset ~b", [?MODULE, ?FUNCTION_NAME, @@ -530,10 +535,9 @@ init(#{dir := Dir, current_epoch = Epoch}, fd = Fd, index_fd = IdxFd}; - [#seg_info{file = Filename, - index = IdxFilename, - last = undefined} - | _] -> + {1, #seg_info{file = Filename, + index = IdxFilename, + last = undefined}, _} -> %% the empty log case {ok, Fd} = open(Filename, ?FILE_OPTS_WRITE), {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), @@ -685,23 +689,23 @@ init_acceptor(Range, EpochOffsets0, lists:sort(EpochOffsets0)), %% then truncate to - SegInfos = build_log_overview(Dir), + IdxFiles = sorted_index_files(Dir), ?DEBUG("~s: ~s from epoch offsets: ~w range ~w", [?MODULE, ?FUNCTION_NAME, EpochOffsets, Range]), - ok = truncate_to(Name, Range, EpochOffsets, SegInfos), + RemIdxFiles = truncate_to(Name, Range, EpochOffsets, IdxFiles), %% after truncation we can do normal init InitOffset = case Range of empty -> 0; {O, _} -> O end, - init(Conf#{initial_offset => InitOffset}, acceptor). + init(Conf#{initial_offset => InitOffset, + index_files => RemIdxFiles}, acceptor). chunk_id_index_scan(IdxFile, ChunkId) when is_list(IdxFile) -> Fd = open_index_read(IdxFile), chunk_id_index_scan0(Fd, ChunkId). chunk_id_index_scan0(Fd, ChunkId) -> - {ok, IdxPos} = file:position(Fd, cur), case file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, < Epoch:64/unsigned, FilePos:32/unsigned, _ChType:8/unsigned>>} -> + {ok, IdxPos} = file:position(Fd, cur), ok = file:close(Fd), - {ChunkId, Epoch, FilePos, IdxPos}; + {ChunkId, Epoch, FilePos, IdxPos - ?INDEX_RECORD_SIZE_B}; {ok, _} -> chunk_id_index_scan0(Fd, ChunkId); eof -> @@ -718,7 +723,8 @@ chunk_id_index_scan0(Fd, ChunkId) -> eof end. -delete_segment(#seg_info{file = File, index = Index}) -> +delete_segment_from_index(Index) -> + File = segment_from_index_file(Index), ?DEBUG("osiris_log: deleting segment ~s in ~s", [filename:basename(File), filename:dirname(File)]), ok = file:delete(File), @@ -727,27 +733,24 @@ delete_segment(#seg_info{file = File, index = Index}) -> truncate_to(_Name, _Range, _EpochOffsets, []) -> %% the target log is empty - ok; -truncate_to(_Name, _Range, [], SegInfos) -> + []; +truncate_to(_Name, _Range, [], IdxFiles) -> %% ????? this means the entire log is out - [begin ok = delete_segment(I) end || I <- SegInfos], - ok; -truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> - case find_segment_for_offset(ChId, SegInfos) of + [begin ok = delete_segment_from_index(I) end || I <- IdxFiles], + []; +truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> + case find_segment_for_offset(ChId, IdxFiles) of not_found -> - case lists:last(SegInfos) of - #seg_info{last = #chunk_info{epoch = E, - id = LastChId, - num = Num}} + case build_seg_info(lists:last(IdxFiles)) of + {ok, #seg_info{last = #chunk_info{epoch = E, + id = LastChId, + num = Num}}} when ChId > LastChId + Num -> %% the last available local chunk id is smaller than the %% sources last chunk id but is in the same epoch %% check if there is any overlap - LastOffsLocal = case offset_range_from_segment_infos(SegInfos) of - empty -> 0; - {_, L} -> L - end, - FstOffsetRemote = case Range of + LastOffsLocal = LastChId + Num, + FstOffsetRemote = case RemoteRange of empty -> 0; {F, _} -> F end, @@ -755,19 +758,22 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> true -> %% there is no overlap, need to delete all %% local segments - [begin ok = delete_segment(I) end || I <- SegInfos], - ok; + [begin ok = delete_segment_from_index(I) end + || I <- IdxFiles], + []; false -> %% there is overlap %% no truncation needed - ok + IdxFiles end; - _ -> - truncate_to(Name, Range, NextEOs, SegInfos) + {ok, _} -> + truncate_to(Name, RemoteRange, NextEOs, IdxFiles) + %% TODO: what to do if error is returned from + %% build_seg_info/1? end; {end_of_log, _Info} -> - ok; - {found, #seg_info{file = File, index = Idx}} -> + IdxFiles; + {found, #seg_info{file = File, index = IdxFile}} -> ?DEBUG("osiris_log: ~s on node ~s truncating to chunk " "id ~b in epoch ~b", [Name, node(), ChId, E]), @@ -775,31 +781,38 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> %% next offset needs to be a chunk offset %% if it is not found we know the offset requested isn't a chunk %% id and thus isn't valid - case chunk_id_index_scan(Idx, ChId) of + case chunk_id_index_scan(IdxFile, ChId) of {ChId, E, Pos, IdxPos} when is_integer(Pos) -> %% the Chunk id was found and has the right epoch %% lets truncate to this point %% FilePos could be eof here which means the next offset {ok, Fd} = file:open(File, [read, write, binary, raw]), - {ok, IdxFd} = file:open(Idx, [read, write, binary, raw]), + _ = file:advise(Fd, 0, 0, random), + {ok, IdxFd} = file:open(IdxFile, [read, write, binary, raw]), - {_ChType, ChId, E, _Num, Size, TSize} = - header_info(Fd, Pos), + {_ChType, ChId, E, _Num, Size, TSize} = header_info(Fd, Pos), %% position at end of chunk {ok, _Pos} = file:position(Fd, {cur, Size + TSize}), ok = file:truncate(Fd), - {ok, _} = - file:position(IdxFd, IdxPos + ?INDEX_RECORD_SIZE_B), + {ok, _} = file:position(IdxFd, IdxPos + ?INDEX_RECORD_SIZE_B), ok = file:truncate(IdxFd), ok = file:close(Fd), ok = file:close(IdxFd), %% delete all segments with a first offset larger then ChId - [begin ok = delete_segment(I) end - || I <- SegInfos, I#seg_info.first#chunk_info.id > ChId], - ok; + %% and return the remainder + lists:filter( + fun (I) -> + case index_file_first_offset(I) > ChId of + true -> + ok = delete_segment_from_index(I), + false; + false -> + true + end + end, IdxFiles); _ -> - truncate_to(Name, Range, NextEOs, SegInfos) + truncate_to(Name, RemoteRange, NextEOs, IdxFiles) end end. @@ -811,8 +824,8 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> {error, {invalid_last_offset_epoch, epoch(), offset()}}. init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir} = Config) -> - SegInfos = build_log_overview(Dir), - Range = offset_range_from_segment_infos(SegInfos), + IdxFiles = sorted_index_files(Dir), + Range = offset_range_from_idx_files(IdxFiles), ?DEBUG("osiris_segment:init_data_reader/2 at ~b prev " "~w local range: ~w", [StartChunkId, PrevEOT, Range]), @@ -829,31 +842,31 @@ init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir} = Config) -> %% first we need to validate PrevEO {ok, init_data_reader_from( StartChunkId, - find_segment_for_offset(StartChunkId, SegInfos), + find_segment_for_offset(StartChunkId, IdxFiles), Config)}; _ -> {PrevEpoch, PrevChunkId, _PrevTs} = PrevEOT, - case check_chunk_has_expected_epoch(PrevChunkId, PrevEpoch, SegInfos) of + case check_chunk_has_expected_epoch(PrevChunkId, PrevEpoch, IdxFiles) of ok -> {ok, init_data_reader_from( StartChunkId, - find_segment_for_offset(StartChunkId, SegInfos), + find_segment_for_offset(StartChunkId, IdxFiles), Config)}; {error, _} = Err -> Err end end. -check_chunk_has_expected_epoch(ChunkId, Epoch, SegInfos) -> - case find_segment_for_offset(ChunkId, SegInfos) of +check_chunk_has_expected_epoch(ChunkId, Epoch, IdxFiles) -> + case find_segment_for_offset(ChunkId, IdxFiles) of not_found -> %% this is unexpected and thus an error {error, {invalid_last_offset_epoch, Epoch, unknown}}; - {found, SegmentInfo = #seg_info{file = _PrevSeg}} -> + {found, #seg_info{} = SegmentInfo} -> %% prev segment exists, does it have the correct %% epoch? - case scan_idx(ChunkId, SegmentInfo) of + case offset_idx_scan(ChunkId, SegmentInfo) of {ChunkId, Epoch, _PrevPos} -> ok; {ChunkId, OtherEpoch, _} -> @@ -893,7 +906,7 @@ init_data_reader_from(ChunkId, init_data_reader_from(ChunkId, {found, #seg_info{file = File} = SegInfo}, Config) -> - {ChunkId, _Epoch, FilePos} = scan_idx(ChunkId, SegInfo), + {ChunkId, _Epoch, FilePos} = offset_idx_scan(ChunkId, SegInfo), init_data_reader_at(ChunkId, FilePos, File, Config). %% @doc Initialise a new offset reader @@ -927,7 +940,8 @@ init_offset_reader(OffsetSpec, Conf) -> end. init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) -> - Range = offset_range_from_segment_infos(build_log_overview(Dir)), + IdxFiles = sorted_index_files(Dir), + Range = offset_range_from_idx_files(IdxFiles), case Range of empty -> {error, {offset_out_of_range, Range}}; @@ -937,121 +951,104 @@ init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) -> %% it is in range, convert to standard offset init_offset_reader0(Offs, Conf) end; -init_offset_reader0({timestamp, Ts}, #{dir := Dir} = Conf) -> - case build_log_overview(Dir) of +init_offset_reader0({timestamp, Ts}, #{} = Conf) -> + case sorted_index_files_rev(Conf) of [] -> init_offset_reader0(next, Conf); - [#seg_info{file = SegmentFile, - first = #chunk_info{timestamp = Fst, - pos = FilePos, - id = ChunkId}} | _] - when is_integer(Fst) andalso Fst > Ts -> - %% timestamp is lower than the first timestamp available - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); - SegInfos -> - case lists:search(fun (#seg_info{first = #chunk_info{timestamp = F}, - last = #chunk_info{timestamp = L}}) - when is_integer(F) - andalso is_integer(L) -> - Ts >= F andalso Ts =< L; - (_) -> - false - end, - SegInfos) - of - {value, #seg_info{file = SegmentFile} = Info} -> + IdxFilesRev -> + case timestamp_idx_file_search(Ts, IdxFilesRev) of + {scan, IdxFile} -> %% segment was found, now we need to scan index to %% find nearest offset - {ChunkId, FilePos} = chunk_location_for_timestamp(Info, Ts), + {ChunkId, FilePos} = chunk_location_for_timestamp(IdxFile, Ts), + SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); - false -> + {first_in, IdxFile} -> + {ok, Fd} = file:open(IdxFile, [raw, binary, read]), + {ok, <>} = first_idx_record(Fd), + SegmentFile = segment_from_index_file(IdxFile), + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + next -> %% segment was not found, attach next %% this should be rare so no need to call the more optimal %% open_offset_reader_at/4 function init_offset_reader0(next, Conf) end end; -init_offset_reader0(first, #{dir := Dir} = Conf) -> - case build_log_overview(Dir) of - [#seg_info{file = File, - first = undefined}] -> +init_offset_reader0(first, #{} = Conf) -> + [FstIdxFile | _ ] = sorted_index_files(Conf), + case build_seg_info(FstIdxFile) of + {ok, #seg_info{file = File, + first = undefined}} -> %% empty log, attach at 0 open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); - [#seg_info{file = File, - first = #chunk_info{id = FirstChunkId, - pos = FilePos}} | _] -> + {ok, #seg_info{file = File, + first = #chunk_info{id = FirstChunkId, + pos = FilePos}}} -> open_offset_reader_at(File, FirstChunkId, FilePos, Conf); - _ -> - exit(no_segments_found) + {error, _} = Err -> + exit(Err) end; -init_offset_reader0(next, #{dir := Dir} = Conf) -> - SegInfos = build_log_overview(Dir), - case lists:reverse(SegInfos) of - [#seg_info{file = File, - last = LastChunk} | _] -> +init_offset_reader0(next, #{} = Conf) -> + [LastIdxFile | _ ] = sorted_index_files_rev(Conf), + case build_seg_info(LastIdxFile) of + {ok, #seg_info{file = File, + last = LastChunk}} -> {NextChunkId, FilePos} = next_location(LastChunk), open_offset_reader_at(File, NextChunkId, FilePos, Conf); - _ -> - exit(no_segments_found) + Err -> + exit(Err) end; -init_offset_reader0(last, #{dir := Dir} = Conf) -> - SegInfos = build_log_overview(Dir), - case lists:reverse(SegInfos) of - [#seg_info{file = File, - last = undefined}] -> - %% empty log, attach at 0 - open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); - [#seg_info{file = File, - last = #chunk_info{type = ?CHNK_USER, - id = LastChunkId, - pos = FilePos}} | _] -> - open_offset_reader_at(File, LastChunkId, FilePos, Conf); - _ -> - case last_user_chunk_location(SegInfos) of - not_found -> - ?DEBUG("~s:~s use chunk not found, fall back to next", - [?MODULE, ?FUNCTION_NAME]), - %% no user chunks in stream, this is awkward, fall back to next - init_offset_reader0(next, Conf); - {ChunkId, FilePos, #seg_info{file = File}} -> - open_offset_reader_at(File, ChunkId, FilePos, Conf) - end +init_offset_reader0(last, #{} = Conf) -> + IdxFiles = sorted_index_files_rev(Conf), + case last_user_chunk_location(IdxFiles) of + not_found -> + ?DEBUG("~s:~s use chunk not found, fall back to next", + [?MODULE, ?FUNCTION_NAME]), + %% no user chunks in stream, this is awkward, fall back to next + init_offset_reader0(next, Conf); + {ChunkId, FilePos, IdxFile} -> + File = segment_from_index_file(IdxFile), + open_offset_reader_at(File, ChunkId, FilePos, Conf) end; -init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) +init_offset_reader0(OffsetSpec, #{} = Conf) when is_integer(OffsetSpec) -> - SegInfos = build_log_overview(Dir), - ChunkRange = chunk_range_from_segment_infos(SegInfos), - Range = offset_range_from_chunk_range(ChunkRange), + IdxFiles = sorted_index_files(Conf), + Range = offset_range_from_idx_files(IdxFiles), ?DEBUG("osiris_log:init_offset_reader0/2 spec ~w range ~w ", - [OffsetSpec, Range]), + [OffsetSpec, Range]), try - StartOffset = - case {OffsetSpec, Range} of - {_, empty} -> - 0; - {Offset, {_, LastOffs}} - when Offset == LastOffs + 1 -> - %% next but we can't use `next` due to race conditions - Offset; - {Offset, {_, LastOffs}} - when Offset > LastOffs + 1 -> - %% out of range, clamp as `next` - throw({retry_with, next, Conf}); - {Offset, {FirstOffs, _LastOffs}} -> - max(FirstOffs, Offset) - end, - %% find the appopriate segment and scan the index to find the - %% postition of the next chunk to read - case find_segment_for_offset(StartOffset, SegInfos) of + %% clamp start offset + StartOffset = case {OffsetSpec, Range} of + {_, empty} -> + 0; + {Offset, {_, LastOffs}} + when Offset == LastOffs + 1 -> + %% next but we can't use `next` + %% due to race conditions + Offset; + {Offset, {_, LastOffs}} + when Offset > LastOffs + 1 -> + %% out of range, clamp as `next` + throw({retry_with, next, Conf}); + {Offset, {FirstOffs, _LastOffs}} -> + max(FirstOffs, Offset) + end, + + case find_segment_for_offset(StartOffset, IdxFiles) of not_found -> {error, {offset_out_of_range, Range}}; {end_of_log, #seg_info{file = SegmentFile, last = LastChunk}} -> {ChunkId, FilePos} = next_location(LastChunk), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); - {found, SegmentInfo = #seg_info{file = SegmentFile}} -> + {found, #seg_info{file = SegmentFile} = SegmentInfo} -> {ChunkId, _Epoch, FilePos} = - case scan_idx(StartOffset, SegmentInfo) of + case offset_idx_scan(StartOffset, SegmentInfo) of eof -> exit(offset_out_of_range); enoent -> @@ -1071,7 +1068,8 @@ init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) missing_file -> %% Retention policies are likely being applied, let's try again %% TODO: should we limit the number of retries? - init_offset_reader0(OffsetSpec, Conf); + %% Remove cached index_files from config + init_offset_reader0(OffsetSpec, maps:remove(index_files, Conf)); {retry_with, NewOffsSpec, NewConf} -> init_offset_reader0(NewOffsSpec, NewConf) end. @@ -1102,10 +1100,10 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, fd = Fd}}. %% Searches the index files backwards for the ID of the last user chunk. -last_user_chunk_location(SegInfos) when is_list(SegInfos) -> +last_user_chunk_location(RevdIdxFiles) when is_list(RevdIdxFiles) -> {Time, Result} = timer:tc( fun() -> - last_user_chunk_id0(lists:reverse(SegInfos)) + last_user_chunk_id0(RevdIdxFiles) end), ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1_000_000]), Result. @@ -1113,16 +1111,16 @@ last_user_chunk_location(SegInfos) when is_list(SegInfos) -> last_user_chunk_id0([]) -> %% There are no user chunks in any index files. not_found; -last_user_chunk_id0([#seg_info{index = IdxFile} = Info | Rest]) -> +last_user_chunk_id0([IdxFile | Rest]) -> try %% Do not read-ahead since we read the index file backwards chunk by chunk. {ok, IdxFd} = open(IdxFile, [read, raw, binary]), - {ok, _} = position_at_idx_record_boundary(IdxFd, eof), - Last = last_user_chunk_id_in_index(IdxFd), + {ok, EofPos} = position_at_idx_record_boundary(IdxFd, eof), + Last = last_user_chunk_id_in_index(EofPos - ?INDEX_RECORD_SIZE_B, IdxFd), _ = file:close(IdxFd), case Last of {ok, Id, Pos} -> - {Id, Pos, Info}; + {Id, Pos, IdxFile}; {error, Reason} -> ?DEBUG("Could not find user chunk in index file ~s (~p)", [IdxFile, Reason]), last_user_chunk_id0(Rest) @@ -1134,29 +1132,22 @@ last_user_chunk_id0([#seg_info{index = IdxFile} = Info | Rest]) -> end. %% Searches the index file backwards for the ID of the last user chunk. -last_user_chunk_id_in_index(IdxFd) -> - case file:position(IdxFd, {cur, -2*?INDEX_RECORD_SIZE_B}) of +last_user_chunk_id_in_index(NextPos, IdxFd) -> + case file:pread(IdxFd, NextPos, ?INDEX_RECORD_SIZE_B) of + {ok, <>} -> + {ok, Offset, FilePos}; + {ok, <<_Offset:64/unsigned, + _Timestamp:64/signed, + _Epoch:64/unsigned, + _FilePos:32/unsigned, + _ChType:8/unsigned>>} -> + last_user_chunk_id_in_index(NextPos - ?INDEX_RECORD_SIZE_B, IdxFd); {error, _} = Error -> - Error; - {ok, _NewPos} -> - case file:read(IdxFd, ?INDEX_RECORD_SIZE_B) of - {ok, - <>} -> - {ok, Offset, FilePos}; - {ok, - <<_Offset:64/unsigned, - _Timestamp:64/signed, - _Epoch:64/unsigned, - _FilePos:32/unsigned, - _ChType:8/unsigned>>} -> - last_user_chunk_id_in_index(IdxFd); - {error, _} = Error -> - Error - end + Error end. -spec committed_offset(state()) -> undefined | offset(). @@ -1295,7 +1286,7 @@ read_chunk_parsed(#?MODULE{mode = #read{type = RType, {error, term()} | {end_of_stream, state()}. send_file(Sock, State) -> - send_file(Sock, State, fun(_, S) -> S end). + send_file(Sock, State, fun(_, _) -> ok end). -spec send_file(gen_tcp:socket(), state(), fun((header_map(), non_neg_integer()) -> term())) -> @@ -1314,10 +1305,8 @@ send_file(Sock, num_records := NumRecords, data_size := DataSize, trailer_size := TrailerSize, - position := Pos} = - Header, - #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = - State1} -> + position := Pos} = Header, + #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} -> %% read header %% used to write frame headers to socket %% and return the number of bytes to sendfile @@ -1338,9 +1327,13 @@ send_file(Sock, %% or the chunk is a user type (for offset readers) case needs_handling(RType, Selector, ChType) of true -> + %% this avoids any data sent in the Callback to be dispatched + %% in it's own TCP frame + ok = setopts(Transport, Sock, [{nopush, true}]), _ = Callback(Header, ToSend), case sendfile(Transport, Fd, Sock, Pos, ToSend) of ok -> + ok = setopts(Transport, Sock, [{nopush, false}]), {ok, _} = file:position(Fd, NextFilePos), {ok, State}; Err -> @@ -1458,58 +1451,96 @@ parse_records(Offs, parse_records(Offs + NumRecs, Rem, [{Offs, {batch, NumRecs, CompType, UncompressedLen, Data}} | Acc]). -build_log_overview(Dir) when is_list(Dir) -> - {Time, Result} = timer:tc( - fun() -> - try - IdxFiles = - lists:sort( - filelib:wildcard( - filename:join(Dir, "*.index"))), - build_log_overview0(IdxFiles, []) - catch - missing_file -> - build_log_overview(Dir) - end - end), - ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1000000]), - Result. +sorted_index_files(#{index_files := IdxFiles}) -> + %% cached + IdxFiles; +sorted_index_files(#{dir := Dir}) -> + sorted_index_files(Dir); +sorted_index_files(Dir) when is_list(Dir) -> + Files = index_files_unsorted(Dir), + lists:sort(Files). + +sorted_index_files_rev(#{index_files := IdxFiles}) -> + %% cached + lists:reverse(IdxFiles); +sorted_index_files_rev(#{dir := Dir}) -> + sorted_index_files_rev(Dir); +sorted_index_files_rev(Dir) -> + Files = index_files_unsorted(Dir), + lists:sort(fun erlang:'>'/2, Files). + +index_files_unsorted(Dir) -> + case prim_file:list_dir(Dir) of + {error, enoent} -> + []; + {ok, Files} -> + [filename:join(Dir, F) + || F <- Files, filename:extension(F) == ".index"] + end. -build_log_overview0([], Acc) -> - lists:reverse(Acc); -build_log_overview0([IdxFile | IdxFiles], Acc0) -> - IdxFd = open_index_read(IdxFile), - case position_at_idx_record_boundary(IdxFd, {eof, -?INDEX_RECORD_SIZE_B}) of - {error, einval} when IdxFiles == [] andalso Acc0 == [] -> - %% this would happen if the file only contained a header +first_and_last_seginfos(#{index_files := IdxFiles}) -> + first_and_last_seginfos0(IdxFiles); +first_and_last_seginfos(#{dir := Dir}) -> + first_and_last_seginfos0(sorted_index_files(Dir)). + +first_and_last_seginfos0([]) -> + none; +first_and_last_seginfos0([FstIdxFile]) -> + %% this function is only used by init + {ok, SegInfo} = build_seg_info(FstIdxFile), + {1, SegInfo, SegInfo}; +first_and_last_seginfos0([FstIdxFile | Rem] = IdxFiles) -> + %% this function is only used by init + {ok, FstSegInfo} = build_seg_info(FstIdxFile), + LastIdxFile = lists:last(Rem), + case build_seg_info(LastIdxFile) of + {ok, #seg_info{first = undefined, + last = undefined}} -> + %% the last index file doesn't have any index records yet + %% retry without it + [_ | RetryIndexFiles] = lists:reverse(IdxFiles), + first_and_last_seginfos0(lists:reverse(RetryIndexFiles)); + {ok, LastSegInfo} -> + {length(Rem) + 1, FstSegInfo, LastSegInfo}; + {error, Err} -> + ?ERROR("~s: failed to build seg_info from file ~s, error: ~w", + [?MODULE, LastIdxFile, Err]), + error(Err) + end. + +build_seg_info(IdxFile) -> + %% do not nead read_ahead here + {ok, IdxFd} = open(IdxFile, [read, raw, binary]), + case last_idx_record(IdxFd) of + {ok, <<_Offset:64/unsigned, + _Timestamp:64/signed, + _Epoch:64/unsigned, + LastChunkPos:32/unsigned, + _ChType:8/unsigned>>} -> ok = file:close(IdxFd), SegFile = segment_from_index_file(IdxFile), - [#seg_info{file = SegFile, index = IdxFile}]; + build_segment_info(SegFile, LastChunkPos, IdxFile); {error, einval} -> + %% this would happen if the file only contained a header + ok = file:close(IdxFd), + SegFile = segment_from_index_file(IdxFile), + {ok, #seg_info{file = SegFile, index = IdxFile}}; + {error, _} = Err -> ok = file:close(IdxFd), - build_log_overview0(IdxFiles, Acc0); + Err + end. + +last_idx_record(IdxFd) -> + case position_at_idx_record_boundary(IdxFd, {eof, -?INDEX_RECORD_SIZE_B}) of {ok, _} -> - case file:read(IdxFd, ?INDEX_RECORD_SIZE_B) of - {ok, - <<_Offset:64/unsigned, - _Timestamp:64/signed, - _Epoch:64/unsigned, - LastChunkPos:32/unsigned, - _ChType:8/unsigned>>} -> - ok = file:close(IdxFd), - SegFile = segment_from_index_file(IdxFile), - Acc = build_segment_info(SegFile, - LastChunkPos, - IdxFile, - Acc0), - build_log_overview0(IdxFiles, Acc); - {error, enoent} -> - %% The retention policy could have just been applied - ok = file:close(IdxFd), - build_log_overview0(IdxFiles, Acc0) - end + file:read(IdxFd, ?INDEX_RECORD_SIZE_B); + Err -> + Err end. +first_idx_record(IdxFd) -> + file:pread(IdxFd, ?IDX_HEADER_SIZE, ?INDEX_RECORD_SIZE_B). + %% Some file:position/2 operations are subject to race conditions. In particular, `eof` may position the Fd %% in the middle of a record being written concurrently. If that happens, we need to re-position at the nearest %% record boundry. See https://github.com/rabbitmq/osiris/issues/73 @@ -1523,15 +1554,15 @@ position_at_idx_record_boundary(IdxFd, At) -> Error -> Error end. -build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> +build_segment_info(SegFile, LastChunkPos, IdxFile) -> try {ok, Fd} = open(SegFile, [read, binary, raw]), - %% skip header, - {ok, ?LOG_HEADER_SIZE} = file:position(Fd, ?LOG_HEADER_SIZE), - case file:read(Fd, ?HEADER_SIZE_B) of + %% we don't want to read blocks into page cache we are unlikely to need + _ = file:advise(Fd, 0, 0, random), + case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of eof -> _ = file:close(Fd), - Acc0; + eof; {ok, < FirstSize:32/unsigned, FirstTSize:32/unsigned, _/binary>>} -> - {ok, LastChunkPos} = file:position(Fd, LastChunkPos), {ok, < _LastCrc:32/integer, LastSize:32/unsigned, LastTSize:32/unsigned, - _Reserved:32>>} = - file:read(Fd, ?HEADER_SIZE_B), + _Reserved:32>>} = file:pread(Fd, LastChunkPos, ?HEADER_SIZE_B), Size = LastChunkPos + LastSize + LastTSize + ?HEADER_SIZE_B, {ok, Eof} = file:position(Fd, eof), ?DEBUG_IF("~s: segment ~s has trailing data ~w ~w", [?MODULE, filename:basename(SegFile), Size, Eof], Size =/= Eof), _ = file:close(Fd), - [#seg_info{file = SegFile, + {ok, #seg_info{file = SegFile, index = IdxFile, size = Size, first = @@ -1584,24 +1613,23 @@ build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> num = LastNumRecords, type = LastChType, size = LastSize + LastTSize, - pos = LastChunkPos}} - | Acc0] + pos = LastChunkPos}}} end catch missing_file -> %% Indexes and segments could be deleted by retention policies while %% the log overview is being built. Ignore those segments and keep going - Acc0 + missing_file end. -spec overview(term()) -> {range(), [{epoch(), offset()}]}. overview(Dir) -> - case build_log_overview(Dir) of + case sorted_index_files(Dir) of [] -> {empty, []}; - SegInfos -> - Range = offset_range_from_segment_infos(SegInfos), - EpochOffsets = last_epoch_offsets(SegInfos), + IdxFiles -> + Range = offset_range_from_idx_files(IdxFiles), + EpochOffsets = last_epoch_offsets(IdxFiles), {Range, EpochOffsets} end. @@ -1650,100 +1678,139 @@ update_retention(Retention, evaluate_retention(Dir, Specs) -> {Time, Result} = timer:tc( fun() -> - SegInfos0 = build_log_overview(Dir), - SegInfos = evaluate_retention0(SegInfos0, Specs), - OffsetRange = offset_range_from_segment_infos(SegInfos), - FirstTs = first_timestamp_from_segment_infos(SegInfos), - {OffsetRange, FirstTs, length(SegInfos)} + IdxFiles0 = sorted_index_files(Dir), + IdxFiles = evaluate_retention0(IdxFiles0, Specs), + OffsetRange = offset_range_from_idx_files(IdxFiles), + FirstTs = first_timestamp_from_index_files(IdxFiles), + {OffsetRange, FirstTs, length(IdxFiles)} end), - ?DEBUG("~s:~s/~b (~w) completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Specs, Time/1_000_000]), + ?DEBUG("~s:~s/~b (~w) completed in ~fs", + [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Specs, Time/1_000_000]), Result. -evaluate_retention0(Infos, []) -> - %% we should never hit empty infos as one should always be left - Infos; -evaluate_retention0(Infos, [{max_bytes, MaxSize} | Specs]) -> - RemSegs = eval_max_bytes(Infos, MaxSize), - evaluate_retention0(RemSegs, Specs); -evaluate_retention0(Infos, [{max_age, Age} | Specs]) -> - RemSegs = eval_age(Infos, Age), - evaluate_retention0(RemSegs, Specs). - -eval_age([#seg_info{last = #chunk_info{timestamp = Ts}, - size = Size} = - Old - | Rem] = - Infos, - Age) -> - Now = erlang:system_time(millisecond), - case Ts < Now - Age - andalso length(Rem) > 0 - andalso Size > ?LOG_HEADER_SIZE - of - true -> - %% the oldest timestamp is older than retention - %% and there are other segments available - %% we can delete - ok = delete_segment(Old), - eval_age(Rem, Age); - false -> - Infos - end; -eval_age(Infos, _Age) -> - Infos. - -eval_max_bytes(SegInfos, MaxSize) -> - TotalSize = - lists:foldl(fun(#seg_info{size = Size}, Acc) -> Acc + Size end, 0, - SegInfos), - case SegInfos of - _ when length(SegInfos) =< 1 -> - SegInfos; - [_, #seg_info{size = 0}] -> - SegInfos; +evaluate_retention0(IdxFiles, []) -> + IdxFiles; +evaluate_retention0(IdxFiles, [{max_bytes, MaxSize} | Specs]) -> + RemIdxFiles = eval_max_bytes(IdxFiles, MaxSize), + evaluate_retention0(RemIdxFiles, Specs); +evaluate_retention0(IdxFiles, [{max_age, Age} | Specs]) -> + RemIdxFiles = eval_age(IdxFiles, Age), + evaluate_retention0(RemIdxFiles, Specs). + +eval_age([_] = IdxFiles, _Age) -> + IdxFiles; +eval_age([IdxFile | IdxFiles] = AllIdxFiles, Age) -> + case last_timestamp_in_index_file(IdxFile) of + {ok, Ts} -> + Now = erlang:system_time(millisecond), + case Ts < Now - Age of + true -> + %% the oldest timestamp is older than retention + %% and there are other segments available + %% we can delete + ok = delete_segment_from_index(IdxFile), + eval_age(IdxFiles, Age); + false -> + AllIdxFiles + end; + _Err -> + AllIdxFiles + end. + +eval_max_bytes(IdxFiles, MaxSize) -> + case IdxFiles of + [] -> + []; + [_] -> + IdxFiles; _ -> + TotalSize = + lists:foldl(fun(IdxFile, Acc) -> + SegFile = segment_from_index_file(IdxFile), + case prim_file:read_file_info(SegFile) of + {ok, #file_info{size = Size}} -> + Acc + Size; + _ -> + Acc + end + end, 0, IdxFiles), case TotalSize > MaxSize of true -> %% we can delete at least one segment - [Old | Rem] = SegInfos, - ok = delete_segment(Old), + [Old | Rem] = IdxFiles, + ok = delete_segment_from_index(Old), eval_max_bytes(Rem, MaxSize); false -> - SegInfos + IdxFiles end end. -%% returns a list of the last offset by epoch -last_epoch_offsets([#seg_info{first = undefined, - last = undefined}]) -> - []; -last_epoch_offsets([#seg_info{index = IdxFile, - first = #chunk_info{epoch = FstE, id = FstChId}} - | SegInfos]) -> - {Time, Result} = - timer:tc( - fun() -> - FstFd = open_index_read(IdxFile), - {LastE, LastO, Res} = - lists:foldl( - fun(#seg_info{index = I}, Acc) -> - Fd = open_index_read(I), - last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), - Fd, Acc) - end, - last_epoch_offset(file:read(FstFd, ?INDEX_RECORD_SIZE_B), - FstFd, {FstE, FstChId, []}), - SegInfos), - lists:reverse([{LastE, LastO} | Res]) - end), - ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, - ?FUNCTION_ARITY, Time/1000000]), +last_epoch_offsets([IdxFile]) -> + Fd = open_index_read(IdxFile), + _ = file:advise(Fd, 0, 0, sequential), + case last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, undefined) of + undefined -> + []; + {LastE, LastO, Res} -> + lists:reverse([{LastE, LastO} | Res]) + end; +last_epoch_offsets([FstIdxFile | _] = IdxFiles) -> + F = fun() -> + {ok, FstFd} = open(FstIdxFile, [read, raw, binary]), + %% on linux this disables read-ahead so should only + %% bring a single block into memory + %% having the first block of index files in page cache + %% should generally be a good thing + _ = file:advise(FstFd, 0, 0, random), + {ok, <>} = first_idx_record(FstFd), + ok = file:close(FstFd), + {LastE, LastO, Res} = + lists:foldl( + fun(IdxFile, {E, _, EOs} = Acc) -> + Fd = open_index_read(IdxFile), + {ok, <>} = last_idx_record(Fd), + case Epoch > E of + true -> + %% we need to scan as the last index record + %% has a greater epoch + _ = file:advise(Fd, 0, 0, sequential), + {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), + last_epoch_offset( + file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, + Acc); + false -> + ok = file:close(Fd), + {Epoch, Offset, EOs} + end + end, {FstE, FstO, []}, IdxFiles), + lists:reverse([{LastE, LastO} | Res]) + end, + {Time, Result} = timer:tc(F), + ?DEBUG("~s:~s/~b completed in ~bms", + [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time div 1000]), Result. %% aggregates the chunk offsets for each epoch last_epoch_offset(eof, Fd, Acc) -> ok = file:close(Fd), Acc; +last_epoch_offset({ok, + <>}, + Fd, undefined) -> + last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, + {Epoch, Offset, []}); last_epoch_offset({ok, < + +segment_from_index_file(IdxFile) when is_list(IdxFile) -> Basename = filename:basename(IdxFile, ".index"), BaseDir = filename:dirname(IdxFile), - SegFile0 = filename:join([BaseDir, Basename]), - SegFile0 ++ ".segment". + filename:join([BaseDir, Basename ++ ".segment"]). make_chunk(Blobs, TData, ChType, Timestamp, Epoch, Next) -> {NumEntries, NumRecords, EData} = @@ -1853,24 +1920,31 @@ write_chunk(Chunk, ok = file:close(IdxFd), State#?MODULE{fd = undefined, index_fd = undefined, - mode = - Write#write{tail_info = - {NextOffset, - {Epoch, Next, Timestamp}}, - segment_size = {0, 0}}}; + mode = Write#write{tail_info = + {NextOffset, + {Epoch, Next, Timestamp}}, + segment_size = {0, 0}}}; false -> State#?MODULE{mode = Write#write{tail_info = {NextOffset, {Epoch, Next, Timestamp}}, - segment_size = {SegSizeBytes + Size, SegSizeChunks + 1}}} + segment_size = {SegSizeBytes + Size, + SegSizeChunks + 1}}} end. max_segment_size_reached(SegFd, CurrentSizeChunks, #cfg{max_segment_size_bytes = MaxSizeBytes, max_segment_size_chunks = MaxSizeChunks}) -> {ok, CurrentSizeBytes} = file:position(SegFd, cur), - CurrentSizeBytes >= MaxSizeBytes orelse CurrentSizeChunks >= MaxSizeChunks - 1. + CurrentSizeBytes >= MaxSizeBytes orelse + CurrentSizeChunks >= MaxSizeChunks - 1. + + +setopts(tcp, Sock, Opts) -> + ok = inet:setopts(Sock, Opts); +setopts(ssl, Sock, Opts) -> + ok = ssl:setopts(Sock, Opts). sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> ok; @@ -1892,18 +1966,57 @@ sendfile(ssl, Fd, Sock, Pos, ToSend) -> Err end. -first_timestamp_from_segment_infos( - [#seg_info{first = #chunk_info{timestamp = Ts}} | _ ]) -> - Ts; -first_timestamp_from_segment_infos(_) -> - 0. +last_timestamp_in_index_file(IdxFile) -> + case file:open(IdxFile, [raw, binary, read]) of + {ok, IdxFd} -> + case last_idx_record(IdxFd) of + {ok, <<_O:64/unsigned, + LastTimestamp:64/signed, + _E:64/unsigned, + _ChunkPos:32/unsigned, + _ChType:8/unsigned>>} -> + _ = file:close(IdxFd), + {ok, LastTimestamp}; + Err -> + _ = file:close(IdxFd), + Err + end; + Err -> + Err + end. + +first_timestamp_from_index_files([IdxFile | _]) -> + case file:open(IdxFile, [raw, binary, read]) of + {ok, IdxFd} -> + case first_idx_record(IdxFd) of + {ok, <<_FstO:64/unsigned, + FstTimestamp:64/signed, + _FstE:64/unsigned, + _FstChunkPos:32/unsigned, + _FstChType:8/unsigned>>} -> + _ = file:close(IdxFd), + FstTimestamp; + _ -> + _ = file:close(IdxFd), + 0 + end; + _Err -> + 0 + end. + +offset_range_from_idx_files([IdxFile]) -> + {ok, #seg_info{first = First, + last = Last}} = build_seg_info(IdxFile), + offset_range_from_chunk_range({First, Last}); +offset_range_from_idx_files(IdxFiles) when is_list(IdxFiles) -> + {_, FstSI, LstSI} = first_and_last_seginfos0(IdxFiles), + ChunkRange = chunk_range_from_segment_infos([FstSI, LstSI]), + offset_range_from_chunk_range(ChunkRange). offset_range_from_segment_infos(SegInfos) -> ChunkRange = chunk_range_from_segment_infos(SegInfos), offset_range_from_chunk_range(ChunkRange). -chunk_range_from_segment_infos([]) -> - empty; chunk_range_from_segment_infos([#seg_info{first = undefined, last = undefined}]) -> empty; @@ -1914,42 +2027,47 @@ chunk_range_from_segment_infos(SegInfos) when is_list(SegInfos) -> offset_range_from_chunk_range(empty) -> empty; +offset_range_from_chunk_range({undefined, undefined}) -> + empty; offset_range_from_chunk_range({#chunk_info{id = FirstChId}, #chunk_info{id = LastChId, num = LastNumRecs}}) -> {FirstChId, LastChId + LastNumRecs - 1}. -%% find the segment the offset is in _or_ if the offset is the very next -%% chunk offset it will return the last segment -find_segment_for_offset(0, - [#seg_info{first = undefined, last = undefined} = - Info]) -> - {end_of_log, Info}; -find_segment_for_offset(Offset, - [#seg_info{last = - #chunk_info{id = LastChId, - num = LastNumRecs}} = - Info]) - when Offset == LastChId + LastNumRecs -> - %% the last segment and offset is the next offset - {end_of_log, Info}; -find_segment_for_offset(Offset, - [#seg_info{first = #chunk_info{id = FirstChId}, - last = - #chunk_info{id = LastChId, - num = LastNumRecs}} = - Info - | Rem]) -> - NextChId = LastChId + LastNumRecs, - case Offset >= FirstChId andalso Offset < NextChId of - true -> - %% we found it - {found, Info}; +find_segment_for_offset(Offset, IdxFiles) -> + %% we assume index files are in the default low-> high order here + case lists:search( + fun(IdxFile) -> + Offset >= index_file_first_offset(IdxFile) + end, lists:reverse(IdxFiles)) of + {value, File} -> + case build_seg_info(File) of + {ok, #seg_info{first = undefined, + last = undefined} = Info} -> + {end_of_log, Info}; + {ok, #seg_info{last = + #chunk_info{id = LastChId, + num = LastNumRecs}} = Info} + when Offset == LastChId + LastNumRecs -> + %% the last segment and offset is the next offset + {end_of_log, Info}; + {ok, #seg_info{first = #chunk_info{id = FirstChId}, + last = #chunk_info{id = LastChId, + num = LastNumRecs}} = Info} -> + NextChId = LastChId + LastNumRecs, + case Offset >= FirstChId andalso Offset < NextChId of + true -> + %% we found it + {found, Info}; + false -> + not_found + end; + {error, _} = Err -> + Err + end; false -> - find_segment_for_offset(Offset, Rem) - end; -find_segment_for_offset(_Offset, _) -> - not_found. + not_found + end. can_read_next_offset(#read{type = offset, next_offset = NextOffset, @@ -1965,15 +2083,13 @@ make_file_name(N, Suff) -> lists:flatten( io_lib:format("~20..0B.~s", [N, Suff])). -open_new_segment(#?MODULE{cfg = - #cfg{directory = Dir, - counter = Cnt}, +open_new_segment(#?MODULE{cfg = #cfg{directory = Dir, + counter = Cnt}, fd = undefined, index_fd = undefined, - mode = - #write{type = _WriterType, - tail_info = {NextOffset, _}}} = - State0) -> + mode = #write{type = _WriterType, + tail_info = {NextOffset, _}}} = + State0) -> Filename = make_file_name(NextOffset, "segment"), IdxFilename = make_file_name(NextOffset, "index"), ?DEBUG("~s: ~s : ~s", [?MODULE, ?FUNCTION_NAME, Filename]), @@ -1999,81 +2115,56 @@ open_index_read(File) -> %% We can't use the assertion that index header is correct because of a %% race condition between opening the file and writing the header %% It seems to happen when retention policies are applied - %% {ok, ?IDX_HEADER} = file:read(Fd, ?IDX_HEADER_SIZE) - _ = file:read(Fd, ?IDX_HEADER_SIZE), + {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), Fd. -scan_idx(Offset, #seg_info{index = IndexFile, - last = LastChunkInSegment} = SegmentInfo) -> - {Time, Result} = timer:tc( - fun() -> - case offset_range_from_segment_infos([SegmentInfo]) of - empty -> - eof; - {SegmentStartOffs, SegmentEndOffs} -> - case Offset < SegmentStartOffs orelse - Offset > SegmentEndOffs of - true -> - offset_out_of_range; - false -> - IndexFd = open_index_read(IndexFile), - Result = scan_idx(IndexFd, Offset, - LastChunkInSegment), - _ = file:close(IndexFd), - Result - end - end - end), +offset_idx_scan(Offset, #seg_info{index = IndexFile} = SegmentInfo) -> + {Time, Result} = + timer:tc( + fun() -> + case offset_range_from_segment_infos([SegmentInfo]) of + empty -> + eof; + {SegmentStartOffs, SegmentEndOffs} -> + case Offset < SegmentStartOffs orelse + Offset > SegmentEndOffs of + true -> + offset_out_of_range; + false -> + IndexFd = open_index_read(IndexFile), + _ = file:advise(IndexFd, 0, 0, sequential), + offset_idx_scan0(IndexFd, Offset, not_found) + end + end + end), ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1000000]), Result. -scan_idx(Fd, Offset, #chunk_info{id = LastChunkInSegmentId, - num = LastChunkInSegmentNum}) -> +offset_idx_scan0(Fd, Offset, PreviousChunk) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, <>} -> - LastOffsetInSegment = LastChunkInSegmentId + LastChunkInSegmentNum - 1, - case Offset < ChunkId orelse Offset > LastOffsetInSegment of - true -> - %% shouldn't really happen as we check the range above - offset_out_of_range; - false -> - %% offset is in this segment - scan_idx0(Fd, Offset, {ChunkId, Epoch, FilePos}) - end; - eof -> - %% this should never happen - offset is in the range and we are reading the first record - eof; - {error, Posix} -> - Posix - end. - -scan_idx0(Fd, Offset, PreviousChunk) -> - case file:read(Fd, ?INDEX_RECORD_SIZE_B) of - {ok, - <>} -> case Offset < ChunkId of true -> %% offset we are looking for is higher or equal %% to the start of the previous chunk %% but lower than the start of the current chunk -> %% return the previous chunk + _ = file:close(Fd), PreviousChunk; false -> - scan_idx0(Fd, Offset, {ChunkId, Epoch, FilePos}) + offset_idx_scan0(Fd, Offset, {ChunkId, Epoch, FilePos}) end; eof -> + _ = file:close(Fd), %% Offset must be in the last chunk as there is no more data PreviousChunk; {error, Posix} -> + _ = file:close(Fd), Posix end. @@ -2085,7 +2176,7 @@ throw_missing(Any) -> open(SegFile, Options) -> throw_missing(file:open(SegFile, Options)). -chunk_location_for_timestamp(#seg_info{index = Idx}, Ts) -> +chunk_location_for_timestamp(Idx, Ts) -> Fd = open_index_read(Idx), %% scan index file for nearest timestamp {ChunkId, _Timestamp, _Epoch, FilePos} = timestamp_idx_scan(Fd, Ts), @@ -2329,6 +2420,93 @@ next_location(#chunk_info{id = Id, size = Size}) -> {Id + Num, Pos + Size + ?HEADER_SIZE_B}. +index_file_first_offset(IdxFile) -> + list_to_integer(filename:basename(IdxFile, ".index")). + +first_last_timestamps(IdxFile) -> + case file:open(IdxFile, [raw, read, binary]) of + {ok, Fd} -> + _ = file:advise(Fd, 0, 0, random), + case first_idx_record(Fd) of + {ok, <<_:64/unsigned, + FirstTs:64/signed, + _:64/unsigned, + _:32/unsigned, + _:8/unsigned>>} -> + %% if we can get the first we can get the last + {ok, <<_:64/unsigned, + LastTs:64/signed, + _:64/unsigned, + _:32/unsigned, + _:8/unsigned>>} = last_idx_record(Fd), + ok = file:close(Fd), + {FirstTs, LastTs}; + {error, einval} -> + %% empty index + undefined; + eof -> + eof + end; + _Err -> + file_not_found + end. + + + +%% accepts a list of index files in reverse order +%% [{21, 30}, {12, 20}, {5, 10}] +%% 11 = {12, 20} +timestamp_idx_file_search(Ts, [FstIdxFile | Older]) -> + case first_last_timestamps(FstIdxFile) of + {_FstTs, EndTs} + when Ts > EndTs -> + %% timestamp greater than the newest timestamp in the stream + %% attach at 'next' + next; + {FstTs, _EndTs} + when Ts < FstTs -> + %% the requested timestamp is older than the first timestamp in + %% this segment, keep scanning + timestamp_idx_file_search0(Ts, Older, FstIdxFile); + {_, _} -> + %% else we must have found it! + {scan, FstIdxFile}; + file_not_found -> + %% the requested timestamp is older than the first timestamp in + %% this segment, keep scanning + timestamp_idx_file_search0(Ts, Older, FstIdxFile); + eof -> + %% empty log + next + end. + +timestamp_idx_file_search0(Ts, [], IdxFile) -> + case first_last_timestamps(IdxFile) of + {FstTs, _LastTs} + when Ts < FstTs -> + {first_in, IdxFile}; + _ -> + {found, IdxFile} + end; +timestamp_idx_file_search0(Ts, [IdxFile | Older], Prev) -> + case first_last_timestamps(IdxFile) of + {_FstTs, EndTs} + when Ts > EndTs -> + %% we should attach the the first chunk in the previous segment + %% as the requested timestamp must fall in between the + %% current and previous segments + {first_in, Prev}; + {FstTs, _EndTs} + when Ts < FstTs -> + %% the requested timestamp is older than the first timestamp in + %% this segment, keep scanning + timestamp_idx_file_search0(Ts, Older, IdxFile); + _ -> + %% else we must have found it! + {scan, IdxFile} + end. + + -ifdef(TEST). % -include_lib("eunit/include/eunit.hrl"). diff --git a/src/osiris_replica.erl b/src/osiris_replica.erl index 153f1f1c39ac..d7eb7cb48a45 100644 --- a/src/osiris_replica.erl +++ b/src/osiris_replica.erl @@ -225,26 +225,7 @@ init(#{name := Name, start_offset => TailInfo, reference => ExtRef, connection_token => Token}, - RRPid = - case supervisor:start_child({osiris_replica_reader_sup, Node}, - #{id => make_ref(), - start => - {osiris_replica_reader, start_link, - [ReplicaReaderConf]}, - %% replica readers should never be - %% restarted by their sups - %% instead they need to be re-started - %% by their replica - restart => temporary, - shutdown => 5000, - type => worker, - modules => [osiris_replica_reader]}) - of - {ok, Pid} -> - Pid; - {ok, Pid, _} -> - Pid - end, + RRPid = osiris_replica_reader:start(Node, ReplicaReaderConf), true = link(RRPid), Interval = maps:get(replica_gc_interval, Config, 5000), erlang:send_after(Interval, self(), force_gc), diff --git a/src/osiris_replica_reader.erl b/src/osiris_replica_reader.erl index ebf90400687a..b8544f3b673e 100644 --- a/src/osiris_replica_reader.erl +++ b/src/osiris_replica_reader.erl @@ -17,6 +17,7 @@ %% API functions -export([start_link/1, + start/2, stop/1]). %% gen_server callbacks -export([init/1, @@ -102,6 +103,27 @@ start_link(Conf) -> stop(Pid) -> gen_server:cast(Pid, stop). +start(Node, ReplicaReaderConf) when is_map(ReplicaReaderConf) -> + case supervisor:start_child({osiris_replica_reader_sup, Node}, + #{id => make_ref(), + start => + {osiris_replica_reader, start_link, + [ReplicaReaderConf]}, + %% replica readers should never be + %% restarted by their sups + %% instead they need to be re-started + %% by their replica + restart => temporary, + shutdown => 5000, + type => worker, + modules => [osiris_replica_reader]}) + of + {ok, Pid} -> + Pid; + {ok, Pid, _} -> + Pid + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -203,7 +225,6 @@ handle_call(_Request, _From, State) -> %%-------------------------------------------------------------------- handle_cast({more_data, _LastOffset}, #state{leader_pid = LeaderPid} = State0) -> - % ?DEBUG("MORE DATA ~b", [_LastOffset]), #state{log = Log} = State = do_sendfile(State0), NextOffs = osiris_log:next_offset(Log), ok = osiris_writer:register_data_listener(LeaderPid, NextOffs), diff --git a/src/osiris_util.erl b/src/osiris_util.erl index 6f9e87ab43d4..6252355d82d0 100644 --- a/src/osiris_util.erl +++ b/src/osiris_util.erl @@ -235,7 +235,6 @@ inet_tls_enabled([{proto_dist, ["inet_tls"]} | _]) -> inet_tls_enabled([_Opt | Tail]) -> inet_tls_enabled(Tail). - partition_parallel(F, Es, Timeout) -> Parent = self(), Running = [{spawn_monitor(fun() -> Parent ! {self(), F(E)} end), E} diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 019b207e520e..cabe465e6e78 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -47,7 +47,9 @@ all_tests() -> init_offset_reader_last_chunk_is_not_user_chunk, init_offset_reader_no_user_chunk_in_last_segment, init_offset_reader_no_user_chunk_in_segments, + init_offset_reader_timestamp_empty, init_offset_reader_timestamp, + init_offset_reader_timestamp_multi_segment, init_offset_reader_truncated, init_data_reader_next, init_data_reader_empty_log, @@ -70,7 +72,8 @@ all_tests() -> evaluate_retention_max_bytes, evaluate_retention_max_age, offset_tracking, - offset_tracking_snapshot + offset_tracking_snapshot, + many_segment_overview ]. groups() -> @@ -537,6 +540,25 @@ init_offset_reader_no_user_chunk_in_segments(Config) -> osiris_log:close(L2), ok. +init_offset_reader_timestamp_empty(Config) -> + ok = logger:set_primary_config(level, all), + Now = now_ms(), + EpochChunks = [], + LDir = ?config(leader_dir, Config), + Conf0 = ?config(osiris_conf, Config), + %% TODO: separate multi segment test + % application:set_env(osiris, max_segment_size_chunks, 1), + Conf = Conf0#{max_segment_size_chunks => 1}, + LLog0 = seed_log(LDir, EpochChunks, Config), + set_offset_ref(Conf, 3), + osiris_log:close(LLog0), + RConf = Conf#{dir => LDir}, + + {ok, L1} = osiris_log:init_offset_reader({timestamp, Now - 8000}, RConf), + ?assertEqual(0, osiris_log:next_offset(L1)), + osiris_log:close(L1), + ok. + init_offset_reader_timestamp(Config) -> ok = logger:set_primary_config(level, all), Now = now_ms(), @@ -545,7 +567,8 @@ init_offset_reader_timestamp(Config) -> {1, Now - 8000, [<<"two">>]}, % 1 {1, Now - 5000, [<<"three">>, <<"four">>]}], % 2 LDir = ?config(leader_dir, Config), - Conf = ?config(osiris_conf, Config), + Conf0 = ?config(osiris_conf, Config), + Conf = Conf0#{max_segment_size_chunks => 1}, LLog0 = seed_log(LDir, EpochChunks, Config), set_offset_ref(Conf, 3), osiris_log:close(LLog0), @@ -567,8 +590,56 @@ init_offset_reader_timestamp(Config) -> osiris_log:init_offset_reader({timestamp, Now - 10000}, RConf), ?assertEqual(0, osiris_log:next_offset(L3)), osiris_log:close(L3), + + %% in between + {ok, L4} = + osiris_log:init_offset_reader({timestamp, Now - 6000}, RConf), + ?assertEqual(2, osiris_log:next_offset(L4)), + osiris_log:close(L4), ok. +init_offset_reader_timestamp_multi_segment(Config) -> + ok = logger:set_primary_config(level, all), + Now = now_ms(), + EpochChunks = + [{1, Now - 10000, [<<"one">>]}, % 0 + {1, Now - 8000, [<<"two">>]}, % 1 + {1, Now - 5000, [<<"three">>, <<"four">>]}], % 2 + LDir = ?config(leader_dir, Config), + Conf0 = ?config(osiris_conf, Config), + %% segment per chunk + application:set_env(osiris, max_segment_size_chunks, 1), + Conf = Conf0#{max_segment_size_chunks => 1}, + LLog0 = seed_log(LDir, EpochChunks, Config), + + set_offset_ref(Conf, 3), + osiris_log:close(LLog0), + application:unset_env(osiris, max_segment_size_chunks), + RConf = Conf#{dir => LDir}, + + {ok, L1} = + osiris_log:init_offset_reader({timestamp, Now - 8000}, RConf), + %% next offset is expected to be offset 1 + ?assertEqual(1, osiris_log:next_offset(L1)), + osiris_log:close(L1), + + %% future case + {ok, L2} = osiris_log:init_offset_reader({timestamp, Now}, RConf), + ?assertEqual(4, osiris_log:next_offset(L2)), + osiris_log:close(L2), + + %% past case + {ok, L3} = + osiris_log:init_offset_reader({timestamp, Now - 10000}, RConf), + ?assertEqual(0, osiris_log:next_offset(L3)), + osiris_log:close(L3), + + %% in between + {ok, L4} = + osiris_log:init_offset_reader({timestamp, Now - 6000}, RConf), + ?assertEqual(2, osiris_log:next_offset(L4)), + osiris_log:close(L4), + ok. init_offset_reader_truncated(Config) -> Data = crypto:strong_rand_bytes(1500), EpochChunks = @@ -1043,7 +1114,7 @@ offset_tracking_snapshot(Config) -> make_trailer(offset, <<"id1">>, 1), S0), %% this should create at least two segments - {_, S2} = seed_log(S1, EpochChunks, Config, T0), + {_, S2} = seed_log(Conf, S1, EpochChunks, Config, T0), osiris_log:close(S2), S3 = osiris_log:init(Conf), T = osiris_log:recover_tracking(S3), @@ -1053,6 +1124,111 @@ offset_tracking_snapshot(Config) -> osiris_log:close(S3), ok. +many_segment_overview(Config) -> + Data = crypto:strong_rand_bytes(1000), + EpochChunks = + [{1, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)] ++ + [{2, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)] ++ + [{3, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)] ++ + [{4, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)] ++ + [{5, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)], + Conf0 = ?config(osiris_conf, Config), + Conf = Conf0#{max_segment_size_bytes => 64000}, + osiris_log:close(seed_log(Conf, EpochChunks, Config)), + %% {40051,{{0,40959},[{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}} + {OverviewTaken, LogOverview} = timer:tc(fun () -> + osiris_log:overview(maps:get(dir, Conf)) + end), + ct:pal("OverviewTaken ~p", [OverviewTaken]), + ct:pal("~p", [LogOverview]), + %% {{0,40959},[{-1,-1},{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]} + ?assertEqual({{0,40959}, + [{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, LogOverview), + Conf6 = Conf#{epoch => 6}, + + {InitTaken, _} = timer:tc( + fun () -> + osiris_log:close(osiris_log:init(Conf6)) + end), + ct:pal("InitTaken ~p", [InitTaken]), + + {OffsLastTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(last, Conf6), + osiris_log:close(L) + end), + ct:pal("OffsLastTaken ~p", [OffsLastTaken]), + + {OffsFirstTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(first, Conf6), + osiris_log:close(L) + end), + ct:pal("OffsFirstTaken ~p", [OffsFirstTaken]), + + {OffsNextTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(next, Conf6), + osiris_log:close(L) + end), + ct:pal("OffsNextTaken ~p", [OffsNextTaken]), + + {OffsOffsetTakenHi, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(40000, Conf6), + osiris_log:close(L) + end), + ct:pal("OffsOffsetTakenHi ~p", [OffsOffsetTakenHi]), + {OffsOffsetTakenLow, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(400, Conf6), + osiris_log:close(L) + end), + ct:pal("OffsOffsetTakenLow ~p", [OffsOffsetTakenLow]), + + + {OffsOffsetTakenMid, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(20000, Conf6), + osiris_log:close(L) + end), + ct:pal("OffsOffsetTakenMid ~p", [OffsOffsetTakenMid]), + + %% TODO: timestamp + Ts = erlang:system_time(millisecond) - 100, + {TimestampTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader({timestamp, Ts}, + Conf6), + osiris_log:close(L) + end), + ct:pal("TimestampTaken ~p", [TimestampTaken]), + + %% acceptor + {Range, EOffs} = LogOverview, + {InitAcceptorTaken, AcceptorLog} = + timer:tc(fun () -> + osiris_log:init_acceptor(Range, EOffs, Conf6) + end), + ct:pal("InitAcceptor took ~bus", [InitAcceptorTaken]), + + {InitDataReaderTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_data_reader( + osiris_log:tail_info(AcceptorLog), Conf6), + osiris_log:close(L) + end), + ct:pal("InitDataReaderTaken ~p", [InitDataReaderTaken]), + + %% evaluate_retention + Specs = [{max_age, 60000}, {max_bytes, 5_000_000_000}], + {RetentionTaken, _} = + timer:tc(fun () -> + osiris_log:evaluate_retention(maps:get(dir, Conf), Specs) + end), + ct:pal("RetentionTaken ~p", [RetentionTaken]), + ok. + %% Utility seed_log(Conf, EpochChunks, Config) -> @@ -1061,22 +1237,23 @@ seed_log(Conf, EpochChunks, Config) -> seed_log(Conf, EpochChunks, Config, Trk) when is_map(Conf) -> Log0 = osiris_log:init(Conf), - seed_log(Log0, EpochChunks, Config, Trk); + seed_log(Conf, Log0, EpochChunks, Config, Trk); seed_log(Dir, EpochChunks, Config, Trk) when is_list(Dir) -> seed_log(#{dir => Dir, epoch => 1, max_segment_size_bytes => 1000 * 1000, name => ?config(test_case, Config)}, - EpochChunks, Config, Trk); -seed_log(Log, EpochChunks, _Config, Trk) -> + EpochChunks, Config, Trk). + +seed_log(Conf, Log, EpochChunks, _Config, Trk) -> lists:foldl(fun ({Epoch, Records}, {T, L}) -> - write_chunk(Epoch, now_ms(), Records, T, L); + write_chunk(Conf, Epoch, now_ms(), Records, T, L); ({Epoch, Ts, Records}, {T, L}) -> - write_chunk(Epoch, Ts, Records, T, L) + write_chunk(Conf, Epoch, Ts, Records, T, L) end, {Trk, Log}, EpochChunks). -write_chunk(Epoch, Now, Records, Trk0, Log0) -> +write_chunk(Conf, Epoch, Now, Records, Trk0, Log0) -> HasTracking = not osiris_tracking:is_empty(Trk0), case osiris_log:is_open(Log0) of false when HasTracking -> @@ -1084,7 +1261,7 @@ write_chunk(Epoch, Now, Records, Trk0, Log0) -> FirstOffset = osiris_log:first_offset(Log0), FirstTs = osiris_log:first_timestamp(Log0), {SnapBin, Trk} = osiris_tracking:snapshot(FirstOffset, FirstTs, Trk0), - write_chunk(Epoch, Now, Records, Trk, + write_chunk(Conf, Epoch, Now, Records, Trk, osiris_log:write([SnapBin], ?CHNK_TRK_SNAPSHOT, Now, @@ -1095,13 +1272,9 @@ write_chunk(Epoch, Now, Records, Trk0, Log0) -> Epoch -> {Trk0, osiris_log:write(Records, Now, Log0)}; _ -> - %% need to re=init - Dir = osiris_log:get_directory(Log0), - Name = osiris_log:get_name(Log0), + %% need to re-init as new epoch osiris_log:close(Log0), - Log = osiris_log:init(#{dir => Dir, - epoch => Epoch, - name => Name}), + Log = osiris_log:init(Conf#{epoch => Epoch}), {Trk0, osiris_log:write(Records, Log)} end end.