Skip to content

Commit

Permalink
Remove the leader ID caching
Browse files Browse the repository at this point in the history
[Why]
Ra exposes the leader ID through `ra_leaderboard:lookup_leader/1`.
There is no need to duplicate that feature anymore.
  • Loading branch information
dumbbell committed Jul 15, 2024
1 parent 7954bd3 commit a379a73
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 154 deletions.
3 changes: 1 addition & 2 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3567,10 +3567,9 @@ handle_async_ret(
end, Correlations0);
handle_async_ret(
StoreId,
{ra_event, RaServer,
{ra_event, _RaServer,
{rejected, {not_leader, LeaderId, CorrelationId}}})
when ?IS_KHEPRI_STORE_ID(StoreId) ->
ok = khepri_cluster:cache_leader_if_changed(StoreId, RaServer, LeaderId),
[{CorrelationId, {error, {not_leader, LeaderId}}}].

%% -------------------------------------------------------------------
Expand Down
55 changes: 1 addition & 54 deletions src/khepri_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@
%% Internal.
-export([node_to_member/2,
this_member/1,
wait_for_cluster_readiness/2,
get_cached_leader/1,
cache_leader/2,
cache_leader_if_changed/3,
clear_cached_leader/1]).
wait_for_cluster_readiness/2]).

-ifdef(TEST).
-export([wait_for_ra_server_exit/1,
Expand Down Expand Up @@ -1480,7 +1476,6 @@ get_store_prop(StoreId, PropName) ->

forget_store(StoreId) ->
ok = khepri_machine:clear_cache(StoreId),
ok = clear_cached_leader(StoreId),
StoreIds = persistent_term:get(?PT_STORE_IDS, #{}),
StoreIds1 = maps:remove(StoreId, StoreIds),
case maps:size(StoreIds1) of
Expand Down Expand Up @@ -1513,51 +1508,3 @@ is_store_running(StoreId) ->
Known = maps:is_key(StoreId, StoreIds),
?assertEqual(Known, Runs),
Runs.

%% Cache the Ra leader ID to avoid command/query redirections from a follower
%% to the leader. The leader ID is returned after each command or query. If we
%% don't know it yet, wait for a leader election using khepri_event_handler.

-define(RA_LEADER_CACHE_KEY(StoreId), {khepri, ra_leader_cache, StoreId}).

-spec get_cached_leader(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: LeaderId | undefined,
LeaderId :: ra:server_id().

get_cached_leader(StoreId) ->
Key = ?RA_LEADER_CACHE_KEY(StoreId),
persistent_term:get(Key, undefined).

-spec cache_leader(StoreId, LeaderId) -> ok when
StoreId :: khepri:store_id(),
LeaderId :: ra:server_id() | not_known.

cache_leader(StoreId, LeaderId) when ?IS_RA_SERVER(LeaderId) ->
ok = persistent_term:put(?RA_LEADER_CACHE_KEY(StoreId), LeaderId);
cache_leader(_StoreId, not_known) ->
ok.

-spec cache_leader_if_changed(StoreId, LeaderId, NewLeaderId) -> ok when
StoreId :: khepri:store_id(),
LeaderId :: ra:server_id() | undefined,
NewLeaderId :: ra:server_id() | not_known.

cache_leader_if_changed(_StoreId, LeaderId, LeaderId) ->
ok;
cache_leader_if_changed(StoreId, undefined, NewLeaderId) ->
case get_cached_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
cache_leader_if_changed(StoreId, LeaderId, NewLeaderId);
undefined ->
cache_leader(StoreId, NewLeaderId)
end;
cache_leader_if_changed(StoreId, _OldLeaderId, NewLeaderId) ->
cache_leader(StoreId, NewLeaderId).

-spec clear_cached_leader(StoreId) -> ok when
StoreId :: khepri:store_id().

clear_cached_leader(StoreId) ->
_ = persistent_term:erase(?RA_LEADER_CACHE_KEY(StoreId)),
ok.
14 changes: 5 additions & 9 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -853,15 +853,12 @@ do_process_sync_command(StoreId, Command, Options) ->
ReplyFrom = maps:get(reply_from, Options, {member, RaServer}),
CommandOptions = #{timeout => Timeout, reply_from => ReplyFrom},
T0 = khepri_utils:start_timeout_window(Timeout),
LeaderId = ra_leaderboard:lookup_leader(StoreId),
Dest = case LeaderId of
Dest = case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined -> LeaderId;
undefined -> RaServer
end,
case ra:process_command(Dest, Command, CommandOptions) of
{ok, Ret, NewLeaderId} ->
khepri_cluster:cache_leader_if_changed(
StoreId, LeaderId, NewLeaderId),
{ok, Ret, _LeaderId} ->
?raise_exception_if_any(Ret);
{timeout, _LeaderId} ->
{error, timeout};
Expand Down Expand Up @@ -898,7 +895,7 @@ process_async_command(
ra:pipeline_command(RaServer, Command, Correlation, Priority);
process_async_command(
StoreId, Command, Correlation, Priority) ->
case khepri_cluster:get_cached_leader(StoreId) of
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
ra:pipeline_command(LeaderId, Command, Correlation, Priority);
undefined ->
Expand Down Expand Up @@ -977,8 +974,7 @@ process_query(StoreId, QueryFun, Options) ->
process_query1(StoreId, QueryFun, Options) ->
LocalServerId = {StoreId, node()},
case ra:local_query(LocalServerId, QueryFun, Options) of
{ok, {_RaIdxTerm, Ret}, NewLeaderId} ->
khepri_cluster:cache_leader(StoreId, NewLeaderId),
{ok, {_RaIdxTerm, Ret}, _NewLeaderId} ->
?raise_exception_if_any(Ret);
{timeout, _LeaderId} ->
{error, timeout};
Expand Down Expand Up @@ -1028,7 +1024,7 @@ add_applied_condition1(StoreId, Options, Timeout) ->
add_applied_condition2(StoreId, Options, Timeout) ->
%% After the previous local query, there is a great chance that the leader
%% was cached, though not 100% guarantied.
case khepri_cluster:get_cached_leader(StoreId) of
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
add_applied_condition3(StoreId, Options, LeaderId, Timeout);
undefined ->
Expand Down
55 changes: 0 additions & 55 deletions test/cached_leader.erl

This file was deleted.

39 changes: 5 additions & 34 deletions test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1949,12 +1949,8 @@ async_command_leader_change_in_three_node_cluster(Config) ->
ok = erpc:call(
LeaderNode,
fun() ->
%% This member hasn't sent any commands so the leader isn't
%% cached yet. The async call will succeed though because this
%% member is the leader.
?assertEqual(
undefined,
khepri_cluster:get_cached_leader(StoreId)),
%% The async call will succeed because this member is the
%% leader.
CorrelationId = 1,
Extra = #{async => CorrelationId},
ok = khepri:put(StoreId, [foo], ?NO_PAYLOAD, Extra),
Expand All @@ -1975,33 +1971,8 @@ async_command_leader_change_in_three_node_cluster(Config) ->
ok = erpc:call(
FollowerNode,
fun() ->
%% This member hasn't sent any commands so the leader isn't
%% cached yet. This member is not the leader so the async
%% command will fail.
?assertEqual(
undefined,
khepri_cluster:get_cached_leader(StoreId)),
CorrelationId1 = 1,
Extra1 = #{async => CorrelationId1},
ok = khepri:put(StoreId, [foo], ?NO_PAYLOAD, Extra1),
RaEvent1 = receive
{ra_event, _, _} = Event1 ->
Event1
after
1_000 ->
throw(timeout)
end,
?assertEqual(
[{CorrelationId1, {error, {not_leader, LeaderId}}}],
khepri:handle_async_ret(StoreId, RaEvent1)),

%% `khepri:handle_async_ret/2' updated the cached leader so
%% the async call will now send the command to the leader.
?assertNotEqual(
undefined,
khepri_cluster:get_cached_leader(StoreId)),
CorrelationId2 = 2,
Extra2 = #{async => CorrelationId2},
CorrelationId = 1,
Extra2 = #{async => CorrelationId},
ok = khepri:put(StoreId, [foo], ?NO_PAYLOAD, Extra2),
RaEvent2 = receive
{ra_event, _, _} = Event2 ->
Expand All @@ -2011,7 +1982,7 @@ async_command_leader_change_in_three_node_cluster(Config) ->
throw(timeout)
end,
?assertMatch(
[{CorrelationId2, {ok, _}}],
[{CorrelationId, {ok, _}}],
khepri:handle_async_ret(StoreId, RaEvent2))
end),
ok.
Expand Down

0 comments on commit a379a73

Please sign in to comment.