Skip to content

Commit

Permalink
Merge pull request #173 from rabbitmq/gh_172
Browse files Browse the repository at this point in the history
Fix bug that would delay offset listener notifications.
  • Loading branch information
acogoluegnes authored Nov 22, 2024
2 parents 36e055a + 69a7e2e commit 1fc98f1
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 52 deletions.
14 changes: 13 additions & 1 deletion src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
read_chunk_parsed/1,
read_chunk_parsed/2,
committed_offset/1,
committed_chunk_id/1,
set_committed_chunk_id/2,
last_chunk_id/1,
get_current_epoch/1,
get_directory/1,
get_name/1,
Expand Down Expand Up @@ -538,6 +540,7 @@ init(#{dir := Dir,
case first_and_last_seginfos(Config) of
none ->
osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1),
osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1),
open_new_segment(#?MODULE{cfg = Cfg,
mode =
#write{type = WriterType,
Expand Down Expand Up @@ -607,6 +610,7 @@ init(#{dir := Dir,
ok = file:truncate(SegFd),
{ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE),
osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1),
osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1),
#?MODULE{cfg = Cfg,
mode =
#write{type = WriterType,
Expand Down Expand Up @@ -1333,7 +1337,11 @@ last_user_chunk_id_in_index(NextPos, IdxFd) ->
end.

-spec committed_offset(state()) -> integer().
committed_offset(#?MODULE{cfg = #cfg{shared = Ref}}) ->
committed_offset(State) ->
committed_chunk_id(State).

-spec committed_chunk_id(state()) -> integer().
committed_chunk_id(#?MODULE{cfg = #cfg{shared = Ref}}) ->
osiris_log_shared:committed_chunk_id(Ref).

-spec set_committed_chunk_id(state(), offset()) -> ok.
Expand All @@ -1342,6 +1350,10 @@ set_committed_chunk_id(#?MODULE{mode = #write{},
when is_integer(ChunkId) ->
osiris_log_shared:set_committed_chunk_id(Ref, ChunkId).

-spec last_chunk_id(state()) -> integer().
last_chunk_id(#?MODULE{cfg = #cfg{shared = Ref}}) ->
osiris_log_shared:last_chunk_id(Ref).

-spec get_current_epoch(state()) -> non_neg_integer().
get_current_epoch(#?MODULE{mode = #write{current_epoch = Epoch}}) ->
Epoch.
Expand Down
111 changes: 60 additions & 51 deletions src/osiris_replica.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
{cfg :: #cfg{},
parse_state :: parse_state(),
log :: osiris_log:state(),
committed_offset = -1 :: -1 | osiris:offset(),
committed_chunk_id = -1 :: -1 | osiris:offset(),
offset_listeners = [] ::
[{pid(), osiris:offset(), mfa() | undefined}]
}).
Expand Down Expand Up @@ -364,7 +364,7 @@ handle_call(get_reader_context, _From,
directory = Dir,
reference = Ref,
counter = CntRef},
committed_offset = COffs,
committed_chunk_id = COffs,
log = Log} =
State) ->
Shared = osiris_log:get_shared(Log),
Expand Down Expand Up @@ -397,28 +397,43 @@ handle_call(Unknown, _From,
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast({committed_offset, Offs},
handle_cast({committed_offset, CommittedChId},
#?MODULE{cfg = #cfg{counter = Cnt},
log = Log,
committed_offset = Last} =
committed_chunk_id = LastCommittedChId} =
State) ->
case Offs > Last of
case CommittedChId > LastCommittedChId of
true ->
%% notify offset listeners
counters:put(Cnt, ?C_COMMITTED_OFFSET, Offs),
ok = osiris_log:set_committed_chunk_id(Log, Offs),
counters:put(Cnt, ?C_COMMITTED_OFFSET, CommittedChId),
ok = osiris_log:set_committed_chunk_id(Log, CommittedChId),
{noreply,
notify_offset_listeners(State#?MODULE{committed_offset = Offs})};
notify_offset_listeners(
State#?MODULE{committed_chunk_id = CommittedChId})};
false ->
State
end;
handle_cast({register_offset_listener, Pid, EvtFormatter, Offset},
#?MODULE{offset_listeners = Listeners} = State0) ->
State1 =
State0#?MODULE{offset_listeners =
[{Pid, Offset, EvtFormatter} | Listeners]},
State = notify_offset_listeners(State1),
{noreply, State};
#?MODULE{cfg = #cfg{reference = Ref,
event_formatter = DefaultFmt},
log = Log,
offset_listeners = Listeners} = State) ->
Max = max_readable_chunk_id(Log),
case Offset =< Max of
true ->
%% only evaluate the request, the rest will be evaluated
%% when data is written or committed
Evt = wrap_osiris_event(
select_formatter(EvtFormatter, DefaultFmt),
{osiris_offset, Ref, Max}),
Pid ! Evt,
{noreply, State};
false ->
%% queue the offset listener for later
{noreply,
State#?MODULE{offset_listeners = [{Pid, Offset, EvtFormatter} |
Listeners]}}
end;
handle_cast(Msg, #?MODULE{cfg = #cfg{name = Name}} = State) ->
?DEBUG_(Name, "osiris_replica unhandled cast ~w", [Msg]),
{noreply, State}.
Expand Down Expand Up @@ -521,14 +536,14 @@ handle_info({'EXIT', Ref, Info},
{stop, unexpected_exit, State}.

handle_incoming_data(Socket, Bin,
#?MODULE{cfg =
#cfg{socket = Socket,
leader_pid = LeaderPid,
transport = Transport,
counter = Cnt},
parse_state = ParseState0,
log = Log0} =
State0) ->
#?MODULE{cfg =
#cfg{socket = Socket,
leader_pid = LeaderPid,
transport = Transport,
counter = Cnt},
parse_state = ParseState0,
log = Log0} =
State0) ->
counters:add(Cnt, ?C_PACKETS, 1),
%% deliberately ignoring return value here as it would fail if the
%% tcp connection has been closed and we still want to try to process
Expand All @@ -547,8 +562,8 @@ handle_incoming_data(Socket, Bin,
undefined ->
{noreply, State1};
_ ->
State = notify_offset_listeners(State1),
ok = osiris_writer:ack(LeaderPid, OffsetTimestamp),
State = notify_offset_listeners(State1),
{noreply, State}
end.

Expand Down Expand Up @@ -594,7 +609,7 @@ format_status(#{state := #?MODULE{cfg = #cfg{name = Name,
log = Log,
parse_state = ParseState,
offset_listeners = OffsetListeners,
committed_offset = CommittedOffset}} = Status) ->
committed_chunk_id = CommittedOffset}} = Status) ->
maps:update(state,
#{name => Name,
external_reference => ExtRef,
Expand Down Expand Up @@ -670,34 +685,28 @@ parse_chunk(Bin, {FirstOffsetTs, IOData, RemSize}, Acc) ->
{{FirstOffsetTs, [Bin | IOData], RemSize - byte_size(Bin)},
lists:reverse(Acc)}.

notify_offset_listeners(#?MODULE{cfg =
#cfg{reference = Ref,
event_formatter = EvtFmt},
committed_offset = COffs,
notify_offset_listeners(#?MODULE{cfg = #cfg{reference = Ref,
event_formatter = EvtFmt},
committed_chunk_id = CommittedChId,
log = Log,
offset_listeners = L0} =
State) ->
case osiris_log:tail_info(Log) of
{_NextOffs, {_, LastChId, _LastTs}} ->
Max = min(COffs, LastChId),
%% do not notify offset listeners if the committed offset isn't
%% available locally yet
{Notify, L} =
lists:splitwith(fun({_Pid, O, _}) -> O =< Max end, L0),
_ = [begin
Evt =
%% the per offset listener event formatter takes precedence of
%% the process scoped one
wrap_osiris_event(
select_formatter(Fmt, EvtFmt),
{osiris_offset, Ref, COffs}),
P ! Evt
end
|| {P, _, Fmt} <- Notify],
State#?MODULE{offset_listeners = L};
_ ->
State
end.
offset_listeners = L0} = State) ->
Max = max_readable_chunk_id(Log),
{Notify, L} =
lists:partition(fun({_Pid, O, _}) -> O =< Max end, L0),
_ = [begin
Evt =
%% the per offset listener event formatter takes precedence of
%% the process scoped one
wrap_osiris_event(
select_formatter(Fmt, EvtFmt),
{osiris_offset, Ref, CommittedChId}),
P ! Evt
end
|| {P, _, Fmt} <- Notify],
State#?MODULE{offset_listeners = L}.

max_readable_chunk_id(Log) ->
min(osiris_log:committed_offset(Log), osiris_log:last_chunk_id(Log)).

%% INTERNAL

Expand Down
5 changes: 5 additions & 0 deletions test/osiris_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ cluster_offset_listener(Config) ->
replica_nodes => Replicas},
{ok, #{leader_pid := Leader}} = osiris:start_cluster(Conf0),
{ok, Log0} = osiris:init_reader(Leader, 0, {test, []}),
0 = osiris_log:next_offset(Log0),
osiris:register_offset_listener(Leader, 0),
ok = osiris:write(Leader, undefined, 42, <<"mah-data">>),
receive
Expand Down Expand Up @@ -572,6 +573,9 @@ replica_offset_listener(Config) ->
fun() ->
{ok, Log0} = osiris:init_reader(R, 0, {test, []}),
osiris:register_offset_listener(R, 0),
%% gh 172 - ensure a higher offset listener does not block
%% a lower one
osiris:register_offset_listener(R, 1),
receive
{osiris_offset, _Name, O} when O > -1 ->
ct:pal("got offset ~w", [O]),
Expand All @@ -585,6 +589,7 @@ replica_offset_listener(Config) ->
exit(osiris_offset_timeout)
end
end),
timer:sleep(100),
ok = osiris:write(Leader, undefined, 42, <<"mah-data">>),

receive
Expand Down

0 comments on commit 1fc98f1

Please sign in to comment.