Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements around queries #260

Merged
merged 8 commits into from
Jul 16, 2024
129 changes: 105 additions & 24 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
transaction/1, transaction/2, transaction/3, transaction/4,
transaction/5,

fence/0, fence/1, fence/2,

handle_async_ret/1, handle_async_ret/2,

%% Bang functions: they return the value directly or throw an error.
Expand Down Expand Up @@ -233,8 +235,8 @@
%%
%% Values are:
%% <ul>
%% <li>`true' to perform an asynchronous low-priority command without a
%% correlation ID.</li>
%% <li>`true' to perform an asynchronous command without a correlation
%% ID.</li>
%% <li>`false' to perform a synchronous command.</li>
%% <li>A correlation ID to perform an asynchronous low-priority command with
%% that correlation ID.</li>
Expand Down Expand Up @@ -288,37 +290,45 @@
%% except for R/W transactions.</li>
%% </ul>

-type favor_option() :: consistency | compromise | low_latency.
-type favor_option() :: consistency | low_latency.
%% Option to indicate where to put the cursor between freshness of the
%% returned data and low latency of queries.
%%
%% Values are:
%% <ul>
%% <li>`consistent' means that a "consistent query" will be used in Ra. It
%% will return the most up-to-date piece of data the cluster agreed on. Note
%% that it could block and eventually time out if there is no quorum in the Ra
%% cluster.</li>
%% <li>`compromise' performs "leader queries" most of the time to reduce
%% latency, but uses "consistent queries" every 10 seconds to verify that the
%% cluster is healthy on a regular basis. It should be faster but may block
%% and time out like `consistent' and still return slightly out-of-date
%% data.</li>
%% <li>`low_latency' means that "local queries" are used exclusively. They are
%% the fastest and have the lowest latency. However, the returned data is
%% whatever the local Ra server has. It could be out-of-date if it has
%% troubles keeping up with the Ra cluster. The chance of blocking and timing
%% out is very small.</li>
%% <li>`low_latency' means that a local query is used. It is the fastest and
%% have the lowest latency. However, the returned data is whatever the local Ra
%% server has. It could be out-of-date if the local Ra server did not get or
%% applied the latest updates yet. The chance of blocking and timing out is
%% very small.</li>
%% <li>`consistency' means that a local query is used. However the query uses
%% the fence mechanism to ensure that the previous updates from the calling
%% process are applied locally before the query is evaluated. It will return
%% the most up-to-date piece of data the cluster agreed on. Note that it could
%% block and eventually time out if there is no quorum in the Ra cluster.</li>
%% </ul>
%%
%% As described above, queries are always evaluated locally by the cluster
%% member that gets the call. The reason Ra's leader and consistent queries
%% are not exposed is that the remote execution of the query function may fail
%% in subtle on non-subtle ways. For instance, the remote node might run a
%% different version of Erlang or Khepri.

-type query_options() :: #{timeout => timeout(),
-type query_options() :: #{condition => ra:query_condition(),
timeout => timeout(),
favor => favor_option()}.
%% Options used in queries.
%%
%% <ul>
%% <li>`condition' indicates the condition on which the Ra server should wait
%% for before it executes the query.</li>
%% <li>`timeout' is passed to Ra query processing function.</li>
%% <li>`favor' indicates where to put the cursor between freshness of the
%% returned data and low latency of queries; see {@link favor_option()}.</li>
%% </ul>
%%
%% `favor' computes a `condition' internally. Therefore if both options are
%% set, `condition' takes precedence and `favor' is ignored.

-type tree_options() :: #{expect_specific_node => boolean(),
props_to_return => [payload_version |
Expand Down Expand Up @@ -423,7 +433,7 @@
-type async_ret() :: khepri_adv:single_result() |
khepri_adv:many_results() |
khepri_tx:tx_fun_result() |
khepri:error(not_leader).
khepri:error({not_leader, ra:server_id()}).
%% The value returned from of a command function which was executed
%% asynchronously.
%%
Expand All @@ -433,8 +443,8 @@
%% `ra_event' message. Handling the notification with {@link
%% khepri:handle_async_ret/2} will return a list of pairs of correlation IDs
%% ({@link ra_server:command_correlation()}) and the return values of the
%% commands which were applied, or `{error, not_leader}' if the commands could
%% not be applied since they were sent to a non-leader member.
%% commands which were applied, or `{error, {not_leader, LeaderId}}' if the
%% commands could not be applied since they were sent to a non-leader member.
%%
%% Note that when commands are successfully applied, the return values are in
%% the {@link khepri_adv} formats - {@link khepri_adv:single_result()} or
Expand Down Expand Up @@ -3403,6 +3413,77 @@ transaction(FunOrPath, Args, ReadWrite, Options)
transaction(StoreId, FunOrPath, Args, ReadWrite, Options) ->
khepri_machine:transaction(StoreId, FunOrPath, Args, ReadWrite, Options).

%% -------------------------------------------------------------------
%% fence().
%% -------------------------------------------------------------------

-spec fence() -> Ret when
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%%
%% Calling this function is the same as calling `fence(StoreId)' with the
%% default store ID (see {@link khepri_cluster:get_default_store_id/0}).
%%
%% @see fence/1.
%% @see fence/2.

fence() ->
StoreId = khepri_cluster:get_default_store_id(),
fence(StoreId).

-spec fence(StoreId | Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%%
%% This function accepts the following two forms:
%% <ul>
%% <li>`fence(StoreId)'. Calling it is the same as calling `fence(StoreId,
%% Timeout)' with the default timeout (see {@link
%% khepri_app:get_default_timeout/0}).</li>
%% <li>`fence(Timeout)'. Calling it is the same as calling `fence(StoreId,
%% Timeout)' with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).</li>
%% </ul>
%%
%% @see fence/2.

fence(Timeout) when Timeout =:= infinity orelse is_integer(Timeout) ->
StoreId = khepri_cluster:get_default_store_id(),
fence(StoreId, Timeout);
fence(StoreId) ->
Timeout = khepri_app:get_default_timeout(),
fence(StoreId, Timeout).

-spec fence(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%%
%% This ensures that a subsequent query will see the result of synchronous and
%% asynchronous updates.
%%
%% This can't work however if:
%% <ul>
%% <li>Asynchronous updates have a correlation ID, in which case the caller is
%% responsible for waiting for the replies.</li>
%% <li>The default `reply_from => local' command option is overridden by
%% something else.</li>
%% </ul>
%%
%% @param StoreId the name of the Khepri store.
%% @param Timeout the time limit after which the call returns with an error.
%%
%% @returns `ok' or an `{error, Reason}' tuple.

fence(StoreId, Timeout) ->
khepri_machine:fence(StoreId, Timeout).

%% -------------------------------------------------------------------
%% handle_async_ret().
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -3486,10 +3567,10 @@ handle_async_ret(
end, Correlations0);
handle_async_ret(
StoreId,
{ra_event, FromId, {rejected, {not_leader, MaybeLeader, CorrelationId}}})
{ra_event, _RaServer,
{rejected, {not_leader, LeaderId, CorrelationId}}})
when ?IS_KHEPRI_STORE_ID(StoreId) ->
ok = khepri_cluster:cache_leader_if_changed(StoreId, FromId, MaybeLeader),
[{CorrelationId, {error, not_leader}}].
[{CorrelationId, {error, {not_leader, LeaderId}}}].

%% -------------------------------------------------------------------
%% Bang functions.
Expand Down
55 changes: 1 addition & 54 deletions src/khepri_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@
%% Internal.
-export([node_to_member/2,
this_member/1,
wait_for_cluster_readiness/2,
get_cached_leader/1,
cache_leader/2,
cache_leader_if_changed/3,
clear_cached_leader/1]).
wait_for_cluster_readiness/2]).

-ifdef(TEST).
-export([wait_for_ra_server_exit/1,
Expand Down Expand Up @@ -1480,7 +1476,6 @@ get_store_prop(StoreId, PropName) ->

forget_store(StoreId) ->
ok = khepri_machine:clear_cache(StoreId),
ok = clear_cached_leader(StoreId),
StoreIds = persistent_term:get(?PT_STORE_IDS, #{}),
StoreIds1 = maps:remove(StoreId, StoreIds),
case maps:size(StoreIds1) of
Expand Down Expand Up @@ -1513,51 +1508,3 @@ is_store_running(StoreId) ->
Known = maps:is_key(StoreId, StoreIds),
?assertEqual(Known, Runs),
Runs.

%% Cache the Ra leader ID to avoid command/query redirections from a follower
%% to the leader. The leader ID is returned after each command or query. If we
%% don't know it yet, wait for a leader election using khepri_event_handler.

-define(RA_LEADER_CACHE_KEY(StoreId), {khepri, ra_leader_cache, StoreId}).

-spec get_cached_leader(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: LeaderId | undefined,
LeaderId :: ra:server_id().

get_cached_leader(StoreId) ->
Key = ?RA_LEADER_CACHE_KEY(StoreId),
persistent_term:get(Key, undefined).

-spec cache_leader(StoreId, LeaderId) -> ok when
StoreId :: khepri:store_id(),
LeaderId :: ra:server_id() | not_known.

cache_leader(StoreId, LeaderId) when ?IS_RA_SERVER(LeaderId) ->
ok = persistent_term:put(?RA_LEADER_CACHE_KEY(StoreId), LeaderId);
cache_leader(_StoreId, not_known) ->
ok.

-spec cache_leader_if_changed(StoreId, LeaderId, NewLeaderId) -> ok when
StoreId :: khepri:store_id(),
LeaderId :: ra:server_id() | undefined,
NewLeaderId :: ra:server_id() | not_known.

cache_leader_if_changed(_StoreId, LeaderId, LeaderId) ->
ok;
cache_leader_if_changed(StoreId, undefined, NewLeaderId) ->
case get_cached_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
cache_leader_if_changed(StoreId, LeaderId, NewLeaderId);
undefined ->
cache_leader(StoreId, NewLeaderId)
end;
cache_leader_if_changed(StoreId, _OldLeaderId, NewLeaderId) ->
cache_leader(StoreId, NewLeaderId).

-spec clear_cached_leader(StoreId) -> ok when
StoreId :: khepri:store_id().

clear_cached_leader(StoreId) ->
_ = persistent_term:erase(?RA_LEADER_CACHE_KEY(StoreId)),
ok.
Loading