diff --git a/src/khepri.erl b/src/khepri.erl
index 46f17997..da06f274 100644
--- a/src/khepri.erl
+++ b/src/khepri.erl
@@ -290,27 +290,29 @@
%% except for R/W transactions.
%%
--type favor_option() :: consistency | compromise | low_latency.
+-type favor_option() :: consistency | low_latency.
%% Option to indicate where to put the cursor between freshness of the
%% returned data and low latency of queries.
%%
%% Values are:
%%
-%% - `consistent' means that a "consistent query" will be used in Ra. It
-%% will return the most up-to-date piece of data the cluster agreed on. Note
-%% that it could block and eventually time out if there is no quorum in the Ra
-%% cluster.
-%% - `compromise' performs "leader queries" most of the time to reduce
-%% latency, but uses "consistent queries" every 10 seconds to verify that the
-%% cluster is healthy on a regular basis. It should be faster but may block
-%% and time out like `consistent' and still return slightly out-of-date
-%% data.
-%% - `low_latency' means that "local queries" are used exclusively. They are
-%% the fastest and have the lowest latency. However, the returned data is
-%% whatever the local Ra server has. It could be out-of-date if it has
-%% troubles keeping up with the Ra cluster. The chance of blocking and timing
-%% out is very small.
+%% - `low_latency' means that a local query is used. It is the fastest and
+%% have the lowest latency. However, the returned data is whatever the local Ra
+%% server has. It could be out-of-date if the local Ra server did not get or
+%% applied the latest updates yet. The chance of blocking and timing out is
+%% very small.
+%% - `consistency' means that a local query is used. However the query uses
+%% the fence mechanism to ensure that the previous updates from the calling
+%% process are applied locally before the query is evaluated. It will return
+%% the most up-to-date piece of data the cluster agreed on. Note that it could
+%% block and eventually time out if there is no quorum in the Ra cluster.
%%
+%%
+%% As described above, queries are always evaluated locally by the cluster
+%% member that gets the call. The reason Ra's leader and consistent queries
+%% are not exposed is that the remote execution of the query function may fail
+%% in subtle on non-subtle ways. For instance, the remote node might run a
+%% different version of Erlang or Khepri.
-type query_options() :: #{condition => ra:query_condition(),
timeout => timeout(),
@@ -324,6 +326,9 @@
%% `favor' indicates where to put the cursor between freshness of the
%% returned data and low latency of queries; see {@link favor_option()}.
%%
+%%
+%% `favor' computes a `condition' internally. Therefore if both options are
+%% set, `condition' takes precedence and `favor' is ignored.
-type tree_options() :: #{expect_specific_node => boolean(),
props_to_return => [payload_version |
diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl
index f6338afc..bb9dde2c 100644
--- a/src/khepri_machine.erl
+++ b/src/khepri_machine.erl
@@ -105,8 +105,7 @@
-ifdef(TEST).
-export([make_virgin_state/1,
convert_state/3,
- set_tree/2,
- get_last_consistent_call_atomics/1]).
+ set_tree/2]).
-endif.
-compile({no_auto_import, [apply/3]}).
@@ -269,17 +268,12 @@ fold(StoreId, PathPattern, Fun, Acc, Options)
%% @returns `ok' or an `{error, Reason}' tuple.
fence(StoreId, Timeout) ->
- Options = #{timeout => Timeout},
- case add_applied_condition(StoreId, Options) of
- {ok, Options1} ->
- QueryFun = fun erlang:is_tuple/1,
- Options2 = Options1#{favor => low_latency},
- case process_query(StoreId, QueryFun, Options2) of
- true -> ok;
- Other when Other =/= false -> Other
- end;
- {error, _} = Error ->
- Error
+ QueryFun = fun erlang:is_tuple/1,
+ Options = #{favor => consistency,
+ timeout => Timeout},
+ case process_query(StoreId, QueryFun, Options) of
+ true -> ok;
+ Other when Other =/= false -> Other
end.
-spec put(StoreId, PathPattern, Payload, Options) -> Ret when
@@ -857,7 +851,6 @@ do_process_sync_command(StoreId, Command, Options) ->
{ok, Ret, NewLeaderId} ->
khepri_cluster:cache_leader_if_changed(
StoreId, LeaderId, NewLeaderId),
- just_did_consistent_call(StoreId),
?raise_exception_if_any(Ret);
{timeout, _LeaderId} ->
{error, timeout};
@@ -948,211 +941,38 @@ select_command_type(#{async := {Correlation, Priority}})
%%
%% @private
+process_query(StoreId, QueryFun, #{condition := _} = Options) ->
+ %% `condition' takes precedence over `favor'.
+ Options1 = maps:remove(favor, Options),
+ process_query1(StoreId, QueryFun, Options1);
process_query(StoreId, QueryFun, Options) ->
- QueryType = select_query_type(StoreId, Options),
+ Favor = maps:get(favor, Options, low_latency),
Timeout = get_timeout(Options),
- Options1 = Options#{timeout => Timeout},
- case QueryType of
- local ->
- process_local_query(StoreId, QueryFun, Options1);
- _ ->
- process_non_local_query(StoreId, QueryFun, QueryType, Options1)
+ Options1 = maps:remove(favor, Options),
+ Options2 = Options1#{timeout => Timeout},
+ case Favor of
+ low_latency ->
+ process_query1(StoreId, QueryFun, Options2);
+ consistency ->
+ case add_applied_condition(StoreId, Options2) of
+ {ok, Options3} ->
+ process_query1(StoreId, QueryFun, Options3);
+ {error, _} = Error ->
+ Error
+ end
end.
--spec process_local_query(StoreId, QueryFun, Options) -> Ret when
- StoreId :: khepri:store_id(),
- QueryFun :: query_fun(),
- Options :: khepri:query_options(),
- Ret :: any().
-
-process_local_query(StoreId, QueryFun, Options) ->
+process_query1(StoreId, QueryFun, Options) ->
LocalServerId = {StoreId, node()},
- Ret = ra:local_query(LocalServerId, QueryFun, Options),
- process_query_response(
- StoreId, LocalServerId, false, QueryFun, local, Options, Ret).
-
--spec process_non_local_query(StoreId, QueryFun, QueryType, Options) ->
- Ret when
- StoreId :: khepri:store_id(),
- QueryFun :: query_fun(),
- QueryType :: leader | consistent,
- Options :: khepri:query_options(),
- Ret :: any().
-
-process_non_local_query(
- StoreId, QueryFun, QueryType, #{timeout := Timeout} = Options)
- when QueryType =:= leader orelse
- QueryType =:= consistent ->
- T0 = khepri_utils:start_timeout_window(Timeout),
- LeaderId = khepri_cluster:get_cached_leader(StoreId),
- RaServer = use_leader_or_local_ra_server(StoreId, LeaderId),
- Ret = case QueryType of
- consistent -> ra:consistent_query(RaServer, QueryFun, Timeout);
- _ -> ra:leader_query(RaServer, QueryFun, Options)
- end,
- NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
- %% TODO: If the consistent query times out in the context of
- %% `QueryType=compromise`, should we retry with a local query to
- %% never block the query and let the caller continue?
- Options1 = Options#{timeout => NewTimeout},
- process_query_response(
- StoreId, RaServer, LeaderId =/= undefined, QueryFun, QueryType,
- Options1, Ret).
-
--spec process_query_response(
- StoreId, RaServer, IsLeader, QueryFun, QueryType, Options,
- Response) ->
- Ret when
- StoreId :: khepri:store_id(),
- RaServer :: ra:server_id(),
- IsLeader :: boolean(),
- QueryFun :: query_fun(),
- QueryType :: local | leader | consistent,
- Options :: khepri:query_options(),
- Response :: {ok, {RaIdxTerm, Ret}, NewLeaderId} |
- {ok, Ret, NewLeaderId} |
- {error, any()} |
- {timeout, ra:server_id()},
- RaIdxTerm :: ra:idxterm(),
- NewLeaderId :: ra:server_id(),
- Ret :: any() | khepri:error(any()).
-
-process_query_response(
- StoreId, RaServer, IsLeader, _QueryFun, consistent, _Options,
- {ok, Ret, NewLeaderId}) ->
- case IsLeader of
- true ->
- khepri_cluster:cache_leader_if_changed(
- StoreId, RaServer, NewLeaderId);
- false ->
- khepri_cluster:cache_leader(StoreId, NewLeaderId)
- end,
- just_did_consistent_call(StoreId),
- ?raise_exception_if_any(Ret);
-process_query_response(
- StoreId, RaServer, IsLeader, _QueryFun, _QueryType, _Options,
- {ok, {_RaIdxTerm, Ret}, NewLeaderId}) ->
- case IsLeader of
- true ->
- khepri_cluster:cache_leader_if_changed(
- StoreId, RaServer, NewLeaderId);
- false ->
- khepri_cluster:cache_leader(StoreId, NewLeaderId)
- end,
- ?raise_exception_if_any(Ret);
-process_query_response(
- _StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Options,
- {timeout, _LeaderId}) ->
- {error, timeout};
-process_query_response(
- StoreId, _RaServer, true = _IsLeader, QueryFun, QueryType,
- #{timeout := Timeout} = Options, {error, Reason})
- when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso
- (Reason == noproc orelse Reason == nodedown orelse
- Reason == shutdown) ->
- %% The cached leader is no more. We simply clear the cache
- %% entry and retry. It may time out eventually.
- khepri_cluster:clear_cached_leader(StoreId),
- process_non_local_query(StoreId, QueryFun, QueryType, Options);
-process_query_response(
- StoreId, RaServer, false = _IsLeader, QueryFun, QueryType,
- #{timeout := Timeout} = Options,
- {error, Reason} = Error)
- when QueryType =/= local andalso ?HAS_TIME_LEFT(Timeout) andalso
- (Reason == noproc orelse Reason == nodedown orelse
- Reason == shutdown) ->
- case khepri_utils:is_ra_server_alive(RaServer) of
- true ->
- %% The follower doesn't know about the new leader yet. Retry again
- %% after waiting a bit.
- NewTimeout = khepri_utils:sleep(?NOPROC_RETRY_INTERVAL, Timeout),
- Options1 = Options#{timeout => NewTimeout},
- process_non_local_query(StoreId, QueryFun, QueryType, Options1);
- false ->
+ case ra:local_query(LocalServerId, QueryFun, Options) of
+ {ok, {_RaIdxTerm, Ret}, NewLeaderId} ->
+ khepri_cluster:cache_leader(StoreId, NewLeaderId),
+ ?raise_exception_if_any(Ret);
+ {timeout, _LeaderId} ->
+ {error, timeout};
+ {error, _} = Error ->
Error
- end;
-process_query_response(
- _StoreId, _RaServer, _IsLeader, _QueryFun, _QueryType, _Options,
- {error, _} = Error) ->
- Error.
-
--spec select_query_type(StoreId, Options) -> QueryType when
- StoreId :: khepri:store_id(),
- Options :: khepri:query_options(),
- QueryType :: local | leader | consistent.
-%% @doc Selects the query type depending on what the caller favors.
-%%
-%% @private
-
-select_query_type(StoreId, #{favor := Favor}) ->
- do_select_query_type(StoreId, Favor);
-select_query_type(StoreId, _Options) ->
- do_select_query_type(StoreId, compromise).
-
--define(
- LAST_CONSISTENT_CALL_TS_REF(StoreId),
- {khepri, last_consistent_call_ts_ref, StoreId}).
-
-do_select_query_type(StoreId, compromise) ->
- Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId),
- Idx = 1,
- case persistent_term:get(Key, undefined) of
- AtomicsRef when AtomicsRef =/= undefined ->
- %% We verify when was the last time we did a command or a
- %% consistent query (i.e. we made sure there was an active leader
- %% in a cluster with a quorum of active members).
- %%
- %% If the last one was more than 10 seconds ago, we force a
- %% consistent query to verify the cluster health at the same time.
- %% Otherwise, we select a leader query which is a good balance
- %% between freshness and latency.
- Last = atomics:get(AtomicsRef, Idx),
- Now = erlang:monotonic_time(),
- Elapsed = erlang:convert_time_unit(Now - Last, native, second),
- ConsistentAgainAfter = application:get_env(
- khepri,
- consistent_query_interval_in_compromise,
- 10),
- if
- Elapsed < ConsistentAgainAfter -> leader;
- true -> consistent
- end;
- undefined ->
- consistent
- end;
-do_select_query_type(_StoreId, consistency) ->
- consistent;
-do_select_query_type(_StoreId, low_latency) ->
- local.
-
-just_did_consistent_call(StoreId) ->
- %% We record the timestamp of the successful command or consistent query
- %% which just returned. This timestamp is used in the `compromise' query
- %% strategy to perform a consistent query from time to time, and leader
- %% queries the rest of the time.
- %%
- %% We store the system time as seconds in an `atomics' structure. The
- %% reference of that structure is stored in a persistent term. We don't
- %% store the timestamp directly in a persistent term because it is not
- %% suited for frequent writes. This way, we store the `atomics' reference
- %% once and update the `atomics' afterwards.
- Idx = 1,
- AtomicsRef = case get_last_consistent_call_atomics(StoreId) of
- Ref when Ref =/= undefined ->
- Ref;
- undefined ->
- Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId),
- Ref = atomics:new(1, []),
- persistent_term:put(Key, Ref),
- Ref
- end,
- Now = erlang:monotonic_time(),
- ok = atomics:put(AtomicsRef, Idx, Now),
- ok.
-
-get_last_consistent_call_atomics(StoreId) ->
- Key = ?LAST_CONSISTENT_CALL_TS_REF(StoreId),
- persistent_term:get(Key, undefined).
+ end.
-spec add_applied_condition(StoreId, Options) -> NewOptions when
StoreId :: khepri:store_id(),
@@ -1261,8 +1081,7 @@ use_leader_or_local_ra_server(StoreId, undefined) ->
%%
%% @private
-clear_cache(StoreId) ->
- _ = persistent_term:erase(?LAST_CONSISTENT_CALL_TS_REF(StoreId)),
+clear_cache(_StoreId) ->
ok.
%% -------------------------------------------------------------------
diff --git a/test/favor_option.erl b/test/favor_option.erl
index e1899294..6d67c79b 100644
--- a/test/favor_option.erl
+++ b/test/favor_option.erl
@@ -14,104 +14,43 @@
-include("src/khepri_error.hrl").
-include("test/helpers.hrl").
-favor_compromise_in_get_test_() ->
- {setup,
- fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
- fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
- [?_test(
- begin
- ?assertEqual(
- undefined,
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ?assertMatch(
- {error, ?khepri_error(node_not_found, _)},
- khepri:get(
- ?FUNCTION_NAME, [foo], #{favor => compromise})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- Ref = khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME),
- TS1 = atomics:get(Ref, 1),
- ?assertNotEqual(0, TS1),
-
- timer:sleep(1000),
-
- ?assertMatch(
- {error, ?khepri_error(node_not_found, _)},
- khepri:get(
- ?FUNCTION_NAME, [foo], #{favor => compromise})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- TS2 = atomics:get(Ref, 1),
- ?assertEqual(TS1, TS2),
-
- timer:sleep(2000),
-
- ?assertMatch(
- {error, ?khepri_error(node_not_found, _)},
- khepri:get(
- ?FUNCTION_NAME, [foo], #{favor => compromise})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- TS3 = atomics:get(Ref, 1),
- ?assertNotEqual(TS1, TS3),
-
- ok
- end)
- ]}.
-
favor_consistency_in_get_test_() ->
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[?_test(
begin
- ?assertEqual(
- undefined,
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ?assertMatch(
- {error, ?khepri_error(node_not_found, _)},
- khepri:get(
- ?FUNCTION_NAME, [foo], #{favor => consistency})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- Ref = khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME),
- TS1 = atomics:get(Ref, 1),
- ?assertNotEqual(0, TS1),
-
- timer:sleep(1000),
-
- ?assertMatch(
- {error, ?khepri_error(node_not_found, _)},
+ List = [a, b, c, d, e, f, g, h, i, j],
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ {error,
+ ?khepri_error(
+ node_not_found, #{node_name => I,
+ node_path => [I],
+ node_is_target => true})},
+ khepri:get(
+ ?FUNCTION_NAME, [I], #{favor => consistency}))
+ end, List),
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ ok,
+ khepri:put(
+ ?FUNCTION_NAME, [I], I, #{async => true}))
+ end, List),
+
+ ?assertEqual(
+ {ok, hd(List)},
khepri:get(
- ?FUNCTION_NAME, [foo], #{favor => consistency})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- TS2 = atomics:get(Ref, 1),
- ?assertNotEqual(TS1, TS2),
-
- ok
+ ?FUNCTION_NAME, [hd(List)], #{favor => consistency})),
+
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ {ok, I},
+ khepri:get(?FUNCTION_NAME, [I]))
+ end, tl(List))
end)
]}.
@@ -121,191 +60,158 @@ favor_low_latency_in_get_test_() ->
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[?_test(
begin
- ?assertEqual(
- undefined,
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ?assertMatch(
- {error, ?khepri_error(node_not_found, _)},
- khepri:get(
- ?FUNCTION_NAME, [foo], #{favor => low_latency})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ?assertMatch(
- {error, ?khepri_error(node_not_found, _)},
- khepri:get(
- ?FUNCTION_NAME, [foo], #{favor => low_latency})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ok
+ List = [a, b, c, d, e, f, g, h, i, j],
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ {error,
+ ?khepri_error(
+ node_not_found, #{node_name => I,
+ node_path => [I],
+ node_is_target => true})},
+ khepri:get(
+ ?FUNCTION_NAME, [I], #{favor => low_latency}))
+ end, List),
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ ok,
+ khepri:put(
+ ?FUNCTION_NAME, [I], I, #{async => true}))
+ end, List),
+
+ lists:foreach(
+ fun(I) ->
+ try
+ ?assertEqual(
+ {ok, I},
+ khepri:get(?FUNCTION_NAME, [I]))
+ catch
+ error:{assertEqual, Props} ->
+ ?assertEqual(
+ {error,
+ ?khepri_error(
+ node_not_found,
+ #{node_name => I,
+ node_path => [I],
+ node_is_target => true})},
+ proplists:get_value(value, Props))
+ end
+ end, List)
end)
]}.
-favor_compromise_in_transaction_test_() ->
+favor_consistency_in_transaction_test_() ->
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[?_test(
begin
- Fun = fun() -> khepri_tx:get([foo]) end,
-
- ?assertEqual(
- undefined,
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ?assertMatch(
- {ok, {error, ?khepri_error(node_not_found, _)}},
- khepri:transaction(
- ?FUNCTION_NAME, Fun, #{favor => compromise})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- Ref = khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME),
- TS1 = atomics:get(Ref, 1),
- ?assertNotEqual(0, TS1),
-
- timer:sleep(1000),
-
- ?assertMatch(
- {ok, {error, ?khepri_error(node_not_found, _)}},
- khepri:transaction(
- ?FUNCTION_NAME, Fun, #{favor => compromise})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- TS2 = atomics:get(Ref, 1),
- ?assertEqual(TS1, TS2),
-
- timer:sleep(2000),
-
- ?assertMatch(
- {ok, {error, ?khepri_error(node_not_found, _)}},
+ List = [a, b, c, d, e, f, g, h, i, j],
+ Fun = fun(I) -> khepri_tx:get([I]) end,
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ {ok,
+ {error,
+ ?khepri_error(
+ node_not_found, #{node_name => I,
+ node_path => [I],
+ node_is_target => true})}},
+ khepri:transaction(
+ ?FUNCTION_NAME, Fun, [I], #{favor => consistency}))
+ end, List),
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ ok,
+ khepri:put(
+ ?FUNCTION_NAME, [I], I, #{async => true}))
+ end, List),
+
+ ?assertEqual(
+ {ok, {ok, hd(List)}},
khepri:transaction(
- ?FUNCTION_NAME, Fun, #{favor => compromise})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- TS3 = atomics:get(Ref, 1),
- ?assertNotEqual(TS1, TS3),
-
- ok
+ ?FUNCTION_NAME, Fun, [hd(List)], #{favor => consistency})),
+
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ {ok, {ok, I}},
+ khepri:transaction(
+ ?FUNCTION_NAME, Fun, [I], #{favor => consistency}))
+ end, tl(List))
end)
]}.
-favor_consistency_in_transaction_test_() ->
+favor_low_latency_in_transaction_test_() ->
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[?_test(
begin
- Fun = fun() -> khepri_tx:get([foo]) end,
-
- ?assertEqual(
- undefined,
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ?assertMatch(
- {ok, {error, ?khepri_error(node_not_found, _)}},
- khepri:transaction(
- ?FUNCTION_NAME, Fun, #{favor => consistency})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- Ref = khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME),
- TS1 = atomics:get(Ref, 1),
- ?assertNotEqual(0, TS1),
-
- timer:sleep(1000),
-
- ?assertMatch(
- {ok, {error, ?khepri_error(node_not_found, _)}},
- khepri:transaction(
- ?FUNCTION_NAME, Fun, #{favor => consistency})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- TS2 = atomics:get(Ref, 1),
- ?assertNotEqual(TS1, TS2),
-
- ok
+ List = [a, b, c, d, e, f, g, h, i, j],
+ Fun = fun(I) -> khepri_tx:get([I]) end,
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ {ok,
+ {error,
+ ?khepri_error(
+ node_not_found, #{node_name => I,
+ node_path => [I],
+ node_is_target => true})}},
+ khepri:transaction(
+ ?FUNCTION_NAME, Fun, [I], #{favor => low_latency}))
+ end, List),
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual(
+ ok,
+ khepri:put(
+ ?FUNCTION_NAME, [I], I, #{async => true}))
+ end, List),
+
+ lists:foreach(
+ fun(I) ->
+ try
+ ?assertEqual(
+ {ok, {ok, I}},
+ khepri:transaction(
+ ?FUNCTION_NAME, Fun, [I],
+ #{favor => low_latency}))
+ catch
+ error:{assertEqual, Props} ->
+ ?assertEqual(
+ {ok,
+ {error,
+ ?khepri_error(
+ node_not_found,
+ #{node_name => I,
+ node_path => [I],
+ node_is_target => true})}},
+ proplists:get_value(value, Props))
+ end
+ end, List)
end)
]}.
-favor_low_latency_in_transaction_test_() ->
+condition_option_takes_precedence_test_() ->
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[?_test(
begin
- Fun = fun() -> khepri_tx:get([foo]) end,
-
?assertEqual(
- undefined,
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ?assertMatch(
- {ok, {error, ?khepri_error(node_not_found, _)}},
- khepri:transaction(
- ?FUNCTION_NAME, Fun, #{favor => low_latency})),
+ ok,
+ khepri:put(?FUNCTION_NAME, [foo], value)),
?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ?assertMatch(
- {ok, {error, ?khepri_error(node_not_found, _)}},
- khepri:transaction(
- ?FUNCTION_NAME, Fun, #{favor => low_latency})),
-
- ?assertEqual(
- {?FUNCTION_NAME, node()},
- khepri_cluster:get_cached_leader(?FUNCTION_NAME)),
- ?assertEqual(
- undefined,
- khepri_machine:get_last_consistent_call_atomics(
- ?FUNCTION_NAME)),
-
- ok
+ {error, timeout},
+ khepri:get(
+ ?FUNCTION_NAME, [foo],
+ #{condition => {applied, {10000, 5}},
+ favor => consistency,
+ timeout => 1000}))
end)
]}.