From 4de5ce3021cbba3a38fac5d920e3aa546e42c284 Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Sun, 21 Nov 2021 06:08:31 -0700 Subject: [PATCH] Add reward type to schema This removes the rewards_pkey constraint since reward_type adds duplicate account/gateway/block constraint violations. There are other indices for gateway/account so this should hopefully not affect rewards query performance --- migrations/1637295492-rewards_type.sql | 27 ++++++ src/be_db_reward.erl | 129 +++++++++++++++++++------ 2 files changed, 124 insertions(+), 32 deletions(-) create mode 100644 migrations/1637295492-rewards_type.sql diff --git a/migrations/1637295492-rewards_type.sql b/migrations/1637295492-rewards_type.sql new file mode 100644 index 00000000..99923b67 --- /dev/null +++ b/migrations/1637295492-rewards_type.sql @@ -0,0 +1,27 @@ +-- migrations/1637295492-rewards_type.sql +-- :up + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'reward_type') THEN + create type reward_type as ENUM ( + 'poc_challenger', + 'poc_challengee', + 'poc_witness', + 'dc_rewards', + 'consensus_rewards', + 'securities_reward' + ); + END IF; +END +$$; + +alter table rewards add column type reward_type; + +alter table rewards drop constraint if exists rewards_pkey; + +-- :down + +alter table rewards drop column type; + + diff --git a/src/be_db_reward.erl b/src/be_db_reward.erl index 13d567e0..b648bc1e 100644 --- a/src/be_db_reward.erl +++ b/src/be_db_reward.erl @@ -15,22 +15,34 @@ -record(state, {}). -define(S_INSERT_REWARD, "insert_reward"). +-define(S_INSERT_REWARD_10, "insert_reward_10"). +-define(S_INSERT_REWARD_100, "insert_reward_100"). %% %% be_db_worker %% prepare_conn(Conn) -> - {ok, S1} = epgsql:parse( - Conn, - ?S_INSERT_REWARD, - [ - "insert into rewards (block, transaction_hash, time, account, gateway, amount) ", - "values ($1, $2, $3, $4, $5, $6) " - ], - [] - ), - #{?S_INSERT_REWARD => S1}. + MkQueryFun = fun(Rows) -> + epgsql:parse( + Conn, + ?S_INSERT_REWARD ++ "_" ++ integer_to_list(Rows), + [ + "insert into rewards (block, transaction_hash, time, account, gateway, amount, type) ", + "values ", + be_utils:make_values_list(7, Rows) + ], + [] + ) + end, + {ok, S1} = MkQueryFun(1), + {ok, S10} = MkQueryFun(10), + {ok, S100} = MkQueryFun(100), + #{ + ?S_INSERT_REWARD => S1, + ?S_INSERT_REWARD_10 => S10, + ?S_INSERT_REWARD_100 => S100 + }. %% %% be_block_handler @@ -50,8 +62,10 @@ load_block(Conn, _Hash, Block, _Sync, _Ledger, State = #state{}) -> end, blockchain_block_v1:transactions(Block) ), - Queries = lists:foldl( - fun(T, TAcc) -> + + StartMkQuery = erlang:monotonic_time(millisecond), + Queries = lists:flatmap( + fun(T) -> TxnHash = ?BIN_TO_B64(blockchain_txn:hash(T)), RewardMap = collect_rewards( blockchain_txn:type(T), @@ -59,28 +73,65 @@ load_block(Conn, _Hash, Block, _Sync, _Ledger, State = #state{}) -> T, #{} ), - lists:foldl( - fun(Entry, RAcc) -> + lists:map( + fun(Entry) -> q_insert_reward( BlockHeight, TxnHash, BlockTime, - Entry, - RAcc + Entry ) end, - TAcc, maps:to_list(RewardMap) ) end, - [], Txns ), - ok = ?BATCH_QUERY(Conn, Queries), + be_db_follower:maybe_log_duration(db_reward_query_make, StartMkQuery), + + StartQuery = erlang:monotonic_time(millisecond), + execute_queries(Conn, Queries), + be_db_follower:maybe_log_duration(db_reward_query_exec, StartQuery), + {ok, State}. +execute_queries(Conn, Queries) when length(Queries) > 100 -> + lists:foreach( + fun + (Q) when length(Q) == 100 -> + {ok, 100} = ?PREPARED_QUERY(Conn, ?S_INSERT_REWARD_100, lists:flatten(Q)); + (Q) -> + execute_queries(Conn, Q) + end, + be_utils:split_list(Queries, 100) + ); +execute_queries(Conn, Queries) when length(Queries) > 10 -> + lists:foreach( + fun + (Q) when length(Q) == 10 -> + {ok, 10} = ?PREPARED_QUERY(Conn, ?S_INSERT_REWARD_10, lists:flatten(Q)); + (Q) -> + execute_queries(Conn, Q) + end, + be_utils:split_list(Queries, 10) + ); +execute_queries(Conn, Queries) -> + ok = ?BATCH_QUERY(Conn, [{?S_INSERT_REWARD, I} || I <- Queries]). + +-type reward_type() :: + poc_challenger + | poc_challengee + | poc_witness + | dc_rewards + | consensus_rewards + | securities_reward. + -type reward_map() :: #{ - {Account :: libp2p_crypto:pubkey_bin(), Gateway :: libp2p_crypto:pubkey_bin() | undefined} => + { + Account :: libp2p_crypto:pubkey_bin(), + Gateway :: libp2p_crypto:pubkey_bin() | undefined, + Type :: reward_type() + } => Reward :: pos_integer() }. @@ -130,32 +181,32 @@ collect_rewards(blockchain_txn_rewards_v2, Chain, Txn, RewardMap) -> collect_v2_rewards(Rewards, Ledger, RewardMap) -> maps:fold( fun - ({owner, _Type, O}, Amt, Acc) -> + ({owner, Type, O}, Amt, Acc) -> maps:update_with( - {O, <<>>}, + {O, <<>>, Type}, fun(Balance) -> Balance + Amt end, Amt, Acc ); - ({gateway, _Type, G}, Amt, Acc) -> + ({gateway, Type, G}, Amt, Acc) -> case blockchain_ledger_v1:find_gateway_owner(G, Ledger) of {error, _Error} -> Acc; {ok, GwOwner} -> maps:update_with( - {GwOwner, G}, + {GwOwner, G, Type}, fun(Balance) -> Balance + Amt end, Amt, Acc ) end; - ({validator, _Type, V}, Amt, Acc) -> + ({validator, Type, V}, Amt, Acc) -> case blockchain_ledger_v1:get_validator(V, Ledger) of {error, _Error} -> Acc; {ok, Validator} -> maps:update_with( - {blockchain_ledger_validator_v1:owner_address(Validator), V}, + {blockchain_ledger_validator_v1:owner_address(Validator), V, Type}, fun(Balance) -> Balance + Amt end, Amt, Acc @@ -170,20 +221,34 @@ collect_v2_rewards(Rewards, Ledger, RewardMap) -> collect_v1_rewards([], RewardMap) -> RewardMap; collect_v1_rewards([Reward | Rest], RewardMap) -> - Key = {blockchain_txn_reward_v1:account(Reward), blockchain_txn_reward_v1:gateway(Reward)}, + Key = { + blockchain_txn_reward_v1:account(Reward), + blockchain_txn_reward_v1:gateway(Reward), + blockchain_txn_reward_v1:type(Reward) + }, Amount = blockchain_txn_reward_v1:amount(Reward), collect_v1_rewards( Rest, maps:update_with(Key, fun(Balance) -> Balance + Amount end, Amount, RewardMap) ). -q_insert_reward(BlockHeight, TxnHash, BlockTime, {{Account, Gateway}, Amount}, Queries) -> - Params = [ +%% ranslate from a chain reward type to a database reward. As of this writing the +%% set of names for the database reward_type matches the rewards_v2 rewards types. +-spec to_reward_type(atom()) -> reward_type(). +to_reward_type(securities) -> securities_reward; +to_reward_type(data_credits) -> dc_rewards; +to_reward_type(poc_challengees) -> poc_challengee; +to_reward_type(poc_challengers) -> poc_challenger; +to_reward_type(poc_witnesses) -> poc_witness; +to_reward_type(consensus) -> consensus_rewards. + +q_insert_reward(BlockHeight, TxnHash, BlockTime, {{Account, Gateway, RewardType}, Amount}) -> + [ BlockHeight, TxnHash, BlockTime, ?BIN_TO_B58(Account), ?BIN_TO_B58(Gateway), - Amount - ], - [{?S_INSERT_REWARD, Params} | Queries]. + Amount, + to_reward_type(RewardType) + ].