diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index b37ebfb2e..0426c1f1d 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -119,9 +119,48 @@ %% conflicting leaves will be compared on each exchange. If there are issues %% with query timeouts this may be halved. Large backlogs may be reduced %% faster by doubling. There are 1M segments in a standard tree overall. +%% Performance tuning can also be made by adjusting the `tictacaae_repairloops` +%% and `tictacaae_rangeboost` - but `tictacaae_maxresults` is the simplest +%% factor that is likely to result in a relatively predictable (and linear) +%% outcome in terms of both CPU cost and repair speed. {mapping, "tictacaae_maxresults", "riak_kv.tictacaae_maxresults", [ {datatype, integer}, - {default, 256}, + {default, 64} +]}. + +%% @doc Max number of repair loops per exchange +%% Each exchange will attempt a repair of tictacaae_maxresults, and will +%% analyse those repairs to see if there exists the potential for more repairs. +%% If there exists this potential, then repair loops will be run, but in these +%% repair loops a more efficient repair will be run: +%% - with a bucket, key_range or last_modified range (at least) to restrict the +%% scope of the fetch_clocks queries +%% - without rebuilding the segments in the AAE hash tree +%% - without checking for presence of the object in the journal +%% Problems related to invalid hash trees will need to be detected through the +%% initial loop of the exchange, not repair loops +{mapping, "tictacaae_repairloops", "riak_kv.tictacaae_repairloops", [ + {datatype, integer}, + {default, 4}, + hidden +]}. + +%% @doc Multiplier to the `tictcaaae_maxresults` when following an initial AAE +%% exchange with a range-limited exchange. +%% After each exchange, where sufficient deltas are discovered there will be a +%% `tictacaae_repairloops` number of range-limited queries (assuming +%% sufficient results continue to be found). Each of these may have the +%% the number of max results boosted by this integer factor. +%% For example, if `tictacaae_maxresuts` is set to 64, and +%% `tictacaae_repairloops` is set to 4, and the `tictacaae_rangeboost` is set +%% to 2 - the initial loop will use `tictacaae_maxresuts` of 64, but any +%% AAE exchanges on loops 1 to 4 will use 128. +%% Exchanges with range-limited queries are more efficient, and so more tree +%% segments can be fetched without creating significant CPU overheads, hence +%% the use of this boost to maxresults. +{mapping, "tictacaae_rangeboost", "riak_kv.tictacaae_rangeboost", [ + {datatype, integer}, + {default, 2}, hidden ]}. diff --git a/rebar.config b/rebar.config index ea95d2f16..e23be6d4a 100644 --- a/rebar.config +++ b/rebar.config @@ -57,6 +57,6 @@ {riak_dt, {git, "https://github.com/basho/riak_dt.git", {tag, "riak_kv-3.0.0"}}}, {riak_api, {git, "https://github.com/basho/riak_api.git", {tag, "riak_kv-3.0.9"}}}, {hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}}, - {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "1.0.1"}}}, + {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-3.0"}}}, {riakhttpc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.8"}}} ]}. diff --git a/src/riak_kv.app.src b/src/riak_kv.app.src index 318e36df2..36deaa90a 100644 --- a/src/riak_kv.app.src +++ b/src/riak_kv.app.src @@ -72,7 +72,7 @@ % generate a warning in the logs {max_siblings, 100}, - %% Object hash version should be 0 by default. Without the + %% @doc Object hash version should be 0 by default. Without the %% environment variable being set at startup, it could by default %% revert to being considered as legacy even when the whole cluster %% has support for version 0 - Github Issue 1656 diff --git a/src/riak_kv_app.erl b/src/riak_kv_app.erl index cea34b291..58b4b48a6 100644 --- a/src/riak_kv_app.erl +++ b/src/riak_kv_app.erl @@ -233,6 +233,10 @@ start(_Type, _StartArgs) -> riak_core_capability:register({riak_kv, put_soft_limit}, [true, false], false), + + riak_core_capability:register({riak_kv, tictacaae_prompted_repairs}, + [true, false], + false), HealthCheckOn = app_helper:get_env(riak_kv, enable_health_checks, false), %% Go ahead and mark the riak_kv service as up in the node watcher. diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index a5456abb2..c5e18225d 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -37,8 +37,6 @@ waiting_vnode_r/2, waiting_read_repair/2]). --export([prompt_readrepair/1]). - -type detail() :: timing | vnodes. -type details() :: [detail()]. @@ -48,8 +46,6 @@ {basic_quorum, boolean()} | %% Whether to use basic quorum (return early %% in some failure cases. {notfound_ok, boolean()} | %% Count notfound responses as successful. - {force_aae, boolean()} | %% Force there to be be an AAE exchange for the - %% preflist after the GEt has been completed {timeout, pos_integer() | infinity} | %% Timeout for vnode responses {details, details()} | %% Return extra details as a 3rd element {details, true} | @@ -64,8 +60,6 @@ -export_type([options/0, option/0]). - - -record(state, {from :: {raw, req_id(), pid()}, options=[] :: options(), n :: pos_integer() | undefined, @@ -87,7 +81,6 @@ [{StateName::atom(), TimeUSecs::non_neg_integer()}]} | undefined, crdt_op :: undefined | true, request_type :: undefined | request_type(), - force_aae = false :: boolean(), override_vnodes = [] :: list(), return_tombstone = false :: boolean(), expected_fetchclock = false :: false | vclock:vclock() @@ -101,8 +94,6 @@ -define(DEFAULT_RT, head). -define(DEFAULT_NC, 0). -define(QUEUE_EMPTY_LOOPS, 8). --define(MIN_REPAIRTIME_MS, 10000). --define(MIN_REPAIRPAUSE_MS, 10). %% =================================================================== %% Public API @@ -271,7 +262,6 @@ prepare(timeout, StateData=#state{bkey=BKey={Bucket,_Key}, DocIdx = riak_core_util:chash_key(BKey, BucketProps), Bucket_N = get_option(n_val, BucketProps), CrdtOp = get_option(crdt_op, Options), - ForceAAE = get_option(force_aae, Options, false), N = case get_option(n_val, Options) of undefined -> Bucket_N; @@ -305,8 +295,7 @@ prepare(timeout, StateData=#state{bkey=BKey={Bucket,_Key}, preflist2 = Preflist2, tracked_bucket = StatTracked, crdt_op = CrdtOp, - request_type=RequestType, - force_aae = ForceAAE}) + request_type=RequestType}) end. %% @private @@ -605,9 +594,7 @@ maybe_finalize(StateData=#state{get_core = GetCore}) -> false -> {next_state,waiting_read_repair,StateData} end. -finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID, - preflist2 = PL, bkey = {B, K}, - force_aae = ForceAAE}) -> +finalize(StateData=#state{get_core = GetCore, trace = Trace}) -> {Action, UpdGetCore} = riak_kv_get_core:final_action(GetCore), UpdStateData = StateData#state{get_core = UpdGetCore}, case Action of @@ -619,86 +606,8 @@ finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID, ?DTRACE(Trace, ?C_GET_FSM_FINALIZE, [], ["finalize"]), ok end, - case ForceAAE of - true -> - Primaries = - [{I, Node} || {{I, Node}, primary} <- PL], - case length(Primaries) of - L when L < 2 -> - lager:info("Insufficient Primaries to support force AAE request", []), - ok; - L -> - BlueP = lists:nth((ReqID rem L) + 1, Primaries), - PinkP = lists:nth(((ReqID + 1) rem L) + 1, Primaries), - IndexN = riak_kv_util:get_index_n({B, K}), - BlueList = [{riak_kv_vnode:aae_send(BlueP), [IndexN]}], - PinkList = [{riak_kv_vnode:aae_send(PinkP), [IndexN]}], - aae_exchange:start(BlueList, - PinkList, - prompt_readrepair([BlueP, PinkP]), - fun reply_fun/1) - end; - false -> - ok - end, {stop,normal,StateData}. --spec prompt_readrepair([{riak_core_ring:partition_id(), node()}]) -> - fun((list({{riak_object:bucket(), riak_object:key()}, - {vclock:vclock(), vclock:vclock()}})) -> ok). -prompt_readrepair(VnodeList) -> - prompt_readrepair(VnodeList, - app_helper:get_env(riak_kv, log_readrepair, false)). - -prompt_readrepair(VnodeList, LogRepair) -> - {ok, C} = riak:local_client(), - FetchFun = - fun({{B, K}, {_BlueClock, _PinkClock}}) -> - case riak_kv_util:consistent_object(B) of - true -> - riak_kv_exchange_fsm:repair_consistent({B, K}); - false -> - riak_client:get(B, K, C) - end - end, - LogFun = - fun({{B, K}, {BlueClock, PinkClock}}) -> - lager:info( - "Prompted read repair Bucket=~p Key=~p Clocks ~w ~w", - [B, K, BlueClock, PinkClock]) - end, - fun(RepairList) -> - SW = os:timestamp(), - RepairCount = length(RepairList), - lager:info("Repairing key_count=~w between ~w", - [RepairCount, VnodeList]), - Pause = - max(?MIN_REPAIRPAUSE_MS, - ?MIN_REPAIRTIME_MS div max(1, RepairCount)), - RehashFun = - fun({{B, K}, {_BlueClock, _PinkClock}}) -> - timer:sleep(Pause), - riak_kv_vnode:rehash(VnodeList, B, K) - end, - lists:foreach(FetchFun, RepairList), - lists:foreach(RehashFun, RepairList), - case LogRepair of - true -> - lists:foreach(LogFun, RepairList); - false -> - ok - end, - lager:info("Repaired key_count=~w " ++ - "in repair_time=~w ms with pause_time=~w ms", - [RepairCount, - timer:now_diff(os:timestamp(), SW) div 1000, - RepairCount * Pause]) - end. - -reply_fun({EndStateName, DeltaCount}) -> - lager:info("AAE Reached end_state=~w with delta_count=~w", - [EndStateName, DeltaCount]). - %% Maybe issue deletes if all primary nodes are available. %% Get core will only requestion deletion if all vnodes diff --git a/src/riak_kv_stat.erl b/src/riak_kv_stat.erl index bac09d2e4..fdc1834f1 100644 --- a/src/riak_kv_stat.erl +++ b/src/riak_kv_stat.erl @@ -271,6 +271,10 @@ do_update({index_fsm_time, Microsecs, ResultCount}) -> do_update({read_repairs, Indices, Preflist}) -> ok = exometer:update([?PFX, ?APP, node, gets, read_repairs], 1), do_repairs(Indices, Preflist); +do_update({tictac_aae, ExchangeState}) -> + ok = exometer:update([?PFX, ?APP, node, tictacaae, ExchangeState], 1); +do_update({tictac_aae, ExchangeType, RepairCount}) -> + ok = exometer:update([?PFX, ?APP, node, tictacaae, ExchangeType], RepairCount); do_update(ngrfetch_nofetch) -> ok = exometer:update([?PFX, ?APP, node, gets, ngrfetch_nofetch], 1); do_update(ngrfetch_prefetch) -> @@ -582,6 +586,24 @@ stats() -> {count, read_repairs_total}]}, {[node, gets, skipped_read_repairs], spiral, [], [{one, skipped_read_repairs}, {count, skipped_read_repairs_total}]}, + {[node, tictacaae, root_compare], spiral, [], + [{one, tictacaae_root_compare}, {count, tictacaae_root_compare_total}]}, + {[node, tictacaae, branch_compare], spiral, [], + [{one, tictacaae_branch_compare}, {count, tictacaae_branch_compare_total}]}, + {[node, tictacaae, clock_compare], spiral, [], + [{one, tictacaae_clock_compare}, {count, tictacaae_clock_compare_total}]}, + {[node, tictacaae, not_supported], spiral, [], + [{one, tictacaae_not_supported}, {count, tictacaae_not_supported_total}]}, + {[node, tictacaae, error], spiral, [], + [{one, tictacaae_error}, {count, tictacaae_error_total}]}, + {[node, tictacaae, timeout], spiral, [], + [{one, tictacaae_timeout}, {count, tictacaae_timeout_total}]}, + {[node, tictacaae, bucket], spiral, [], + [{one, tictacaae_bucket}, {count, tictacaae_bucket_total}]}, + {[node, tictacaae, modtime], spiral, [], + [{one, tictacaae_modtime}, {count, tictacaae_modtime_total}]}, + {[node, tictacaae, exchange], spiral, [], + [{one, tictacaae_exchange}, {count, tictacaae_exchange_total}]}, {[node, gets, ngrfetch_nofetch], spiral, [], [{one, ngrfetch_nofetch}, {count, ngrfetch_nofetch_total}]}, {[node, gets, ngrfetch_prefetch], spiral, [], [{one, ngrfetch_prefetch}, diff --git a/src/riak_kv_tictacaae_repairs.erl b/src/riak_kv_tictacaae_repairs.erl new file mode 100644 index 000000000..98669e3b3 --- /dev/null +++ b/src/riak_kv_tictacaae_repairs.erl @@ -0,0 +1,370 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_tictacaae_repairs: functions for tictac aae prompted repairs +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + + +%% @doc Various functions that are useful for repairing entropy via tictac aae +-module(riak_kv_tictacaae_repairs). + +-export([prompt_tictac_exchange/7, log_tictac_result/4]). + +-define(EXCHANGE_PAUSE_MS, 1000). +-define(AAE_MAX_RESULTS, 128). +-define(AAE_RANGE_BOOST, 2). +-define(MEGA, 1000000). +-define(SCAN_TIMEOUT_MS, 120000). +-define(MIN_REPAIRTIME_MS, 10000). +-define(MIN_REPAIRPAUSE_MS, 10). + +-type repair_list() :: + list({riak_object:bucket(), + pos_integer(), + list(riak_object:key()), + list(erlang:timestamp())}). + +-type keyclock_list() :: + list({{riak_object:bucket(), riak_object:key()}, + {vclock:vclock(), vclock:vclock()}}). + + +%% =================================================================== +%% Public API +%% =================================================================== + + +-spec prompt_tictac_exchange({riak_core_ring:partition_id(), node()}, + {riak_core_ring:partition_id(), node()}, + {non_neg_integer(), pos_integer()}, + pos_integer(), pos_integer(), + fun((term()) -> ok), + aae_exchange:filters()) -> ok. +prompt_tictac_exchange(LocalVnode, RemoteVnode, IndexN, + ScanTimeout, LoopCount, + ReplyFun, Filter) -> + ExchangePause = + app_helper:get_env(riak_kv, + tictacaae_exchangepause, + ?EXCHANGE_PAUSE_MS), + RangeBoost = + case Filter of + none -> + 1; + _ -> + app_helper:get_env(riak_kv, + tictacaae_rangeboost, + ?AAE_RANGE_BOOST) + end, + MaxResults = + case app_helper:get_env(riak_kv, tictacaae_maxresults) of + MR when is_integer(MR) -> + MR * RangeBoost; + _ -> + ?AAE_MAX_RESULTS * RangeBoost + end, + ExchangeOptions = + [{scan_timeout, ScanTimeout}, + {transition_pause_ms, ExchangePause}, + {purpose, kv_aae}, + {max_results, MaxResults}], + + BlueList = + [{riak_kv_vnode:aae_send(LocalVnode), [IndexN]}], + PinkList = + [{riak_kv_vnode:aae_send(RemoteVnode), [IndexN]}], + PromptRehash = Filter == none, + RepairFun = + prompt_readrepair([LocalVnode, RemoteVnode], + IndexN, + MaxResults, + LoopCount, + PromptRehash, + os:timestamp()), + {ok, _AAEPid, AAExid} = + aae_exchange:start(full, + BlueList, + PinkList, + RepairFun, + ReplyFun, + Filter, + ExchangeOptions), + _ = + lager:debug("Exchange prompted with exchange_id=~s between ~w and ~w", + [AAExid, LocalVnode, RemoteVnode]), + ok. + + +-spec log_tictac_result( + {root_compare|branch_compare|clock_compare| + error|timeout|not_supported, non_neg_integer()}, + bucket|modtime|exchange, + non_neg_integer()|initial, + riak_core_ring:partition_id()) -> ok. +log_tictac_result(ExchangeResult, FilterType, LoopCount, Index) -> + PotentialRepairs = + case element(2, ExchangeResult) of + N when is_integer(N), N > 0 -> + riak_kv_stat:update({tictac_aae, FilterType, N}), + N; + _ -> + 0 + end, + ExchangeState = + element(1, ExchangeResult), + case expected_aae_state(ExchangeState) of + true -> + riak_kv_stat:update({tictac_aae, ExchangeState}); + _ -> + ok + end, + case ExchangeState of + PositiveState + when PositiveState == root_compare; + PositiveState == branch_compare -> + ok; + ExchangeState -> + lager:info("Tictac AAE exchange for partition=~w " ++ + "pending_state=~w filter_type=~w loop_count=~w " ++ + "potential_repairs=~w", + [Index, + ExchangeState, FilterType, LoopCount, + PotentialRepairs]) + end, + ok. + + +%% =================================================================== +%% Utility functions +%% =================================================================== + +expected_aae_state(ExchangeState) -> + lists:member(ExchangeState, + [root_compare, branch_compare, clock_compare, + error, timeout, not_supported]). + + +-spec prompt_readrepair( + [{riak_core_ring:partition_id(), node()}], + {non_neg_integer(), pos_integer()}, + non_neg_integer(), + non_neg_integer(), + boolean(), + erlang:timestamp()) -> + fun((keyclock_list()) -> ok). +prompt_readrepair(VnodeList, IndexN, MaxResults, + LoopCount, Rehash, StartTime) -> + prompt_readrepair(VnodeList, + IndexN, + MaxResults, + LoopCount, + StartTime, + Rehash, + app_helper:get_env(riak_kv, log_readrepair, false)). + +prompt_readrepair(VnodeList, IndexN, MaxResults, + LoopCount, StartTime, Rehash, LogRepair) -> + {ok, C} = riak:local_client(), + FetchFun = + fun({{B, K}, {_BlueClock, _PinkClock}}) -> + case riak_kv_util:consistent_object(B) of + true -> + riak_kv_exchange_fsm:repair_consistent({B, K}); + false -> + riak_client:get(B, K, C) + end + end, + LogFun = + fun({{B, K}, {BlueClock, PinkClock}}) -> + lager:info( + "Prompted read repair Bucket=~p Key=~p Clocks ~w ~w", + [B, K, BlueClock, PinkClock]) + end, + fun(RepairList) -> + SW = os:timestamp(), + RepairCount = length(RepairList), + lager:info("Repairing key_count=~w between ~w", + [RepairCount, VnodeList]), + Pause = + max(?MIN_REPAIRPAUSE_MS, + ?MIN_REPAIRTIME_MS div max(1, RepairCount)), + RehashFun = + fun({{B, K}, {_BlueClock, _PinkClock}}) -> + timer:sleep(Pause), + riak_kv_vnode:rehash(VnodeList, B, K) + end, + lists:foreach(FetchFun, RepairList), + case Rehash of + true -> + lists:foreach(RehashFun, RepairList); + _ -> + ok + end, + case LogRepair of + true -> + lists:foreach(LogFun, RepairList); + false -> + ok + end, + EndTime = os:timestamp(), + lager:info("Repaired key_count=~w " ++ + "in repair_time=~w ms with pause_time=~w ms " ++ + "total process_time=~w ms", + [RepairCount, + timer:now_diff(EndTime, SW) div 1000, + RepairCount * Pause, + timer:now_diff(EndTime, StartTime) div 1000]), + case LoopCount of + LoopCount when LoopCount > 0 -> + case analyse_repairs(RepairList, MaxResults) of + {false, none} -> + lager:info("Repair cycle type=false at LoopCount=~w", + [LoopCount]); + {FilterType, Filter} -> + lager:info("Repair cycle type=~p at LoopCount=~w", + [FilterType, LoopCount]), + [LocalVnode, RemoteVnode] = VnodeList, + ReplyFun = + fun(ExchangeResult) -> + log_tictac_result( + ExchangeResult, + FilterType, + LoopCount, + element(1, IndexN)) + end, + prompt_tictac_exchange( + LocalVnode, RemoteVnode, IndexN, + ?SCAN_TIMEOUT_MS, LoopCount - 1, + ReplyFun, Filter) + end; + LoopCount -> + lager:info("Repair cycle type=complete at LoopCount=~w", + [LoopCount]) + end + end. + +%% @doc +%% Is there a majority of repairs which belong to a particular bucket. +%% In which case we should re-run the aae_exchange, but this time filter +%% the fetch clocks to look only in the relevant Bucket and KeyRange and +%% Last Modified range discovered for this bucket. +%% If there is no majority bucket, but there are more than half MaxResults +%% returned, then re-run with only a last modified range as a filter. +%% With the filters applied, one can reasonable expect the resulting +%% fetch_clock queries to be substantially faster (unless there happens to +%% be no majority bucket, and a low last_modified time) +-spec analyse_repairs(keyclock_list(), non_neg_integer()) -> + {false|bucket|modtime, aae_exchange:filters()}. +analyse_repairs(KeyClockList, MaxRepairs) -> + RepairList = lists:foldl(fun analyse_repair/2, [], KeyClockList), + EnableKeyRange = + app_helper:get_env(riak_kv, tictacaae_enablekeyrange, false), + ClusterCapable = + riak_core_capability:get({riak_kv, tictacaae_prompted_repairs}, false), + analyse_repairs(RepairList, MaxRepairs, EnableKeyRange, ClusterCapable). + +-spec analyse_repairs(repair_list(), non_neg_integer(), boolean(), boolean()) + -> {false|bucket|modtime, aae_exchange:filters()}. +analyse_repairs(_RepairList, _MaxRepairs, _EnableKeyRange, false) -> + {false, none}; +analyse_repairs(RepairList, MaxRepairs, EnableKeyRange, _) -> + case lists:reverse(lists:keysort(2, RepairList)) of + [Candidate|_Rest] -> + Threshold = MaxRepairs div 3, + case Candidate of + {B, CandCount, KL, MTL} when CandCount > (2 * Threshold) -> + [FirstKey|RestKeys] = lists:sort(KL), + LastKey = lists:last(RestKeys), + KeyRange = + case EnableKeyRange of + true -> {FirstKey, LastKey}; + false -> all + end, + {bucket, + {filter, + B, KeyRange, large, all, + get_modified_range(MTL), + pre_hash}}; + _ -> + FoldFun = fun({_B, _C, _KL, MTL}, Acc) -> Acc ++ MTL end, + MTL = lists:sort(lists:foldl(FoldFun, [], RepairList)), + case length(MTL) of + CandCount when CandCount > Threshold -> + {modtime, + {filter, + all, all, large, all, + get_modified_range(MTL), + pre_hash}}; + _ -> + {false, none} + end + end; + [] -> + {false, none} + end. + + +-spec get_modified_range(list(calendar:datetime())) + -> {pos_integer(), pos_integer()}. +get_modified_range(ModifiedDateTimeList) -> + [FirstDate|RestDates] = lists:sort(ModifiedDateTimeList), + HighDate = lists:last(RestDates), + EpochTime = + calendar:datetime_to_gregorian_seconds({{1970,1,1},{0,0,0}}), + LowTS = + calendar:datetime_to_gregorian_seconds(FirstDate) - EpochTime, + HighTS = + calendar:datetime_to_gregorian_seconds(HighDate) - EpochTime, + {LowTS - 1, HighTS + 1}. + +%% @doc Look at the last modified time on each clock +%% Further repairs will only on those prompted in the clean case +%% when the last modified time is different between the clocks - i.e. +%% one clock has an identifiably higher last modified time which can be +%% considered a likely timestamp of an incident +-spec analyse_repair({{riak_object:bucket(), riak_object:key()}, + {vclock:vclock(), vclock:vclock()}}, + repair_list()) -> repair_list(). +analyse_repair({{B, K}, {BlueClock, PinkClock}}, ByBucketAcc) -> + BlueTime = last_modified(BlueClock), + PinkTime = last_modified(PinkClock), + ModTime = + if + BlueTime > PinkTime -> + BlueTime; + PinkTime > BlueTime -> + PinkTime; + true -> + false + end, + case ModTime of + false -> + ByBucketAcc; + ModTime -> + case lists:keytake(B, 1, ByBucketAcc) of + false -> + [{B, 1, [K], [ModTime]}|ByBucketAcc]; + {value, {B, C, KL, TL}, RemBucketAcc} -> + [{B, C + 1, [K|KL], [ModTime|TL]}|RemBucketAcc] + end + end. + +last_modified(none) -> + none; +last_modified(VC) -> + vclock:last_modified(VC). + diff --git a/src/riak_kv_util.erl b/src/riak_kv_util.erl index de903430e..fbe201d49 100644 --- a/src/riak_kv_util.erl +++ b/src/riak_kv_util.erl @@ -26,30 +26,31 @@ -export([is_x_deleted/1, - obj_not_deleted/1, - try_cast/3, - fallback/4, - expand_value/3, - expand_rw_value/4, - expand_sync_on_write/2, - normalize_rw_value/2, - make_request/2, - get_index_n/1, - preflist_siblings/1, - fix_incorrect_index_entries/1, - fix_incorrect_index_entries/0, - responsible_preflists/1, - responsible_preflists/2, - responsible_preflists/3, - make_vtag/1, - puts_active/0, - exact_puts_active/0, - gets_active/0, - consistent_object/1, - get_write_once/1, - overload_reply/1, - get_backend_config/3, - is_modfun_allowed/2]). + obj_not_deleted/1, + try_cast/3, + fallback/4, + expand_value/3, + expand_rw_value/4, + expand_sync_on_write/2, + normalize_rw_value/2, + make_request/2, + get_index_n/1, + preflist_siblings/1, + fix_incorrect_index_entries/1, + fix_incorrect_index_entries/0, + responsible_preflists/1, + responsible_preflists/2, + responsible_preflists/3, + make_vtag/1, + puts_active/0, + exact_puts_active/0, + gets_active/0, + consistent_object/1, + get_write_once/1, + overload_reply/1, + get_backend_config/3, + is_modfun_allowed/2, + shuffle_list/1]). -export([report_hashtree_tokens/0, reset_hashtree_tokens/2]). -include_lib("riak_kv_vnode.hrl"). @@ -499,6 +500,12 @@ is_modfun_allowed(Mod, _Fun) -> end. +-spec shuffle_list(list()) -> list(). +shuffle_list(L) -> + lists:map(fun({_R, X0}) -> X0 end, + lists:keysort(1, lists:map(fun(X) -> {rand:uniform(), X} end, L))). + + %% =================================================================== %% EUnit tests %% =================================================================== diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 7d680e1d6..7bf14ab46 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -214,7 +214,7 @@ %% Queue time in ms to prompt a sync ping. -define(AAE_SKIP_COUNT, 10). -define(AAE_LOADING_WAIT, 5000). --define(EXCHANGE_PAUSE_MS, 1000). +-define(AAE_REPAIR_LOOPS, 8). -define(AF1_QUEUE, riak_core_node_worker_pool:af1()). %% Assured Forwarding - pool 1 @@ -385,7 +385,8 @@ determine_aaedata_root(Partition) -> preflistfun(Bucket, Key) -> riak_kv_util:get_index_n({Bucket, Key}). --spec tictac_returnfun(partition(), store|trees|exchange) -> fun(). +-spec tictac_returnfun(partition(), store|trees|exchange) -> + fun((term()) -> ok). %% @doc %% Function to be passed to return a response once an operation is complete tictac_returnfun(Partition, exchange) -> @@ -932,7 +933,32 @@ handle_command({aae, AAERequest, IndexNs, Colour}, Sender, State) -> IndexNs, SegmentIDs, ReturnFun, + IndexNFun); + {fetch_clocks, SegmentIDs, MR} -> + IndexNFun = + fun(B, K) -> riak_kv_util:get_index_n({B, K}) end, + ModifiedLimiter = aaefold_setmodifiedlimiter(MR), + aae_controller:aae_fetchclocks(Cntrl, + IndexNs, + all, + SegmentIDs, + ModifiedLimiter, + ReturnFun, + IndexNFun); + {fetch_clocks_range, Bucket, KR, SF, MR} -> + IndexNFun = + fun(B, K) -> riak_kv_util:get_index_n({B, K}) end, + RangeLimiter = aaefold_setrangelimiter(Bucket, KR), + ModifiedLimiter = aaefold_setmodifiedlimiter(MR), + SegmentIDs = element(2, SF), + aae_controller:aae_fetchclocks(Cntrl, + IndexNs, + RangeLimiter, + SegmentIDs, + ModifiedLimiter, + ReturnFun, IndexNFun) + end end, {noreply, State}; @@ -1113,23 +1139,18 @@ handle_command({rebuild_complete, trees, _ST}, _Sender, State) -> {noreply, State#state{tictac_rebuilding = false}} end; -handle_command({exchange_complete, {EndState, DeltaCount}, ST}, +handle_command({exchange_complete, ExchangeResult, ST}, _Sender, State) -> %% Record how many deltas were seen in the exchange %% Revert the skip_count to 0 so that exchanges can be made at the next %% prompt. XC = State#state.tictac_exchangecount + 1, - DC = State#state.tictac_deltacount + DeltaCount, + DC = State#state.tictac_deltacount + element(2, ExchangeResult), XT = State#state.tictac_exchangetime + timer:now_diff(os:timestamp(), ST), - case EndState of - PositiveState - when PositiveState == root_compare; - PositiveState == branch_compare -> - ok; - UnwelcomeState -> - lager:info("Tictac AAE exchange for partition=~w pending_state=~w", - [State#state.idx, UnwelcomeState]) - end, + riak_kv_tictacaae_repairs:log_tictac_result(ExchangeResult, + exchange, + initial, + State#state.idx), {noreply, State#state{tictac_exchangecount = XC, tictac_deltacount = DC, tictac_exchangetime = XT, @@ -1197,7 +1218,8 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> State#state.tictac_deltacount, State#state.tictac_exchangetime div (1000 * 1000), LoopDuration div (1000 * 1000)]), - {noreply, State#state{tictac_exchangequeue = Exchanges, + {noreply, State#state{tictac_exchangequeue = + riak_kv_util:shuffle_list(Exchanges), tictac_exchangecount = 0, tictac_deltacount = 0, tictac_exchangetime = 0, @@ -1224,38 +1246,20 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> lists:keyfind(Remote, 1, PL)} of {{Local, LN}, {Remote, RN}} -> IndexN = {DocIdx, N}, - BlueList = - [{riak_kv_vnode:aae_send({Local, LN}), [IndexN]}], - PinkList = - [{riak_kv_vnode:aae_send({Remote, RN}), [IndexN]}], - RepairFun = - riak_kv_get_fsm:prompt_readrepair([{Local, LN}, - {Remote, RN}]), - ReplyFun = tictac_returnfun(Idx, exchange), ScanTimeout = ?AAE_SKIP_COUNT * XTick, - ExchangePause = - app_helper:get_env(riak_kv, - tictacaae_exchangepause, - ?EXCHANGE_PAUSE_MS), - BaseOptions = - [{scan_timeout, ScanTimeout}, - {transition_pause_ms, ExchangePause}, - {purpose, kv_aae}], - ExchangeOptions = + LoopCount = case app_helper:get_env(riak_kv, - tictacaae_maxresults) of - MR when is_integer(MR) -> - BaseOptions ++ [{max_results, MR}]; + tictacaae_repairloops) of + LC when is_integer(LC) -> + LC; _ -> - BaseOptions + ?AAE_REPAIR_LOOPS end, - aae_exchange:start(full, - BlueList, - PinkList, - RepairFun, - ReplyFun, - none, - ExchangeOptions), + ReplyFun = tictac_returnfun(Idx, exchange), + riak_kv_tictacaae_repairs:prompt_tictac_exchange( + {Local, LN}, {Remote, RN}, IndexN, + ScanTimeout, LoopCount, ReplyFun, none), + ?AAE_SKIP_COUNT; _ -> lager:warning("Proposed exchange between ~w and ~w " ++ @@ -2130,13 +2134,19 @@ aaefold_setrangelimiter(Bucket, all) -> aaefold_setrangelimiter(Bucket, {StartKey, EndKey}) -> {key_range, Bucket, StartKey, EndKey}. --spec aaefold_setmodifiedlimiter({date, pos_integer(), pos_integer()} | all) +-spec aaefold_setmodifiedlimiter( + {date, pos_integer(), pos_integer()} | + all | + aae_keystore:modified_limiter()) -> aae_keystore:modified_limiter(). %% @doc %% Convert the format of the date limiter to one compatible with the aae store aaefold_setmodifiedlimiter({date, LowModDate, HighModDate}) when is_integer(LowModDate), is_integer(HighModDate) -> {LowModDate, HighModDate}; +aaefold_setmodifiedlimiter({LowModDate, HighModDate}) + when is_integer(LowModDate), is_integer(HighModDate) -> + {LowModDate, HighModDate}; aaefold_setmodifiedlimiter(_) -> all.