Skip to content

Commit

Permalink
Merge pull request #260 from rabbitmq/default-to-local-queries-plus-r…
Browse files Browse the repository at this point in the history
…elated-improvements

Improvements around queries
  • Loading branch information
dumbbell authored Jul 16, 2024
2 parents 0f56ea9 + a379a73 commit 25b405f
Show file tree
Hide file tree
Showing 7 changed files with 592 additions and 648 deletions.
129 changes: 105 additions & 24 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
transaction/1, transaction/2, transaction/3, transaction/4,
transaction/5,

fence/0, fence/1, fence/2,

handle_async_ret/1, handle_async_ret/2,

%% Bang functions: they return the value directly or throw an error.
Expand Down Expand Up @@ -233,8 +235,8 @@
%%
%% Values are:
%% <ul>
%% <li>`true' to perform an asynchronous low-priority command without a
%% correlation ID.</li>
%% <li>`true' to perform an asynchronous command without a correlation
%% ID.</li>
%% <li>`false' to perform a synchronous command.</li>
%% <li>A correlation ID to perform an asynchronous low-priority command with
%% that correlation ID.</li>
Expand Down Expand Up @@ -288,37 +290,45 @@
%% except for R/W transactions.</li>
%% </ul>

-type favor_option() :: consistency | compromise | low_latency.
-type favor_option() :: consistency | low_latency.
%% Option to indicate where to put the cursor between freshness of the
%% returned data and low latency of queries.
%%
%% Values are:
%% <ul>
%% <li>`consistent' means that a "consistent query" will be used in Ra. It
%% will return the most up-to-date piece of data the cluster agreed on. Note
%% that it could block and eventually time out if there is no quorum in the Ra
%% cluster.</li>
%% <li>`compromise' performs "leader queries" most of the time to reduce
%% latency, but uses "consistent queries" every 10 seconds to verify that the
%% cluster is healthy on a regular basis. It should be faster but may block
%% and time out like `consistent' and still return slightly out-of-date
%% data.</li>
%% <li>`low_latency' means that "local queries" are used exclusively. They are
%% the fastest and have the lowest latency. However, the returned data is
%% whatever the local Ra server has. It could be out-of-date if it has
%% troubles keeping up with the Ra cluster. The chance of blocking and timing
%% out is very small.</li>
%% <li>`low_latency' means that a local query is used. It is the fastest and
%% have the lowest latency. However, the returned data is whatever the local Ra
%% server has. It could be out-of-date if the local Ra server did not get or
%% applied the latest updates yet. The chance of blocking and timing out is
%% very small.</li>
%% <li>`consistency' means that a local query is used. However the query uses
%% the fence mechanism to ensure that the previous updates from the calling
%% process are applied locally before the query is evaluated. It will return
%% the most up-to-date piece of data the cluster agreed on. Note that it could
%% block and eventually time out if there is no quorum in the Ra cluster.</li>
%% </ul>
%%
%% As described above, queries are always evaluated locally by the cluster
%% member that gets the call. The reason Ra's leader and consistent queries
%% are not exposed is that the remote execution of the query function may fail
%% in subtle on non-subtle ways. For instance, the remote node might run a
%% different version of Erlang or Khepri.

-type query_options() :: #{timeout => timeout(),
-type query_options() :: #{condition => ra:query_condition(),
timeout => timeout(),
favor => favor_option()}.
%% Options used in queries.
%%
%% <ul>
%% <li>`condition' indicates the condition on which the Ra server should wait
%% for before it executes the query.</li>
%% <li>`timeout' is passed to Ra query processing function.</li>
%% <li>`favor' indicates where to put the cursor between freshness of the
%% returned data and low latency of queries; see {@link favor_option()}.</li>
%% </ul>
%%
%% `favor' computes a `condition' internally. Therefore if both options are
%% set, `condition' takes precedence and `favor' is ignored.

-type tree_options() :: #{expect_specific_node => boolean(),
props_to_return => [payload_version |
Expand Down Expand Up @@ -423,7 +433,7 @@
-type async_ret() :: khepri_adv:single_result() |
khepri_adv:many_results() |
khepri_tx:tx_fun_result() |
khepri:error(not_leader).
khepri:error({not_leader, ra:server_id()}).
%% The value returned from of a command function which was executed
%% asynchronously.
%%
Expand All @@ -433,8 +443,8 @@
%% `ra_event' message. Handling the notification with {@link
%% khepri:handle_async_ret/2} will return a list of pairs of correlation IDs
%% ({@link ra_server:command_correlation()}) and the return values of the
%% commands which were applied, or `{error, not_leader}' if the commands could
%% not be applied since they were sent to a non-leader member.
%% commands which were applied, or `{error, {not_leader, LeaderId}}' if the
%% commands could not be applied since they were sent to a non-leader member.
%%
%% Note that when commands are successfully applied, the return values are in
%% the {@link khepri_adv} formats - {@link khepri_adv:single_result()} or
Expand Down Expand Up @@ -3403,6 +3413,77 @@ transaction(FunOrPath, Args, ReadWrite, Options)
transaction(StoreId, FunOrPath, Args, ReadWrite, Options) ->
khepri_machine:transaction(StoreId, FunOrPath, Args, ReadWrite, Options).

%% -------------------------------------------------------------------
%% fence().
%% -------------------------------------------------------------------

-spec fence() -> Ret when
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%%
%% Calling this function is the same as calling `fence(StoreId)' with the
%% default store ID (see {@link khepri_cluster:get_default_store_id/0}).
%%
%% @see fence/1.
%% @see fence/2.

fence() ->
StoreId = khepri_cluster:get_default_store_id(),
fence(StoreId).

-spec fence(StoreId | Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%%
%% This function accepts the following two forms:
%% <ul>
%% <li>`fence(StoreId)'. Calling it is the same as calling `fence(StoreId,
%% Timeout)' with the default timeout (see {@link
%% khepri_app:get_default_timeout/0}).</li>
%% <li>`fence(Timeout)'. Calling it is the same as calling `fence(StoreId,
%% Timeout)' with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).</li>
%% </ul>
%%
%% @see fence/2.

fence(Timeout) when Timeout =:= infinity orelse is_integer(Timeout) ->
StoreId = khepri_cluster:get_default_store_id(),
fence(StoreId, Timeout);
fence(StoreId) ->
Timeout = khepri_app:get_default_timeout(),
fence(StoreId, Timeout).

-spec fence(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%%
%% This ensures that a subsequent query will see the result of synchronous and
%% asynchronous updates.
%%
%% This can't work however if:
%% <ul>
%% <li>Asynchronous updates have a correlation ID, in which case the caller is
%% responsible for waiting for the replies.</li>
%% <li>The default `reply_from => local' command option is overridden by
%% something else.</li>
%% </ul>
%%
%% @param StoreId the name of the Khepri store.
%% @param Timeout the time limit after which the call returns with an error.
%%
%% @returns `ok' or an `{error, Reason}' tuple.

fence(StoreId, Timeout) ->
khepri_machine:fence(StoreId, Timeout).

%% -------------------------------------------------------------------
%% handle_async_ret().
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -3486,10 +3567,10 @@ handle_async_ret(
end, Correlations0);
handle_async_ret(
StoreId,
{ra_event, FromId, {rejected, {not_leader, MaybeLeader, CorrelationId}}})
{ra_event, _RaServer,
{rejected, {not_leader, LeaderId, CorrelationId}}})
when ?IS_KHEPRI_STORE_ID(StoreId) ->
ok = khepri_cluster:cache_leader_if_changed(StoreId, FromId, MaybeLeader),
[{CorrelationId, {error, not_leader}}].
[{CorrelationId, {error, {not_leader, LeaderId}}}].

%% -------------------------------------------------------------------
%% Bang functions.
Expand Down
55 changes: 1 addition & 54 deletions src/khepri_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@
%% Internal.
-export([node_to_member/2,
this_member/1,
wait_for_cluster_readiness/2,
get_cached_leader/1,
cache_leader/2,
cache_leader_if_changed/3,
clear_cached_leader/1]).
wait_for_cluster_readiness/2]).

-ifdef(TEST).
-export([wait_for_ra_server_exit/1,
Expand Down Expand Up @@ -1480,7 +1476,6 @@ get_store_prop(StoreId, PropName) ->

forget_store(StoreId) ->
ok = khepri_machine:clear_cache(StoreId),
ok = clear_cached_leader(StoreId),
StoreIds = persistent_term:get(?PT_STORE_IDS, #{}),
StoreIds1 = maps:remove(StoreId, StoreIds),
case maps:size(StoreIds1) of
Expand Down Expand Up @@ -1513,51 +1508,3 @@ is_store_running(StoreId) ->
Known = maps:is_key(StoreId, StoreIds),
?assertEqual(Known, Runs),
Runs.

%% Cache the Ra leader ID to avoid command/query redirections from a follower
%% to the leader. The leader ID is returned after each command or query. If we
%% don't know it yet, wait for a leader election using khepri_event_handler.

-define(RA_LEADER_CACHE_KEY(StoreId), {khepri, ra_leader_cache, StoreId}).

-spec get_cached_leader(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: LeaderId | undefined,
LeaderId :: ra:server_id().

get_cached_leader(StoreId) ->
Key = ?RA_LEADER_CACHE_KEY(StoreId),
persistent_term:get(Key, undefined).

-spec cache_leader(StoreId, LeaderId) -> ok when
StoreId :: khepri:store_id(),
LeaderId :: ra:server_id() | not_known.

cache_leader(StoreId, LeaderId) when ?IS_RA_SERVER(LeaderId) ->
ok = persistent_term:put(?RA_LEADER_CACHE_KEY(StoreId), LeaderId);
cache_leader(_StoreId, not_known) ->
ok.

-spec cache_leader_if_changed(StoreId, LeaderId, NewLeaderId) -> ok when
StoreId :: khepri:store_id(),
LeaderId :: ra:server_id() | undefined,
NewLeaderId :: ra:server_id() | not_known.

cache_leader_if_changed(_StoreId, LeaderId, LeaderId) ->
ok;
cache_leader_if_changed(StoreId, undefined, NewLeaderId) ->
case get_cached_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
cache_leader_if_changed(StoreId, LeaderId, NewLeaderId);
undefined ->
cache_leader(StoreId, NewLeaderId)
end;
cache_leader_if_changed(StoreId, _OldLeaderId, NewLeaderId) ->
cache_leader(StoreId, NewLeaderId).

-spec clear_cached_leader(StoreId) -> ok when
StoreId :: khepri:store_id().

clear_cached_leader(StoreId) ->
_ = persistent_term:erase(?RA_LEADER_CACHE_KEY(StoreId)),
ok.
Loading

0 comments on commit 25b405f

Please sign in to comment.