From 2e5502c5f552887ff5bb3e6a65c5145667a576e1 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 13 Apr 2022 14:59:22 +0100 Subject: [PATCH 01/19] Optimise osiris_log:overview/1 By avoiding scanning _every_ index file to find the epoch offsets but instead using the last chunk_info to see if the last epoch in the segment is different from the last. --- src/osiris_log.erl | 31 +++++++++++++++++++------------ test/osiris_log_SUITE.erl | 25 ++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index b76cde42554a..062375d9c973 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -1472,13 +1472,15 @@ build_log_overview(Dir) when is_list(Dir) -> build_log_overview(Dir) end end), - ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1000000]), + ?DEBUG("~s:~s/~b completed in ~b ms", + [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time div 1000]), Result. build_log_overview0([], Acc) -> lists:reverse(Acc); build_log_overview0([IdxFile | IdxFiles], Acc0) -> - IdxFd = open_index_read(IdxFile), + %% do not nead read_ahead here + {ok, IdxFd} = open(IdxFile, [read, raw, binary]), case position_at_idx_record_boundary(IdxFd, {eof, -?INDEX_RECORD_SIZE_B}) of {error, einval} when IdxFiles == [] andalso Acc0 == [] -> %% this would happen if the file only contained a header @@ -1526,9 +1528,7 @@ position_at_idx_record_boundary(IdxFd, At) -> build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> try {ok, Fd} = open(SegFile, [read, binary, raw]), - %% skip header, - {ok, ?LOG_HEADER_SIZE} = file:position(Fd, ?LOG_HEADER_SIZE), - case file:read(Fd, ?HEADER_SIZE_B) of + case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of eof -> _ = file:close(Fd), Acc0; @@ -1545,7 +1545,6 @@ build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> FirstSize:32/unsigned, FirstTSize:32/unsigned, _/binary>>} -> - {ok, LastChunkPos} = file:position(Fd, LastChunkPos), {ok, < _LastCrc:32/integer, LastSize:32/unsigned, LastTSize:32/unsigned, - _Reserved:32>>} = - file:read(Fd, ?HEADER_SIZE_B), + _Reserved:32>>} = file:pread(Fd, LastChunkPos, ?HEADER_SIZE_B), Size = LastChunkPos + LastSize + LastTSize + ?HEADER_SIZE_B, {ok, Eof} = file:position(Fd, eof), ?DEBUG_IF("~s: segment ~s has trailing data ~w ~w", @@ -1726,18 +1724,27 @@ last_epoch_offsets([#seg_info{index = IdxFile, FstFd = open_index_read(IdxFile), {LastE, LastO, Res} = lists:foldl( - fun(#seg_info{index = I}, Acc) -> + fun(#seg_info{index = I, + last = #chunk_info{epoch = LE}}, + {E, _, _} = Acc) when LE > E -> Fd = open_index_read(I), last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), - Fd, Acc) + Fd, Acc); + (#seg_info{last = #chunk_info{id = LO, + epoch = LE}}, + {_, _, Acc}) -> + {LE, LO, Acc}; + (_, Acc) -> + %% not sure if this can ever happen + Acc end, last_epoch_offset(file:read(FstFd, ?INDEX_RECORD_SIZE_B), FstFd, {FstE, FstChId, []}), SegInfos), lists:reverse([{LastE, LastO} | Res]) end), - ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, - ?FUNCTION_ARITY, Time/1000000]), + ?DEBUG("~s:~s/~b completed in ~bms", + [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time div 1000]), Result. %% aggregates the chunk offsets for each epoch diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 019b207e520e..c8f1f5bc8409 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -70,7 +70,8 @@ all_tests() -> evaluate_retention_max_bytes, evaluate_retention_max_age, offset_tracking, - offset_tracking_snapshot + offset_tracking_snapshot, + many_segment_overview ]. groups() -> @@ -1053,6 +1054,28 @@ offset_tracking_snapshot(Config) -> osiris_log:close(S3), ok. +many_segment_overview(Config) -> + Data = crypto:strong_rand_bytes(1000), + EpochChunks = + [{1, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)] ++ + [{2, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)] ++ + [{3, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)] ++ + [{4, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)] ++ + [{5, [Data || _ <- lists:seq(1, 8)]} || _ <- lists:seq(1, 1024)], + Conf0 = ?config(osiris_conf, Config), + Conf = Conf0#{max_segment_size_bytes => 64000}, + osiris_log:close(seed_log(Conf, EpochChunks, Config)), + %% {40051,{{0,40959},[{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}} + {Take, Res} = timer:tc(fun () -> + osiris_log:overview(maps:get(dir, Conf)) + end), + ct:pal("TimeTaken ~p", [Take]), + ct:pal("~p", [Res]), + ?assertEqual({{0,40959},[{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, + Res), + + ok. + %% Utility seed_log(Conf, EpochChunks, Config) -> From 7bc1d0290badbc692133074a3a177f8187c1e5df Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 13 Apr 2022 15:29:58 +0100 Subject: [PATCH 02/19] Optimise osiris_log:init/1 By not building full segment info for all segments but only for the first and last segment which is all this function needs. --- src/osiris_log.erl | 24 ++++++++++++++++++------ test/osiris_log_SUITE.erl | 15 +++++++++++---- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 062375d9c973..d12b7283538c 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -469,7 +469,7 @@ init(#{dir := Dir, counter = Cnt, counter_id = counter_id(Config), first_offset_fun = FirstOffsetFun}, - case lists:reverse(build_log_overview(Dir)) of + case lists:reverse(build_log_overview_first_last(Dir)) of [] -> NextOffset = case Config of #{initial_offset := IO} @@ -494,7 +494,7 @@ init(#{dir := Dir, | _] = Infos -> [#seg_info{first = #chunk_info{id = FstChId, timestamp = FstTs}} | _] = - lists:reverse(Infos), + lists:reverse(Infos), %% assert epoch is same or larger %% than last known epoch case LastEpoch > Epoch of @@ -1458,14 +1458,26 @@ parse_records(Offs, parse_records(Offs + NumRecs, Rem, [{Offs, {batch, NumRecs, CompType, UncompressedLen, Data}} | Acc]). +sorted_index_files(Dir) when is_list(Dir) -> + lists:sort(filelib:wildcard(filename:join(Dir, "*.index"))). + +build_log_overview_first_last(Dir) when is_list(Dir) -> + case sorted_index_files(Dir) of + [] -> + []; + [Fst] -> + %% this function is only used by init + build_log_overview0([Fst], []); + [Fst | Rem] -> + %% this function is only used by init + build_log_overview0([Fst, lists:last(Rem)], []) + end. + build_log_overview(Dir) when is_list(Dir) -> + IdxFiles = sorted_index_files(Dir), {Time, Result} = timer:tc( fun() -> try - IdxFiles = - lists:sort( - filelib:wildcard( - filename:join(Dir, "*.index"))), build_log_overview0(IdxFiles, []) catch missing_file -> diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index c8f1f5bc8409..6b147fb26fc7 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1066,14 +1066,21 @@ many_segment_overview(Config) -> Conf = Conf0#{max_segment_size_bytes => 64000}, osiris_log:close(seed_log(Conf, EpochChunks, Config)), %% {40051,{{0,40959},[{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}} - {Take, Res} = timer:tc(fun () -> - osiris_log:overview(maps:get(dir, Conf)) - end), - ct:pal("TimeTaken ~p", [Take]), + {OverviewTaken, Res} = timer:tc(fun () -> + osiris_log:overview(maps:get(dir, Conf)) + end), + ct:pal("OverviewTaken ~p", [OverviewTaken ]), ct:pal("~p", [Res]), ?assertEqual({{0,40959},[{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, Res), + {InitTaken, _} = timer:tc( + fun () -> + osiris_log:close( + osiris_log:init(Conf#{epoch => 6})) + end), + ct:pal("InitTaken ~p", [InitTaken]), + ok. %% Utility From 17ffcaf9c5f90b1f999376bd1a9aa31839fd5e06 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 13 Apr 2022 16:49:26 +0100 Subject: [PATCH 03/19] Optimise osiris_log:init_offset_reader For offset specs: first, last and next --- src/osiris_log.erl | 112 +++++++++++++++++++++----------------- test/osiris_log_SUITE.erl | 31 ++++++++++- 2 files changed, 91 insertions(+), 52 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index d12b7283538c..4ed93a825d0f 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -469,8 +469,8 @@ init(#{dir := Dir, counter = Cnt, counter_id = counter_id(Config), first_offset_fun = FirstOffsetFun}, - case lists:reverse(build_log_overview_first_last(Dir)) of - [] -> + case build_log_overview_first_last(Dir) of + none -> NextOffset = case Config of #{initial_offset := IO} when WriterType == acceptor -> @@ -483,18 +483,17 @@ init(#{dir := Dir, #write{type = WriterType, tail_info = {NextOffset, empty}, current_epoch = Epoch}}); - [#seg_info{file = Filename, + {NumSegments, + #seg_info{file = Filename, index = IdxFilename, size = Size, last = - #chunk_info{epoch = LastEpoch, - timestamp = LastTs, - id = LastChId, - num = LastNum}} - | _] = Infos -> - [#seg_info{first = #chunk_info{id = FstChId, - timestamp = FstTs}} | _] = - lists:reverse(Infos), + #chunk_info{epoch = LastEpoch, + timestamp = LastTs, + id = LastChId, + num = LastNum}}, + #seg_info{first = #chunk_info{id = FstChId, + timestamp = FstTs}}} -> %% assert epoch is same or larger %% than last known epoch case LastEpoch > Epoch of @@ -509,7 +508,7 @@ init(#{dir := Dir, counters:put(Cnt, ?C_FIRST_OFFSET, FstChId), counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs), counters:put(Cnt, ?C_OFFSET, LastChId + LastNum - 1), - counters:put(Cnt, ?C_SEGMENTS, length(Infos)), + counters:put(Cnt, ?C_SEGMENTS, NumSegments), ?DEBUG("~s:~s/~b: next offset ~b first offset ~b", [?MODULE, ?FUNCTION_NAME, @@ -530,10 +529,9 @@ init(#{dir := Dir, current_epoch = Epoch}, fd = Fd, index_fd = IdxFd}; - [#seg_info{file = Filename, - index = IdxFilename, - last = undefined} - | _] -> + {1, #seg_info{file = Filename, + index = IdxFilename, + last = undefined}, _} -> %% the empty log case {ok, Fd} = open(Filename, ?FILE_OPTS_WRITE), {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), @@ -972,48 +970,53 @@ init_offset_reader0({timestamp, Ts}, #{dir := Dir} = Conf) -> end end; init_offset_reader0(first, #{dir := Dir} = Conf) -> - case build_log_overview(Dir) of - [#seg_info{file = File, - first = undefined}] -> + case build_log_overview_first_last(Dir) of + {_, #seg_info{file = File, + first = undefined}, _} -> %% empty log, attach at 0 open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); - [#seg_info{file = File, - first = #chunk_info{id = FirstChunkId, - pos = FilePos}} | _] -> + {_, #seg_info{file = File, + first = #chunk_info{id = FirstChunkId, + pos = FilePos}}, _} -> open_offset_reader_at(File, FirstChunkId, FilePos, Conf); _ -> exit(no_segments_found) end; init_offset_reader0(next, #{dir := Dir} = Conf) -> - SegInfos = build_log_overview(Dir), - case lists:reverse(SegInfos) of - [#seg_info{file = File, - last = LastChunk} | _] -> + case build_log_overview_first_last(Dir) of + {_, #seg_info{file = File, + last = LastChunk}, _} -> {NextChunkId, FilePos} = next_location(LastChunk), open_offset_reader_at(File, NextChunkId, FilePos, Conf); _ -> exit(no_segments_found) end; init_offset_reader0(last, #{dir := Dir} = Conf) -> - SegInfos = build_log_overview(Dir), - case lists:reverse(SegInfos) of - [#seg_info{file = File, - last = undefined}] -> - %% empty log, attach at 0 - open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); - [#seg_info{file = File, - last = #chunk_info{type = ?CHNK_USER, - id = LastChunkId, - pos = FilePos}} | _] -> - open_offset_reader_at(File, LastChunkId, FilePos, Conf); + % SegInfos = build_log_overview(Dir), + IdxFiles = sorted_index_files(Dir), + case lists:reverse(IdxFiles) of + [IdxFile] -> + %% just one segment + case index_file_to_seg_info(IdxFile) of + #seg_info{file = File, + last = undefined} -> + %% empty log, attach at 0 + open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); + #seg_info{file = File, + last = #chunk_info{type = ?CHNK_USER, + id = LastChunkId, + pos = FilePos}} -> + open_offset_reader_at(File, LastChunkId, FilePos, Conf) + end; _ -> - case last_user_chunk_location(SegInfos) of + case last_user_chunk_location(IdxFiles) of not_found -> ?DEBUG("~s:~s use chunk not found, fall back to next", [?MODULE, ?FUNCTION_NAME]), %% no user chunks in stream, this is awkward, fall back to next init_offset_reader0(next, Conf); - {ChunkId, FilePos, #seg_info{file = File}} -> + {ChunkId, FilePos, IdxFile} -> + File = segment_from_index_file(IdxFile), open_offset_reader_at(File, ChunkId, FilePos, Conf) end end; @@ -1102,10 +1105,10 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, fd = Fd}}. %% Searches the index files backwards for the ID of the last user chunk. -last_user_chunk_location(SegInfos) when is_list(SegInfos) -> +last_user_chunk_location(IdxFiles) when is_list(IdxFiles) -> {Time, Result} = timer:tc( fun() -> - last_user_chunk_id0(lists:reverse(SegInfos)) + last_user_chunk_id0(lists:reverse(IdxFiles)) end), ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1_000_000]), Result. @@ -1113,7 +1116,7 @@ last_user_chunk_location(SegInfos) when is_list(SegInfos) -> last_user_chunk_id0([]) -> %% There are no user chunks in any index files. not_found; -last_user_chunk_id0([#seg_info{index = IdxFile} = Info | Rest]) -> +last_user_chunk_id0([IdxFile | Rest]) -> try %% Do not read-ahead since we read the index file backwards chunk by chunk. {ok, IdxFd} = open(IdxFile, [read, raw, binary]), @@ -1122,7 +1125,7 @@ last_user_chunk_id0([#seg_info{index = IdxFile} = Info | Rest]) -> _ = file:close(IdxFd), case Last of {ok, Id, Pos} -> - {Id, Pos, Info}; + {Id, Pos, IdxFile}; {error, Reason} -> ?DEBUG("Could not find user chunk in index file ~s (~p)", [IdxFile, Reason]), last_user_chunk_id0(Rest) @@ -1461,16 +1464,26 @@ parse_records(Offs, sorted_index_files(Dir) when is_list(Dir) -> lists:sort(filelib:wildcard(filename:join(Dir, "*.index"))). +id_index_files(Dir) -> + [{list_to_integer(filename:basename(IdxFile, ".index")), IdxFile} + || IdxFile <- sorted_index_files(Dir)]. + +index_file_to_seg_info(IdxFile) -> + [SegInfo] = build_log_overview0([IdxFile], []), + SegInfo. + build_log_overview_first_last(Dir) when is_list(Dir) -> case sorted_index_files(Dir) of [] -> - []; + none; [Fst] -> %% this function is only used by init - build_log_overview0([Fst], []); + [SegInfo] = build_log_overview0([Fst], []), + {1, SegInfo, SegInfo}; [Fst | Rem] -> %% this function is only used by init - build_log_overview0([Fst, lists:last(Rem)], []) + [FstSegInfo, LastSegInfo] = build_log_overview0([lists:last(Rem), Fst], []), + {length(Rem) + 1, FstSegInfo, LastSegInfo} end. build_log_overview(Dir) when is_list(Dir) -> @@ -1955,10 +1968,9 @@ find_segment_for_offset(Offset, find_segment_for_offset(Offset, [#seg_info{first = #chunk_info{id = FirstChId}, last = - #chunk_info{id = LastChId, - num = LastNumRecs}} = - Info - | Rem]) -> + #chunk_info{id = LastChId, + num = LastNumRecs}} = + Info | Rem]) -> NextChId = LastChId + LastNumRecs, case Offset >= FirstChId andalso Offset < NextChId of true -> diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 6b147fb26fc7..d991fe91feae 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1071,8 +1071,8 @@ many_segment_overview(Config) -> end), ct:pal("OverviewTaken ~p", [OverviewTaken ]), ct:pal("~p", [Res]), - ?assertEqual({{0,40959},[{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, - Res), + ?assertEqual({{0,40959}, + [{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, Res), {InitTaken, _} = timer:tc( fun () -> @@ -1081,6 +1081,33 @@ many_segment_overview(Config) -> end), ct:pal("InitTaken ~p", [InitTaken]), + {OffsLastTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(last, Conf#{epoch => 6}), + osiris_log:close(L) + end), + ct:pal("OffsLastTaken ~p", [OffsLastTaken]), + + {OffsFirstTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(first, Conf#{epoch => 6}), + osiris_log:close(L) + end), + ct:pal("OffsFirstTaken ~p", [OffsFirstTaken]), + + {OffsNextTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(next, Conf#{epoch => 6}), + osiris_log:close(L) + end), + ct:pal("OffsNextTaken ~p", [OffsNextTaken]), + + % {OffsOffsetTaken, _} = + % timer:tc(fun () -> + % {ok, L} = osiris_log:init_offset_reader(40000, Conf#{epoch => 6}), + % osiris_log:close(L) + % end), + % ct:pal("OffsOffsetTaken ~p", [OffsOffsetTaken]), ok. %% Utility From 5e2af9cc2aee4f0d963aa0c441a0121e15d26b78 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 14 Apr 2022 15:26:32 +0100 Subject: [PATCH 04/19] Optimise osiris_log:init_offset_reader further needs a bit more refactoring --- src/osiris_log.erl | 241 +++++++++++++++++++++++--------------- test/osiris_log_SUITE.erl | 53 +++++---- 2 files changed, 176 insertions(+), 118 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 4ed93a825d0f..cbf694845f8a 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -484,6 +484,8 @@ init(#{dir := Dir, tail_info = {NextOffset, empty}, current_epoch = Epoch}}); {NumSegments, + #seg_info{first = #chunk_info{id = FstChId, + timestamp = FstTs}}, #seg_info{file = Filename, index = IdxFilename, size = Size, @@ -491,9 +493,7 @@ init(#{dir := Dir, #chunk_info{epoch = LastEpoch, timestamp = LastTs, id = LastChId, - num = LastNum}}, - #seg_info{first = #chunk_info{id = FstChId, - timestamp = FstTs}}} -> + num = LastNum}}} -> %% assert epoch is same or larger %% than last known epoch case LastEpoch > Epoch of @@ -984,49 +984,33 @@ init_offset_reader0(first, #{dir := Dir} = Conf) -> end; init_offset_reader0(next, #{dir := Dir} = Conf) -> case build_log_overview_first_last(Dir) of - {_, #seg_info{file = File, - last = LastChunk}, _} -> + {_, _, #seg_info{file = File, + last = LastChunk}} -> {NextChunkId, FilePos} = next_location(LastChunk), open_offset_reader_at(File, NextChunkId, FilePos, Conf); _ -> exit(no_segments_found) end; init_offset_reader0(last, #{dir := Dir} = Conf) -> - % SegInfos = build_log_overview(Dir), - IdxFiles = sorted_index_files(Dir), - case lists:reverse(IdxFiles) of - [IdxFile] -> - %% just one segment - case index_file_to_seg_info(IdxFile) of - #seg_info{file = File, - last = undefined} -> - %% empty log, attach at 0 - open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); - #seg_info{file = File, - last = #chunk_info{type = ?CHNK_USER, - id = LastChunkId, - pos = FilePos}} -> - open_offset_reader_at(File, LastChunkId, FilePos, Conf) - end; - _ -> - case last_user_chunk_location(IdxFiles) of - not_found -> - ?DEBUG("~s:~s use chunk not found, fall back to next", - [?MODULE, ?FUNCTION_NAME]), - %% no user chunks in stream, this is awkward, fall back to next - init_offset_reader0(next, Conf); - {ChunkId, FilePos, IdxFile} -> - File = segment_from_index_file(IdxFile), - open_offset_reader_at(File, ChunkId, FilePos, Conf) - end + IdxFiles = lists:reverse(sorted_index_files(Dir)), + case last_user_chunk_location(IdxFiles) of + not_found -> + ?DEBUG("~s:~s use chunk not found, fall back to next", + [?MODULE, ?FUNCTION_NAME]), + %% no user chunks in stream, this is awkward, fall back to next + init_offset_reader0(next, Conf); + {ChunkId, FilePos, IdxFile} -> + File = segment_from_index_file(IdxFile), + open_offset_reader_at(File, ChunkId, FilePos, Conf) end; init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) when is_integer(OffsetSpec) -> - SegInfos = build_log_overview(Dir), - ChunkRange = chunk_range_from_segment_infos(SegInfos), + IdxFiles = sorted_index_files(Dir), + {_, FstSI, LstSI} = build_log_overview_first_last0(IdxFiles), + ChunkRange = chunk_range_from_segment_infos(lists:usort([FstSI, LstSI])), Range = offset_range_from_chunk_range(ChunkRange), ?DEBUG("osiris_log:init_offset_reader0/2 spec ~w range ~w ", - [OffsetSpec, Range]), + [OffsetSpec, Range]), try StartOffset = case {OffsetSpec, Range} of @@ -1043,9 +1027,36 @@ init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) {Offset, {FirstOffs, _LastOffs}} -> max(FirstOffs, Offset) end, + %% maybe reverse IdxFiles if the offset requested is in the top half + %% of the range + RevIdxFiles = lists:reverse(IdxFiles), + IdxFiles2 = case lists:search( + fun(IdxFile) -> + FstOffset = list_to_integer( + filename:basename(IdxFile, ".index")), + StartOffset >= FstOffset + end, RevIdxFiles) of + {value, File} -> + [File]; + false -> + case Range of + empty -> + IdxFiles; + {From, To} -> + Pivot = To - ((To - From) div 2), + case StartOffset > Pivot of + true -> + RevIdxFiles; + false -> + IdxFiles + end + end + end, + + %% find the appopriate segment and scan the index to find the %% postition of the next chunk to read - case find_segment_for_offset(StartOffset, SegInfos) of + case find_segment_for_offset2(StartOffset, IdxFiles2) of not_found -> {error, {offset_out_of_range, Range}}; {end_of_log, #seg_info{file = SegmentFile, @@ -1105,10 +1116,10 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, fd = Fd}}. %% Searches the index files backwards for the ID of the last user chunk. -last_user_chunk_location(IdxFiles) when is_list(IdxFiles) -> +last_user_chunk_location(RevdIdxFiles) when is_list(RevdIdxFiles) -> {Time, Result} = timer:tc( fun() -> - last_user_chunk_id0(lists:reverse(IdxFiles)) + last_user_chunk_id0(RevdIdxFiles) end), ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1_000_000]), Result. @@ -1120,8 +1131,8 @@ last_user_chunk_id0([IdxFile | Rest]) -> try %% Do not read-ahead since we read the index file backwards chunk by chunk. {ok, IdxFd} = open(IdxFile, [read, raw, binary]), - {ok, _} = position_at_idx_record_boundary(IdxFd, eof), - Last = last_user_chunk_id_in_index(IdxFd), + {ok, EofPos} = position_at_idx_record_boundary(IdxFd, eof), + Last = last_user_chunk_id_in_index(EofPos - ?INDEX_RECORD_SIZE_B, IdxFd), _ = file:close(IdxFd), case Last of {ok, Id, Pos} -> @@ -1137,29 +1148,22 @@ last_user_chunk_id0([IdxFile | Rest]) -> end. %% Searches the index file backwards for the ID of the last user chunk. -last_user_chunk_id_in_index(IdxFd) -> - case file:position(IdxFd, {cur, -2*?INDEX_RECORD_SIZE_B}) of +last_user_chunk_id_in_index(NextPos, IdxFd) -> + case file:pread(IdxFd, NextPos, ?INDEX_RECORD_SIZE_B) of + {ok, <>} -> + {ok, Offset, FilePos}; + {ok, <<_Offset:64/unsigned, + _Timestamp:64/signed, + _Epoch:64/unsigned, + _FilePos:32/unsigned, + _ChType:8/unsigned>>} -> + last_user_chunk_id_in_index(NextPos - ?INDEX_RECORD_SIZE_B, IdxFd); {error, _} = Error -> - Error; - {ok, _NewPos} -> - case file:read(IdxFd, ?INDEX_RECORD_SIZE_B) of - {ok, - <>} -> - {ok, Offset, FilePos}; - {ok, - <<_Offset:64/unsigned, - _Timestamp:64/signed, - _Epoch:64/unsigned, - _FilePos:32/unsigned, - _ChType:8/unsigned>>} -> - last_user_chunk_id_in_index(IdxFd); - {error, _} = Error -> - Error - end + Error end. -spec committed_offset(state()) -> undefined | offset(). @@ -1461,30 +1465,34 @@ parse_records(Offs, parse_records(Offs + NumRecs, Rem, [{Offs, {batch, NumRecs, CompType, UncompressedLen, Data}} | Acc]). -sorted_index_files(Dir) when is_list(Dir) -> - lists:sort(filelib:wildcard(filename:join(Dir, "*.index"))). +% sorted_index_files(Dir) when is_list(Dir) -> +% {ok, Files} = prim_file:list_dir(Dir), +% lists:sort([filename:join(Dir, F) || F <- Files, +% filename:extension(F) == ".index"]). + +% sorted_index_files(Dir) when is_list(Dir) -> +% filelib:wildcard(filename:join(Dir, "*.index"), prim_file). -id_index_files(Dir) -> - [{list_to_integer(filename:basename(IdxFile, ".index")), IdxFile} - || IdxFile <- sorted_index_files(Dir)]. +sorted_index_files(Dir) when is_list(Dir) -> + filelib:wildcard(filename:join(Dir, "*.index")). index_file_to_seg_info(IdxFile) -> [SegInfo] = build_log_overview0([IdxFile], []), SegInfo. build_log_overview_first_last(Dir) when is_list(Dir) -> - case sorted_index_files(Dir) of - [] -> - none; - [Fst] -> - %% this function is only used by init - [SegInfo] = build_log_overview0([Fst], []), - {1, SegInfo, SegInfo}; - [Fst | Rem] -> - %% this function is only used by init - [FstSegInfo, LastSegInfo] = build_log_overview0([lists:last(Rem), Fst], []), - {length(Rem) + 1, FstSegInfo, LastSegInfo} - end. + build_log_overview_first_last0(sorted_index_files(Dir)). + +build_log_overview_first_last0([]) -> + none; +build_log_overview_first_last0([Fst]) -> + %% this function is only used by init + [SegInfo] = build_log_overview0([Fst], []), + {1, SegInfo, SegInfo}; +build_log_overview_first_last0([Fst | Rem]) -> + %% this function is only used by init + [FstSegInfo, LastSegInfo] = build_log_overview0([Fst, lists:last(Rem)], []), + {length(Rem) + 1, FstSegInfo, LastSegInfo}. build_log_overview(Dir) when is_list(Dir) -> IdxFiles = sorted_index_files(Dir), @@ -1679,7 +1687,8 @@ evaluate_retention(Dir, Specs) -> FirstTs = first_timestamp_from_segment_infos(SegInfos), {OffsetRange, FirstTs, length(SegInfos)} end), - ?DEBUG("~s:~s/~b (~w) completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Specs, Time/1_000_000]), + ?DEBUG("~s:~s/~b (~w) completed in ~fs", + [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Specs, Time/1_000_000]), Result. evaluate_retention0(Infos, []) -> @@ -1797,11 +1806,10 @@ last_epoch_offset({ok, last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, {Epoch, O, [{CurEpoch, LastOffs} | Acc]}). -segment_from_index_file(IdxFile) -> +segment_from_index_file(IdxFile) when is_list(IdxFile) -> Basename = filename:basename(IdxFile, ".index"), BaseDir = filename:dirname(IdxFile), - SegFile0 = filename:join([BaseDir, Basename]), - SegFile0 ++ ".segment". + filename:join([BaseDir, Basename ++ ".segment"]). make_chunk(Blobs, TData, ChType, Timestamp, Epoch, Next) -> {NumEntries, NumRecords, EData} = @@ -1885,24 +1893,25 @@ write_chunk(Chunk, ok = file:close(IdxFd), State#?MODULE{fd = undefined, index_fd = undefined, - mode = - Write#write{tail_info = - {NextOffset, - {Epoch, Next, Timestamp}}, - segment_size = {0, 0}}}; + mode = Write#write{tail_info = + {NextOffset, + {Epoch, Next, Timestamp}}, + segment_size = {0, 0}}}; false -> State#?MODULE{mode = Write#write{tail_info = {NextOffset, {Epoch, Next, Timestamp}}, - segment_size = {SegSizeBytes + Size, SegSizeChunks + 1}}} + segment_size = {SegSizeBytes + Size, + SegSizeChunks + 1}}} end. max_segment_size_reached(SegFd, CurrentSizeChunks, #cfg{max_segment_size_bytes = MaxSizeBytes, max_segment_size_chunks = MaxSizeChunks}) -> {ok, CurrentSizeBytes} = file:position(SegFd, cur), - CurrentSizeBytes >= MaxSizeBytes orelse CurrentSizeChunks >= MaxSizeChunks - 1. + CurrentSizeBytes >= MaxSizeBytes orelse + CurrentSizeChunks >= MaxSizeChunks - 1. sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> ok; @@ -1982,6 +1991,46 @@ find_segment_for_offset(Offset, find_segment_for_offset(_Offset, _) -> not_found. +%% find the segment the offset is in _or_ if the offset is the very next +%% chunk offset it will return the last segment +find_segment_for_offset2(Offset, [IdxFile]) -> + case index_file_to_seg_info(IdxFile) of + #seg_info{first = undefined, + last = undefined} = Info -> + {end_of_log, Info}; + #seg_info{last = + #chunk_info{id = LastChId, + num = LastNumRecs}} = Info + when Offset == LastChId + LastNumRecs -> + %% the last segment and offset is the next offset + {end_of_log, Info}; + #seg_info{first = #chunk_info{id = FirstChId}, + last = #chunk_info{id = LastChId, + num = LastNumRecs}} = Info -> + NextChId = LastChId + LastNumRecs, + case Offset >= FirstChId andalso Offset < NextChId of + true -> + %% we found it + {found, Info}; + false -> + not_found + end + end; +find_segment_for_offset2(Offset, [IdxFile | Rem]) -> + #seg_info{first = #chunk_info{id = FirstChId}, + last = #chunk_info{id = LastChId, + num = LastNumRecs}} = Info = index_file_to_seg_info(IdxFile), + NextChId = LastChId + LastNumRecs, + case Offset >= FirstChId andalso Offset < NextChId of + true -> + %% we found it + {found, Info}; + false -> + find_segment_for_offset2(Offset, Rem) + end; +find_segment_for_offset2(_Offset, _) -> + not_found. + can_read_next_offset(#read{type = offset, next_offset = NextOffset, offset_ref = Ref}) -> @@ -1996,15 +2045,13 @@ make_file_name(N, Suff) -> lists:flatten( io_lib:format("~20..0B.~s", [N, Suff])). -open_new_segment(#?MODULE{cfg = - #cfg{directory = Dir, - counter = Cnt}, +open_new_segment(#?MODULE{cfg = #cfg{directory = Dir, + counter = Cnt}, fd = undefined, index_fd = undefined, - mode = - #write{type = _WriterType, - tail_info = {NextOffset, _}}} = - State0) -> + mode = #write{type = _WriterType, + tail_info = {NextOffset, _}}} = + State0) -> Filename = make_file_name(NextOffset, "segment"), IdxFilename = make_file_name(NextOffset, "index"), ?DEBUG("~s: ~s : ~s", [?MODULE, ?FUNCTION_NAME, Filename]), diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index d991fe91feae..d03dda574161 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1044,7 +1044,7 @@ offset_tracking_snapshot(Config) -> make_trailer(offset, <<"id1">>, 1), S0), %% this should create at least two segments - {_, S2} = seed_log(S1, EpochChunks, Config, T0), + {_, S2} = seed_log(Conf, S1, EpochChunks, Config, T0), osiris_log:close(S2), S3 = osiris_log:init(Conf), T = osiris_log:recover_tracking(S3), @@ -1069,7 +1069,7 @@ many_segment_overview(Config) -> {OverviewTaken, Res} = timer:tc(fun () -> osiris_log:overview(maps:get(dir, Conf)) end), - ct:pal("OverviewTaken ~p", [OverviewTaken ]), + ct:pal("OverviewTaken ~p", [OverviewTaken]), ct:pal("~p", [Res]), ?assertEqual({{0,40959}, [{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, Res), @@ -1102,12 +1102,26 @@ many_segment_overview(Config) -> end), ct:pal("OffsNextTaken ~p", [OffsNextTaken]), - % {OffsOffsetTaken, _} = - % timer:tc(fun () -> - % {ok, L} = osiris_log:init_offset_reader(40000, Conf#{epoch => 6}), - % osiris_log:close(L) - % end), - % ct:pal("OffsOffsetTaken ~p", [OffsOffsetTaken]), + {OffsOffsetTakenHi, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(40000, Conf#{epoch => 6}), + osiris_log:close(L) + end), + ct:pal("OffsOffsetTakenHi ~p", [OffsOffsetTakenHi]), + {OffsOffsetTakenLow, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(400, Conf#{epoch => 6}), + osiris_log:close(L) + end), + ct:pal("OffsOffsetTakenLow ~p", [OffsOffsetTakenLow]), + + + {OffsOffsetTakenMid, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader(20000, Conf#{epoch => 6}), + osiris_log:close(L) + end), + ct:pal("OffsOffsetTakenMid ~p", [OffsOffsetTakenMid]), ok. %% Utility @@ -1118,22 +1132,23 @@ seed_log(Conf, EpochChunks, Config) -> seed_log(Conf, EpochChunks, Config, Trk) when is_map(Conf) -> Log0 = osiris_log:init(Conf), - seed_log(Log0, EpochChunks, Config, Trk); + seed_log(Conf, Log0, EpochChunks, Config, Trk); seed_log(Dir, EpochChunks, Config, Trk) when is_list(Dir) -> seed_log(#{dir => Dir, epoch => 1, max_segment_size_bytes => 1000 * 1000, name => ?config(test_case, Config)}, - EpochChunks, Config, Trk); -seed_log(Log, EpochChunks, _Config, Trk) -> + EpochChunks, Config, Trk). + +seed_log(Conf, Log, EpochChunks, _Config, Trk) -> lists:foldl(fun ({Epoch, Records}, {T, L}) -> - write_chunk(Epoch, now_ms(), Records, T, L); + write_chunk(Conf, Epoch, now_ms(), Records, T, L); ({Epoch, Ts, Records}, {T, L}) -> - write_chunk(Epoch, Ts, Records, T, L) + write_chunk(Conf, Epoch, Ts, Records, T, L) end, {Trk, Log}, EpochChunks). -write_chunk(Epoch, Now, Records, Trk0, Log0) -> +write_chunk(Conf, Epoch, Now, Records, Trk0, Log0) -> HasTracking = not osiris_tracking:is_empty(Trk0), case osiris_log:is_open(Log0) of false when HasTracking -> @@ -1141,7 +1156,7 @@ write_chunk(Epoch, Now, Records, Trk0, Log0) -> FirstOffset = osiris_log:first_offset(Log0), FirstTs = osiris_log:first_timestamp(Log0), {SnapBin, Trk} = osiris_tracking:snapshot(FirstOffset, FirstTs, Trk0), - write_chunk(Epoch, Now, Records, Trk, + write_chunk(Conf, Epoch, Now, Records, Trk, osiris_log:write([SnapBin], ?CHNK_TRK_SNAPSHOT, Now, @@ -1152,13 +1167,9 @@ write_chunk(Epoch, Now, Records, Trk0, Log0) -> Epoch -> {Trk0, osiris_log:write(Records, Now, Log0)}; _ -> - %% need to re=init - Dir = osiris_log:get_directory(Log0), - Name = osiris_log:get_name(Log0), + %% need to re-init as new epoch osiris_log:close(Log0), - Log = osiris_log:init(#{dir => Dir, - epoch => Epoch, - name => Name}), + Log = osiris_log:init(Conf#{epoch => Epoch}), {Trk0, osiris_log:write(Records, Log)} end end. From fd16df65992b2d2e07e6417c0e97056bd24d9930 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 26 Apr 2022 11:04:51 +0100 Subject: [PATCH 05/19] refactor find segment for offset --- src/osiris_log.erl | 129 ++++++++++++++++++++------------------------- 1 file changed, 57 insertions(+), 72 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index cbf694845f8a..3000ccb5f6a1 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -1012,50 +1012,40 @@ init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) ?DEBUG("osiris_log:init_offset_reader0/2 spec ~w range ~w ", [OffsetSpec, Range]), try - StartOffset = - case {OffsetSpec, Range} of - {_, empty} -> - 0; - {Offset, {_, LastOffs}} - when Offset == LastOffs + 1 -> - %% next but we can't use `next` due to race conditions - Offset; - {Offset, {_, LastOffs}} - when Offset > LastOffs + 1 -> - %% out of range, clamp as `next` - throw({retry_with, next, Conf}); - {Offset, {FirstOffs, _LastOffs}} -> - max(FirstOffs, Offset) - end, - %% maybe reverse IdxFiles if the offset requested is in the top half - %% of the range - RevIdxFiles = lists:reverse(IdxFiles), + StartOffset = case {OffsetSpec, Range} of + {_, empty} -> + 0; + {Offset, {_, LastOffs}} + when Offset == LastOffs + 1 -> + %% next but we can't use `next` + %% due to race conditions + Offset; + {Offset, {_, LastOffs}} + when Offset > LastOffs + 1 -> + %% out of range, clamp as `next` + throw({retry_with, next, Conf}); + {Offset, {FirstOffs, _LastOffs}} -> + max(FirstOffs, Offset) + end, + + %% find the first segment that may contain the start offset IdxFiles2 = case lists:search( fun(IdxFile) -> FstOffset = list_to_integer( filename:basename(IdxFile, ".index")), StartOffset >= FstOffset - end, RevIdxFiles) of + end, lists:reverse(IdxFiles)) of {value, File} -> [File]; false -> - case Range of - empty -> - IdxFiles; - {From, To} -> - Pivot = To - ((To - From) div 2), - case StartOffset > Pivot of - true -> - RevIdxFiles; - false -> - IdxFiles - end - end + %% as we clamp StartOffset to the range of the stream + %% we should never get here + ?INFO("~s: unexpected start offset ~b for range ~w, " + "retry with next strategy", + [?MODULE, StartOffset, Range]), + throw({retry_with, next, Conf}) end, - - %% find the appopriate segment and scan the index to find the - %% postition of the next chunk to read case find_segment_for_offset2(StartOffset, IdxFiles2) of not_found -> {error, {offset_out_of_range, Range}}; @@ -1991,45 +1981,40 @@ find_segment_for_offset(Offset, find_segment_for_offset(_Offset, _) -> not_found. -%% find the segment the offset is in _or_ if the offset is the very next -%% chunk offset it will return the last segment -find_segment_for_offset2(Offset, [IdxFile]) -> - case index_file_to_seg_info(IdxFile) of - #seg_info{first = undefined, - last = undefined} = Info -> - {end_of_log, Info}; - #seg_info{last = - #chunk_info{id = LastChId, - num = LastNumRecs}} = Info - when Offset == LastChId + LastNumRecs -> - %% the last segment and offset is the next offset - {end_of_log, Info}; - #seg_info{first = #chunk_info{id = FirstChId}, - last = #chunk_info{id = LastChId, - num = LastNumRecs}} = Info -> - NextChId = LastChId + LastNumRecs, - case Offset >= FirstChId andalso Offset < NextChId of - true -> - %% we found it - {found, Info}; - false -> - not_found - end - end; -find_segment_for_offset2(Offset, [IdxFile | Rem]) -> - #seg_info{first = #chunk_info{id = FirstChId}, - last = #chunk_info{id = LastChId, - num = LastNumRecs}} = Info = index_file_to_seg_info(IdxFile), - NextChId = LastChId + LastNumRecs, - case Offset >= FirstChId andalso Offset < NextChId of - true -> - %% we found it - {found, Info}; +find_segment_for_offset2(Offset, IdxFiles) -> + %% we assume index files are in the default low-> high order here + case lists:search( + fun(IdxFile) -> + FstOffset = list_to_integer( + filename:basename(IdxFile, ".index")), + Offset >= FstOffset + end, lists:reverse(IdxFiles)) of + {value, File} -> + case index_file_to_seg_info(File) of + #seg_info{first = undefined, + last = undefined} = Info -> + {end_of_log, Info}; + #seg_info{last = + #chunk_info{id = LastChId, + num = LastNumRecs}} = Info + when Offset == LastChId + LastNumRecs -> + %% the last segment and offset is the next offset + {end_of_log, Info}; + #seg_info{first = #chunk_info{id = FirstChId}, + last = #chunk_info{id = LastChId, + num = LastNumRecs}} = Info -> + NextChId = LastChId + LastNumRecs, + case Offset >= FirstChId andalso Offset < NextChId of + true -> + %% we found it + {found, Info}; + false -> + not_found + end + end; false -> - find_segment_for_offset2(Offset, Rem) - end; -find_segment_for_offset2(_Offset, _) -> - not_found. + not_found + end. can_read_next_offset(#read{type = offset, next_offset = NextOffset, From 59d04455bc35cb14f2a00efbc74a8806714b8541 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 26 Apr 2022 17:02:12 +0100 Subject: [PATCH 06/19] Optimise osiris_log:overview/1 further By not building a log overview at all and instead just scan the index files. --- src/osiris_log.erl | 305 +++++++++++++++++++++++++------------- test/osiris_log_SUITE.erl | 23 ++- 2 files changed, 219 insertions(+), 109 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 3000ccb5f6a1..dfdcab91db08 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -338,7 +338,11 @@ tracking_config => osiris_tracking:config(), counter_spec => counter_spec(), %% used when initialising a log from an offset other than 0 - initial_offset => osiris:offset()}. + initial_offset => osiris:offset(), + %% a cached list of the index files for a given log + %% avoids scanning disk for files multiple times if already know + %% e.g. in init_acceptor + index_files => [filename:filename()]}. -type record() :: {offset(), osiris:data()}. -type offset_spec() :: osiris:offset_spec(). -type retention_spec() :: osiris:retention_spec(). @@ -432,8 +436,7 @@ init(Config) -> -spec init(config(), writer | acceptor) -> state(). init(#{dir := Dir, name := Name, - epoch := Epoch} = - Config, + epoch := Epoch} = Config, WriterType) -> %% scan directory for segments if in write mode MaxSizeBytes = @@ -469,7 +472,7 @@ init(#{dir := Dir, counter = Cnt, counter_id = counter_id(Config), first_offset_fun = FirstOffsetFun}, - case build_log_overview_first_last(Dir) of + case build_log_overview_first_last(Config) of none -> NextOffset = case Config of #{initial_offset := IO} @@ -683,16 +686,17 @@ init_acceptor(Range, EpochOffsets0, lists:sort(EpochOffsets0)), %% then truncate to - SegInfos = build_log_overview(Dir), + IdxFiles = sorted_index_files(Dir), ?DEBUG("~s: ~s from epoch offsets: ~w range ~w", [?MODULE, ?FUNCTION_NAME, EpochOffsets, Range]), - ok = truncate_to(Name, Range, EpochOffsets, SegInfos), + RemIdxFiles = truncate_to(Name, Range, EpochOffsets, IdxFiles), %% after truncation we can do normal init InitOffset = case Range of empty -> 0; {O, _} -> O end, - init(Conf#{initial_offset => InitOffset}, acceptor). + init(Conf#{initial_offset => InitOffset, + index_files => RemIdxFiles}, acceptor). chunk_id_index_scan(IdxFile, ChunkId) when is_list(IdxFile) -> Fd = open_index_read(IdxFile), @@ -716,6 +720,14 @@ chunk_id_index_scan0(Fd, ChunkId) -> eof end. +delete_segment_from_index(Index) -> + File = segment_from_index_file(Index), + ?DEBUG("osiris_log: deleting segment ~s in ~s", + [filename:basename(File), filename:dirname(File)]), + ok = file:delete(File), + ok = file:delete(Index), + ok. + delete_segment(#seg_info{file = File, index = Index}) -> ?DEBUG("osiris_log: deleting segment ~s in ~s", [filename:basename(File), filename:dirname(File)]), @@ -725,15 +737,15 @@ delete_segment(#seg_info{file = File, index = Index}) -> truncate_to(_Name, _Range, _EpochOffsets, []) -> %% the target log is empty - ok; -truncate_to(_Name, _Range, [], SegInfos) -> + []; +truncate_to(_Name, _Range, [], IdxFiles) -> %% ????? this means the entire log is out - [begin ok = delete_segment(I) end || I <- SegInfos], - ok; -truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> - case find_segment_for_offset(ChId, SegInfos) of + [begin ok = delete_segment_from_index(I) end || I <- IdxFiles], + []; +truncate_to(Name, Range, [{E, ChId} | NextEOs], IdxFiles) -> + case find_segment_for_offset2(ChId, IdxFiles) of not_found -> - case lists:last(SegInfos) of + case index_file_to_seg_info(lists:last(IdxFiles)) of #seg_info{last = #chunk_info{epoch = E, id = LastChId, num = Num}} @@ -741,7 +753,7 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> %% the last available local chunk id is smaller than the %% sources last chunk id but is in the same epoch %% check if there is any overlap - LastOffsLocal = case offset_range_from_segment_infos(SegInfos) of + LastOffsLocal = case offset_range_from_idx_files(IdxFiles) of empty -> 0; {_, L} -> L end, @@ -753,18 +765,19 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> true -> %% there is no overlap, need to delete all %% local segments - [begin ok = delete_segment(I) end || I <- SegInfos], - ok; + [begin ok = delete_segment_from_index(I) end + || I <- IdxFiles], + []; false -> %% there is overlap %% no truncation needed - ok + IdxFiles end; _ -> - truncate_to(Name, Range, NextEOs, SegInfos) + truncate_to(Name, Range, NextEOs, IdxFiles) end; {end_of_log, _Info} -> - ok; + IdxFiles; {found, #seg_info{file = File, index = Idx}} -> ?DEBUG("osiris_log: ~s on node ~s truncating to chunk " "id ~b in epoch ~b", @@ -793,11 +806,19 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], SegInfos) -> ok = file:close(Fd), ok = file:close(IdxFd), %% delete all segments with a first offset larger then ChId - [begin ok = delete_segment(I) end - || I <- SegInfos, I#seg_info.first#chunk_info.id > ChId], - ok; + %% and return the remainder + lists:filter( + fun (I) -> + case index_file_first_offset(I) > ChId of + true -> + ok = delete_segment_from_index(I), + false; + false -> + true + end + end, IdxFiles); _ -> - truncate_to(Name, Range, NextEOs, SegInfos) + truncate_to(Name, Range, NextEOs, IdxFiles) end end. @@ -925,7 +946,8 @@ init_offset_reader(OffsetSpec, Conf) -> end. init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) -> - Range = offset_range_from_segment_infos(build_log_overview(Dir)), + IdxFiles = sorted_index_files(Dir), + Range = offset_range_from_idx_files(IdxFiles), case Range of empty -> {error, {offset_out_of_range, Range}}; @@ -969,8 +991,8 @@ init_offset_reader0({timestamp, Ts}, #{dir := Dir} = Conf) -> init_offset_reader0(next, Conf) end end; -init_offset_reader0(first, #{dir := Dir} = Conf) -> - case build_log_overview_first_last(Dir) of +init_offset_reader0(first, #{} = Conf) -> + case build_log_overview_first_last(Conf) of {_, #seg_info{file = File, first = undefined}, _} -> %% empty log, attach at 0 @@ -982,8 +1004,8 @@ init_offset_reader0(first, #{dir := Dir} = Conf) -> _ -> exit(no_segments_found) end; -init_offset_reader0(next, #{dir := Dir} = Conf) -> - case build_log_overview_first_last(Dir) of +init_offset_reader0(next, #{} = Conf) -> + case build_log_overview_first_last(Conf) of {_, _, #seg_info{file = File, last = LastChunk}} -> {NextChunkId, FilePos} = next_location(LastChunk), @@ -1006,9 +1028,7 @@ init_offset_reader0(last, #{dir := Dir} = Conf) -> init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) when is_integer(OffsetSpec) -> IdxFiles = sorted_index_files(Dir), - {_, FstSI, LstSI} = build_log_overview_first_last0(IdxFiles), - ChunkRange = chunk_range_from_segment_infos(lists:usort([FstSI, LstSI])), - Range = offset_range_from_chunk_range(ChunkRange), + Range = offset_range_from_idx_files(IdxFiles), ?DEBUG("osiris_log:init_offset_reader0/2 spec ~w range ~w ", [OffsetSpec, Range]), try @@ -1031,9 +1051,7 @@ init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) %% find the first segment that may contain the start offset IdxFiles2 = case lists:search( fun(IdxFile) -> - FstOffset = list_to_integer( - filename:basename(IdxFile, ".index")), - StartOffset >= FstOffset + StartOffset >= index_file_first_offset(IdxFile) end, lists:reverse(IdxFiles)) of {value, File} -> [File]; @@ -1455,22 +1473,25 @@ parse_records(Offs, parse_records(Offs + NumRecs, Rem, [{Offs, {batch, NumRecs, CompType, UncompressedLen, Data}} | Acc]). -% sorted_index_files(Dir) when is_list(Dir) -> -% {ok, Files} = prim_file:list_dir(Dir), -% lists:sort([filename:join(Dir, F) || F <- Files, -% filename:extension(F) == ".index"]). +sorted_index_files(Dir) when is_list(Dir) -> + case prim_file:list_dir(Dir) of + {error, enoent} -> + []; + {ok, Files} -> + lists:sort([filename:join(Dir, F) + || F <- Files, filename:extension(F) == ".index"]) + end. % sorted_index_files(Dir) when is_list(Dir) -> % filelib:wildcard(filename:join(Dir, "*.index"), prim_file). -sorted_index_files(Dir) when is_list(Dir) -> - filelib:wildcard(filename:join(Dir, "*.index")). - index_file_to_seg_info(IdxFile) -> [SegInfo] = build_log_overview0([IdxFile], []), SegInfo. -build_log_overview_first_last(Dir) when is_list(Dir) -> +build_log_overview_first_last(#{index_files := IdxFiles}) -> + build_log_overview_first_last0(IdxFiles); +build_log_overview_first_last(#{dir := Dir}) -> build_log_overview_first_last0(sorted_index_files(Dir)). build_log_overview_first_last0([]) -> @@ -1504,7 +1525,16 @@ build_log_overview0([], Acc) -> build_log_overview0([IdxFile | IdxFiles], Acc0) -> %% do not nead read_ahead here {ok, IdxFd} = open(IdxFile, [read, raw, binary]), - case position_at_idx_record_boundary(IdxFd, {eof, -?INDEX_RECORD_SIZE_B}) of + case tail_index(IdxFd) of + {ok, <<_Offset:64/unsigned, + _Timestamp:64/signed, + _Epoch:64/unsigned, + LastChunkPos:32/unsigned, + _ChType:8/unsigned>>} -> + ok = file:close(IdxFd), + SegFile = segment_from_index_file(IdxFile), + Acc = build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0), + build_log_overview0(IdxFiles, Acc); {error, einval} when IdxFiles == [] andalso Acc0 == [] -> %% this would happen if the file only contained a header ok = file:close(IdxFd), @@ -1513,28 +1543,23 @@ build_log_overview0([IdxFile | IdxFiles], Acc0) -> {error, einval} -> ok = file:close(IdxFd), build_log_overview0(IdxFiles, Acc0); + {error, enoent} -> + %% The retention policy could have just been applied + ok = file:close(IdxFd), + build_log_overview0(IdxFiles, Acc0) + end. + +tail_index(IdxFd) -> + case position_at_idx_record_boundary(IdxFd, {eof, -?INDEX_RECORD_SIZE_B}) of {ok, _} -> - case file:read(IdxFd, ?INDEX_RECORD_SIZE_B) of - {ok, - <<_Offset:64/unsigned, - _Timestamp:64/signed, - _Epoch:64/unsigned, - LastChunkPos:32/unsigned, - _ChType:8/unsigned>>} -> - ok = file:close(IdxFd), - SegFile = segment_from_index_file(IdxFile), - Acc = build_segment_info(SegFile, - LastChunkPos, - IdxFile, - Acc0), - build_log_overview0(IdxFiles, Acc); - {error, enoent} -> - %% The retention policy could have just been applied - ok = file:close(IdxFd), - build_log_overview0(IdxFiles, Acc0) - end + file:read(IdxFd, ?INDEX_RECORD_SIZE_B); + Err -> + Err end. +top_index(IdxFd) -> + file:pread(IdxFd, ?IDX_HEADER_SIZE, ?INDEX_RECORD_SIZE_B). + %% Some file:position/2 operations are subject to race conditions. In particular, `eof` may position the Fd %% in the middle of a record being written concurrently. If that happens, we need to re-position at the nearest %% record boundry. See https://github.com/rabbitmq/osiris/issues/73 @@ -1617,14 +1642,23 @@ build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> -spec overview(term()) -> {range(), [{epoch(), offset()}]}. overview(Dir) -> - case build_log_overview(Dir) of + case sorted_index_files(Dir) of [] -> {empty, []}; - SegInfos -> - Range = offset_range_from_segment_infos(SegInfos), - EpochOffsets = last_epoch_offsets(SegInfos), + IdxFiles -> + Range = offset_range_from_idx_files(IdxFiles), + EpochOffsets = last_epoch_offsets2(IdxFiles), {Range, EpochOffsets} end. +% overview(Dir) -> +% case build_log_overview(Dir) of +% [] -> +% {empty, []}; +% SegInfos -> +% Range = offset_range_from_segment_infos(SegInfos), +% EpochOffsets = last_epoch_offsets(SegInfos), +% {Range, EpochOffsets} +% end. -spec format_status(state()) -> map(). format_status(#?MODULE{cfg = #cfg{directory = Dir, @@ -1692,11 +1726,7 @@ evaluate_retention0(Infos, [{max_age, Age} | Specs]) -> evaluate_retention0(RemSegs, Specs). eval_age([#seg_info{last = #chunk_info{timestamp = Ts}, - size = Size} = - Old - | Rem] = - Infos, - Age) -> + size = Size} = Old | Rem] = Infos, Age) -> Now = erlang:system_time(millisecond), case Ts < Now - Age andalso length(Rem) > 0 @@ -1735,46 +1765,103 @@ eval_max_bytes(SegInfos, MaxSize) -> end end. -%% returns a list of the last offset by epoch -last_epoch_offsets([#seg_info{first = undefined, - last = undefined}]) -> - []; -last_epoch_offsets([#seg_info{index = IdxFile, - first = #chunk_info{epoch = FstE, id = FstChId}} - | SegInfos]) -> - {Time, Result} = - timer:tc( - fun() -> - FstFd = open_index_read(IdxFile), - {LastE, LastO, Res} = - lists:foldl( - fun(#seg_info{index = I, - last = #chunk_info{epoch = LE}}, - {E, _, _} = Acc) when LE > E -> - Fd = open_index_read(I), - last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), - Fd, Acc); - (#seg_info{last = #chunk_info{id = LO, - epoch = LE}}, - {_, _, Acc}) -> - {LE, LO, Acc}; - (_, Acc) -> - %% not sure if this can ever happen - Acc - end, - last_epoch_offset(file:read(FstFd, ?INDEX_RECORD_SIZE_B), - FstFd, {FstE, FstChId, []}), - SegInfos), - lists:reverse([{LastE, LastO} | Res]) - end), +last_epoch_offsets2([IdxFile]) -> + Fd = open_index_read(IdxFile), + case last_epoch_offset( + file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, undefined) of + undefined -> + []; + {LastE, LastO, Res} -> + lists:reverse([{LastE, LastO} | Res]) + end; +last_epoch_offsets2([FstIdxFile | IdxFiles]) -> + F = fun() -> + FstFd = open_index_read(FstIdxFile), + {ok, <>} = top_index(FstFd), + {ok, ?IDX_HEADER_SIZE} = file:position(FstFd, ?IDX_HEADER_SIZE), + {LastE, LastO, Res} = + lists:foldl( + fun(IdxFile, {E, _, EOs} = Acc) -> + Fd = open_index_read(IdxFile), + {ok, <>} = tail_index(Fd), + case Epoch > E of + true -> + %% we need to scan as the last index record + %% has a greater epoch + {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), + last_epoch_offset( + file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, + Acc); + false -> + ok = file:close(Fd), + {Epoch, Offset, EOs} + end + end, + last_epoch_offset(file:read(FstFd, ?INDEX_RECORD_SIZE_B), + FstFd, {FstE, FstO, []}), IdxFiles), + lists:reverse([{LastE, LastO} | Res]) + end, + {Time, Result} = timer:tc(F), ?DEBUG("~s:~s/~b completed in ~bms", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time div 1000]), Result. +%% returns a list of the last offset by epoch +% last_epoch_offsets([#seg_info{first = undefined, +% last = undefined}]) -> +% []; +% last_epoch_offsets([#seg_info{index = IdxFile, +% first = #chunk_info{epoch = FstE, id = FstChId}} +% | SegInfos]) -> +% {Time, Result} = +% timer:tc( +% fun() -> +% FstFd = open_index_read(IdxFile), +% {LastE, LastO, Res} = +% lists:foldl( +% fun(#seg_info{index = I, +% last = #chunk_info{epoch = LE}}, +% {E, _, _} = Acc) when LE > E -> +% Fd = open_index_read(I), +% last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), +% Fd, Acc); +% (#seg_info{last = #chunk_info{id = LO, +% epoch = LE}}, +% {_, _, Acc}) -> +% {LE, LO, Acc}; +% (_, Acc) -> +% %% not sure if this can ever happen +% Acc +% end, +% last_epoch_offset(file:read(FstFd, ?INDEX_RECORD_SIZE_B), +% FstFd, {FstE, FstChId, []}), +% SegInfos), +% lists:reverse([{LastE, LastO} | Res]) +% end), +% ?DEBUG("~s:~s/~b completed in ~bms", +% [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time div 1000]), +% Result. %% aggregates the chunk offsets for each epoch last_epoch_offset(eof, Fd, Acc) -> ok = file:close(Fd), Acc; +last_epoch_offset({ok, + <>}, + Fd, undefined) -> + last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, + {Epoch, Offset, []}); last_epoch_offset({ok, < Basename = filename:basename(IdxFile, ".index"), BaseDir = filename:dirname(IdxFile), @@ -1929,6 +2017,12 @@ first_timestamp_from_segment_infos( first_timestamp_from_segment_infos(_) -> 0. + +offset_range_from_idx_files(IdxFiles) -> + {_, FstSI, LstSI} = build_log_overview_first_last0(IdxFiles), + ChunkRange = chunk_range_from_segment_infos(lists:usort([FstSI, LstSI])), + offset_range_from_chunk_range(ChunkRange). + offset_range_from_segment_infos(SegInfos) -> ChunkRange = chunk_range_from_segment_infos(SegInfos), offset_range_from_chunk_range(ChunkRange). @@ -2392,6 +2486,9 @@ next_location(#chunk_info{id = Id, size = Size}) -> {Id + Num, Pos + Size + ?HEADER_SIZE_B}. +index_file_first_offset(IdxFile) -> + list_to_integer(filename:basename(IdxFile, ".index")). + -ifdef(TEST). % -include_lib("eunit/include/eunit.hrl"). diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index d03dda574161..ff42631ba510 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1066,13 +1066,14 @@ many_segment_overview(Config) -> Conf = Conf0#{max_segment_size_bytes => 64000}, osiris_log:close(seed_log(Conf, EpochChunks, Config)), %% {40051,{{0,40959},[{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}} - {OverviewTaken, Res} = timer:tc(fun () -> - osiris_log:overview(maps:get(dir, Conf)) - end), + {OverviewTaken, LogOverview} = timer:tc(fun () -> + osiris_log:overview(maps:get(dir, Conf)) + end), ct:pal("OverviewTaken ~p", [OverviewTaken]), - ct:pal("~p", [Res]), + ct:pal("~p", [LogOverview]), + %% {{0,40959},[{-1,-1},{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]} ?assertEqual({{0,40959}, - [{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, Res), + [{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, LogOverview), {InitTaken, _} = timer:tc( fun () -> @@ -1122,6 +1123,18 @@ many_segment_overview(Config) -> osiris_log:close(L) end), ct:pal("OffsOffsetTakenMid ~p", [OffsOffsetTakenMid]), + + %% TODO: timestamp + + %% acceptor + {Range, EOffs} = LogOverview, + {InitAcceptorTaken, _} = + timer:tc(fun () -> + osiris_log:init_acceptor(Range, EOffs, Conf#{epoch => 6}) + end), + ct:pal("InitAcceptor took ~bus", [InitAcceptorTaken]), + + %% TODO: evaluate_retention ok. %% Utility From ebdf3951f3694d3ca0296d2884654ebaa53d38b2 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 27 Apr 2022 10:56:13 +0100 Subject: [PATCH 07/19] Optimise osiris_log:init_data_reader --- src/osiris_log.erl | 51 ++++++++------------------------------- test/osiris_log_SUITE.erl | 13 +++++++++- 2 files changed, 22 insertions(+), 42 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index dfdcab91db08..82b6c2bfc9d5 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -743,7 +743,7 @@ truncate_to(_Name, _Range, [], IdxFiles) -> [begin ok = delete_segment_from_index(I) end || I <- IdxFiles], []; truncate_to(Name, Range, [{E, ChId} | NextEOs], IdxFiles) -> - case find_segment_for_offset2(ChId, IdxFiles) of + case find_segment_for_offset(ChId, IdxFiles) of not_found -> case index_file_to_seg_info(lists:last(IdxFiles)) of #seg_info{last = #chunk_info{epoch = E, @@ -830,8 +830,8 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], IdxFiles) -> {error, {invalid_last_offset_epoch, epoch(), offset()}}. init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir} = Config) -> - SegInfos = build_log_overview(Dir), - Range = offset_range_from_segment_infos(SegInfos), + IdxFiles = sorted_index_files(Dir), + Range = offset_range_from_idx_files(IdxFiles), ?DEBUG("osiris_segment:init_data_reader/2 at ~b prev " "~w local range: ~w", [StartChunkId, PrevEOT, Range]), @@ -848,23 +848,23 @@ init_data_reader({StartChunkId, PrevEOT}, #{dir := Dir} = Config) -> %% first we need to validate PrevEO {ok, init_data_reader_from( StartChunkId, - find_segment_for_offset(StartChunkId, SegInfos), + find_segment_for_offset(StartChunkId, IdxFiles), Config)}; _ -> {PrevEpoch, PrevChunkId, _PrevTs} = PrevEOT, - case check_chunk_has_expected_epoch(PrevChunkId, PrevEpoch, SegInfos) of + case check_chunk_has_expected_epoch(PrevChunkId, PrevEpoch, IdxFiles) of ok -> {ok, init_data_reader_from( StartChunkId, - find_segment_for_offset(StartChunkId, SegInfos), + find_segment_for_offset(StartChunkId, IdxFiles), Config)}; {error, _} = Err -> Err end end. -check_chunk_has_expected_epoch(ChunkId, Epoch, SegInfos) -> - case find_segment_for_offset(ChunkId, SegInfos) of +check_chunk_has_expected_epoch(ChunkId, Epoch, IdxFiles) -> + case find_segment_for_offset(ChunkId, IdxFiles) of not_found -> %% this is unexpected and thus an error {error, @@ -1064,7 +1064,7 @@ init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) throw({retry_with, next, Conf}) end, - case find_segment_for_offset2(StartOffset, IdxFiles2) of + case find_segment_for_offset(StartOffset, IdxFiles2) of not_found -> {error, {offset_out_of_range, Range}}; {end_of_log, #seg_info{file = SegmentFile, @@ -2044,38 +2044,7 @@ offset_range_from_chunk_range({#chunk_info{id = FirstChId}, num = LastNumRecs}}) -> {FirstChId, LastChId + LastNumRecs - 1}. -%% find the segment the offset is in _or_ if the offset is the very next -%% chunk offset it will return the last segment -find_segment_for_offset(0, - [#seg_info{first = undefined, last = undefined} = - Info]) -> - {end_of_log, Info}; -find_segment_for_offset(Offset, - [#seg_info{last = - #chunk_info{id = LastChId, - num = LastNumRecs}} = - Info]) - when Offset == LastChId + LastNumRecs -> - %% the last segment and offset is the next offset - {end_of_log, Info}; -find_segment_for_offset(Offset, - [#seg_info{first = #chunk_info{id = FirstChId}, - last = - #chunk_info{id = LastChId, - num = LastNumRecs}} = - Info | Rem]) -> - NextChId = LastChId + LastNumRecs, - case Offset >= FirstChId andalso Offset < NextChId of - true -> - %% we found it - {found, Info}; - false -> - find_segment_for_offset(Offset, Rem) - end; -find_segment_for_offset(_Offset, _) -> - not_found. - -find_segment_for_offset2(Offset, IdxFiles) -> +find_segment_for_offset(Offset, IdxFiles) -> %% we assume index files are in the default low-> high order here case lists:search( fun(IdxFile) -> diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index ff42631ba510..35bb2d3804bb 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1125,16 +1125,27 @@ many_segment_overview(Config) -> ct:pal("OffsOffsetTakenMid ~p", [OffsOffsetTakenMid]), %% TODO: timestamp + %% %% acceptor {Range, EOffs} = LogOverview, - {InitAcceptorTaken, _} = + {InitAcceptorTaken, AcceptorLog} = timer:tc(fun () -> osiris_log:init_acceptor(Range, EOffs, Conf#{epoch => 6}) end), ct:pal("InitAcceptor took ~bus", [InitAcceptorTaken]), + {InitDataReaderTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_data_reader( + osiris_log:tail_info(AcceptorLog), + Conf#{epoch => 6}), + osiris_log:close(L) + end), + ct:pal("InitDataReaderTaken ~p", [InitDataReaderTaken]), + %% TODO: evaluate_retention + %% TODO: init_data_reader ok. %% Utility From 79b9f2d676822a8ef62b807d813ffc215fb692b9 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 27 Apr 2022 17:01:14 +0100 Subject: [PATCH 08/19] Optimise osiris_log:evaluate_retention/2 --- src/osiris_log.erl | 196 ++++++++++++++++++-------------------- test/osiris_log_SUITE.erl | 18 +++- 2 files changed, 107 insertions(+), 107 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 82b6c2bfc9d5..c839d8393292 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -8,6 +8,7 @@ -module(osiris_log). -include("osiris.hrl"). +-include_lib("kernel/include/file.hrl"). -export([init/1, init/2, @@ -728,13 +729,6 @@ delete_segment_from_index(Index) -> ok = file:delete(Index), ok. -delete_segment(#seg_info{file = File, index = Index}) -> - ?DEBUG("osiris_log: deleting segment ~s in ~s", - [filename:basename(File), filename:dirname(File)]), - ok = file:delete(File), - ok = file:delete(Index), - ok. - truncate_to(_Name, _Range, _EpochOffsets, []) -> %% the target log is empty []; @@ -1650,15 +1644,6 @@ overview(Dir) -> EpochOffsets = last_epoch_offsets2(IdxFiles), {Range, EpochOffsets} end. -% overview(Dir) -> -% case build_log_overview(Dir) of -% [] -> -% {empty, []}; -% SegInfos -> -% Range = offset_range_from_segment_infos(SegInfos), -% EpochOffsets = last_epoch_offsets(SegInfos), -% {Range, EpochOffsets} -% end. -spec format_status(state()) -> map(). format_status(#?MODULE{cfg = #cfg{directory = Dir, @@ -1705,63 +1690,72 @@ update_retention(Retention, evaluate_retention(Dir, Specs) -> {Time, Result} = timer:tc( fun() -> - SegInfos0 = build_log_overview(Dir), - SegInfos = evaluate_retention0(SegInfos0, Specs), - OffsetRange = offset_range_from_segment_infos(SegInfos), - FirstTs = first_timestamp_from_segment_infos(SegInfos), - {OffsetRange, FirstTs, length(SegInfos)} + IdxFiles0 = sorted_index_files(Dir), + IdxFiles = evaluate_retention0(IdxFiles0, Specs), + OffsetRange = offset_range_from_idx_files(IdxFiles), + FirstTs = first_timestamp_from_index_files(IdxFiles), + {OffsetRange, FirstTs, length(IdxFiles)} end), ?DEBUG("~s:~s/~b (~w) completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Specs, Time/1_000_000]), Result. -evaluate_retention0(Infos, []) -> - %% we should never hit empty infos as one should always be left - Infos; -evaluate_retention0(Infos, [{max_bytes, MaxSize} | Specs]) -> - RemSegs = eval_max_bytes(Infos, MaxSize), - evaluate_retention0(RemSegs, Specs); -evaluate_retention0(Infos, [{max_age, Age} | Specs]) -> - RemSegs = eval_age(Infos, Age), - evaluate_retention0(RemSegs, Specs). - -eval_age([#seg_info{last = #chunk_info{timestamp = Ts}, - size = Size} = Old | Rem] = Infos, Age) -> - Now = erlang:system_time(millisecond), - case Ts < Now - Age - andalso length(Rem) > 0 - andalso Size > ?LOG_HEADER_SIZE - of - true -> - %% the oldest timestamp is older than retention - %% and there are other segments available - %% we can delete - ok = delete_segment(Old), - eval_age(Rem, Age); - false -> - Infos - end; -eval_age(Infos, _Age) -> - Infos. - -eval_max_bytes(SegInfos, MaxSize) -> - TotalSize = - lists:foldl(fun(#seg_info{size = Size}, Acc) -> Acc + Size end, 0, - SegInfos), - case SegInfos of - _ when length(SegInfos) =< 1 -> - SegInfos; - [_, #seg_info{size = 0}] -> - SegInfos; +evaluate_retention0(IdxFiles, []) -> + IdxFiles; +evaluate_retention0(IdxFiles, [{max_bytes, MaxSize} | Specs]) -> + RemIdxFiles = eval_max_bytes(IdxFiles, MaxSize), + evaluate_retention0(RemIdxFiles, Specs); +evaluate_retention0(IdxFiles, [{max_age, Age} | Specs]) -> + RemIdxFiles = eval_age(IdxFiles, Age), + evaluate_retention0(RemIdxFiles, Specs). + +eval_age([_] = IdxFiles, _Age) -> + IdxFiles; +eval_age([IdxFile | IdxFiles] = AllIdxFiles, Age) -> + case last_timestamp_in_index_file(IdxFile) of + {ok, Ts} -> + Now = erlang:system_time(millisecond), + case Ts < Now - Age of + true -> + %% the oldest timestamp is older than retention + %% and there are other segments available + %% we can delete + ok = delete_segment_from_index(IdxFile), + eval_age(IdxFiles, Age); + false -> + AllIdxFiles + end; + _Err -> + AllIdxFiles + end. + +eval_max_bytes(IdxFiles, MaxSize) -> + case IdxFiles of + [] -> + []; + [_] -> + IdxFiles; _ -> + %% TODO: handle case where the second segment doesn't have any entries + %% on second though is there ever + TotalSize = + lists:foldl(fun(IdxFile, Acc) -> + SegFile = segment_from_index_file(IdxFile), + case prim_file:read_file_info(SegFile) of + {ok, #file_info{size = Size}} -> + Acc + Size; + _ -> + Acc + end + end, 0, IdxFiles), case TotalSize > MaxSize of true -> %% we can delete at least one segment - [Old | Rem] = SegInfos, - ok = delete_segment(Old), + [Old | Rem] = IdxFiles, + ok = delete_segment_from_index(Old), eval_max_bytes(Rem, MaxSize); false -> - SegInfos + IdxFiles end end. @@ -1813,41 +1807,6 @@ last_epoch_offsets2([FstIdxFile | IdxFiles]) -> ?DEBUG("~s:~s/~b completed in ~bms", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time div 1000]), Result. -%% returns a list of the last offset by epoch -% last_epoch_offsets([#seg_info{first = undefined, -% last = undefined}]) -> -% []; -% last_epoch_offsets([#seg_info{index = IdxFile, -% first = #chunk_info{epoch = FstE, id = FstChId}} -% | SegInfos]) -> -% {Time, Result} = -% timer:tc( -% fun() -> -% FstFd = open_index_read(IdxFile), -% {LastE, LastO, Res} = -% lists:foldl( -% fun(#seg_info{index = I, -% last = #chunk_info{epoch = LE}}, -% {E, _, _} = Acc) when LE > E -> -% Fd = open_index_read(I), -% last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), -% Fd, Acc); -% (#seg_info{last = #chunk_info{id = LO, -% epoch = LE}}, -% {_, _, Acc}) -> -% {LE, LO, Acc}; -% (_, Acc) -> -% %% not sure if this can ever happen -% Acc -% end, -% last_epoch_offset(file:read(FstFd, ?INDEX_RECORD_SIZE_B), -% FstFd, {FstE, FstChId, []}), -% SegInfos), -% lists:reverse([{LastE, LastO} | Res]) -% end), -% ?DEBUG("~s:~s/~b completed in ~bms", -% [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time div 1000]), -% Result. %% aggregates the chunk offsets for each epoch last_epoch_offset(eof, Fd, Acc) -> @@ -2011,12 +1970,43 @@ sendfile(ssl, Fd, Sock, Pos, ToSend) -> Err end. -first_timestamp_from_segment_infos( - [#seg_info{first = #chunk_info{timestamp = Ts}} | _ ]) -> - Ts; -first_timestamp_from_segment_infos(_) -> - 0. +last_timestamp_in_index_file(IdxFile) -> + case file:open(IdxFile, [raw, binary, read]) of + {ok, IdxFd} -> + case tail_index(IdxFd) of + {ok, <<_O:64/unsigned, + LastTimestamp:64/signed, + _E:64/unsigned, + _ChunkPos:32/unsigned, + _ChType:8/unsigned>>} -> + _ = file:close(IdxFd), + {ok, LastTimestamp}; + Err -> + _ = file:close(IdxFd), + Err + end; + Err -> + Err + end. +first_timestamp_from_index_files([IdxFile | _]) -> + case file:open(IdxFile, [raw, binary, read]) of + {ok, IdxFd} -> + case top_index(IdxFd) of + {ok, <<_FstO:64/unsigned, + FstTimestamp:64/signed, + _FstE:64/unsigned, + _FstChunkPos:32/unsigned, + _FstChType:8/unsigned>>} -> + _ = file:close(IdxFd), + FstTimestamp; + _ -> + _ = file:close(IdxFd), + 0 + end; + _Err -> + 0 + end. offset_range_from_idx_files(IdxFiles) -> {_, FstSI, LstSI} = build_log_overview_first_last0(IdxFiles), @@ -2027,8 +2017,6 @@ offset_range_from_segment_infos(SegInfos) -> ChunkRange = chunk_range_from_segment_infos(SegInfos), offset_range_from_chunk_range(ChunkRange). -chunk_range_from_segment_infos([]) -> - empty; chunk_range_from_segment_infos([#seg_info{first = undefined, last = undefined}]) -> empty; diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 35bb2d3804bb..eb9ebc0ac08b 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -1125,7 +1125,14 @@ many_segment_overview(Config) -> ct:pal("OffsOffsetTakenMid ~p", [OffsOffsetTakenMid]), %% TODO: timestamp - %% + Ts = erlang:system_time(millisecond) - 10000, + {TimestampTaken, _} = + timer:tc(fun () -> + {ok, L} = osiris_log:init_offset_reader({timestamp, Ts}, + Conf#{epoch => 6}), + osiris_log:close(L) + end), + ct:pal("TimestampTaken ~p", [TimestampTaken]), %% acceptor {Range, EOffs} = LogOverview, @@ -1144,8 +1151,13 @@ many_segment_overview(Config) -> end), ct:pal("InitDataReaderTaken ~p", [InitDataReaderTaken]), - %% TODO: evaluate_retention - %% TODO: init_data_reader + %% evaluate_retention + Specs = [{max_age, 60000}, {max_bytes, 5_000_000_000}], + {RetentionTaken, _} = + timer:tc(fun () -> + osiris_log:evaluate_retention(maps:get(dir, Conf), Specs) + end), + ct:pal("RetentionTaken ~p", [RetentionTaken]), ok. %% Utility From e7ae5ab97197c7aafd411f5ca8c8c9087962a1c7 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 27 Apr 2022 17:10:32 +0100 Subject: [PATCH 09/19] Move replica reader start code into osiris_replica_reader module --- src/osiris_replica.erl | 21 +-------------------- src/osiris_replica_reader.erl | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/src/osiris_replica.erl b/src/osiris_replica.erl index 153f1f1c39ac..d7eb7cb48a45 100644 --- a/src/osiris_replica.erl +++ b/src/osiris_replica.erl @@ -225,26 +225,7 @@ init(#{name := Name, start_offset => TailInfo, reference => ExtRef, connection_token => Token}, - RRPid = - case supervisor:start_child({osiris_replica_reader_sup, Node}, - #{id => make_ref(), - start => - {osiris_replica_reader, start_link, - [ReplicaReaderConf]}, - %% replica readers should never be - %% restarted by their sups - %% instead they need to be re-started - %% by their replica - restart => temporary, - shutdown => 5000, - type => worker, - modules => [osiris_replica_reader]}) - of - {ok, Pid} -> - Pid; - {ok, Pid, _} -> - Pid - end, + RRPid = osiris_replica_reader:start(Node, ReplicaReaderConf), true = link(RRPid), Interval = maps:get(replica_gc_interval, Config, 5000), erlang:send_after(Interval, self(), force_gc), diff --git a/src/osiris_replica_reader.erl b/src/osiris_replica_reader.erl index ebf90400687a..3cd2ee0769ce 100644 --- a/src/osiris_replica_reader.erl +++ b/src/osiris_replica_reader.erl @@ -17,6 +17,7 @@ %% API functions -export([start_link/1, + start/2, stop/1]). %% gen_server callbacks -export([init/1, @@ -102,6 +103,27 @@ start_link(Conf) -> stop(Pid) -> gen_server:cast(Pid, stop). +start(Node, ReplicaReaderConf) when is_map(ReplicaReaderConf) -> + case supervisor:start_child({osiris_replica_reader_sup, Node}, + #{id => make_ref(), + start => + {osiris_replica_reader, start_link, + [ReplicaReaderConf]}, + %% replica readers should never be + %% restarted by their sups + %% instead they need to be re-started + %% by their replica + restart => temporary, + shutdown => 5000, + type => worker, + modules => [osiris_replica_reader]}) + of + {ok, Pid} -> + Pid; + {ok, Pid, _} -> + Pid + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== From 6416e552347a93ad933fbaa59f5a35c9c52333cb Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 29 Apr 2022 16:13:42 +0100 Subject: [PATCH 10/19] Optimise offset reader timestamp log attachments We still do a linear scan but don't first build a log overview. Also scanning is done new -> old rather than from the start of the log as before. Also fix issue where a timestamp that falls in between segments causes the offset reader to attach to "next" --- src/osiris_log.erl | 168 ++++++++++++++++++++++++++------------ src/osiris_util.erl | 3 + test/osiris_log_SUITE.erl | 97 ++++++++++++++++++---- 3 files changed, 204 insertions(+), 64 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index c839d8393292..85e622908862 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -442,10 +442,12 @@ init(#{dir := Dir, %% scan directory for segments if in write mode MaxSizeBytes = maps:get(max_segment_size_bytes, Config, ?DEFAULT_MAX_SEGMENT_SIZE_B), - MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, ?DEFAULT_MAX_SEGMENT_SIZE_C), + MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, + ?DEFAULT_MAX_SEGMENT_SIZE_C), Retention = maps:get(retention, Config, []), ?INFO("Will use ~s for osiris log data directory", [Dir]), - ?DEBUG("osiris_log:init/1 max_segment_size_bytes: ~b, max_segment_size_chunks ~b, retention ~w", + ?DEBUG("osiris_log:init/1 max_segment_size_bytes: ~b, + max_segment_size_chunks ~b, retention ~w", [MaxSizeBytes, MaxSizeChunks, Retention]), ok = filelib:ensure_dir(Dir), case file:make_dir(Dir) of @@ -952,33 +954,28 @@ init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) -> init_offset_reader0(Offs, Conf) end; init_offset_reader0({timestamp, Ts}, #{dir := Dir} = Conf) -> - case build_log_overview(Dir) of + case sorted_index_files(Dir) of [] -> init_offset_reader0(next, Conf); - [#seg_info{file = SegmentFile, - first = #chunk_info{timestamp = Fst, - pos = FilePos, - id = ChunkId}} | _] - when is_integer(Fst) andalso Fst > Ts -> - %% timestamp is lower than the first timestamp available - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); - SegInfos -> - case lists:search(fun (#seg_info{first = #chunk_info{timestamp = F}, - last = #chunk_info{timestamp = L}}) - when is_integer(F) - andalso is_integer(L) -> - Ts >= F andalso Ts =< L; - (_) -> - false - end, - SegInfos) + IdxFiles -> + case timestamp_idx_file_search(Ts, lists:reverse(IdxFiles)) of - {value, #seg_info{file = SegmentFile} = Info} -> + {scan, IdxFile} -> %% segment was found, now we need to scan index to %% find nearest offset - {ChunkId, FilePos} = chunk_location_for_timestamp(Info, Ts), + {ChunkId, FilePos} = chunk_location_for_timestamp(IdxFile, Ts), + SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); - false -> + {first_in, IdxFile} -> + {ok, Fd} = file:open(IdxFile, [raw, binary, read]), + {ok, <>} = top_index(Fd), + SegmentFile = segment_from_index_file(IdxFile), + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + next -> %% segment was not found, attach next %% this should be rare so no need to call the more optimal %% open_offset_reader_at/4 function @@ -1499,21 +1496,6 @@ build_log_overview_first_last0([Fst | Rem]) -> [FstSegInfo, LastSegInfo] = build_log_overview0([Fst, lists:last(Rem)], []), {length(Rem) + 1, FstSegInfo, LastSegInfo}. -build_log_overview(Dir) when is_list(Dir) -> - IdxFiles = sorted_index_files(Dir), - {Time, Result} = timer:tc( - fun() -> - try - build_log_overview0(IdxFiles, []) - catch - missing_file -> - build_log_overview(Dir) - end - end), - ?DEBUG("~s:~s/~b completed in ~b ms", - [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time div 1000]), - Result. - build_log_overview0([], Acc) -> lists:reverse(Acc); build_log_overview0([IdxFile | IdxFiles], Acc0) -> @@ -1736,18 +1718,16 @@ eval_max_bytes(IdxFiles, MaxSize) -> [_] -> IdxFiles; _ -> - %% TODO: handle case where the second segment doesn't have any entries - %% on second though is there ever TotalSize = - lists:foldl(fun(IdxFile, Acc) -> - SegFile = segment_from_index_file(IdxFile), - case prim_file:read_file_info(SegFile) of - {ok, #file_info{size = Size}} -> - Acc + Size; - _ -> - Acc - end - end, 0, IdxFiles), + lists:foldl(fun(IdxFile, Acc) -> + SegFile = segment_from_index_file(IdxFile), + case prim_file:read_file_info(SegFile) of + {ok, #file_info{size = Size}} -> + Acc + Size; + _ -> + Acc + end + end, 0, IdxFiles), case TotalSize > MaxSize of true -> %% we can delete at least one segment @@ -2199,11 +2179,16 @@ throw_missing(Any) -> open(SegFile, Options) -> throw_missing(file:open(SegFile, Options)). -chunk_location_for_timestamp(#seg_info{index = Idx}, Ts) -> +chunk_location_for_timestamp(Idx, Ts) -> Fd = open_index_read(Idx), %% scan index file for nearest timestamp {ChunkId, _Timestamp, _Epoch, FilePos} = timestamp_idx_scan(Fd, Ts), {ChunkId, FilePos}. +% chunk_location_for_timestamp(#seg_info{index = Idx}, Ts) -> +% Fd = open_index_read(Idx), +% %% scan index file for nearest timestamp +% {ChunkId, _Timestamp, _Epoch, FilePos} = timestamp_idx_scan(Fd, Ts), +% {ChunkId, FilePos}. timestamp_idx_scan(Fd, Ts) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of @@ -2446,6 +2431,89 @@ next_location(#chunk_info{id = Id, index_file_first_offset(IdxFile) -> list_to_integer(filename:basename(IdxFile, ".index")). +start_end_timestamp(IdxFile) -> + case file:open(IdxFile, [raw, read, binary]) of + {ok, Fd} -> + case top_index(Fd) of + {ok, <<_:64/unsigned, + TopTs:64/signed, + _:64/unsigned, + _:32/unsigned, + _:8/unsigned>>} -> + %% if we can top we can tail + {ok, <<_:64/unsigned, + TailTs:64/signed, + _:64/unsigned, + _:32/unsigned, + _:8/unsigned>>} = tail_index(Fd), + ok = file:close(Fd), + {TopTs, TailTs}; + {error, einval} -> + %% empty index + undefined; + eof -> + eof + end; + _Err -> + file_not_found + end. + + + +%% accepts a list of index files in reverse order +%% [{21, 30}, {12, 20}, {5, 10}] +%% 11 = {12, 20} +timestamp_idx_file_search(Ts, [FstIdxFile | Older]) -> + case start_end_timestamp(FstIdxFile) of + {_FstTs, EndTs} + when Ts > EndTs -> + %% timestamp greater than the newest timestamp in the stream + %% attach at 'next' + next; + {FstTs, _EndTs} + when Ts < FstTs -> + %% the requested timestamp is older than the first timestamp in + %% this segment, keep scanning + timestamp_idx_file_search0(Ts, Older, FstIdxFile); + {_, _} -> + %% else we must have found it! + {scan, FstIdxFile}; + file_not_found -> + %% the requested timestamp is older than the first timestamp in + %% this segment, keep scanning + timestamp_idx_file_search0(Ts, Older, FstIdxFile); + eof -> + %% empty log + next + end. + +timestamp_idx_file_search0(Ts, [], IdxFile) -> + case start_end_timestamp(IdxFile) of + {FstTs, _LastTs} + when Ts < FstTs -> + {first_in, IdxFile}; + _ -> + {found, IdxFile} + end; +timestamp_idx_file_search0(Ts, [IdxFile | Older], Prev) -> + case start_end_timestamp(IdxFile) of + {_FstTs, EndTs} + when Ts > EndTs -> + %% we should attach the the first chunk in the previous segment + %% as the requested timestamp must fall in between the + %% current and previous segments + {first_in, Prev}; + {FstTs, _EndTs} + when Ts < FstTs -> + %% the requested timestamp is older than the first timestamp in + %% this segment, keep scanning + timestamp_idx_file_search0(Ts, Older, IdxFile); + _ -> + %% else we must have found it! + {scan, IdxFile} + end. + + -ifdef(TEST). % -include_lib("eunit/include/eunit.hrl"). diff --git a/src/osiris_util.erl b/src/osiris_util.erl index 6f9e87ab43d4..9ce8dd5b91b0 100644 --- a/src/osiris_util.erl +++ b/src/osiris_util.erl @@ -235,6 +235,9 @@ inet_tls_enabled([{proto_dist, ["inet_tls"]} | _]) -> inet_tls_enabled([_Opt | Tail]) -> inet_tls_enabled(Tail). +% -spec binary_search(Vjj +% binary_search(Fun, List) -> +% Fun(hd(List)). partition_parallel(F, Es, Timeout) -> Parent = self(), diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index eb9ebc0ac08b..cabe465e6e78 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -47,7 +47,9 @@ all_tests() -> init_offset_reader_last_chunk_is_not_user_chunk, init_offset_reader_no_user_chunk_in_last_segment, init_offset_reader_no_user_chunk_in_segments, + init_offset_reader_timestamp_empty, init_offset_reader_timestamp, + init_offset_reader_timestamp_multi_segment, init_offset_reader_truncated, init_data_reader_next, init_data_reader_empty_log, @@ -538,6 +540,25 @@ init_offset_reader_no_user_chunk_in_segments(Config) -> osiris_log:close(L2), ok. +init_offset_reader_timestamp_empty(Config) -> + ok = logger:set_primary_config(level, all), + Now = now_ms(), + EpochChunks = [], + LDir = ?config(leader_dir, Config), + Conf0 = ?config(osiris_conf, Config), + %% TODO: separate multi segment test + % application:set_env(osiris, max_segment_size_chunks, 1), + Conf = Conf0#{max_segment_size_chunks => 1}, + LLog0 = seed_log(LDir, EpochChunks, Config), + set_offset_ref(Conf, 3), + osiris_log:close(LLog0), + RConf = Conf#{dir => LDir}, + + {ok, L1} = osiris_log:init_offset_reader({timestamp, Now - 8000}, RConf), + ?assertEqual(0, osiris_log:next_offset(L1)), + osiris_log:close(L1), + ok. + init_offset_reader_timestamp(Config) -> ok = logger:set_primary_config(level, all), Now = now_ms(), @@ -546,7 +567,8 @@ init_offset_reader_timestamp(Config) -> {1, Now - 8000, [<<"two">>]}, % 1 {1, Now - 5000, [<<"three">>, <<"four">>]}], % 2 LDir = ?config(leader_dir, Config), - Conf = ?config(osiris_conf, Config), + Conf0 = ?config(osiris_conf, Config), + Conf = Conf0#{max_segment_size_chunks => 1}, LLog0 = seed_log(LDir, EpochChunks, Config), set_offset_ref(Conf, 3), osiris_log:close(LLog0), @@ -568,8 +590,56 @@ init_offset_reader_timestamp(Config) -> osiris_log:init_offset_reader({timestamp, Now - 10000}, RConf), ?assertEqual(0, osiris_log:next_offset(L3)), osiris_log:close(L3), + + %% in between + {ok, L4} = + osiris_log:init_offset_reader({timestamp, Now - 6000}, RConf), + ?assertEqual(2, osiris_log:next_offset(L4)), + osiris_log:close(L4), ok. +init_offset_reader_timestamp_multi_segment(Config) -> + ok = logger:set_primary_config(level, all), + Now = now_ms(), + EpochChunks = + [{1, Now - 10000, [<<"one">>]}, % 0 + {1, Now - 8000, [<<"two">>]}, % 1 + {1, Now - 5000, [<<"three">>, <<"four">>]}], % 2 + LDir = ?config(leader_dir, Config), + Conf0 = ?config(osiris_conf, Config), + %% segment per chunk + application:set_env(osiris, max_segment_size_chunks, 1), + Conf = Conf0#{max_segment_size_chunks => 1}, + LLog0 = seed_log(LDir, EpochChunks, Config), + + set_offset_ref(Conf, 3), + osiris_log:close(LLog0), + application:unset_env(osiris, max_segment_size_chunks), + RConf = Conf#{dir => LDir}, + + {ok, L1} = + osiris_log:init_offset_reader({timestamp, Now - 8000}, RConf), + %% next offset is expected to be offset 1 + ?assertEqual(1, osiris_log:next_offset(L1)), + osiris_log:close(L1), + + %% future case + {ok, L2} = osiris_log:init_offset_reader({timestamp, Now}, RConf), + ?assertEqual(4, osiris_log:next_offset(L2)), + osiris_log:close(L2), + + %% past case + {ok, L3} = + osiris_log:init_offset_reader({timestamp, Now - 10000}, RConf), + ?assertEqual(0, osiris_log:next_offset(L3)), + osiris_log:close(L3), + + %% in between + {ok, L4} = + osiris_log:init_offset_reader({timestamp, Now - 6000}, RConf), + ?assertEqual(2, osiris_log:next_offset(L4)), + osiris_log:close(L4), + ok. init_offset_reader_truncated(Config) -> Data = crypto:strong_rand_bytes(1500), EpochChunks = @@ -1074,44 +1144,44 @@ many_segment_overview(Config) -> %% {{0,40959},[{-1,-1},{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]} ?assertEqual({{0,40959}, [{1,8184},{2,16376},{3,24568},{4,32760},{5,40952}]}, LogOverview), + Conf6 = Conf#{epoch => 6}, {InitTaken, _} = timer:tc( fun () -> - osiris_log:close( - osiris_log:init(Conf#{epoch => 6})) + osiris_log:close(osiris_log:init(Conf6)) end), ct:pal("InitTaken ~p", [InitTaken]), {OffsLastTaken, _} = timer:tc(fun () -> - {ok, L} = osiris_log:init_offset_reader(last, Conf#{epoch => 6}), + {ok, L} = osiris_log:init_offset_reader(last, Conf6), osiris_log:close(L) end), ct:pal("OffsLastTaken ~p", [OffsLastTaken]), {OffsFirstTaken, _} = timer:tc(fun () -> - {ok, L} = osiris_log:init_offset_reader(first, Conf#{epoch => 6}), + {ok, L} = osiris_log:init_offset_reader(first, Conf6), osiris_log:close(L) end), ct:pal("OffsFirstTaken ~p", [OffsFirstTaken]), {OffsNextTaken, _} = timer:tc(fun () -> - {ok, L} = osiris_log:init_offset_reader(next, Conf#{epoch => 6}), + {ok, L} = osiris_log:init_offset_reader(next, Conf6), osiris_log:close(L) end), ct:pal("OffsNextTaken ~p", [OffsNextTaken]), {OffsOffsetTakenHi, _} = timer:tc(fun () -> - {ok, L} = osiris_log:init_offset_reader(40000, Conf#{epoch => 6}), + {ok, L} = osiris_log:init_offset_reader(40000, Conf6), osiris_log:close(L) end), ct:pal("OffsOffsetTakenHi ~p", [OffsOffsetTakenHi]), {OffsOffsetTakenLow, _} = timer:tc(fun () -> - {ok, L} = osiris_log:init_offset_reader(400, Conf#{epoch => 6}), + {ok, L} = osiris_log:init_offset_reader(400, Conf6), osiris_log:close(L) end), ct:pal("OffsOffsetTakenLow ~p", [OffsOffsetTakenLow]), @@ -1119,17 +1189,17 @@ many_segment_overview(Config) -> {OffsOffsetTakenMid, _} = timer:tc(fun () -> - {ok, L} = osiris_log:init_offset_reader(20000, Conf#{epoch => 6}), + {ok, L} = osiris_log:init_offset_reader(20000, Conf6), osiris_log:close(L) end), ct:pal("OffsOffsetTakenMid ~p", [OffsOffsetTakenMid]), %% TODO: timestamp - Ts = erlang:system_time(millisecond) - 10000, + Ts = erlang:system_time(millisecond) - 100, {TimestampTaken, _} = timer:tc(fun () -> {ok, L} = osiris_log:init_offset_reader({timestamp, Ts}, - Conf#{epoch => 6}), + Conf6), osiris_log:close(L) end), ct:pal("TimestampTaken ~p", [TimestampTaken]), @@ -1138,15 +1208,14 @@ many_segment_overview(Config) -> {Range, EOffs} = LogOverview, {InitAcceptorTaken, AcceptorLog} = timer:tc(fun () -> - osiris_log:init_acceptor(Range, EOffs, Conf#{epoch => 6}) + osiris_log:init_acceptor(Range, EOffs, Conf6) end), ct:pal("InitAcceptor took ~bus", [InitAcceptorTaken]), {InitDataReaderTaken, _} = timer:tc(fun () -> {ok, L} = osiris_log:init_data_reader( - osiris_log:tail_info(AcceptorLog), - Conf#{epoch => 6}), + osiris_log:tail_info(AcceptorLog), Conf6), osiris_log:close(L) end), ct:pal("InitDataReaderTaken ~p", [InitDataReaderTaken]), From 54e4f871aeb20b24b8f6d584f77482d83b8de30b Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 10 May 2022 16:15:12 +0100 Subject: [PATCH 11/19] osiris_log: optimisations and refactoring --- src/osiris_log.erl | 384 ++++++++++++++++++++------------------------- 1 file changed, 174 insertions(+), 210 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 85e622908862..d19273e4e1c4 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -475,7 +475,7 @@ init(#{dir := Dir, counter = Cnt, counter_id = counter_id(Config), first_offset_fun = FirstOffsetFun}, - case build_log_overview_first_last(Config) of + case first_and_last_seginfos(Config) of none -> NextOffset = case Config of #{initial_offset := IO} @@ -738,22 +738,19 @@ truncate_to(_Name, _Range, [], IdxFiles) -> %% ????? this means the entire log is out [begin ok = delete_segment_from_index(I) end || I <- IdxFiles], []; -truncate_to(Name, Range, [{E, ChId} | NextEOs], IdxFiles) -> +truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> case find_segment_for_offset(ChId, IdxFiles) of not_found -> - case index_file_to_seg_info(lists:last(IdxFiles)) of - #seg_info{last = #chunk_info{epoch = E, - id = LastChId, - num = Num}} + case build_seg_info(lists:last(IdxFiles)) of + {ok, #seg_info{last = #chunk_info{epoch = E, + id = LastChId, + num = Num}}} when ChId > LastChId + Num -> %% the last available local chunk id is smaller than the %% sources last chunk id but is in the same epoch %% check if there is any overlap - LastOffsLocal = case offset_range_from_idx_files(IdxFiles) of - empty -> 0; - {_, L} -> L - end, - FstOffsetRemote = case Range of + LastOffsLocal = LastChId + Num, + FstOffsetRemote = case RemoteRange of empty -> 0; {F, _} -> F end, @@ -769,8 +766,10 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], IdxFiles) -> %% no truncation needed IdxFiles end; - _ -> - truncate_to(Name, Range, NextEOs, IdxFiles) + {ok, _} -> + truncate_to(Name, RemoteRange, NextEOs, IdxFiles) + %% TODO: what to do if error is returned from + %% build_seg_info/1? end; {end_of_log, _Info} -> IdxFiles; @@ -814,7 +813,7 @@ truncate_to(Name, Range, [{E, ChId} | NextEOs], IdxFiles) -> end end, IdxFiles); _ -> - truncate_to(Name, Range, NextEOs, IdxFiles) + truncate_to(Name, RemoteRange, NextEOs, IdxFiles) end end. @@ -865,10 +864,10 @@ check_chunk_has_expected_epoch(ChunkId, Epoch, IdxFiles) -> %% this is unexpected and thus an error {error, {invalid_last_offset_epoch, Epoch, unknown}}; - {found, SegmentInfo = #seg_info{file = _PrevSeg}} -> + {found, #seg_info{} = SegmentInfo} -> %% prev segment exists, does it have the correct %% epoch? - case scan_idx(ChunkId, SegmentInfo) of + case offset_idx_scan(ChunkId, SegmentInfo) of {ChunkId, Epoch, _PrevPos} -> ok; {ChunkId, OtherEpoch, _} -> @@ -908,7 +907,7 @@ init_data_reader_from(ChunkId, init_data_reader_from(ChunkId, {found, #seg_info{file = File} = SegInfo}, Config) -> - {ChunkId, _Epoch, FilePos} = scan_idx(ChunkId, SegInfo), + {ChunkId, _Epoch, FilePos} = offset_idx_scan(ChunkId, SegInfo), init_data_reader_at(ChunkId, FilePos, File, Config). %% @doc Initialise a new offset reader @@ -953,13 +952,12 @@ init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) -> %% it is in range, convert to standard offset init_offset_reader0(Offs, Conf) end; -init_offset_reader0({timestamp, Ts}, #{dir := Dir} = Conf) -> - case sorted_index_files(Dir) of +init_offset_reader0({timestamp, Ts}, #{} = Conf) -> + case sorted_index_files_rev(Conf) of [] -> init_offset_reader0(next, Conf); - IdxFiles -> - case timestamp_idx_file_search(Ts, lists:reverse(IdxFiles)) - of + IdxFilesRev -> + case timestamp_idx_file_search(Ts, IdxFilesRev) of {scan, IdxFile} -> %% segment was found, now we need to scan index to %% find nearest offset @@ -972,7 +970,7 @@ init_offset_reader0({timestamp, Ts}, #{dir := Dir} = Conf) -> _Ts:64/signed, _:64/unsigned, FilePos:32/unsigned, - _:8/unsigned>>} = top_index(Fd), + _:8/unsigned>>} = first_idx_record(Fd), SegmentFile = segment_from_index_file(IdxFile), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); next -> @@ -983,29 +981,31 @@ init_offset_reader0({timestamp, Ts}, #{dir := Dir} = Conf) -> end end; init_offset_reader0(first, #{} = Conf) -> - case build_log_overview_first_last(Conf) of - {_, #seg_info{file = File, - first = undefined}, _} -> + [FstIdxFile | _ ] = sorted_index_files(Conf), + case build_seg_info(FstIdxFile) of + {ok, #seg_info{file = File, + first = undefined}} -> %% empty log, attach at 0 open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); - {_, #seg_info{file = File, - first = #chunk_info{id = FirstChunkId, - pos = FilePos}}, _} -> + {ok, #seg_info{file = File, + first = #chunk_info{id = FirstChunkId, + pos = FilePos}}} -> open_offset_reader_at(File, FirstChunkId, FilePos, Conf); - _ -> - exit(no_segments_found) + {error, _} = Err -> + exit(Err) end; init_offset_reader0(next, #{} = Conf) -> - case build_log_overview_first_last(Conf) of - {_, _, #seg_info{file = File, - last = LastChunk}} -> + [LastIdxFile | _ ] = sorted_index_files_rev(Conf), + case build_seg_info(LastIdxFile) of + {ok, #seg_info{file = File, + last = LastChunk}} -> {NextChunkId, FilePos} = next_location(LastChunk), open_offset_reader_at(File, NextChunkId, FilePos, Conf); - _ -> - exit(no_segments_found) + Err -> + exit(Err) end; -init_offset_reader0(last, #{dir := Dir} = Conf) -> - IdxFiles = lists:reverse(sorted_index_files(Dir)), +init_offset_reader0(last, #{} = Conf) -> + IdxFiles = sorted_index_files_rev(Conf), case last_user_chunk_location(IdxFiles) of not_found -> ?DEBUG("~s:~s use chunk not found, fall back to next", @@ -1016,13 +1016,14 @@ init_offset_reader0(last, #{dir := Dir} = Conf) -> File = segment_from_index_file(IdxFile), open_offset_reader_at(File, ChunkId, FilePos, Conf) end; -init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) +init_offset_reader0(OffsetSpec, #{} = Conf) when is_integer(OffsetSpec) -> - IdxFiles = sorted_index_files(Dir), + IdxFiles = sorted_index_files(Conf), Range = offset_range_from_idx_files(IdxFiles), ?DEBUG("osiris_log:init_offset_reader0/2 spec ~w range ~w ", [OffsetSpec, Range]), try + %% clamp start offset StartOffset = case {OffsetSpec, Range} of {_, empty} -> 0; @@ -1039,32 +1040,16 @@ init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) max(FirstOffs, Offset) end, - %% find the first segment that may contain the start offset - IdxFiles2 = case lists:search( - fun(IdxFile) -> - StartOffset >= index_file_first_offset(IdxFile) - end, lists:reverse(IdxFiles)) of - {value, File} -> - [File]; - false -> - %% as we clamp StartOffset to the range of the stream - %% we should never get here - ?INFO("~s: unexpected start offset ~b for range ~w, " - "retry with next strategy", - [?MODULE, StartOffset, Range]), - throw({retry_with, next, Conf}) - end, - - case find_segment_for_offset(StartOffset, IdxFiles2) of + case find_segment_for_offset(StartOffset, IdxFiles) of not_found -> {error, {offset_out_of_range, Range}}; {end_of_log, #seg_info{file = SegmentFile, last = LastChunk}} -> {ChunkId, FilePos} = next_location(LastChunk), open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); - {found, SegmentInfo = #seg_info{file = SegmentFile}} -> + {found, #seg_info{file = SegmentFile} = SegmentInfo} -> {ChunkId, _Epoch, FilePos} = - case scan_idx(StartOffset, SegmentInfo) of + case offset_idx_scan(StartOffset, SegmentInfo) of eof -> exit(offset_out_of_range); enoent -> @@ -1084,7 +1069,8 @@ init_offset_reader0(OffsetSpec, #{dir := Dir} = Conf) missing_file -> %% Retention policies are likely being applied, let's try again %% TODO: should we limit the number of retries? - init_offset_reader0(OffsetSpec, Conf); + %% Remove cached index_files from config + init_offset_reader0(OffsetSpec, maps:remove(index_files, Conf)); {retry_with, NewOffsSpec, NewConf} -> init_offset_reader0(NewOffsSpec, NewConf) end. @@ -1464,44 +1450,55 @@ parse_records(Offs, parse_records(Offs + NumRecs, Rem, [{Offs, {batch, NumRecs, CompType, UncompressedLen, Data}} | Acc]). +sorted_index_files(#{index_files := IdxFiles}) -> + %% cached + IdxFiles; +sorted_index_files(#{dir := Dir}) -> + sorted_index_files(Dir); sorted_index_files(Dir) when is_list(Dir) -> + Files = index_files_unsorted(Dir), + lists:sort(Files). + +sorted_index_files_rev(#{index_files := IdxFiles}) -> + %% cached + lists:reverse(IdxFiles); +sorted_index_files_rev(#{dir := Dir}) -> + sorted_index_files_rev(Dir); +sorted_index_files_rev(Dir) -> + Files = index_files_unsorted(Dir), + lists:sort(fun erlang:'>'/2, Files). + +index_files_unsorted(Dir) -> case prim_file:list_dir(Dir) of {error, enoent} -> []; {ok, Files} -> - lists:sort([filename:join(Dir, F) - || F <- Files, filename:extension(F) == ".index"]) + [filename:join(Dir, F) + || F <- Files, filename:extension(F) == ".index"] end. -% sorted_index_files(Dir) when is_list(Dir) -> -% filelib:wildcard(filename:join(Dir, "*.index"), prim_file). - -index_file_to_seg_info(IdxFile) -> - [SegInfo] = build_log_overview0([IdxFile], []), - SegInfo. - -build_log_overview_first_last(#{index_files := IdxFiles}) -> - build_log_overview_first_last0(IdxFiles); -build_log_overview_first_last(#{dir := Dir}) -> - build_log_overview_first_last0(sorted_index_files(Dir)). +first_and_last_seginfos(#{index_files := IdxFiles}) -> + first_and_last_seginfos0(IdxFiles); +first_and_last_seginfos(#{dir := Dir}) -> + first_and_last_seginfos0(sorted_index_files(Dir)). -build_log_overview_first_last0([]) -> +first_and_last_seginfos0([]) -> none; -build_log_overview_first_last0([Fst]) -> +first_and_last_seginfos0([FstIdxFile]) -> %% this function is only used by init - [SegInfo] = build_log_overview0([Fst], []), + {ok, SegInfo} = build_seg_info(FstIdxFile), {1, SegInfo, SegInfo}; -build_log_overview_first_last0([Fst | Rem]) -> +first_and_last_seginfos0([FstIdxFile | Rem]) -> %% this function is only used by init - [FstSegInfo, LastSegInfo] = build_log_overview0([Fst, lists:last(Rem)], []), + {ok, FstSegInfo} = build_seg_info(FstIdxFile), + LastIdxFile = lists:last(Rem), + {ok, LastSegInfo} = build_seg_info(LastIdxFile ), {length(Rem) + 1, FstSegInfo, LastSegInfo}. -build_log_overview0([], Acc) -> - lists:reverse(Acc); -build_log_overview0([IdxFile | IdxFiles], Acc0) -> +build_seg_info(IdxFile) -> %% do not nead read_ahead here {ok, IdxFd} = open(IdxFile, [read, raw, binary]), - case tail_index(IdxFd) of + case last_idx_record(IdxFd) of {ok, <<_Offset:64/unsigned, _Timestamp:64/signed, _Epoch:64/unsigned, @@ -1509,23 +1506,18 @@ build_log_overview0([IdxFile | IdxFiles], Acc0) -> _ChType:8/unsigned>>} -> ok = file:close(IdxFd), SegFile = segment_from_index_file(IdxFile), - Acc = build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0), - build_log_overview0(IdxFiles, Acc); - {error, einval} when IdxFiles == [] andalso Acc0 == [] -> + build_segment_info(SegFile, LastChunkPos, IdxFile); + {error, einval} -> %% this would happen if the file only contained a header ok = file:close(IdxFd), SegFile = segment_from_index_file(IdxFile), - [#seg_info{file = SegFile, index = IdxFile}]; - {error, einval} -> - ok = file:close(IdxFd), - build_log_overview0(IdxFiles, Acc0); - {error, enoent} -> - %% The retention policy could have just been applied + {ok, #seg_info{file = SegFile, index = IdxFile}}; + {error, _} = Err -> ok = file:close(IdxFd), - build_log_overview0(IdxFiles, Acc0) + Err end. -tail_index(IdxFd) -> +last_idx_record(IdxFd) -> case position_at_idx_record_boundary(IdxFd, {eof, -?INDEX_RECORD_SIZE_B}) of {ok, _} -> file:read(IdxFd, ?INDEX_RECORD_SIZE_B); @@ -1533,7 +1525,7 @@ tail_index(IdxFd) -> Err end. -top_index(IdxFd) -> +first_idx_record(IdxFd) -> file:pread(IdxFd, ?IDX_HEADER_SIZE, ?INDEX_RECORD_SIZE_B). %% Some file:position/2 operations are subject to race conditions. In particular, `eof` may position the Fd @@ -1549,13 +1541,13 @@ position_at_idx_record_boundary(IdxFd, At) -> Error -> Error end. -build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> +build_segment_info(SegFile, LastChunkPos, IdxFile) -> try {ok, Fd} = open(SegFile, [read, binary, raw]), case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of eof -> _ = file:close(Fd), - Acc0; + eof; {ok, < [?MODULE, filename:basename(SegFile), Size, Eof], Size =/= Eof), _ = file:close(Fd), - [#seg_info{file = SegFile, + {ok, #seg_info{file = SegFile, index = IdxFile, size = Size, first = @@ -1606,14 +1598,13 @@ build_segment_info(SegFile, LastChunkPos, IdxFile, Acc0) -> num = LastNumRecords, type = LastChType, size = LastSize + LastTSize, - pos = LastChunkPos}} - | Acc0] + pos = LastChunkPos}}} end catch missing_file -> %% Indexes and segments could be deleted by retention policies while %% the log overview is being built. Ignore those segments and keep going - Acc0 + missing_file end. -spec overview(term()) -> {range(), [{epoch(), offset()}]}. @@ -1623,7 +1614,7 @@ overview(Dir) -> {empty, []}; IdxFiles -> Range = offset_range_from_idx_files(IdxFiles), - EpochOffsets = last_epoch_offsets2(IdxFiles), + EpochOffsets = last_epoch_offsets(IdxFiles), {Range, EpochOffsets} end. @@ -1739,48 +1730,45 @@ eval_max_bytes(IdxFiles, MaxSize) -> end end. -last_epoch_offsets2([IdxFile]) -> +last_epoch_offsets([IdxFile]) -> Fd = open_index_read(IdxFile), - case last_epoch_offset( - file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, undefined) of + case last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, undefined) of undefined -> []; {LastE, LastO, Res} -> lists:reverse([{LastE, LastO} | Res]) end; -last_epoch_offsets2([FstIdxFile | IdxFiles]) -> +last_epoch_offsets([FstIdxFile | _] = IdxFiles) -> F = fun() -> FstFd = open_index_read(FstIdxFile), {ok, <>} = top_index(FstFd), + _FstChType:8/unsigned>>} = first_idx_record(FstFd), {ok, ?IDX_HEADER_SIZE} = file:position(FstFd, ?IDX_HEADER_SIZE), {LastE, LastO, Res} = - lists:foldl( - fun(IdxFile, {E, _, EOs} = Acc) -> - Fd = open_index_read(IdxFile), - {ok, <>} = tail_index(Fd), - case Epoch > E of - true -> - %% we need to scan as the last index record - %% has a greater epoch - {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), - last_epoch_offset( - file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, - Acc); - false -> - ok = file:close(Fd), - {Epoch, Offset, EOs} - end - end, - last_epoch_offset(file:read(FstFd, ?INDEX_RECORD_SIZE_B), - FstFd, {FstE, FstO, []}), IdxFiles), + lists:foldl( + fun(IdxFile, {E, _, EOs} = Acc) -> + Fd = open_index_read(IdxFile), + {ok, <>} = last_idx_record(Fd), + case Epoch > E of + true -> + %% we need to scan as the last index record + %% has a greater epoch + {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), + last_epoch_offset( + file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, + Acc); + false -> + ok = file:close(Fd), + {Epoch, Offset, EOs} + end + end, {FstE, FstO, []}, IdxFiles), lists:reverse([{LastE, LastO} | Res]) end, {Time, Result} = timer:tc(F), @@ -1953,7 +1941,7 @@ sendfile(ssl, Fd, Sock, Pos, ToSend) -> last_timestamp_in_index_file(IdxFile) -> case file:open(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> - case tail_index(IdxFd) of + case last_idx_record(IdxFd) of {ok, <<_O:64/unsigned, LastTimestamp:64/signed, _E:64/unsigned, @@ -1972,7 +1960,7 @@ last_timestamp_in_index_file(IdxFile) -> first_timestamp_from_index_files([IdxFile | _]) -> case file:open(IdxFile, [raw, binary, read]) of {ok, IdxFd} -> - case top_index(IdxFd) of + case first_idx_record(IdxFd) of {ok, <<_FstO:64/unsigned, FstTimestamp:64/signed, _FstE:64/unsigned, @@ -1988,9 +1976,13 @@ first_timestamp_from_index_files([IdxFile | _]) -> 0 end. -offset_range_from_idx_files(IdxFiles) -> - {_, FstSI, LstSI} = build_log_overview_first_last0(IdxFiles), - ChunkRange = chunk_range_from_segment_infos(lists:usort([FstSI, LstSI])), +offset_range_from_idx_files([IdxFile]) -> + {ok, #seg_info{first = First, + last = Last}} = build_seg_info(IdxFile), + offset_range_from_chunk_range({First, Last}); +offset_range_from_idx_files(IdxFiles) when is_list(IdxFiles) -> + {_, FstSI, LstSI} = first_and_last_seginfos0(IdxFiles), + ChunkRange = chunk_range_from_segment_infos([FstSI, LstSI]), offset_range_from_chunk_range(ChunkRange). offset_range_from_segment_infos(SegInfos) -> @@ -2007,6 +1999,8 @@ chunk_range_from_segment_infos(SegInfos) when is_list(SegInfos) -> offset_range_from_chunk_range(empty) -> empty; +offset_range_from_chunk_range({undefined, undefined}) -> + empty; offset_range_from_chunk_range({#chunk_info{id = FirstChId}, #chunk_info{id = LastChId, num = LastNumRecs}}) -> @@ -2016,24 +2010,22 @@ find_segment_for_offset(Offset, IdxFiles) -> %% we assume index files are in the default low-> high order here case lists:search( fun(IdxFile) -> - FstOffset = list_to_integer( - filename:basename(IdxFile, ".index")), - Offset >= FstOffset + Offset >= index_file_first_offset(IdxFile) end, lists:reverse(IdxFiles)) of {value, File} -> - case index_file_to_seg_info(File) of - #seg_info{first = undefined, - last = undefined} = Info -> + case build_seg_info(File) of + {ok, #seg_info{first = undefined, + last = undefined} = Info} -> {end_of_log, Info}; - #seg_info{last = - #chunk_info{id = LastChId, - num = LastNumRecs}} = Info + {ok, #seg_info{last = + #chunk_info{id = LastChId, + num = LastNumRecs}} = Info} when Offset == LastChId + LastNumRecs -> %% the last segment and offset is the next offset {end_of_log, Info}; - #seg_info{first = #chunk_info{id = FirstChId}, - last = #chunk_info{id = LastChId, - num = LastNumRecs}} = Info -> + {ok, #seg_info{first = #chunk_info{id = FirstChId}, + last = #chunk_info{id = LastChId, + num = LastNumRecs}} = Info} -> NextChId = LastChId + LastNumRecs, case Offset >= FirstChId andalso Offset < NextChId of true -> @@ -2041,7 +2033,9 @@ find_segment_for_offset(Offset, IdxFiles) -> {found, Info}; false -> not_found - end + end; + {error, _} = Err -> + Err end; false -> not_found @@ -2097,77 +2091,52 @@ open_index_read(File) -> _ = file:read(Fd, ?IDX_HEADER_SIZE), Fd. -scan_idx(Offset, #seg_info{index = IndexFile, - last = LastChunkInSegment} = SegmentInfo) -> - {Time, Result} = timer:tc( - fun() -> - case offset_range_from_segment_infos([SegmentInfo]) of - empty -> - eof; - {SegmentStartOffs, SegmentEndOffs} -> - case Offset < SegmentStartOffs orelse - Offset > SegmentEndOffs of - true -> - offset_out_of_range; - false -> - IndexFd = open_index_read(IndexFile), - Result = scan_idx(IndexFd, Offset, - LastChunkInSegment), - _ = file:close(IndexFd), - Result - end - end - end), +offset_idx_scan(Offset, #seg_info{index = IndexFile} = SegmentInfo) -> + {Time, Result} = + timer:tc( + fun() -> + case offset_range_from_segment_infos([SegmentInfo]) of + empty -> + eof; + {SegmentStartOffs, SegmentEndOffs} -> + case Offset < SegmentStartOffs orelse + Offset > SegmentEndOffs of + true -> + offset_out_of_range; + false -> + IndexFd = open_index_read(IndexFile), + offset_idx_scan0(IndexFd, Offset, not_found) + end + end + end), ?DEBUG("~s:~s/~b completed in ~fs", [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, Time/1000000]), Result. -scan_idx(Fd, Offset, #chunk_info{id = LastChunkInSegmentId, - num = LastChunkInSegmentNum}) -> +offset_idx_scan0(Fd, Offset, PreviousChunk) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, <>} -> - LastOffsetInSegment = LastChunkInSegmentId + LastChunkInSegmentNum - 1, - case Offset < ChunkId orelse Offset > LastOffsetInSegment of - true -> - %% shouldn't really happen as we check the range above - offset_out_of_range; - false -> - %% offset is in this segment - scan_idx0(Fd, Offset, {ChunkId, Epoch, FilePos}) - end; - eof -> - %% this should never happen - offset is in the range and we are reading the first record - eof; - {error, Posix} -> - Posix - end. - -scan_idx0(Fd, Offset, PreviousChunk) -> - case file:read(Fd, ?INDEX_RECORD_SIZE_B) of - {ok, - <>} -> case Offset < ChunkId of true -> %% offset we are looking for is higher or equal %% to the start of the previous chunk %% but lower than the start of the current chunk -> %% return the previous chunk + _ = file:close(Fd), PreviousChunk; false -> - scan_idx0(Fd, Offset, {ChunkId, Epoch, FilePos}) + offset_idx_scan0(Fd, Offset, {ChunkId, Epoch, FilePos}) end; eof -> + _ = file:close(Fd), %% Offset must be in the last chunk as there is no more data PreviousChunk; {error, Posix} -> + _ = file:close(Fd), Posix end. @@ -2184,11 +2153,6 @@ chunk_location_for_timestamp(Idx, Ts) -> %% scan index file for nearest timestamp {ChunkId, _Timestamp, _Epoch, FilePos} = timestamp_idx_scan(Fd, Ts), {ChunkId, FilePos}. -% chunk_location_for_timestamp(#seg_info{index = Idx}, Ts) -> -% Fd = open_index_read(Idx), -% %% scan index file for nearest timestamp -% {ChunkId, _Timestamp, _Epoch, FilePos} = timestamp_idx_scan(Fd, Ts), -% {ChunkId, FilePos}. timestamp_idx_scan(Fd, Ts) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of @@ -2431,23 +2395,23 @@ next_location(#chunk_info{id = Id, index_file_first_offset(IdxFile) -> list_to_integer(filename:basename(IdxFile, ".index")). -start_end_timestamp(IdxFile) -> +first_last_timestamps(IdxFile) -> case file:open(IdxFile, [raw, read, binary]) of {ok, Fd} -> - case top_index(Fd) of + case first_idx_record(Fd) of {ok, <<_:64/unsigned, - TopTs:64/signed, + FirstTs:64/signed, _:64/unsigned, _:32/unsigned, _:8/unsigned>>} -> - %% if we can top we can tail + %% if we can get the first we can get the last {ok, <<_:64/unsigned, - TailTs:64/signed, + LastTs:64/signed, _:64/unsigned, _:32/unsigned, - _:8/unsigned>>} = tail_index(Fd), + _:8/unsigned>>} = last_idx_record(Fd), ok = file:close(Fd), - {TopTs, TailTs}; + {FirstTs, LastTs}; {error, einval} -> %% empty index undefined; @@ -2464,7 +2428,7 @@ start_end_timestamp(IdxFile) -> %% [{21, 30}, {12, 20}, {5, 10}] %% 11 = {12, 20} timestamp_idx_file_search(Ts, [FstIdxFile | Older]) -> - case start_end_timestamp(FstIdxFile) of + case first_last_timestamps(FstIdxFile) of {_FstTs, EndTs} when Ts > EndTs -> %% timestamp greater than the newest timestamp in the stream @@ -2488,7 +2452,7 @@ timestamp_idx_file_search(Ts, [FstIdxFile | Older]) -> end. timestamp_idx_file_search0(Ts, [], IdxFile) -> - case start_end_timestamp(IdxFile) of + case first_last_timestamps(IdxFile) of {FstTs, _LastTs} when Ts < FstTs -> {first_in, IdxFile}; @@ -2496,7 +2460,7 @@ timestamp_idx_file_search0(Ts, [], IdxFile) -> {found, IdxFile} end; timestamp_idx_file_search0(Ts, [IdxFile | Older], Prev) -> - case start_end_timestamp(IdxFile) of + case first_last_timestamps(IdxFile) of {_FstTs, EndTs} when Ts > EndTs -> %% we should attach the the first chunk in the previous segment From ad0644754bd53eb05f8b5f4cc852b6a250a0b88f Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 11 May 2022 11:37:02 +0100 Subject: [PATCH 12/19] osiris_log use fadvise where it makes sense --- src/osiris_log.erl | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index d19273e4e1c4..b66ca9b673c1 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -706,7 +706,6 @@ chunk_id_index_scan(IdxFile, ChunkId) when is_list(IdxFile) -> chunk_id_index_scan0(Fd, ChunkId). chunk_id_index_scan0(Fd, ChunkId) -> - {ok, IdxPos} = file:position(Fd, cur), case file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, < Epoch:64/unsigned, FilePos:32/unsigned, _ChType:8/unsigned>>} -> + {ok, IdxPos} = file:position(Fd, cur), ok = file:close(Fd), - {ChunkId, Epoch, FilePos, IdxPos}; + {ChunkId, Epoch, FilePos, IdxPos - ?INDEX_RECORD_SIZE_B}; {ok, _} -> chunk_id_index_scan0(Fd, ChunkId); eof -> @@ -787,10 +787,10 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> %% lets truncate to this point %% FilePos could be eof here which means the next offset {ok, Fd} = file:open(File, [read, write, binary, raw]), + _ = file:advise(Fd, 0, 0, random), {ok, IdxFd} = file:open(Idx, [read, write, binary, raw]), - {_ChType, ChId, E, _Num, Size, TSize} = - header_info(Fd, Pos), + {_ChType, ChId, E, _Num, Size, TSize} = header_info(Fd, Pos), %% position at end of chunk {ok, _Pos} = file:position(Fd, {cur, Size + TSize}), ok = file:truncate(Fd), @@ -1330,6 +1330,8 @@ send_file(Sock, %% or the chunk is a user type (for offset readers) case needs_handling(RType, Selector, ChType) of true -> + %% TODO: use inets:setopts(Sock, {nopush, Boolean}) to avoid + %% sending a tiny package in the Callback _ = Callback(Header, ToSend), case sendfile(Transport, Fd, Sock, Pos, ToSend) of ok -> @@ -1544,6 +1546,8 @@ position_at_idx_record_boundary(IdxFd, At) -> build_segment_info(SegFile, LastChunkPos, IdxFile) -> try {ok, Fd} = open(SegFile, [read, binary, raw]), + %% we don't want to read blocks into page cache we are unlikely to need + _ = file:advise(Fd, 0, 0, random), case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of eof -> _ = file:close(Fd), @@ -1732,6 +1736,7 @@ eval_max_bytes(IdxFiles, MaxSize) -> last_epoch_offsets([IdxFile]) -> Fd = open_index_read(IdxFile), + _ = file:advise(Fd, 0, 0, sequential), case last_epoch_offset(file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, undefined) of undefined -> []; @@ -1740,17 +1745,22 @@ last_epoch_offsets([IdxFile]) -> end; last_epoch_offsets([FstIdxFile | _] = IdxFiles) -> F = fun() -> - FstFd = open_index_read(FstIdxFile), + {ok, FstFd} = open(FstIdxFile, [read, raw, binary]), + %% on linux this disables read-ahead so should only + %% bring a single block into memory + %% having the first block of index files in page cache + %% should generally be a good thing + _ = file:advise(FstFd, 0, 0, random), {ok, <>} = first_idx_record(FstFd), - {ok, ?IDX_HEADER_SIZE} = file:position(FstFd, ?IDX_HEADER_SIZE), + ok = file:close(FstFd), {LastE, LastO, Res} = lists:foldl( fun(IdxFile, {E, _, EOs} = Acc) -> - Fd = open_index_read(IdxFile), + Fd = open_index_read(IdxFile), {ok, < true -> %% we need to scan as the last index record %% has a greater epoch - {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), + _ = file:advise(Fd, 0, 0, sequential), + {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), last_epoch_offset( file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, Acc); false -> - ok = file:close(Fd), + ok = file:close(Fd), {Epoch, Offset, EOs} end end, {FstE, FstO, []}, IdxFiles), @@ -2087,8 +2098,7 @@ open_index_read(File) -> %% We can't use the assertion that index header is correct because of a %% race condition between opening the file and writing the header %% It seems to happen when retention policies are applied - %% {ok, ?IDX_HEADER} = file:read(Fd, ?IDX_HEADER_SIZE) - _ = file:read(Fd, ?IDX_HEADER_SIZE), + {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), Fd. offset_idx_scan(Offset, #seg_info{index = IndexFile} = SegmentInfo) -> @@ -2105,6 +2115,7 @@ offset_idx_scan(Offset, #seg_info{index = IndexFile} = SegmentInfo) -> offset_out_of_range; false -> IndexFd = open_index_read(IndexFile), + _ = file:advise(IndexFd, 0, 0, sequential), offset_idx_scan0(IndexFd, Offset, not_found) end end @@ -2398,6 +2409,7 @@ index_file_first_offset(IdxFile) -> first_last_timestamps(IdxFile) -> case file:open(IdxFile, [raw, read, binary]) of {ok, Fd} -> + _ = file:advise(Fd, 0, 0, random), case first_idx_record(Fd) of {ok, <<_:64/unsigned, FirstTs:64/signed, From cf0c7e7a584eb6778e28e730c8c781d88fd3fea6 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 13 May 2022 12:35:25 +0100 Subject: [PATCH 13/19] handle retention race condition --- src/osiris_log.erl | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index b66ca9b673c1..bbdcc2f0650e 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -773,7 +773,7 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> end; {end_of_log, _Info} -> IdxFiles; - {found, #seg_info{file = File, index = Idx}} -> + {found, #seg_info{file = File, index = IdxFile}} -> ?DEBUG("osiris_log: ~s on node ~s truncating to chunk " "id ~b in epoch ~b", [Name, node(), ChId, E]), @@ -781,22 +781,21 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) -> %% next offset needs to be a chunk offset %% if it is not found we know the offset requested isn't a chunk %% id and thus isn't valid - case chunk_id_index_scan(Idx, ChId) of + case chunk_id_index_scan(IdxFile, ChId) of {ChId, E, Pos, IdxPos} when is_integer(Pos) -> %% the Chunk id was found and has the right epoch %% lets truncate to this point %% FilePos could be eof here which means the next offset {ok, Fd} = file:open(File, [read, write, binary, raw]), - _ = file:advise(Fd, 0, 0, random), - {ok, IdxFd} = file:open(Idx, [read, write, binary, raw]), + _ = file:advise(Fd, 0, 0, random), + {ok, IdxFd} = file:open(IdxFile, [read, write, binary, raw]), {_ChType, ChId, E, _Num, Size, TSize} = header_info(Fd, Pos), %% position at end of chunk {ok, _Pos} = file:position(Fd, {cur, Size + TSize}), ok = file:truncate(Fd), - {ok, _} = - file:position(IdxFd, IdxPos + ?INDEX_RECORD_SIZE_B), + {ok, _} = file:position(IdxFd, IdxPos + ?INDEX_RECORD_SIZE_B), ok = file:truncate(IdxFd), ok = file:close(Fd), ok = file:close(IdxFd), @@ -1306,10 +1305,8 @@ send_file(Sock, num_records := NumRecords, data_size := DataSize, trailer_size := TrailerSize, - position := Pos} = - Header, - #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = - State1} -> + position := Pos} = Header, + #?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} -> %% read header %% used to write frame headers to socket %% and return the number of bytes to sendfile @@ -1330,8 +1327,8 @@ send_file(Sock, %% or the chunk is a user type (for offset readers) case needs_handling(RType, Selector, ChType) of true -> - %% TODO: use inets:setopts(Sock, {nopush, Boolean}) to avoid - %% sending a tiny package in the Callback + %% TODO: use inets:setopts(Sock, {nopush, Boolean}) to avoid + %% sending a tiny package in the Callback _ = Callback(Header, ToSend), case sendfile(Transport, Fd, Sock, Pos, ToSend) of ok -> @@ -1490,12 +1487,24 @@ first_and_last_seginfos0([FstIdxFile]) -> %% this function is only used by init {ok, SegInfo} = build_seg_info(FstIdxFile), {1, SegInfo, SegInfo}; -first_and_last_seginfos0([FstIdxFile | Rem]) -> +first_and_last_seginfos0([FstIdxFile | Rem] = IdxFiles) -> %% this function is only used by init {ok, FstSegInfo} = build_seg_info(FstIdxFile), LastIdxFile = lists:last(Rem), - {ok, LastSegInfo} = build_seg_info(LastIdxFile ), - {length(Rem) + 1, FstSegInfo, LastSegInfo}. + case build_seg_info(LastIdxFile) of + {ok, #seg_info{first = undefined, + last = undefined}} -> + %% the last index file doesn't have any index records yet + %% retry without it + [_ | RetryIndexFiles] = lists:reverse(IdxFiles), + first_and_last_seginfos0(lists:reverse(RetryIndexFiles)); + {ok, LastSegInfo} -> + {length(Rem) + 1, FstSegInfo, LastSegInfo}; + {error, Err} -> + ?ERROR("~s: failed to build seg_info from file ~s, error: ~w", + [?MODULE, LastIdxFile, Err]), + error(Err) + end. build_seg_info(IdxFile) -> %% do not nead read_ahead here From dbdc8c12f3fb6996356ff07bf69b74b4987e14ee Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 13 May 2022 14:30:14 +0100 Subject: [PATCH 14/19] formatting --- src/osiris_log.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index bbdcc2f0650e..d67a4f32125d 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -1769,7 +1769,7 @@ last_epoch_offsets([FstIdxFile | _] = IdxFiles) -> {LastE, LastO, Res} = lists:foldl( fun(IdxFile, {E, _, EOs} = Acc) -> - Fd = open_index_read(IdxFile), + Fd = open_index_read(IdxFile), {ok, < true -> %% we need to scan as the last index record %% has a greater epoch - _ = file:advise(Fd, 0, 0, sequential), - {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), + _ = file:advise(Fd, 0, 0, sequential), + {ok, ?IDX_HEADER_SIZE} = file:position(Fd, ?IDX_HEADER_SIZE), last_epoch_offset( file:read(Fd, ?INDEX_RECORD_SIZE_B), Fd, Acc); false -> - ok = file:close(Fd), + ok = file:close(Fd), {Epoch, Offset, EOs} end end, {FstE, FstO, []}, IdxFiles), From 6d929a3e2e8cb7b77ee3cc532b93b1cb3b28ff54 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 13 May 2022 16:14:13 +0100 Subject: [PATCH 15/19] remove commented function --- src/osiris_util.erl | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/osiris_util.erl b/src/osiris_util.erl index 9ce8dd5b91b0..6252355d82d0 100644 --- a/src/osiris_util.erl +++ b/src/osiris_util.erl @@ -235,10 +235,6 @@ inet_tls_enabled([{proto_dist, ["inet_tls"]} | _]) -> inet_tls_enabled([_Opt | Tail]) -> inet_tls_enabled(Tail). -% -spec binary_search(Vjj -% binary_search(Fun, List) -> -% Fun(hd(List)). - partition_parallel(F, Es, Timeout) -> Parent = self(), Running = [{spawn_monitor(fun() -> Parent ! {self(), F(E)} end), E} From 1b28554a6a0e4fdc1c17b5144a842b5b5e650341 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 16 May 2022 11:40:33 +0100 Subject: [PATCH 16/19] use TPC_CORK when available --- src/osiris_log.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index d67a4f32125d..e03238825ff9 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -1329,9 +1329,11 @@ send_file(Sock, true -> %% TODO: use inets:setopts(Sock, {nopush, Boolean}) to avoid %% sending a tiny package in the Callback + ok = inet:setopts(Sock, [{nopush, true}]), _ = Callback(Header, ToSend), case sendfile(Transport, Fd, Sock, Pos, ToSend) of ok -> + ok = inet:setopts(Sock, [{nopush, false}]), {ok, _} = file:position(Fd, NextFilePos), {ok, State}; Err -> From a717bb2b3ffca6e39ba7b1a0d42c70e9af2c74c4 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 16 May 2022 15:52:05 +0100 Subject: [PATCH 17/19] make setops work with ssl --- src/osiris_log.erl | 16 +++++++++++----- src/osiris_replica_reader.erl | 1 - 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index e03238825ff9..fdc7364f0b0f 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -1286,7 +1286,7 @@ read_chunk_parsed(#?MODULE{mode = #read{type = RType, {error, term()} | {end_of_stream, state()}. send_file(Sock, State) -> - send_file(Sock, State, fun(_, S) -> S end). + send_file(Sock, State, fun(_, _) -> ok end). -spec send_file(gen_tcp:socket(), state(), fun((header_map(), non_neg_integer()) -> term())) -> @@ -1327,13 +1327,13 @@ send_file(Sock, %% or the chunk is a user type (for offset readers) case needs_handling(RType, Selector, ChType) of true -> - %% TODO: use inets:setopts(Sock, {nopush, Boolean}) to avoid - %% sending a tiny package in the Callback - ok = inet:setopts(Sock, [{nopush, true}]), + %% this avoids any data sent in the Callback to be dispatched + %% in it's own TCP frame + ok = setopts(Transport, Sock, [{nopush, true}]), _ = Callback(Header, ToSend), case sendfile(Transport, Fd, Sock, Pos, ToSend) of ok -> - ok = inet:setopts(Sock, [{nopush, false}]), + ok = setopts(Transport, Sock, [{nopush, false}]), {ok, _} = file:position(Fd, NextFilePos), {ok, State}; Err -> @@ -1940,6 +1940,12 @@ max_segment_size_reached(SegFd, CurrentSizeChunks, CurrentSizeBytes >= MaxSizeBytes orelse CurrentSizeChunks >= MaxSizeChunks - 1. + +setopts(tcp, Sock, Opts) -> + ok = inet:setopts(Sock, Opts); +setopts(ssl, Sock, Opts) -> + ok = ssl:setopts(Sock, Opts). + sendfile(_Transport, _Fd, _Sock, _Pos, 0) -> ok; sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) -> diff --git a/src/osiris_replica_reader.erl b/src/osiris_replica_reader.erl index 3cd2ee0769ce..b8544f3b673e 100644 --- a/src/osiris_replica_reader.erl +++ b/src/osiris_replica_reader.erl @@ -225,7 +225,6 @@ handle_call(_Request, _From, State) -> %%-------------------------------------------------------------------- handle_cast({more_data, _LastOffset}, #state{leader_pid = LeaderPid} = State0) -> - % ?DEBUG("MORE DATA ~b", [_LastOffset]), #state{log = Log} = State = do_sendfile(State0), NextOffs = osiris_log:next_offset(Log), ok = osiris_writer:register_data_listener(LeaderPid, NextOffs), From 8ff335414c3ea669f7d5027879043fc68300747d Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Tue, 17 May 2022 14:07:42 +0200 Subject: [PATCH 18/19] Fix injecting osiris sha --- .github/workflows/rabbitmq-oci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rabbitmq-oci.yaml b/.github/workflows/rabbitmq-oci.yaml index 6244e4e24d07..e035e4dbc367 100644 --- a/.github/workflows/rabbitmq-oci.yaml +++ b/.github/workflows/rabbitmq-oci.yaml @@ -28,7 +28,7 @@ jobs: - name: Inject the git sha as the osiris version working-directory: osiris run: | - sed -i"_orig" "/vsn,/ s/2\\.[0-9]\\.[0-9]/${{ github.event.pull_request.head.sha || github.sha }}/" src/osiris.app.src + sed -i"_orig" -E '/VERSION/ s/1\.[0-9]+\.[0-9]+/${{ github.event.pull_request.head.sha || github.sha }}/' BUILD.bazel - name: Checkout RabbitMQ uses: actions/checkout@v3 From dddad27863d4baab1f1ea50b3a265e7513c7a40c Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Tue, 17 May 2022 14:56:32 +0200 Subject: [PATCH 19/19] Rename "ra" to "osiris" (copy-paste mistake) --- .github/workflows/rabbitmq-oci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rabbitmq-oci.yaml b/.github/workflows/rabbitmq-oci.yaml index e035e4dbc367..552b50312376 100644 --- a/.github/workflows/rabbitmq-oci.yaml +++ b/.github/workflows/rabbitmq-oci.yaml @@ -137,7 +137,7 @@ jobs: RABBIT_SHA=${{ steps.load-rabbitmq-info.outputs.RABBITMQ_SHA }} OSIRIS_SHA=${{ github.event.pull_request.head.sha || github.sha }} - OSIRIS_ABBREV=ra-${OSIRIS_SHA:0:7} + OSIRIS_ABBREV=osiris-${OSIRIS_SHA:0:7} TAG_1=rabbitmq-${RABBIT_REF}-${OSIRIS_ABBREV}-${{ steps.load-info.outputs.otp }} TAG_2=rabbitmq-${RABBIT_REF}-${OSIRIS_ABBREV}-${{ matrix.image_tag_suffix }}