Skip to content

Commit

Permalink
Merge pull request #127 from rabbitmq/error-handling
Browse files Browse the repository at this point in the history
Handle errors on replica and replica reader
  • Loading branch information
kjnilsson authored Jun 20, 2023
2 parents 32b7f82 + 1f175ea commit 4aaaa9a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 26 deletions.
6 changes: 5 additions & 1 deletion src/osiris_replica.erl
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,11 @@ terminate(Reason, #?MODULE{cfg = #cfg{name = Name,
?DEBUG_(Name, "terminating with ~w ", [Reason]),
_ = ets:delete(osiris_reader_context_cache, self()),
ok = osiris_log:close(Log),
ok = gen_tcp:close(Sock),
case Sock of
undefined -> ok;
_ ->
ok = gen_tcp:close(Sock)
end,
ok.

%%--------------------------------------------------------------------
Expand Down
55 changes: 30 additions & 25 deletions src/osiris_replica_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -162,32 +162,37 @@ init(#{hosts := Hosts,
CntSpec = {CntId, ?COUNTER_FIELDS},
Config = #{counter_spec => CntSpec, transport => Transport},
%% TODO: handle errors
try
{ok, Log} =
osiris_writer:init_data_reader(LeaderPid, TailInfo, Config),
CntRef = osiris_log:counters_ref(Log),
?INFO("~ts: starting osiris replica reader at offset ~b",
[Name, osiris_log:next_offset(Log)]),

ok = send(Transport, Sock, Token),
%% register data listener with osiris_proc
ok = osiris_writer:register_data_listener(LeaderPid, StartOffset),
MRef = monitor(process, LeaderPid),
State =
maybe_send_committed_offset(#state{log = Log,
name = Name,
transport = Transport,
socket = Sock,
replica_pid = ReplicaPid,
leader_pid = LeaderPid,
leader_monitor_ref = MRef,
counter = CntRef,
counter_id = CntId}),
{ok, State}
catch
exit:{noproc, _} ->
Ret = osiris_writer:init_data_reader(LeaderPid, TailInfo, Config),
case Ret of
{ok, Log} ->
CntRef = osiris_log:counters_ref(Log),
?INFO("~ts: starting osiris replica reader at offset ~b",
[Name, osiris_log:next_offset(Log)]),

ok = send(Transport, Sock, Token),
%% register data listener with osiris_proc
ok = osiris_writer:register_data_listener(LeaderPid, StartOffset),
MRef = monitor(process, LeaderPid),
State =
maybe_send_committed_offset(#state{log = Log,
name = Name,
transport = Transport,
socket = Sock,
replica_pid = ReplicaPid,
leader_pid = LeaderPid,
leader_monitor_ref = MRef,
counter = CntRef,
counter_id = CntId}),
{ok, State};
{error, no_process} ->
?WARN("osiris writer for ~p is down, replica reader will not start", [ExtRef]),
{stop, writer_unavailable}
{stop, writer_unavailable};
{error, {offset_out_of_range, Range} = Reason} ->
?WARN("data reader found an offset out of range: ~p, replica reader will not start", [Range]),
{stop, Reason};
{error, {invalid_last_offset_epoch, Epoch, Offset} = Reason} ->
?WARN("data reader found an invalid last offset epoch: epoch ~p offset ~p, replica reader will not start", [Epoch, Offset]),
{stop, Reason}
end;
{error, Reason} ->
?WARN("could not connect osiris to replica at ~p", [Hosts]),
Expand Down

0 comments on commit 4aaaa9a

Please sign in to comment.