Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mas i1805 hintedrepair #1806

Merged
merged 8 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,48 @@
%% conflicting leaves will be compared on each exchange. If there are issues
%% with query timeouts this may be halved. Large backlogs may be reduced
%% faster by doubling. There are 1M segments in a standard tree overall.
%% Performance tuning can also be made by adjusting the `tictacaae_repairloops`
%% and `tictacaae_rangeboost` - but `tictacaae_maxresults` is the simplest
%% factor that is likely to result in a relatively predictable (and linear)
%% outcome in terms of both CPU cost and repair speed.
{mapping, "tictacaae_maxresults", "riak_kv.tictacaae_maxresults", [
{datatype, integer},
{default, 256},
{default, 64}
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
]}.

%% @doc Max number of repair loops per exchange
%% Each exchange will attempt a repair of tictacaae_maxresults, and will
%% analyse those repairs to see if there exists the potential for more repairs.
%% If there exists this potential, then repair loops will be run, but in these
%% repair loops a more efficient repair will be run:
%% - with a bucket, key_range or last_modified range (at least) to restrict the
%% scope of the fetch_clocks queries
%% - without rebuilding the segments in the AAE hash tree
%% - without checking for presence of the object in the journal
%% Problems related to invalid hash trees will need to be detected through the
%% initial loop of the exchange, not repair loops
{mapping, "tictacaae_repairloops", "riak_kv.tictacaae_repairloops", [
{datatype, integer},
{default, 4},
hidden
]}.

%% @doc Multiplier to the `tictcaaae_maxresults` when following an initial AAE
%% exchange with a range-limited exchange.
%% After each exchange, where sufficient deltas are discovered there will be a
%% `tictacaae_repairloops` number of range-limited queries (assuming
%% sufficient results continue to be found). Each of these may have the
%% the number of max results boosted by this integer factor.
%% For example, if `tictacaae_maxresuts` is set to 64, and
%% `tictacaae_repairloops` is set to 4, and the `tictacaae_rangeboost` is set
%% to 2 - the initial loop will use `tictacaae_maxresuts` of 64, but any
%% AAE exchanges on loops 1 to 4 will use 128.
%% Exchanges with range-limited queries are more efficient, and so more tree
%% segments can be fetched without creating significant CPU overheads, hence
%% the use of this boost to maxresults.
{mapping, "tictacaae_rangeboost", "riak_kv.tictacaae_rangeboost", [
{datatype, integer},
{default, 2},
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
hidden
]}.

Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@
{riak_dt, {git, "https://github.com/basho/riak_dt.git", {tag, "riak_kv-3.0.0"}}},
{riak_api, {git, "https://github.com/basho/riak_api.git", {tag, "riak_kv-3.0.9"}}},
{hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}},
{kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "1.0.1"}}},
{kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-3.0"}}},
{riakhttpc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.8"}}}
]}.
2 changes: 1 addition & 1 deletion src/riak_kv.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
% generate a warning in the logs
{max_siblings, 100},

%% Object hash version should be 0 by default. Without the
%% @doc Object hash version should be 0 by default. Without the
%% environment variable being set at startup, it could by default
%% revert to being considered as legacy even when the whole cluster
%% has support for version 0 - Github Issue 1656
Expand Down
4 changes: 4 additions & 0 deletions src/riak_kv_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ start(_Type, _StartArgs) ->
riak_core_capability:register({riak_kv, put_soft_limit},
[true, false],
false),

riak_core_capability:register({riak_kv, tictacaae_prompted_repairs},
[true, false],
false),

HealthCheckOn = app_helper:get_env(riak_kv, enable_health_checks, false),
%% Go ahead and mark the riak_kv service as up in the node watcher.
Expand Down
95 changes: 2 additions & 93 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
waiting_vnode_r/2,
waiting_read_repair/2]).

-export([prompt_readrepair/1]).

-type detail() :: timing |
vnodes.
-type details() :: [detail()].
Expand All @@ -48,8 +46,6 @@
{basic_quorum, boolean()} | %% Whether to use basic quorum (return early
%% in some failure cases.
{notfound_ok, boolean()} | %% Count notfound responses as successful.
{force_aae, boolean()} | %% Force there to be be an AAE exchange for the
%% preflist after the GEt has been completed
{timeout, pos_integer() | infinity} | %% Timeout for vnode responses
{details, details()} | %% Return extra details as a 3rd element
{details, true} |
Expand All @@ -64,8 +60,6 @@

-export_type([options/0, option/0]).



-record(state, {from :: {raw, req_id(), pid()},
options=[] :: options(),
n :: pos_integer() | undefined,
Expand All @@ -87,7 +81,6 @@
[{StateName::atom(), TimeUSecs::non_neg_integer()}]} | undefined,
crdt_op :: undefined | true,
request_type :: undefined | request_type(),
force_aae = false :: boolean(),
override_vnodes = [] :: list(),
return_tombstone = false :: boolean(),
expected_fetchclock = false :: false | vclock:vclock()
Expand All @@ -101,8 +94,6 @@
-define(DEFAULT_RT, head).
-define(DEFAULT_NC, 0).
-define(QUEUE_EMPTY_LOOPS, 8).
-define(MIN_REPAIRTIME_MS, 10000).
-define(MIN_REPAIRPAUSE_MS, 10).

%% ===================================================================
%% Public API
Expand Down Expand Up @@ -271,7 +262,6 @@ prepare(timeout, StateData=#state{bkey=BKey={Bucket,_Key},
DocIdx = riak_core_util:chash_key(BKey, BucketProps),
Bucket_N = get_option(n_val, BucketProps),
CrdtOp = get_option(crdt_op, Options),
ForceAAE = get_option(force_aae, Options, false),
N = case get_option(n_val, Options) of
undefined ->
Bucket_N;
Expand Down Expand Up @@ -305,8 +295,7 @@ prepare(timeout, StateData=#state{bkey=BKey={Bucket,_Key},
preflist2 = Preflist2,
tracked_bucket = StatTracked,
crdt_op = CrdtOp,
request_type=RequestType,
force_aae = ForceAAE})
request_type=RequestType})
end.

%% @private
Expand Down Expand Up @@ -605,9 +594,7 @@ maybe_finalize(StateData=#state{get_core = GetCore}) ->
false -> {next_state,waiting_read_repair,StateData}
end.

finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID,
preflist2 = PL, bkey = {B, K},
force_aae = ForceAAE}) ->
finalize(StateData=#state{get_core = GetCore, trace = Trace}) ->
{Action, UpdGetCore} = riak_kv_get_core:final_action(GetCore),
UpdStateData = StateData#state{get_core = UpdGetCore},
case Action of
Expand All @@ -619,86 +606,8 @@ finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID,
?DTRACE(Trace, ?C_GET_FSM_FINALIZE, [], ["finalize"]),
ok
end,
case ForceAAE of
true ->
Primaries =
[{I, Node} || {{I, Node}, primary} <- PL],
case length(Primaries) of
L when L < 2 ->
lager:info("Insufficient Primaries to support force AAE request", []),
ok;
L ->
BlueP = lists:nth((ReqID rem L) + 1, Primaries),
PinkP = lists:nth(((ReqID + 1) rem L) + 1, Primaries),
IndexN = riak_kv_util:get_index_n({B, K}),
BlueList = [{riak_kv_vnode:aae_send(BlueP), [IndexN]}],
PinkList = [{riak_kv_vnode:aae_send(PinkP), [IndexN]}],
aae_exchange:start(BlueList,
PinkList,
prompt_readrepair([BlueP, PinkP]),
fun reply_fun/1)
end;
false ->
ok
end,
{stop,normal,StateData}.

-spec prompt_readrepair([{riak_core_ring:partition_id(), node()}]) ->
fun((list({{riak_object:bucket(), riak_object:key()},
{vclock:vclock(), vclock:vclock()}})) -> ok).
prompt_readrepair(VnodeList) ->
prompt_readrepair(VnodeList,
app_helper:get_env(riak_kv, log_readrepair, false)).

prompt_readrepair(VnodeList, LogRepair) ->
{ok, C} = riak:local_client(),
FetchFun =
fun({{B, K}, {_BlueClock, _PinkClock}}) ->
case riak_kv_util:consistent_object(B) of
true ->
riak_kv_exchange_fsm:repair_consistent({B, K});
false ->
riak_client:get(B, K, C)
end
end,
LogFun =
fun({{B, K}, {BlueClock, PinkClock}}) ->
lager:info(
"Prompted read repair Bucket=~p Key=~p Clocks ~w ~w",
[B, K, BlueClock, PinkClock])
end,
fun(RepairList) ->
SW = os:timestamp(),
RepairCount = length(RepairList),
lager:info("Repairing key_count=~w between ~w",
[RepairCount, VnodeList]),
Pause =
max(?MIN_REPAIRPAUSE_MS,
?MIN_REPAIRTIME_MS div max(1, RepairCount)),
RehashFun =
fun({{B, K}, {_BlueClock, _PinkClock}}) ->
timer:sleep(Pause),
riak_kv_vnode:rehash(VnodeList, B, K)
end,
lists:foreach(FetchFun, RepairList),
lists:foreach(RehashFun, RepairList),
case LogRepair of
true ->
lists:foreach(LogFun, RepairList);
false ->
ok
end,
lager:info("Repaired key_count=~w " ++
"in repair_time=~w ms with pause_time=~w ms",
[RepairCount,
timer:now_diff(os:timestamp(), SW) div 1000,
RepairCount * Pause])
end.

reply_fun({EndStateName, DeltaCount}) ->
lager:info("AAE Reached end_state=~w with delta_count=~w",
[EndStateName, DeltaCount]).


%% Maybe issue deletes if all primary nodes are available.
%% Get core will only requestion deletion if all vnodes
Expand Down
22 changes: 22 additions & 0 deletions src/riak_kv_stat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ do_update({index_fsm_time, Microsecs, ResultCount}) ->
do_update({read_repairs, Indices, Preflist}) ->
ok = exometer:update([?PFX, ?APP, node, gets, read_repairs], 1),
do_repairs(Indices, Preflist);
do_update({tictac_aae, ExchangeState}) ->
ok = exometer:update([?PFX, ?APP, node, tictacaae, ExchangeState], 1);
do_update({tictac_aae, ExchangeType, RepairCount}) ->
ok = exometer:update([?PFX, ?APP, node, tictacaae, ExchangeType], RepairCount);
do_update(ngrfetch_nofetch) ->
ok = exometer:update([?PFX, ?APP, node, gets, ngrfetch_nofetch], 1);
do_update(ngrfetch_prefetch) ->
Expand Down Expand Up @@ -582,6 +586,24 @@ stats() ->
{count, read_repairs_total}]},
{[node, gets, skipped_read_repairs], spiral, [], [{one, skipped_read_repairs},
{count, skipped_read_repairs_total}]},
{[node, tictacaae, root_compare], spiral, [],
[{one, tictacaae_root_compare}, {count, tictacaae_root_compare_total}]},
{[node, tictacaae, branch_compare], spiral, [],
[{one, tictacaae_branch_compare}, {count, tictacaae_branch_compare_total}]},
{[node, tictacaae, clock_compare], spiral, [],
[{one, tictacaae_clock_compare}, {count, tictacaae_clock_compare_total}]},
{[node, tictacaae, not_supported], spiral, [],
[{one, tictacaae_not_supported}, {count, tictacaae_not_supported_total}]},
{[node, tictacaae, error], spiral, [],
[{one, tictacaae_error}, {count, tictacaae_error_total}]},
{[node, tictacaae, timeout], spiral, [],
[{one, tictacaae_timeout}, {count, tictacaae_timeout_total}]},
{[node, tictacaae, bucket], spiral, [],
[{one, tictacaae_bucket}, {count, tictacaae_bucket_total}]},
{[node, tictacaae, modtime], spiral, [],
[{one, tictacaae_modtime}, {count, tictacaae_modtime_total}]},
{[node, tictacaae, exchange], spiral, [],
[{one, tictacaae_exchange}, {count, tictacaae_exchange_total}]},
{[node, gets, ngrfetch_nofetch], spiral, [], [{one, ngrfetch_nofetch},
{count, ngrfetch_nofetch_total}]},
{[node, gets, ngrfetch_prefetch], spiral, [], [{one, ngrfetch_prefetch},
Expand Down
Loading