Skip to content

Commit

Permalink
Mas i1805 hintedrepair (#1806)
Browse files Browse the repository at this point in the history
* Range-based AAE

Allow for range-based exchanges to be used when standard exchanges identify a demand for further AAE, and a localised AAE issue.

* Add stats and configuration

Should be able to track what is happening via both stats and logs.

* Add capability check

If all nodes in the cluster are not yet at this version, then they may not support the necessary aae_folds required by the range-based repairs.

* tictacaae_maxresults

Following performance testing of new feature, AAE throughput can be improved at this level on current settings, without increasing CPU load.

Configuration option no longer hidden, as it may require turning by users who wish for AAE deltas to be closed more rapidly

* Fix typos

* Update following review

* Clear-up schema comments after review

* Branch switch
  • Loading branch information
martinsumner authored Nov 29, 2021
1 parent 0af857d commit 58b3fc2
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 163 deletions.
41 changes: 40 additions & 1 deletion priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,48 @@
%% conflicting leaves will be compared on each exchange. If there are issues
%% with query timeouts this may be halved. Large backlogs may be reduced
%% faster by doubling. There are 1M segments in a standard tree overall.
%% Performance tuning can also be made by adjusting the `tictacaae_repairloops`
%% and `tictacaae_rangeboost` - but `tictacaae_maxresults` is the simplest
%% factor that is likely to result in a relatively predictable (and linear)
%% outcome in terms of both CPU cost and repair speed.
{mapping, "tictacaae_maxresults", "riak_kv.tictacaae_maxresults", [
{datatype, integer},
{default, 256},
{default, 64}
]}.

%% @doc Max number of repair loops per exchange
%% Each exchange will attempt a repair of tictacaae_maxresults, and will
%% analyse those repairs to see if there exists the potential for more repairs.
%% If there exists this potential, then repair loops will be run, but in these
%% repair loops a more efficient repair will be run:
%% - with a bucket, key_range or last_modified range (at least) to restrict the
%% scope of the fetch_clocks queries
%% - without rebuilding the segments in the AAE hash tree
%% - without checking for presence of the object in the journal
%% Problems related to invalid hash trees will need to be detected through the
%% initial loop of the exchange, not repair loops
{mapping, "tictacaae_repairloops", "riak_kv.tictacaae_repairloops", [
{datatype, integer},
{default, 4},
hidden
]}.

%% @doc Multiplier to the `tictcaaae_maxresults` when following an initial AAE
%% exchange with a range-limited exchange.
%% After each exchange, where sufficient deltas are discovered there will be a
%% `tictacaae_repairloops` number of range-limited queries (assuming
%% sufficient results continue to be found). Each of these may have the
%% the number of max results boosted by this integer factor.
%% For example, if `tictacaae_maxresuts` is set to 64, and
%% `tictacaae_repairloops` is set to 4, and the `tictacaae_rangeboost` is set
%% to 2 - the initial loop will use `tictacaae_maxresuts` of 64, but any
%% AAE exchanges on loops 1 to 4 will use 128.
%% Exchanges with range-limited queries are more efficient, and so more tree
%% segments can be fetched without creating significant CPU overheads, hence
%% the use of this boost to maxresults.
{mapping, "tictacaae_rangeboost", "riak_kv.tictacaae_rangeboost", [
{datatype, integer},
{default, 2},
hidden
]}.

Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@
{riak_dt, {git, "https://github.com/basho/riak_dt.git", {tag, "riak_kv-3.0.0"}}},
{riak_api, {git, "https://github.com/basho/riak_api.git", {tag, "riak_kv-3.0.9"}}},
{hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}},
{kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "1.0.1"}}},
{kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-3.0"}}},
{riakhttpc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.8"}}}
]}.
2 changes: 1 addition & 1 deletion src/riak_kv.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
% generate a warning in the logs
{max_siblings, 100},

%% Object hash version should be 0 by default. Without the
%% @doc Object hash version should be 0 by default. Without the
%% environment variable being set at startup, it could by default
%% revert to being considered as legacy even when the whole cluster
%% has support for version 0 - Github Issue 1656
Expand Down
4 changes: 4 additions & 0 deletions src/riak_kv_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ start(_Type, _StartArgs) ->
riak_core_capability:register({riak_kv, put_soft_limit},
[true, false],
false),

riak_core_capability:register({riak_kv, tictacaae_prompted_repairs},
[true, false],
false),

HealthCheckOn = app_helper:get_env(riak_kv, enable_health_checks, false),
%% Go ahead and mark the riak_kv service as up in the node watcher.
Expand Down
95 changes: 2 additions & 93 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
waiting_vnode_r/2,
waiting_read_repair/2]).

-export([prompt_readrepair/1]).

-type detail() :: timing |
vnodes.
-type details() :: [detail()].
Expand All @@ -48,8 +46,6 @@
{basic_quorum, boolean()} | %% Whether to use basic quorum (return early
%% in some failure cases.
{notfound_ok, boolean()} | %% Count notfound responses as successful.
{force_aae, boolean()} | %% Force there to be be an AAE exchange for the
%% preflist after the GEt has been completed
{timeout, pos_integer() | infinity} | %% Timeout for vnode responses
{details, details()} | %% Return extra details as a 3rd element
{details, true} |
Expand All @@ -64,8 +60,6 @@

-export_type([options/0, option/0]).



-record(state, {from :: {raw, req_id(), pid()},
options=[] :: options(),
n :: pos_integer() | undefined,
Expand All @@ -87,7 +81,6 @@
[{StateName::atom(), TimeUSecs::non_neg_integer()}]} | undefined,
crdt_op :: undefined | true,
request_type :: undefined | request_type(),
force_aae = false :: boolean(),
override_vnodes = [] :: list(),
return_tombstone = false :: boolean(),
expected_fetchclock = false :: false | vclock:vclock()
Expand All @@ -101,8 +94,6 @@
-define(DEFAULT_RT, head).
-define(DEFAULT_NC, 0).
-define(QUEUE_EMPTY_LOOPS, 8).
-define(MIN_REPAIRTIME_MS, 10000).
-define(MIN_REPAIRPAUSE_MS, 10).

%% ===================================================================
%% Public API
Expand Down Expand Up @@ -271,7 +262,6 @@ prepare(timeout, StateData=#state{bkey=BKey={Bucket,_Key},
DocIdx = riak_core_util:chash_key(BKey, BucketProps),
Bucket_N = get_option(n_val, BucketProps),
CrdtOp = get_option(crdt_op, Options),
ForceAAE = get_option(force_aae, Options, false),
N = case get_option(n_val, Options) of
undefined ->
Bucket_N;
Expand Down Expand Up @@ -305,8 +295,7 @@ prepare(timeout, StateData=#state{bkey=BKey={Bucket,_Key},
preflist2 = Preflist2,
tracked_bucket = StatTracked,
crdt_op = CrdtOp,
request_type=RequestType,
force_aae = ForceAAE})
request_type=RequestType})
end.

%% @private
Expand Down Expand Up @@ -605,9 +594,7 @@ maybe_finalize(StateData=#state{get_core = GetCore}) ->
false -> {next_state,waiting_read_repair,StateData}
end.

finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID,
preflist2 = PL, bkey = {B, K},
force_aae = ForceAAE}) ->
finalize(StateData=#state{get_core = GetCore, trace = Trace}) ->
{Action, UpdGetCore} = riak_kv_get_core:final_action(GetCore),
UpdStateData = StateData#state{get_core = UpdGetCore},
case Action of
Expand All @@ -619,86 +606,8 @@ finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID,
?DTRACE(Trace, ?C_GET_FSM_FINALIZE, [], ["finalize"]),
ok
end,
case ForceAAE of
true ->
Primaries =
[{I, Node} || {{I, Node}, primary} <- PL],
case length(Primaries) of
L when L < 2 ->
lager:info("Insufficient Primaries to support force AAE request", []),
ok;
L ->
BlueP = lists:nth((ReqID rem L) + 1, Primaries),
PinkP = lists:nth(((ReqID + 1) rem L) + 1, Primaries),
IndexN = riak_kv_util:get_index_n({B, K}),
BlueList = [{riak_kv_vnode:aae_send(BlueP), [IndexN]}],
PinkList = [{riak_kv_vnode:aae_send(PinkP), [IndexN]}],
aae_exchange:start(BlueList,
PinkList,
prompt_readrepair([BlueP, PinkP]),
fun reply_fun/1)
end;
false ->
ok
end,
{stop,normal,StateData}.

-spec prompt_readrepair([{riak_core_ring:partition_id(), node()}]) ->
fun((list({{riak_object:bucket(), riak_object:key()},
{vclock:vclock(), vclock:vclock()}})) -> ok).
prompt_readrepair(VnodeList) ->
prompt_readrepair(VnodeList,
app_helper:get_env(riak_kv, log_readrepair, false)).

prompt_readrepair(VnodeList, LogRepair) ->
{ok, C} = riak:local_client(),
FetchFun =
fun({{B, K}, {_BlueClock, _PinkClock}}) ->
case riak_kv_util:consistent_object(B) of
true ->
riak_kv_exchange_fsm:repair_consistent({B, K});
false ->
riak_client:get(B, K, C)
end
end,
LogFun =
fun({{B, K}, {BlueClock, PinkClock}}) ->
lager:info(
"Prompted read repair Bucket=~p Key=~p Clocks ~w ~w",
[B, K, BlueClock, PinkClock])
end,
fun(RepairList) ->
SW = os:timestamp(),
RepairCount = length(RepairList),
lager:info("Repairing key_count=~w between ~w",
[RepairCount, VnodeList]),
Pause =
max(?MIN_REPAIRPAUSE_MS,
?MIN_REPAIRTIME_MS div max(1, RepairCount)),
RehashFun =
fun({{B, K}, {_BlueClock, _PinkClock}}) ->
timer:sleep(Pause),
riak_kv_vnode:rehash(VnodeList, B, K)
end,
lists:foreach(FetchFun, RepairList),
lists:foreach(RehashFun, RepairList),
case LogRepair of
true ->
lists:foreach(LogFun, RepairList);
false ->
ok
end,
lager:info("Repaired key_count=~w " ++
"in repair_time=~w ms with pause_time=~w ms",
[RepairCount,
timer:now_diff(os:timestamp(), SW) div 1000,
RepairCount * Pause])
end.

reply_fun({EndStateName, DeltaCount}) ->
lager:info("AAE Reached end_state=~w with delta_count=~w",
[EndStateName, DeltaCount]).


%% Maybe issue deletes if all primary nodes are available.
%% Get core will only requestion deletion if all vnodes
Expand Down
22 changes: 22 additions & 0 deletions src/riak_kv_stat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ do_update({index_fsm_time, Microsecs, ResultCount}) ->
do_update({read_repairs, Indices, Preflist}) ->
ok = exometer:update([?PFX, ?APP, node, gets, read_repairs], 1),
do_repairs(Indices, Preflist);
do_update({tictac_aae, ExchangeState}) ->
ok = exometer:update([?PFX, ?APP, node, tictacaae, ExchangeState], 1);
do_update({tictac_aae, ExchangeType, RepairCount}) ->
ok = exometer:update([?PFX, ?APP, node, tictacaae, ExchangeType], RepairCount);
do_update(ngrfetch_nofetch) ->
ok = exometer:update([?PFX, ?APP, node, gets, ngrfetch_nofetch], 1);
do_update(ngrfetch_prefetch) ->
Expand Down Expand Up @@ -582,6 +586,24 @@ stats() ->
{count, read_repairs_total}]},
{[node, gets, skipped_read_repairs], spiral, [], [{one, skipped_read_repairs},
{count, skipped_read_repairs_total}]},
{[node, tictacaae, root_compare], spiral, [],
[{one, tictacaae_root_compare}, {count, tictacaae_root_compare_total}]},
{[node, tictacaae, branch_compare], spiral, [],
[{one, tictacaae_branch_compare}, {count, tictacaae_branch_compare_total}]},
{[node, tictacaae, clock_compare], spiral, [],
[{one, tictacaae_clock_compare}, {count, tictacaae_clock_compare_total}]},
{[node, tictacaae, not_supported], spiral, [],
[{one, tictacaae_not_supported}, {count, tictacaae_not_supported_total}]},
{[node, tictacaae, error], spiral, [],
[{one, tictacaae_error}, {count, tictacaae_error_total}]},
{[node, tictacaae, timeout], spiral, [],
[{one, tictacaae_timeout}, {count, tictacaae_timeout_total}]},
{[node, tictacaae, bucket], spiral, [],
[{one, tictacaae_bucket}, {count, tictacaae_bucket_total}]},
{[node, tictacaae, modtime], spiral, [],
[{one, tictacaae_modtime}, {count, tictacaae_modtime_total}]},
{[node, tictacaae, exchange], spiral, [],
[{one, tictacaae_exchange}, {count, tictacaae_exchange_total}]},
{[node, gets, ngrfetch_nofetch], spiral, [], [{one, ngrfetch_nofetch},
{count, ngrfetch_nofetch_total}]},
{[node, gets, ngrfetch_prefetch], spiral, [], [{one, ngrfetch_prefetch},
Expand Down
Loading

0 comments on commit 58b3fc2

Please sign in to comment.