Skip to content


khepri_machine: Introduce a fence mechanism
Browse files Browse the repository at this point in the history
with the previous commits, we ensured that:
1. when a synchronous update returns, the local machine state is
2. Khepri would default to local queries, to eliminate the risks linked
   to remote executions and possible incompatible code.

Therefore, a query ais local to make sure its execution is local and
that it works on an up-to-date state.

There is still the following scenario where this isn't enough:
1. The caller makes several asynchronous updates without a correlation
   ID, or overrides the default of `reply_from => local`.
2. The caller then performs a query.

In this case, the query is unlikely to be executed after the
asynchronous commands were applied. This is not a bug, the caller
explicitly asked for asynchronous updates.

The caller could use correlation IDs and wait for the replies. But
without correlation IDs, it's not possible.

To help the caller in this case, this patch introduces
`khepri:fence/{0,1,2}`. It is a blocking call that queries the Ra leader
to learn its last index (the number of the last command it received),
then performs an arbitrary local query, passing that index so that the
query execution waits for that index to be committed locally.

This way, by putting a call to `khepri:fence()` between asynchronous
updates and a query, the caller ensures that the query will see the
result of those asynchronous updates.
  • Loading branch information
dumbbell committed Jul 13, 2024
1 parent 2c1ee4d commit 653ec8e
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 31 deletions.
78 changes: 77 additions & 1 deletion src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
transaction/1, transaction/2, transaction/3, transaction/4,

fence/0, fence/1, fence/2,

handle_async_ret/1, handle_async_ret/2,

%% Bang functions: they return the value directly or throw an error.
Expand Down Expand Up @@ -310,11 +312,14 @@
%% out is very small.</li>
%% </ul>

-type query_options() :: #{timeout => timeout(),
-type query_options() :: #{condition => ra:query_condition(),
timeout => timeout(),
favor => favor_option()}.
%% Options used in queries.
%% <ul>
%% <li>`condition' indicates the condition on which the Ra server should wait
%% for before it executes the query.</li>
%% <li>`timeout' is passed to Ra query processing function.</li>
%% <li>`favor' indicates where to put the cursor between freshness of the
%% returned data and low latency of queries; see {@link favor_option()}.</li>
Expand Down Expand Up @@ -3403,6 +3408,77 @@ transaction(FunOrPath, Args, ReadWrite, Options)
transaction(StoreId, FunOrPath, Args, ReadWrite, Options) ->
khepri_machine:transaction(StoreId, FunOrPath, Args, ReadWrite, Options).

%% -------------------------------------------------------------------
%% fence().
%% -------------------------------------------------------------------

-spec fence() -> Ret when
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%% Calling this function is the same as calling `fence(StoreId)' with the
%% default store ID (see {@link khepri_cluster:get_default_store_id/0}).
%% @see fence/1.
%% @see fence/2.

fence() ->
StoreId = khepri_cluster:get_default_store_id(),

-spec fence(StoreId | Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%% This function accepts the following two forms:
%% <ul>
%% <li>`fence(StoreId)'. Calling it is the same as calling `fence(StoreId,
%% Timeout)' with the default timeout (see {@link
%% khepri_app:get_default_timeout/0}).</li>
%% <li>`fence(Timeout)'. Calling it is the same as calling `fence(StoreId,
%% Timeout)' with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).</li>
%% </ul>
%% @see fence/2.

fence(Timeout) when Timeout =:= infinity orelse is_integer(Timeout) ->
StoreId = khepri_cluster:get_default_store_id(),
fence(StoreId, Timeout);
fence(StoreId) ->
Timeout = khepri_app:get_default_timeout(),
fence(StoreId, Timeout).

-spec fence(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%% This ensures that a subsequent query will see the result of synchronous and
%% asynchronous updates.
%% This can't work however if:
%% <ul>
%% <li>Asynchronous updates have a correlation ID, in which case the caller is
%% responsible for waiting for the replies.</li>
%% <li>The default `reply_from => local' command option is overridden by
%% something else.</li>
%% </ul>
%% @param StoreId the name of the Khepri store.
%% @param Timeout the time limit after which the call returns with an error.
%% @returns `ok' or an `{error, Reason}' tuple.

fence(StoreId, Timeout) ->
khepri_machine:fence(StoreId, Timeout).

%% -------------------------------------------------------------------
%% handle_async_ret().
%% -------------------------------------------------------------------
Expand Down
179 changes: 150 additions & 29 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

Expand Down Expand Up @@ -255,6 +256,32 @@ fold(StoreId, PathPattern, Fun, Acc, Options)
Ret -> Ret

-spec fence(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Blocks until all updates received by the cluster leader are applied
%% locally.
%% @param StoreId the name of the Ra cluster
%% @param Timeout the time limit after which the call returns with an error.
%% @returns `ok' or an `{error, Reason}' tuple.

fence(StoreId, Timeout) ->
Options = #{timeout => Timeout},
case add_applied_condition(StoreId, Options) of
{ok, Options1} ->
QueryFun = fun erlang:is_tuple/1,
Options2 = Options1#{favor => low_latency},
case process_query(StoreId, QueryFun, Options2) of
true -> ok;
Other when Other =/= false -> Other
{error, _} = Error ->

-spec put(StoreId, PathPattern, Payload, Options) -> Ret when
StoreId :: khepri:store_id(),
PathPattern :: khepri_path:pattern(),
Expand Down Expand Up @@ -662,6 +689,7 @@ split_query_options(Options) ->
(Option, Value, {Q, T}) when
Option =:= condition orelse
Option =:= timeout orelse
Option =:= favor ->
Q1 = Q#{Option => Value},
Expand Down Expand Up @@ -923,69 +951,74 @@ select_command_type(#{async := {Correlation, Priority}})
process_query(StoreId, QueryFun, Options) ->
QueryType = select_query_type(StoreId, Options),
Timeout = get_timeout(Options),
Options1 = Options#{timeout => Timeout},
case QueryType of
local -> process_local_query(StoreId, QueryFun, Timeout);
_ -> process_non_local_query(StoreId, QueryFun, QueryType, Timeout)
local ->
process_local_query(StoreId, QueryFun, Options1);
_ ->
process_non_local_query(StoreId, QueryFun, QueryType, Options1)

-spec process_local_query(StoreId, QueryFun, Timeout) -> Ret when
-spec process_local_query(StoreId, QueryFun, Options) -> Ret when
StoreId :: khepri:store_id(),
QueryFun :: query_fun(),
Timeout :: timeout(),
Options :: khepri:query_options(),
Ret :: any().

process_local_query(StoreId, QueryFun, Timeout) ->
process_local_query(StoreId, QueryFun, Options) ->
LocalServerId = {StoreId, node()},
Ret = ra:local_query(LocalServerId, QueryFun, Timeout),
Ret = ra:local_query(LocalServerId, QueryFun, Options),
StoreId, LocalServerId, false, QueryFun, local, Timeout, Ret).
StoreId, LocalServerId, false, QueryFun, local, Options, Ret).

-spec process_non_local_query(StoreId, QueryFun, QueryType, Timeout) ->
-spec process_non_local_query(StoreId, QueryFun, QueryType, Options) ->
Ret when
StoreId :: khepri:store_id(),
QueryFun :: query_fun(),
QueryType :: leader | consistent,
Timeout :: timeout(),
Options :: khepri:query_options(),
Ret :: any().

process_non_local_query(StoreId, QueryFun, QueryType, Timeout)
StoreId, QueryFun, QueryType, #{timeout := Timeout} = Options)
when QueryType =:= leader orelse
QueryType =:= consistent ->
T0 = khepri_utils:start_timeout_window(Timeout),
LeaderId = khepri_cluster:get_cached_leader(StoreId),
RaServer = use_leader_or_local_ra_server(StoreId, LeaderId),
Ret = case QueryType of
leader -> ra:leader_query(RaServer, QueryFun, Timeout);
consistent -> ra:consistent_query(RaServer, QueryFun, Timeout)
consistent -> ra:consistent_query(RaServer, QueryFun, Timeout);
_ -> ra:leader_query(RaServer, QueryFun, Options)
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
%% TODO: If the consistent query times out in the context of
%% `QueryType=compromise`, should we retry with a local query to
%% never block the query and let the caller continue?
Options1 = Options#{timeout => NewTimeout},
StoreId, RaServer, LeaderId =/= undefined, QueryFun, QueryType,
NewTimeout, Ret).
Options1, Ret).

-spec process_query_response(
StoreId, RaServer, IsLeader, QueryFun, QueryType, Timeout,
StoreId, RaServer, IsLeader, QueryFun, QueryType, Options,
Response) ->
Ret when
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
IsLeader :: boolean(),
QueryFun :: query_fun(),
QueryType :: local | leader | consistent,
Timeout :: timeout(),
Response :: {ok, {RaIndex, any()}, NewLeaderId} |
{ok, any(), NewLeaderId} |
Options :: khepri:query_options(),
Response :: {ok, {RaIdxTerm, Ret}, NewLeaderId} |
{ok, Ret, NewLeaderId} |
{error, any()} |
{timeout, ra:server_id()},
RaIndex :: ra:index(),
RaIdxTerm :: ra:idxterm(),
NewLeaderId :: ra:server_id(),
Ret :: any().
Ret :: any() | khepri:error(any()).

StoreId, RaServer, IsLeader, _QueryFun, consistent, _Timeout,
StoreId, RaServer, IsLeader, _QueryFun, consistent, _Options,
{ok, Ret, NewLeaderId}) ->
case IsLeader of
true ->
Expand All @@ -997,8 +1030,8 @@ process_query_response(
StoreId, RaServer, IsLeader, _QueryFun, _QueryType, _Timeout,
{ok, {_RaIndex, Ret}, NewLeaderId}) ->
StoreId, RaServer, IsLeader, _QueryFun, _QueryType, _Options,
{ok, {_RaIdxTerm, Ret}, NewLeaderId}) ->
case IsLeader of
true ->
Expand All @@ -1008,21 +1041,22 @@ process_query_response(
_StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Timeout,
_StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Options,
{timeout, _LeaderId}) ->
{error, timeout};
StoreId, _RaServer, true = _IsLeader, QueryFun, QueryType, Timeout,
{error, Reason})
StoreId, _RaServer, true = _IsLeader, QueryFun, QueryType,
#{timeout := Timeout} = Options, {error, Reason})
when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Reason == shutdown) ->
%% The cached leader is no more. We simply clear the cache
%% entry and retry. It may time out eventually.
process_non_local_query(StoreId, QueryFun, QueryType, Timeout);
process_non_local_query(StoreId, QueryFun, QueryType, Options);
StoreId, RaServer, false = _IsLeader, QueryFun, QueryType, Timeout,
StoreId, RaServer, false = _IsLeader, QueryFun, QueryType,
#{timeout := Timeout} = Options,
{error, Reason} = Error)
when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Expand All @@ -1032,12 +1066,13 @@ process_query_response(
%% The follower doesn't know about the new leader yet. Retry again
%% after waiting a bit.
NewTimeout = khepri_utils:sleep(?NOPROC_RETRY_INTERVAL, Timeout),
process_non_local_query(StoreId, QueryFun, QueryType, NewTimeout);
Options1 = Options#{timeout => NewTimeout},
process_non_local_query(StoreId, QueryFun, QueryType, Options1);
false ->
_StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Timeout,
_StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Options,
{error, _} = Error) ->

Expand Down Expand Up @@ -1119,6 +1154,92 @@ get_last_consistent_call_atomics(StoreId) ->
persistent_term:get(Key, undefined).

-spec add_applied_condition(StoreId, Options) -> NewOptions when
StoreId :: khepri:store_id(),
Options :: khepri:query_options(),
NewOptions :: khepri:ok(khepri:query_options()) | khepri:error().
%% @private

add_applied_condition(StoreId, Options) ->
Timeout = get_timeout(Options),
add_applied_condition1(StoreId, Options, Timeout).

add_applied_condition1(StoreId, Options, Timeout) ->
%% The `applied' condition permits that a query is only evaluated after
%% the given index is applied on the local node. This is useful to enforce
%% the order of operations between updates and queries. We have to follow
%% several steps to prepare that condition.
%% We first send an arbitrary query to the local Ra server. This is to
%% make sure that previously submitted pipelined commands were processed
%% by that server.
%% For instance, if there was a pipelined command without any correlation
%% ID, it ensures it was forwarded to the leader. Likewise for a
%% synchronous command without the `reply_from => local' option.
%% We can't have this guaranty for pipelined commands with a correlation
%% because the caller is responsible for receiving the rejection from the
%% follower and handle the redirect to the leader.
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
InternalOptions = #{favor => low_latency,
timeout => Timeout},
case process_query(StoreId, QueryFun, InternalOptions) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->

add_applied_condition2(StoreId, Options, Timeout) ->
%% After the previous local query, there is a great chance that the leader
%% was cached, though not 100% guarantied.
case khepri_cluster:get_cached_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
add_applied_condition3(StoreId, Options, LeaderId, Timeout);
undefined ->
add_applied_condition1(StoreId, Options, Timeout)

add_applied_condition3(StoreId, Options, LeaderId, Timeout) ->
%% We query the leader to know the last index it committed. We also
%% double-check it is still the leader; if it is not, we recurse.
T0 = khepri_utils:start_timeout_window(Timeout),
case ra:member_overview(LeaderId, Timeout) of
{ok, Overview, LeaderId} ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),

%% Now that we know the last committed index of the leader, we can
%% perform an arbitrary query on the local server. The query will
%% wait for that same index to be applied locally before it is
%% executed.
%% We don't care about the result of that query. We just want to
%% block until the latest commands are applied locally.
#{log := #{last_index := LastIndex},
current_term := CurrentTerm} = Overview,
Condition = {applied, {LastIndex, CurrentTerm}},
Options1 = Options#{condition => Condition,
timeout => NewTimeout},
{ok, Options1};
{ok, _Overview, NewLeaderId} ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition3(StoreId, Options, NewLeaderId, NewTimeout);
{timeout, _LeaderId} ->
{error, timeout};
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Reason == shutdown) ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout);
Error ->

-spec get_timeout(Options) -> Timeout when
Options :: khepri:command_options() | khepri:query_options(),
Timeout :: timeout().
Expand Down

0 comments on commit 653ec8e

Please sign in to comment.