Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tictac_aae test for prompted repairs #1360

Merged
merged 5 commits into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 132 additions & 27 deletions tests/verify_tictac_aae.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@

% I would hope this would come from the testing framework some day
% to use the test in small and large scenarios.
-define(DEFAULT_RING_SIZE, 8).
-define(DEFAULT_RING_SIZE, 16).
-define(AAE_THROTTLE_LIMITS, [{-1, 0}, {100, 10}]).
-define(CFG_NOREBUILD(PrimaryOnly, InitialSkip),
-define(CFG_NOREBUILD(PrimaryOnly, InitialSkip, MaxResults, ExTick, KR),
[{riak_kv,
[
% Speedy AAE configuration
Expand All @@ -56,10 +56,13 @@
% store
{tictacaae_rebuildwait, 4},
{tictacaae_rebuilddelay, 3600},
{tictacaae_exchangetick, 5 * 1000}, % 5 seconds
{tictacaae_exchangetick, ExTick * 1000},
{tictacaae_rebuildtick, 3600000}, % don't tick for an hour!
{tictacaae_primaryonly, PrimaryOnly},
{tictacaae_stepinitialtick, InitialSkip}
{tictacaae_stepinitialtick, InitialSkip},
{tictacaae_maxresults, MaxResults},
{tictacaae_repairloops, 4},
{tictacaae_enablekeyrange, KR}
]},
{riak_core,
[
Expand All @@ -81,28 +84,34 @@
{tictacaae_rebuildtick, 15 * 1000}, % Check for rebuilds!
{max_aae_queue_time, 0},
{tictacaae_stepinitialtick, false},
{log_readrepair, true}
{log_readrepair, true},
{tictacaae_enablekeyrange, true}
]},
{riak_core,
[
{ring_creation_size, ?DEFAULT_RING_SIZE}
]}]
).
-define(NUM_NODES, 3).
-define(NUM_KEYS, 1000).
-define(NUM_NODES, 4).
-define(NUM_KEYS, 2000).
-define(BUCKET, <<"test_bucket">>).
-define(ALT_BUCKET1, <<"alt_bucket1">>).
-define(ALT_BUCKET2, <<"alt_bucket2">>).
-define(ALT_BUCKET3, <<"alt_bucket3">>).
-define(ALT_BUCKET4, <<"alt_bucket4">>).
-define(N_VAL, 3).
-define(STATS_DELAY, 1000).

confirm() ->

lager:info("Test with no rebuilds - and no startup skip"),
Nodes1 = rt:build_cluster(?NUM_NODES, ?CFG_NOREBUILD(true, false)),
ok = verify_aae_norebuild(Nodes1),
lager:info("Test with no rebuilds - and no startup skip and no key ranges"),
Nodes1 = rt:build_cluster(?NUM_NODES, ?CFG_NOREBUILD(true, false, 64, 15, false)),
ok = verify_aae_norebuild(Nodes1, true),
rt:clean_cluster(Nodes1),

lager:info("Test with no rebuilds - but with startup skip"),
Nodes2 = rt:build_cluster(?NUM_NODES, ?CFG_NOREBUILD(true, true)),
ok = verify_aae_norebuild(Nodes2),
lager:info("Test with no rebuilds - but with startup skip and key ranges"),
Nodes2 = rt:build_cluster(?NUM_NODES, ?CFG_NOREBUILD(true, true, 64, 10, true)),
ok = verify_aae_norebuild(Nodes2, false),
rt:clean_cluster(Nodes2),

lager:info("Test with rebuilds"),
Expand All @@ -111,48 +120,138 @@ confirm() ->
rt:clean_cluster(Nodes3),

lager:info("Test with no rebuilds - and AAE on fallbacks"),
Nodes4 = rt:build_cluster(?NUM_NODES, ?CFG_NOREBUILD(false, false)),
ok = verify_aae_norebuild(Nodes4),
pass.

Nodes4 = rt:build_cluster(?NUM_NODES, ?CFG_NOREBUILD(false, false, 128, 10, false)),
ok = verify_aae_norebuild(Nodes4, false),
rt:clean_cluster(Nodes4),

OldVsn = previous,
lager:info("Building previous version cluster ~p", [OldVsn]),
[Nodes5] =
rt:build_clusters([{?NUM_NODES, OldVsn, ?CFG_NOREBUILD(true, false, 64, 15, false)}]),

[NodeToUpgrade|_Rest] = Nodes5,

{riak_kv, _, RiakVer} =
lists:keyfind(riak_kv,
1,
rpc:call(NodeToUpgrade, application, loaded_applications, [])),

case RiakVer of
RiakVer when RiakVer >= "riak_kv-3.0.9" ->
lager:info("Skipping upgrade test - previous ~s > 3.0.8", [RiakVer]),
pass;
RiakVer ->
lager:info("Running upgrade test with previous version ~s", [RiakVer]),
rt:upgrade(NodeToUpgrade, current),
rt:wait_for_service(NodeToUpgrade, riak_kv),

?assertNot(check_capability(NodeToUpgrade)),

ok = verify_aae_norebuild(Nodes5, false),

CheckFun =
fun(StatName) ->
proplists:get_value(StatName,
verify_riak_stats:get_stats(NodeToUpgrade, ?STATS_DELAY))
end,
?assertEqual(0, CheckFun(<<"tictacaae_bucket_total">>)),
?assertEqual(0, CheckFun(<<"tictacaae_modtime_total">>)),
?assertNotEqual(0, CheckFun(<<"tictacaae_exchange_total">>)),

pass
end.


check_capability(Node) ->
rpc:call(Node,
riak_core_capability,
get,
[{riak_kv, tictacaae_prompted_repairs}, false]).

verify_aae_norebuild(Nodes) ->
verify_aae_norebuild(Nodes, false).

verify_aae_norebuild(Nodes, CheckTypeStats) ->
lager:info("Tictac AAE tests without rebuilding trees"),
Node1 = hd(Nodes),

% Recovery without tree rebuilds

% Test recovery from too few replicas written
KV1 = test_data(1, 1000),
KV1 = test_data(1, ?NUM_KEYS),
test_less_than_n_writes(Node1, KV1),

% Test recovery when replicas are different
RepSN = <<"read_repairs_total">>,
Repairs =
lists:sum(
lists:map(
fun(N) ->
proplists:get_value(RepSN,
verify_riak_stats:get_stats(N, ?STATS_DELAY))
end,
Nodes)),
?assertMatch(?NUM_KEYS, Repairs),

KV2 = [{K, <<V/binary, "a">>} || {K, V} <- KV1],
lager:info("Writing additional n=1 data to require more repairs"),
write_data(Node1, KV2, [{n_val, 1}], ?ALT_BUCKET1),
write_data(Node1, KV2, [{n_val, 1}], ?ALT_BUCKET2),
write_data(Node1, KV2, [{n_val, 1}], ?ALT_BUCKET3),
write_data(Node1, KV2, [{n_val, 1}], ?ALT_BUCKET4),
lager:info("Updating data on n=1"),
test_less_than_n_mods(Node1, KV2),
lager:info("Verifying alternative bucket data"),
verify_data(Node1, KV2, ?ALT_BUCKET1),
verify_data(Node1, KV2, ?ALT_BUCKET2),
verify_data(Node1, KV2, ?ALT_BUCKET3),
verify_data(Node1, KV2, ?ALT_BUCKET4),

case CheckTypeStats of
true ->
B_SN = <<"tictacaae_bucket_total">>,
MT_SN = <<"tictacaae_modtime_total">>,
E_SN = <<"tictacaae_exchange_total">>,
VerifyFun =
fun(StatName) ->
fun(Node) ->
V = proplists:get_value(StatName,
verify_riak_stats:get_stats(Node, ?STATS_DELAY)),
?assertNotEqual(0, V)
end
end,

lists:foreach(VerifyFun(B_SN), Nodes),
lists:foreach(VerifyFun(MT_SN), Nodes),
lists:foreach(VerifyFun(E_SN), Nodes),
ok;
false ->
ok
end,

ok.

verify_aae_rebuild(Nodes) ->
lager:info("Tictac AAE tests with rebuilding trees"),
Node1 = hd(Nodes),

% Test recovery from too few replicas written
KV1 = test_data(1, 1000),
KV1 = test_data(1, ?NUM_KEYS),
test_less_than_n_writes(Node1, KV1),

% Test recovery when replicas are different
KV2 = [{K, <<V/binary, "a">>} || {K, V} <- KV1],
test_less_than_n_mods(Node1, KV2),

% Test recovery from too few replicas written
KV3 = test_data(1001, 2000),
KV3 = test_data(?NUM_KEYS + 1, 2 * ?NUM_KEYS),
test_less_than_n_writes(Node1, KV3),

% Test recovery when replicas are different
KV4 = [{K, <<V/binary, "a">>} || {K, V} <- KV3],
test_less_than_n_mods(Node1, KV4),

lager:info("Writing 1000 objects"),
KV5 = test_data(2001, 3000),
lager:info("Writing ~w objects", [?NUM_KEYS]),
KV5 = test_data(1 + 2 * ?NUM_KEYS, 3 * ?NUM_KEYS),
write_data(Node1, KV5),

% Test recovery from single partition loss.
Expand Down Expand Up @@ -188,7 +287,7 @@ get_preflist(Node, B, K) ->
Pl.

to_key(N) ->
list_to_binary(io_lib:format("K~4..0B", [N])).
list_to_binary(io_lib:format("K~8..0B", [N])).

test_data(Start, End) ->
Keys = [to_key(N) || N <- lists:seq(Start, End)],
Expand All @@ -198,14 +297,17 @@ write_data(Node, KVs) ->
write_data(Node, KVs, []).

write_data(Node, KVs, Opts) ->
write_data(Node, KVs, Opts, ?BUCKET).

write_data(Node, KVs, Opts, Bucket) ->
PB = rt:pbc(Node),
[begin
O =
case riakc_pb_socket:get(PB, ?BUCKET, K) of
case riakc_pb_socket:get(PB, Bucket, K) of
{ok, Prev} ->
riakc_obj:update_value(Prev, V);
_ ->
riakc_obj:new(?BUCKET, K, V)
riakc_obj:new(Bucket, K, V)
end,
?assertMatch(ok, riakc_pb_socket:put(PB, O, Opts))
end || {K, V} <- KVs],
Expand All @@ -214,11 +316,14 @@ write_data(Node, KVs, Opts) ->

% @doc Verifies that the data is eventually restored to the expected set.
verify_data(Node, KeyValues) ->
verify_data(Node, KeyValues, ?BUCKET).

verify_data(Node, KeyValues, Bucket) ->
lager:info("Verify all replicas are eventually correct"),
PB = rt:pbc(Node),
CheckFun =
fun() ->
Matches = [verify_replicas(Node, ?BUCKET, K, V, ?N_VAL)
Matches = [verify_replicas(Node, Bucket, K, V, ?N_VAL)
|| {K, V} <- KeyValues],
CountTrues = fun(true, G) -> G+1; (false, G) -> G end,
NumGood = lists:foldl(CountTrues, 0, Matches),
Expand Down
Loading