From 69a7e2e175cebdb33457534d202c4613d2580a2d Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 22 Nov 2024 12:11:53 +0000 Subject: [PATCH] Fix bug that would delay offset listener notifications. If an offset listener with a higher offset was registered after one with a lower one it would previously block the lower one until the higher one was committed. Some refactoring and minor optimisations. --- src/osiris_log.erl | 14 +++++- src/osiris_replica.erl | 111 ++++++++++++++++++++++------------------- test/osiris_SUITE.erl | 5 ++ 3 files changed, 78 insertions(+), 52 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index fc2a816..8e80578 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -36,7 +36,9 @@ read_chunk_parsed/1, read_chunk_parsed/2, committed_offset/1, + committed_chunk_id/1, set_committed_chunk_id/2, + last_chunk_id/1, get_current_epoch/1, get_directory/1, get_name/1, @@ -538,6 +540,7 @@ init(#{dir := Dir, case first_and_last_seginfos(Config) of none -> osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1), + osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1), open_new_segment(#?MODULE{cfg = Cfg, mode = #write{type = WriterType, @@ -607,6 +610,7 @@ init(#{dir := Dir, ok = file:truncate(SegFd), {ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE), osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1), + osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1), #?MODULE{cfg = Cfg, mode = #write{type = WriterType, @@ -1333,7 +1337,11 @@ last_user_chunk_id_in_index(NextPos, IdxFd) -> end. -spec committed_offset(state()) -> integer(). -committed_offset(#?MODULE{cfg = #cfg{shared = Ref}}) -> +committed_offset(State) -> + committed_chunk_id(State). + +-spec committed_chunk_id(state()) -> integer(). +committed_chunk_id(#?MODULE{cfg = #cfg{shared = Ref}}) -> osiris_log_shared:committed_chunk_id(Ref). -spec set_committed_chunk_id(state(), offset()) -> ok. @@ -1342,6 +1350,10 @@ set_committed_chunk_id(#?MODULE{mode = #write{}, when is_integer(ChunkId) -> osiris_log_shared:set_committed_chunk_id(Ref, ChunkId). +-spec last_chunk_id(state()) -> integer(). +last_chunk_id(#?MODULE{cfg = #cfg{shared = Ref}}) -> + osiris_log_shared:last_chunk_id(Ref). + -spec get_current_epoch(state()) -> non_neg_integer(). get_current_epoch(#?MODULE{mode = #write{current_epoch = Epoch}}) -> Epoch. diff --git a/src/osiris_replica.erl b/src/osiris_replica.erl index 1fd2075..25bd162 100644 --- a/src/osiris_replica.erl +++ b/src/osiris_replica.erl @@ -59,7 +59,7 @@ {cfg :: #cfg{}, parse_state :: parse_state(), log :: osiris_log:state(), - committed_offset = -1 :: -1 | osiris:offset(), + committed_chunk_id = -1 :: -1 | osiris:offset(), offset_listeners = [] :: [{pid(), osiris:offset(), mfa() | undefined}] }). @@ -364,7 +364,7 @@ handle_call(get_reader_context, _From, directory = Dir, reference = Ref, counter = CntRef}, - committed_offset = COffs, + committed_chunk_id = COffs, log = Log} = State) -> Shared = osiris_log:get_shared(Log), @@ -397,28 +397,43 @@ handle_call(Unknown, _From, %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({committed_offset, Offs}, +handle_cast({committed_offset, CommittedChId}, #?MODULE{cfg = #cfg{counter = Cnt}, log = Log, - committed_offset = Last} = + committed_chunk_id = LastCommittedChId} = State) -> - case Offs > Last of + case CommittedChId > LastCommittedChId of true -> %% notify offset listeners - counters:put(Cnt, ?C_COMMITTED_OFFSET, Offs), - ok = osiris_log:set_committed_chunk_id(Log, Offs), + counters:put(Cnt, ?C_COMMITTED_OFFSET, CommittedChId), + ok = osiris_log:set_committed_chunk_id(Log, CommittedChId), {noreply, - notify_offset_listeners(State#?MODULE{committed_offset = Offs})}; + notify_offset_listeners( + State#?MODULE{committed_chunk_id = CommittedChId})}; false -> State end; handle_cast({register_offset_listener, Pid, EvtFormatter, Offset}, - #?MODULE{offset_listeners = Listeners} = State0) -> - State1 = - State0#?MODULE{offset_listeners = - [{Pid, Offset, EvtFormatter} | Listeners]}, - State = notify_offset_listeners(State1), - {noreply, State}; + #?MODULE{cfg = #cfg{reference = Ref, + event_formatter = DefaultFmt}, + log = Log, + offset_listeners = Listeners} = State) -> + Max = max_readable_chunk_id(Log), + case Offset =< Max of + true -> + %% only evaluate the request, the rest will be evaluated + %% when data is written or committed + Evt = wrap_osiris_event( + select_formatter(EvtFormatter, DefaultFmt), + {osiris_offset, Ref, Max}), + Pid ! Evt, + {noreply, State}; + false -> + %% queue the offset listener for later + {noreply, + State#?MODULE{offset_listeners = [{Pid, Offset, EvtFormatter} | + Listeners]}} + end; handle_cast(Msg, #?MODULE{cfg = #cfg{name = Name}} = State) -> ?DEBUG_(Name, "osiris_replica unhandled cast ~w", [Msg]), {noreply, State}. @@ -521,14 +536,14 @@ handle_info({'EXIT', Ref, Info}, {stop, unexpected_exit, State}. handle_incoming_data(Socket, Bin, - #?MODULE{cfg = - #cfg{socket = Socket, - leader_pid = LeaderPid, - transport = Transport, - counter = Cnt}, - parse_state = ParseState0, - log = Log0} = - State0) -> + #?MODULE{cfg = + #cfg{socket = Socket, + leader_pid = LeaderPid, + transport = Transport, + counter = Cnt}, + parse_state = ParseState0, + log = Log0} = + State0) -> counters:add(Cnt, ?C_PACKETS, 1), %% deliberately ignoring return value here as it would fail if the %% tcp connection has been closed and we still want to try to process @@ -547,8 +562,8 @@ handle_incoming_data(Socket, Bin, undefined -> {noreply, State1}; _ -> - State = notify_offset_listeners(State1), ok = osiris_writer:ack(LeaderPid, OffsetTimestamp), + State = notify_offset_listeners(State1), {noreply, State} end. @@ -594,7 +609,7 @@ format_status(#{state := #?MODULE{cfg = #cfg{name = Name, log = Log, parse_state = ParseState, offset_listeners = OffsetListeners, - committed_offset = CommittedOffset}} = Status) -> + committed_chunk_id = CommittedOffset}} = Status) -> maps:update(state, #{name => Name, external_reference => ExtRef, @@ -670,34 +685,28 @@ parse_chunk(Bin, {FirstOffsetTs, IOData, RemSize}, Acc) -> {{FirstOffsetTs, [Bin | IOData], RemSize - byte_size(Bin)}, lists:reverse(Acc)}. -notify_offset_listeners(#?MODULE{cfg = - #cfg{reference = Ref, - event_formatter = EvtFmt}, - committed_offset = COffs, +notify_offset_listeners(#?MODULE{cfg = #cfg{reference = Ref, + event_formatter = EvtFmt}, + committed_chunk_id = CommittedChId, log = Log, - offset_listeners = L0} = - State) -> - case osiris_log:tail_info(Log) of - {_NextOffs, {_, LastChId, _LastTs}} -> - Max = min(COffs, LastChId), - %% do not notify offset listeners if the committed offset isn't - %% available locally yet - {Notify, L} = - lists:splitwith(fun({_Pid, O, _}) -> O =< Max end, L0), - _ = [begin - Evt = - %% the per offset listener event formatter takes precedence of - %% the process scoped one - wrap_osiris_event( - select_formatter(Fmt, EvtFmt), - {osiris_offset, Ref, COffs}), - P ! Evt - end - || {P, _, Fmt} <- Notify], - State#?MODULE{offset_listeners = L}; - _ -> - State - end. + offset_listeners = L0} = State) -> + Max = max_readable_chunk_id(Log), + {Notify, L} = + lists:partition(fun({_Pid, O, _}) -> O =< Max end, L0), + _ = [begin + Evt = + %% the per offset listener event formatter takes precedence of + %% the process scoped one + wrap_osiris_event( + select_formatter(Fmt, EvtFmt), + {osiris_offset, Ref, CommittedChId}), + P ! Evt + end + || {P, _, Fmt} <- Notify], + State#?MODULE{offset_listeners = L}. + +max_readable_chunk_id(Log) -> + min(osiris_log:committed_offset(Log), osiris_log:last_chunk_id(Log)). %% INTERNAL diff --git a/test/osiris_SUITE.erl b/test/osiris_SUITE.erl index 914b2f0..cd178b2 100644 --- a/test/osiris_SUITE.erl +++ b/test/osiris_SUITE.erl @@ -535,6 +535,7 @@ cluster_offset_listener(Config) -> replica_nodes => Replicas}, {ok, #{leader_pid := Leader}} = osiris:start_cluster(Conf0), {ok, Log0} = osiris:init_reader(Leader, 0, {test, []}), + 0 = osiris_log:next_offset(Log0), osiris:register_offset_listener(Leader, 0), ok = osiris:write(Leader, undefined, 42, <<"mah-data">>), receive @@ -572,6 +573,9 @@ replica_offset_listener(Config) -> fun() -> {ok, Log0} = osiris:init_reader(R, 0, {test, []}), osiris:register_offset_listener(R, 0), + %% gh 172 - ensure a higher offset listener does not block + %% a lower one + osiris:register_offset_listener(R, 1), receive {osiris_offset, _Name, O} when O > -1 -> ct:pal("got offset ~w", [O]), @@ -585,6 +589,7 @@ replica_offset_listener(Config) -> exit(osiris_offset_timeout) end end), + timer:sleep(100), ok = osiris:write(Leader, undefined, 42, <<"mah-data">>), receive