Skip to content

Commit

Permalink
khepri_machine: Queries default to local ones
Browse files Browse the repository at this point in the history
[Why]
Before, Khepri tried something smart by mixing consistent and leader
queries by default. It exposed the `favor` option to let the caller
influence the type of query.

Leader queries are a good compromise in terms of latency and freshness
of the queried data. However, they come with a big caveat: the query
function is executed on the leader, not on the local node (if the local
node is a follower).

There are various potential issues with the remote execution of query
functions:
* The function reference might be invalid on the remote node because of
  a different in the version of Erlang/OTP.
* The function reference might be invalid on the remote node because the
  module hosting the function is not the same.
* The function reference might be invalid on the remote node because the
  module hosting the function is missing.
* Using an exported MFA instead of a function reference doesn't solve
  the problem because the remote copy of that MFA may still be missing
  or simply be incompatible (different code/behavior). This might be
  even more difficult to debug.

The problem is the same with consistent queries.

[How]
The only way to be sure that the query function behaves as the caller
expects is to run the local copy of that function. Therefore, we must
always use a local query.

The `favor` option is changed to accept `low_latency` or `consistency`.
A local query will always be used behind the scene. However if the
caller favors consistency, the query will leverage the fence mechanism
introduced in a previous commit.

If the caller favors the low latency, there is a risk that the query
runs against out-of-date data. That is why a previous commit changed the
default behavior of synchronous updates so that the call returns only
when it was applied locally. This should increaset the chance that the
query works on fresh data.

Therefore, with the new default beraviors and options in this commit and
the previous ones, we ensure that a query will work with the local query
function and that it will be executed against the local up-to-date
state.
  • Loading branch information
dumbbell committed Jul 13, 2024
1 parent 653ec8e commit b66fc4f
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 479 deletions.
35 changes: 20 additions & 15 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -290,27 +290,29 @@
%% except for R/W transactions.</li>
%% </ul>

-type favor_option() :: consistency | compromise | low_latency.
-type favor_option() :: consistency | low_latency.
%% Option to indicate where to put the cursor between freshness of the
%% returned data and low latency of queries.
%%
%% Values are:
%% <ul>
%% <li>`consistent' means that a "consistent query" will be used in Ra. It
%% will return the most up-to-date piece of data the cluster agreed on. Note
%% that it could block and eventually time out if there is no quorum in the Ra
%% cluster.</li>
%% <li>`compromise' performs "leader queries" most of the time to reduce
%% latency, but uses "consistent queries" every 10 seconds to verify that the
%% cluster is healthy on a regular basis. It should be faster but may block
%% and time out like `consistent' and still return slightly out-of-date
%% data.</li>
%% <li>`low_latency' means that "local queries" are used exclusively. They are
%% the fastest and have the lowest latency. However, the returned data is
%% whatever the local Ra server has. It could be out-of-date if it has
%% troubles keeping up with the Ra cluster. The chance of blocking and timing
%% out is very small.</li>
%% <li>`low_latency' means that a local query is used. It is the fastest and
%% have the lowest latency. However, the returned data is whatever the local Ra
%% server has. It could be out-of-date if the local Ra server did not get or
%% applied the latest updates yet. The chance of blocking and timing out is
%% very small.</li>
%% <li>`consistency' means that a local query is used. However the query uses
%% the fence mechanism to ensure that the previous updates from the calling
%% process are applied locally before the query is evaluated. It will return
%% the most up-to-date piece of data the cluster agreed on. Note that it could
%% block and eventually time out if there is no quorum in the Ra cluster.</li>
%% </ul>
%%
%% As described above, queries are always evaluated locally by the cluster
%% member that gets the call. The reason Ra's leader and consistent queries
%% are not exposed is that the remote execution of the query function may fail
%% in subtle on non-subtle ways. For instance, the remote node might run a
%% different version of Erlang or Khepri.

-type query_options() :: #{condition => ra:query_condition(),
timeout => timeout(),
Expand All @@ -324,6 +326,9 @@
%% <li>`favor' indicates where to put the cursor between freshness of the
%% returned data and low latency of queries; see {@link favor_option()}.</li>
%% </ul>
%%
%% `favor' computes a `condition' internally. Therefore if both options are
%% set, `condition' takes precedence and `favor' is ignored.

-type tree_options() :: #{expect_specific_node => boolean(),
props_to_return => [payload_version |
Expand Down
249 changes: 34 additions & 215 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@
-ifdef(TEST).
-export([make_virgin_state/1,
convert_state/3,
set_tree/2,
get_last_consistent_call_atomics/1]).
set_tree/2]).
-endif.

-compile({no_auto_import, [apply/3]}).
Expand Down Expand Up @@ -269,17 +268,12 @@ fold(StoreId, PathPattern, Fun, Acc, Options)
%% @returns `ok' or an `{error, Reason}' tuple.

fence(StoreId, Timeout) ->
Options = #{timeout => Timeout},
case add_applied_condition(StoreId, Options) of
{ok, Options1} ->
QueryFun = fun erlang:is_tuple/1,
Options2 = Options1#{favor => low_latency},
case process_query(StoreId, QueryFun, Options2) of
true -> ok;
Other when Other =/= false -> Other
end;
{error, _} = Error ->
Error
QueryFun = fun erlang:is_tuple/1,
Options = #{favor => consistency,
timeout => Timeout},
case process_query(StoreId, QueryFun, Options) of
true -> ok;
Other when Other =/= false -> Other
end.

-spec put(StoreId, PathPattern, Payload, Options) -> Ret when
Expand Down Expand Up @@ -857,7 +851,6 @@ do_process_sync_command(StoreId, Command, Options) ->
{ok, Ret, NewLeaderId} ->
khepri_cluster:cache_leader_if_changed(
StoreId, LeaderId, NewLeaderId),
just_did_consistent_call(StoreId),
?raise_exception_if_any(Ret);
{timeout, _LeaderId} ->
{error, timeout};
Expand Down Expand Up @@ -948,211 +941,38 @@ select_command_type(#{async := {Correlation, Priority}})
%%
%% @private

process_query(StoreId, QueryFun, #{condition := _} = Options) ->
%% `condition' takes precedence over `favor'.
Options1 = maps:remove(favor, Options),
process_query1(StoreId, QueryFun, Options1);
process_query(StoreId, QueryFun, Options) ->
QueryType = select_query_type(StoreId, Options),
Favor = maps:get(favor, Options, low_latency),
Timeout = get_timeout(Options),
Options1 = Options#{timeout => Timeout},
case QueryType of
local ->
process_local_query(StoreId, QueryFun, Options1);
_ ->
process_non_local_query(StoreId, QueryFun, QueryType, Options1)
Options1 = maps:remove(favor, Options),
Options2 = Options1#{timeout => Timeout},
case Favor of
low_latency ->
process_query1(StoreId, QueryFun, Options2);
consistency ->
case add_applied_condition(StoreId, Options2) of
{ok, Options3} ->
process_query1(StoreId, QueryFun, Options3);
{error, _} = Error ->
Error
end
end.

-spec process_local_query(StoreId, QueryFun, Options) -> Ret when
StoreId :: khepri:store_id(),
QueryFun :: query_fun(),
Options :: khepri:query_options(),
Ret :: any().

process_local_query(StoreId, QueryFun, Options) ->
process_query1(StoreId, QueryFun, Options) ->
LocalServerId = {StoreId, node()},
Ret = ra:local_query(LocalServerId, QueryFun, Options),
process_query_response(
StoreId, LocalServerId, false, QueryFun, local, Options, Ret).

-spec process_non_local_query(StoreId, QueryFun, QueryType, Options) ->
Ret when
StoreId :: khepri:store_id(),
QueryFun :: query_fun(),
QueryType :: leader | consistent,
Options :: khepri:query_options(),
Ret :: any().

process_non_local_query(
StoreId, QueryFun, QueryType, #{timeout := Timeout} = Options)
when QueryType =:= leader orelse
QueryType =:= consistent ->
T0 = khepri_utils:start_timeout_window(Timeout),
LeaderId = khepri_cluster:get_cached_leader(StoreId),
RaServer = use_leader_or_local_ra_server(StoreId, LeaderId),
Ret = case QueryType of
consistent -> ra:consistent_query(RaServer, QueryFun, Timeout);
_ -> ra:leader_query(RaServer, QueryFun, Options)
end,
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
%% TODO: If the consistent query times out in the context of
%% `QueryType=compromise`, should we retry with a local query to
%% never block the query and let the caller continue?
Options1 = Options#{timeout => NewTimeout},
process_query_response(
StoreId, RaServer, LeaderId =/= undefined, QueryFun, QueryType,
Options1, Ret).

-spec process_query_response(
StoreId, RaServer, IsLeader, QueryFun, QueryType, Options,
Response) ->
Ret when
StoreId :: khepri:store_id(),
RaServer :: ra:server_id(),
IsLeader :: boolean(),
QueryFun :: query_fun(),
QueryType :: local | leader | consistent,
Options :: khepri:query_options(),
Response :: {ok, {RaIdxTerm, Ret}, NewLeaderId} |
{ok, Ret, NewLeaderId} |
{error, any()} |
{timeout, ra:server_id()},
RaIdxTerm :: ra:idxterm(),
NewLeaderId :: ra:server_id(),
Ret :: any() | khepri:error(any()).

process_query_response(
StoreId, RaServer, IsLeader, _QueryFun, consistent, _Options,
{ok, Ret, NewLeaderId}) ->
case IsLeader of
true ->
khepri_cluster:cache_leader_if_changed(
StoreId, RaServer, NewLeaderId);
false ->
khepri_cluster:cache_leader(StoreId, NewLeaderId)
end,
just_did_consistent_call(StoreId),
?raise_exception_if_any(Ret);
process_query_response(
StoreId, RaServer, IsLeader, _QueryFun, _QueryType, _Options,
{ok, {_RaIdxTerm, Ret}, NewLeaderId}) ->
case IsLeader of
true ->
khepri_cluster:cache_leader_if_changed(
StoreId, RaServer, NewLeaderId);
false ->
khepri_cluster:cache_leader(StoreId, NewLeaderId)
end,
?raise_exception_if_any(Ret);
process_query_response(
_StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Options,
{timeout, _LeaderId}) ->
{error, timeout};
process_query_response(
StoreId, _RaServer, true = _IsLeader, QueryFun, QueryType,
#{timeout := Timeout} = Options, {error, Reason})
when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Reason == shutdown) ->
%% The cached leader is no more. We simply clear the cache
%% entry and retry. It may time out eventually.
khepri_cluster:clear_cached_leader(StoreId),
process_non_local_query(StoreId, QueryFun, QueryType, Options);
process_query_response(
StoreId, RaServer, false = _IsLeader, QueryFun, QueryType,
#{timeout := Timeout} = Options,
{error, Reason} = Error)
when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Reason == shutdown) ->
case khepri_utils:is_ra_server_alive(RaServer) of
true ->
%% The follower doesn't know about the new leader yet. Retry again
%% after waiting a bit.
NewTimeout = khepri_utils:sleep(?NOPROC_RETRY_INTERVAL, Timeout),
Options1 = Options#{timeout => NewTimeout},
process_non_local_query(StoreId, QueryFun, QueryType, Options1);
false ->
case ra:local_query(LocalServerId, QueryFun, Options) of
{ok, {_RaIdxTerm, Ret}, NewLeaderId} ->
khepri_cluster:cache_leader(StoreId, NewLeaderId),
?raise_exception_if_any(Ret);
{timeout, _LeaderId} ->
{error, timeout};
{error, _} = Error ->
Error
end;
process_query_response(
_StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Options,
{error, _} = Error) ->
Error.

-spec select_query_type(StoreId, Options) -> QueryType when
StoreId :: khepri:store_id(),
Options :: khepri:query_options(),
QueryType :: local | leader | consistent.
%% @doc Selects the query type depending on what the caller favors.
%%
%% @private

select_query_type(StoreId, #{favor := Favor}) ->
do_select_query_type(StoreId, Favor);
select_query_type(StoreId, _Options) ->
do_select_query_type(StoreId, compromise).

-define(
LAST_CONSISTENT_CALL_TS_REF(StoreId),
{khepri, last_consistent_call_ts_ref, StoreId}).

do_select_query_type(StoreId, compromise) ->
Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId),
Idx = 1,
case persistent_term:get(Key, undefined) of
AtomicsRef when AtomicsRef =/= undefined ->
%% We verify when was the last time we did a command or a
%% consistent query (i.e. we made sure there was an active leader
%% in a cluster with a quorum of active members).
%%
%% If the last one was more than 10 seconds ago, we force a
%% consistent query to verify the cluster health at the same time.
%% Otherwise, we select a leader query which is a good balance
%% between freshness and latency.
Last = atomics:get(AtomicsRef, Idx),
Now = erlang:monotonic_time(),
Elapsed = erlang:convert_time_unit(Now - Last, native, second),
ConsistentAgainAfter = application:get_env(
khepri,
consistent_query_interval_in_compromise,
10),
if
Elapsed < ConsistentAgainAfter -> leader;
true -> consistent
end;
undefined ->
consistent
end;
do_select_query_type(_StoreId, consistency) ->
consistent;
do_select_query_type(_StoreId, low_latency) ->
local.

just_did_consistent_call(StoreId) ->
%% We record the timestamp of the successful command or consistent query
%% which just returned. This timestamp is used in the `compromise' query
%% strategy to perform a consistent query from time to time, and leader
%% queries the rest of the time.
%%
%% We store the system time as seconds in an `atomics' structure. The
%% reference of that structure is stored in a persistent term. We don't
%% store the timestamp directly in a persistent term because it is not
%% suited for frequent writes. This way, we store the `atomics' reference
%% once and update the `atomics' afterwards.
Idx = 1,
AtomicsRef = case get_last_consistent_call_atomics(StoreId) of
Ref when Ref =/= undefined ->
Ref;
undefined ->
Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId),
Ref = atomics:new(1, []),
persistent_term:put(Key, Ref),
Ref
end,
Now = erlang:monotonic_time(),
ok = atomics:put(AtomicsRef, Idx, Now),
ok.

get_last_consistent_call_atomics(StoreId) ->
Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId),
persistent_term:get(Key, undefined).
end.

-spec add_applied_condition(StoreId, Options) -> NewOptions when
StoreId :: khepri:store_id(),
Expand Down Expand Up @@ -1261,8 +1081,7 @@ use_leader_or_local_ra_server(StoreId, undefined) ->
%%
%% @private

clear_cache(StoreId) ->
_ = persistent_term:erase(?LAST_CONSISTENT_CALL_TS_REF(StoreId)),
clear_cache(_StoreId) ->
ok.

%% -------------------------------------------------------------------
Expand Down
Loading

0 comments on commit b66fc4f

Please sign in to comment.