From 8bb85acb661ed8fe8280069a3a428403516ad648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 21 Jun 2024 10:25:36 +0200 Subject: [PATCH] khepri_machine: Queries default to local ones [Why] Before, Khepri tried something smart by mixing consistent and leader queries by default. It exposed the `favor` option to let the caller influence the type of query. Leader queries are a good compromise in terms of latency and freshness of the queried data. However, they come with a big caveat: the query function is executed on the leader, not on the local node (if the local node is a follower). There are various potential issues with the remote execution of query functions: * The function reference might be invalid on the remote node because of a different in the version of Erlang/OTP. * The function reference might be invalid on the remote node because the module hosting the function is not the same. * The function reference might be invalid on the remote node because the module hosting the function is missing. * Using an exported MFA instead of a function reference doesn't solve the problem because the remote copy of that MFA may still be missing or simply be incompatible (different code/behavior). This might be even more difficult to debug. The problem is the same with consistent queries. [How] The only way to be sure that the query function behaves as the caller expects is to run the local copy of that function. Therefore, we must always use a local query. The `favor` option is changed to accept `low_latency` or `consistency`. A local query will always be used behind the scene. However if the caller favors consistency, the query will leverage the fence mechanism introduced in a previous commit. If the caller favors the low latency, there is a risk that the query runs against out-of-date data. That is why a previous commit changed the default behavior of synchronous updates so that the call returns only when it was applied locally. This should increaset the chance that the query works on fresh data. Therefore, with the new default beraviors and options in this commit and the previous ones, we ensure that a query will work with the local query function and that it will be executed against the local up-to-date state. --- src/khepri.erl | 35 ++-- src/khepri_machine.erl | 249 ++++--------------------- test/favor_option.erl | 404 ++++++++++++++++------------------------- 3 files changed, 209 insertions(+), 479 deletions(-) diff --git a/src/khepri.erl b/src/khepri.erl index 46f17997..da06f274 100644 --- a/src/khepri.erl +++ b/src/khepri.erl @@ -290,27 +290,29 @@ %% except for R/W transactions. %% --type favor_option() :: consistency | compromise | low_latency. +-type favor_option() :: consistency | low_latency. %% Option to indicate where to put the cursor between freshness of the %% returned data and low latency of queries. %% %% Values are: %% +%% +%% As described above, queries are always evaluated locally by the cluster +%% member that gets the call. The reason Ra's leader and consistent queries +%% are not exposed is that the remote execution of the query function may fail +%% in subtle on non-subtle ways. For instance, the remote node might run a +%% different version of Erlang or Khepri. -type query_options() :: #{condition => ra:query_condition(), timeout => timeout(), @@ -324,6 +326,9 @@ %%
  • `favor' indicates where to put the cursor between freshness of the %% returned data and low latency of queries; see {@link favor_option()}.
  • %% +%% +%% `favor' computes a `condition' internally. Therefore if both options are +%% set, `condition' takes precedence and `favor' is ignored. -type tree_options() :: #{expect_specific_node => boolean(), props_to_return => [payload_version | diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index f6338afc..bb9dde2c 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -105,8 +105,7 @@ -ifdef(TEST). -export([make_virgin_state/1, convert_state/3, - set_tree/2, - get_last_consistent_call_atomics/1]). + set_tree/2]). -endif. -compile({no_auto_import, [apply/3]}). @@ -269,17 +268,12 @@ fold(StoreId, PathPattern, Fun, Acc, Options) %% @returns `ok' or an `{error, Reason}' tuple. fence(StoreId, Timeout) -> - Options = #{timeout => Timeout}, - case add_applied_condition(StoreId, Options) of - {ok, Options1} -> - QueryFun = fun erlang:is_tuple/1, - Options2 = Options1#{favor => low_latency}, - case process_query(StoreId, QueryFun, Options2) of - true -> ok; - Other when Other =/= false -> Other - end; - {error, _} = Error -> - Error + QueryFun = fun erlang:is_tuple/1, + Options = #{favor => consistency, + timeout => Timeout}, + case process_query(StoreId, QueryFun, Options) of + true -> ok; + Other when Other =/= false -> Other end. -spec put(StoreId, PathPattern, Payload, Options) -> Ret when @@ -857,7 +851,6 @@ do_process_sync_command(StoreId, Command, Options) -> {ok, Ret, NewLeaderId} -> khepri_cluster:cache_leader_if_changed( StoreId, LeaderId, NewLeaderId), - just_did_consistent_call(StoreId), ?raise_exception_if_any(Ret); {timeout, _LeaderId} -> {error, timeout}; @@ -948,211 +941,38 @@ select_command_type(#{async := {Correlation, Priority}}) %% %% @private +process_query(StoreId, QueryFun, #{condition := _} = Options) -> + %% `condition' takes precedence over `favor'. + Options1 = maps:remove(favor, Options), + process_query1(StoreId, QueryFun, Options1); process_query(StoreId, QueryFun, Options) -> - QueryType = select_query_type(StoreId, Options), + Favor = maps:get(favor, Options, low_latency), Timeout = get_timeout(Options), - Options1 = Options#{timeout => Timeout}, - case QueryType of - local -> - process_local_query(StoreId, QueryFun, Options1); - _ -> - process_non_local_query(StoreId, QueryFun, QueryType, Options1) + Options1 = maps:remove(favor, Options), + Options2 = Options1#{timeout => Timeout}, + case Favor of + low_latency -> + process_query1(StoreId, QueryFun, Options2); + consistency -> + case add_applied_condition(StoreId, Options2) of + {ok, Options3} -> + process_query1(StoreId, QueryFun, Options3); + {error, _} = Error -> + Error + end end. --spec process_local_query(StoreId, QueryFun, Options) -> Ret when - StoreId :: khepri:store_id(), - QueryFun :: query_fun(), - Options :: khepri:query_options(), - Ret :: any(). - -process_local_query(StoreId, QueryFun, Options) -> +process_query1(StoreId, QueryFun, Options) -> LocalServerId = {StoreId, node()}, - Ret = ra:local_query(LocalServerId, QueryFun, Options), - process_query_response( - StoreId, LocalServerId, false, QueryFun, local, Options, Ret). - --spec process_non_local_query(StoreId, QueryFun, QueryType, Options) -> - Ret when - StoreId :: khepri:store_id(), - QueryFun :: query_fun(), - QueryType :: leader | consistent, - Options :: khepri:query_options(), - Ret :: any(). - -process_non_local_query( - StoreId, QueryFun, QueryType, #{timeout := Timeout} = Options) - when QueryType =:= leader orelse - QueryType =:= consistent -> - T0 = khepri_utils:start_timeout_window(Timeout), - LeaderId = khepri_cluster:get_cached_leader(StoreId), - RaServer = use_leader_or_local_ra_server(StoreId, LeaderId), - Ret = case QueryType of - consistent -> ra:consistent_query(RaServer, QueryFun, Timeout); - _ -> ra:leader_query(RaServer, QueryFun, Options) - end, - NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), - %% TODO: If the consistent query times out in the context of - %% `QueryType=compromise`, should we retry with a local query to - %% never block the query and let the caller continue? - Options1 = Options#{timeout => NewTimeout}, - process_query_response( - StoreId, RaServer, LeaderId =/= undefined, QueryFun, QueryType, - Options1, Ret). - --spec process_query_response( - StoreId, RaServer, IsLeader, QueryFun, QueryType, Options, - Response) -> - Ret when - StoreId :: khepri:store_id(), - RaServer :: ra:server_id(), - IsLeader :: boolean(), - QueryFun :: query_fun(), - QueryType :: local | leader | consistent, - Options :: khepri:query_options(), - Response :: {ok, {RaIdxTerm, Ret}, NewLeaderId} | - {ok, Ret, NewLeaderId} | - {error, any()} | - {timeout, ra:server_id()}, - RaIdxTerm :: ra:idxterm(), - NewLeaderId :: ra:server_id(), - Ret :: any() | khepri:error(any()). - -process_query_response( - StoreId, RaServer, IsLeader, _QueryFun, consistent, _Options, - {ok, Ret, NewLeaderId}) -> - case IsLeader of - true -> - khepri_cluster:cache_leader_if_changed( - StoreId, RaServer, NewLeaderId); - false -> - khepri_cluster:cache_leader(StoreId, NewLeaderId) - end, - just_did_consistent_call(StoreId), - ?raise_exception_if_any(Ret); -process_query_response( - StoreId, RaServer, IsLeader, _QueryFun, _QueryType, _Options, - {ok, {_RaIdxTerm, Ret}, NewLeaderId}) -> - case IsLeader of - true -> - khepri_cluster:cache_leader_if_changed( - StoreId, RaServer, NewLeaderId); - false -> - khepri_cluster:cache_leader(StoreId, NewLeaderId) - end, - ?raise_exception_if_any(Ret); -process_query_response( - _StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Options, - {timeout, _LeaderId}) -> - {error, timeout}; -process_query_response( - StoreId, _RaServer, true = _IsLeader, QueryFun, QueryType, - #{timeout := Timeout} = Options, {error, Reason}) - when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso - (Reason == noproc orelse Reason == nodedown orelse - Reason == shutdown) -> - %% The cached leader is no more. We simply clear the cache - %% entry and retry. It may time out eventually. - khepri_cluster:clear_cached_leader(StoreId), - process_non_local_query(StoreId, QueryFun, QueryType, Options); -process_query_response( - StoreId, RaServer, false = _IsLeader, QueryFun, QueryType, - #{timeout := Timeout} = Options, - {error, Reason} = Error) - when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso - (Reason == noproc orelse Reason == nodedown orelse - Reason == shutdown) -> - case khepri_utils:is_ra_server_alive(RaServer) of - true -> - %% The follower doesn't know about the new leader yet. Retry again - %% after waiting a bit. - NewTimeout = khepri_utils:sleep(?NOPROC_RETRY_INTERVAL, Timeout), - Options1 = Options#{timeout => NewTimeout}, - process_non_local_query(StoreId, QueryFun, QueryType, Options1); - false -> + case ra:local_query(LocalServerId, QueryFun, Options) of + {ok, {_RaIdxTerm, Ret}, NewLeaderId} -> + khepri_cluster:cache_leader(StoreId, NewLeaderId), + ?raise_exception_if_any(Ret); + {timeout, _LeaderId} -> + {error, timeout}; + {error, _} = Error -> Error - end; -process_query_response( - _StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Options, - {error, _} = Error) -> - Error. - --spec select_query_type(StoreId, Options) -> QueryType when - StoreId :: khepri:store_id(), - Options :: khepri:query_options(), - QueryType :: local | leader | consistent. -%% @doc Selects the query type depending on what the caller favors. -%% -%% @private - -select_query_type(StoreId, #{favor := Favor}) -> - do_select_query_type(StoreId, Favor); -select_query_type(StoreId, _Options) -> - do_select_query_type(StoreId, compromise). - --define( - LAST_CONSISTENT_CALL_TS_REF(StoreId), - {khepri, last_consistent_call_ts_ref, StoreId}). - -do_select_query_type(StoreId, compromise) -> - Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId), - Idx = 1, - case persistent_term:get(Key, undefined) of - AtomicsRef when AtomicsRef =/= undefined -> - %% We verify when was the last time we did a command or a - %% consistent query (i.e. we made sure there was an active leader - %% in a cluster with a quorum of active members). - %% - %% If the last one was more than 10 seconds ago, we force a - %% consistent query to verify the cluster health at the same time. - %% Otherwise, we select a leader query which is a good balance - %% between freshness and latency. - Last = atomics:get(AtomicsRef, Idx), - Now = erlang:monotonic_time(), - Elapsed = erlang:convert_time_unit(Now - Last, native, second), - ConsistentAgainAfter = application:get_env( - khepri, - consistent_query_interval_in_compromise, - 10), - if - Elapsed < ConsistentAgainAfter -> leader; - true -> consistent - end; - undefined -> - consistent - end; -do_select_query_type(_StoreId, consistency) -> - consistent; -do_select_query_type(_StoreId, low_latency) -> - local. - -just_did_consistent_call(StoreId) -> - %% We record the timestamp of the successful command or consistent query - %% which just returned. This timestamp is used in the `compromise' query - %% strategy to perform a consistent query from time to time, and leader - %% queries the rest of the time. - %% - %% We store the system time as seconds in an `atomics' structure. The - %% reference of that structure is stored in a persistent term. We don't - %% store the timestamp directly in a persistent term because it is not - %% suited for frequent writes. This way, we store the `atomics' reference - %% once and update the `atomics' afterwards. - Idx = 1, - AtomicsRef = case get_last_consistent_call_atomics(StoreId) of - Ref when Ref =/= undefined -> - Ref; - undefined -> - Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId), - Ref = atomics:new(1, []), - persistent_term:put(Key, Ref), - Ref - end, - Now = erlang:monotonic_time(), - ok = atomics:put(AtomicsRef, Idx, Now), - ok. - -get_last_consistent_call_atomics(StoreId) -> - Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId), - persistent_term:get(Key, undefined). + end. -spec add_applied_condition(StoreId, Options) -> NewOptions when StoreId :: khepri:store_id(), @@ -1261,8 +1081,7 @@ use_leader_or_local_ra_server(StoreId, undefined) -> %% %% @private -clear_cache(StoreId) -> - _ = persistent_term:erase(?LAST_CONSISTENT_CALL_TS_REF(StoreId)), +clear_cache(_StoreId) -> ok. %% ------------------------------------------------------------------- diff --git a/test/favor_option.erl b/test/favor_option.erl index e1899294..6d67c79b 100644 --- a/test/favor_option.erl +++ b/test/favor_option.erl @@ -14,104 +14,43 @@ -include("src/khepri_error.hrl"). -include("test/helpers.hrl"). -favor_compromise_in_get_test_() -> - {setup, - fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end, - fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end, - [?_test( - begin - ?assertEqual( - undefined, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ?assertMatch( - {error, ?khepri_error(node_not_found, _)}, - khepri:get( - ?FUNCTION_NAME, [foo], #{favor => compromise})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - Ref = khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME), - TS1 = atomics:get(Ref, 1), - ?assertNotEqual(0, TS1), - - timer:sleep(1000), - - ?assertMatch( - {error, ?khepri_error(node_not_found, _)}, - khepri:get( - ?FUNCTION_NAME, [foo], #{favor => compromise})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - TS2 = atomics:get(Ref, 1), - ?assertEqual(TS1, TS2), - - timer:sleep(2000), - - ?assertMatch( - {error, ?khepri_error(node_not_found, _)}, - khepri:get( - ?FUNCTION_NAME, [foo], #{favor => compromise})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - TS3 = atomics:get(Ref, 1), - ?assertNotEqual(TS1, TS3), - - ok - end) - ]}. - favor_consistency_in_get_test_() -> {setup, fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end, fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end, [?_test( begin - ?assertEqual( - undefined, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ?assertMatch( - {error, ?khepri_error(node_not_found, _)}, - khepri:get( - ?FUNCTION_NAME, [foo], #{favor => consistency})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - Ref = khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME), - TS1 = atomics:get(Ref, 1), - ?assertNotEqual(0, TS1), - - timer:sleep(1000), - - ?assertMatch( - {error, ?khepri_error(node_not_found, _)}, + List = [a, b, c, d, e, f, g, h, i, j], + lists:foreach( + fun(I) -> + ?assertEqual( + {error, + ?khepri_error( + node_not_found, #{node_name => I, + node_path => [I], + node_is_target => true})}, + khepri:get( + ?FUNCTION_NAME, [I], #{favor => consistency})) + end, List), + lists:foreach( + fun(I) -> + ?assertEqual( + ok, + khepri:put( + ?FUNCTION_NAME, [I], I, #{async => true})) + end, List), + + ?assertEqual( + {ok, hd(List)}, khepri:get( - ?FUNCTION_NAME, [foo], #{favor => consistency})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - TS2 = atomics:get(Ref, 1), - ?assertNotEqual(TS1, TS2), - - ok + ?FUNCTION_NAME, [hd(List)], #{favor => consistency})), + + lists:foreach( + fun(I) -> + ?assertEqual( + {ok, I}, + khepri:get(?FUNCTION_NAME, [I])) + end, tl(List)) end) ]}. @@ -121,191 +60,158 @@ favor_low_latency_in_get_test_() -> fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end, [?_test( begin - ?assertEqual( - undefined, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ?assertMatch( - {error, ?khepri_error(node_not_found, _)}, - khepri:get( - ?FUNCTION_NAME, [foo], #{favor => low_latency})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ?assertMatch( - {error, ?khepri_error(node_not_found, _)}, - khepri:get( - ?FUNCTION_NAME, [foo], #{favor => low_latency})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ok + List = [a, b, c, d, e, f, g, h, i, j], + lists:foreach( + fun(I) -> + ?assertEqual( + {error, + ?khepri_error( + node_not_found, #{node_name => I, + node_path => [I], + node_is_target => true})}, + khepri:get( + ?FUNCTION_NAME, [I], #{favor => low_latency})) + end, List), + lists:foreach( + fun(I) -> + ?assertEqual( + ok, + khepri:put( + ?FUNCTION_NAME, [I], I, #{async => true})) + end, List), + + lists:foreach( + fun(I) -> + try + ?assertEqual( + {ok, I}, + khepri:get(?FUNCTION_NAME, [I])) + catch + error:{assertEqual, Props} -> + ?assertEqual( + {error, + ?khepri_error( + node_not_found, + #{node_name => I, + node_path => [I], + node_is_target => true})}, + proplists:get_value(value, Props)) + end + end, List) end) ]}. -favor_compromise_in_transaction_test_() -> +favor_consistency_in_transaction_test_() -> {setup, fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end, fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end, [?_test( begin - Fun = fun() -> khepri_tx:get([foo]) end, - - ?assertEqual( - undefined, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ?assertMatch( - {ok, {error, ?khepri_error(node_not_found, _)}}, - khepri:transaction( - ?FUNCTION_NAME, Fun, #{favor => compromise})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - Ref = khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME), - TS1 = atomics:get(Ref, 1), - ?assertNotEqual(0, TS1), - - timer:sleep(1000), - - ?assertMatch( - {ok, {error, ?khepri_error(node_not_found, _)}}, - khepri:transaction( - ?FUNCTION_NAME, Fun, #{favor => compromise})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - TS2 = atomics:get(Ref, 1), - ?assertEqual(TS1, TS2), - - timer:sleep(2000), - - ?assertMatch( - {ok, {error, ?khepri_error(node_not_found, _)}}, + List = [a, b, c, d, e, f, g, h, i, j], + Fun = fun(I) -> khepri_tx:get([I]) end, + lists:foreach( + fun(I) -> + ?assertEqual( + {ok, + {error, + ?khepri_error( + node_not_found, #{node_name => I, + node_path => [I], + node_is_target => true})}}, + khepri:transaction( + ?FUNCTION_NAME, Fun, [I], #{favor => consistency})) + end, List), + lists:foreach( + fun(I) -> + ?assertEqual( + ok, + khepri:put( + ?FUNCTION_NAME, [I], I, #{async => true})) + end, List), + + ?assertEqual( + {ok, {ok, hd(List)}}, khepri:transaction( - ?FUNCTION_NAME, Fun, #{favor => compromise})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - TS3 = atomics:get(Ref, 1), - ?assertNotEqual(TS1, TS3), - - ok + ?FUNCTION_NAME, Fun, [hd(List)], #{favor => consistency})), + + lists:foreach( + fun(I) -> + ?assertEqual( + {ok, {ok, I}}, + khepri:transaction( + ?FUNCTION_NAME, Fun, [I], #{favor => consistency})) + end, tl(List)) end) ]}. -favor_consistency_in_transaction_test_() -> +favor_low_latency_in_transaction_test_() -> {setup, fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end, fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end, [?_test( begin - Fun = fun() -> khepri_tx:get([foo]) end, - - ?assertEqual( - undefined, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ?assertMatch( - {ok, {error, ?khepri_error(node_not_found, _)}}, - khepri:transaction( - ?FUNCTION_NAME, Fun, #{favor => consistency})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - Ref = khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME), - TS1 = atomics:get(Ref, 1), - ?assertNotEqual(0, TS1), - - timer:sleep(1000), - - ?assertMatch( - {ok, {error, ?khepri_error(node_not_found, _)}}, - khepri:transaction( - ?FUNCTION_NAME, Fun, #{favor => consistency})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - TS2 = atomics:get(Ref, 1), - ?assertNotEqual(TS1, TS2), - - ok + List = [a, b, c, d, e, f, g, h, i, j], + Fun = fun(I) -> khepri_tx:get([I]) end, + lists:foreach( + fun(I) -> + ?assertEqual( + {ok, + {error, + ?khepri_error( + node_not_found, #{node_name => I, + node_path => [I], + node_is_target => true})}}, + khepri:transaction( + ?FUNCTION_NAME, Fun, [I], #{favor => low_latency})) + end, List), + lists:foreach( + fun(I) -> + ?assertEqual( + ok, + khepri:put( + ?FUNCTION_NAME, [I], I, #{async => true})) + end, List), + + lists:foreach( + fun(I) -> + try + ?assertEqual( + {ok, {ok, I}}, + khepri:transaction( + ?FUNCTION_NAME, Fun, [I], + #{favor => low_latency})) + catch + error:{assertEqual, Props} -> + ?assertEqual( + {ok, + {error, + ?khepri_error( + node_not_found, + #{node_name => I, + node_path => [I], + node_is_target => true})}}, + proplists:get_value(value, Props)) + end + end, List) end) ]}. -favor_low_latency_in_transaction_test_() -> +condition_option_takes_precedence_test_() -> {setup, fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end, fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end, [?_test( begin - Fun = fun() -> khepri_tx:get([foo]) end, - ?assertEqual( - undefined, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ?assertMatch( - {ok, {error, ?khepri_error(node_not_found, _)}}, - khepri:transaction( - ?FUNCTION_NAME, Fun, #{favor => low_latency})), + ok, + khepri:put(?FUNCTION_NAME, [foo], value)), ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ?assertMatch( - {ok, {error, ?khepri_error(node_not_found, _)}}, - khepri:transaction( - ?FUNCTION_NAME, Fun, #{favor => low_latency})), - - ?assertEqual( - {?FUNCTION_NAME, node()}, - khepri_cluster:get_cached_leader(?FUNCTION_NAME)), - ?assertEqual( - undefined, - khepri_machine:get_last_consistent_call_atomics( - ?FUNCTION_NAME)), - - ok + {error, timeout}, + khepri:get( + ?FUNCTION_NAME, [foo], + #{condition => {applied, {10000, 5}}, + favor => consistency, + timeout => 1000})) end) ]}.