Skip to content
This repository has been archived by the owner on Nov 5, 2023. It is now read-only.

Commit

Permalink
Add reward type to schema
Browse files Browse the repository at this point in the history
This removes the rewards_pkey constraint since reward_type adds duplicate account/gateway/block constraint violations. There are other indices for gateway/account so this should hopefully not affect rewards query performance
  • Loading branch information
madninja committed Jan 12, 2022
1 parent c043f4f commit 4de5ce3
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 32 deletions.
27 changes: 27 additions & 0 deletions migrations/1637295492-rewards_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- migrations/1637295492-rewards_type.sql
-- :up

DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'reward_type') THEN
create type reward_type as ENUM (
'poc_challenger',
'poc_challengee',
'poc_witness',
'dc_rewards',
'consensus_rewards',
'securities_reward'
);
END IF;
END
$$;

alter table rewards add column type reward_type;

alter table rewards drop constraint if exists rewards_pkey;

-- :down

alter table rewards drop column type;


129 changes: 97 additions & 32 deletions src/be_db_reward.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,34 @@
-record(state, {}).

-define(S_INSERT_REWARD, "insert_reward").
-define(S_INSERT_REWARD_10, "insert_reward_10").
-define(S_INSERT_REWARD_100, "insert_reward_100").

%%
%% be_db_worker
%%

prepare_conn(Conn) ->
{ok, S1} = epgsql:parse(
Conn,
?S_INSERT_REWARD,
[
"insert into rewards (block, transaction_hash, time, account, gateway, amount) ",
"values ($1, $2, $3, $4, $5, $6) "
],
[]
),
#{?S_INSERT_REWARD => S1}.
MkQueryFun = fun(Rows) ->
epgsql:parse(
Conn,
?S_INSERT_REWARD ++ "_" ++ integer_to_list(Rows),
[
"insert into rewards (block, transaction_hash, time, account, gateway, amount, type) ",
"values ",
be_utils:make_values_list(7, Rows)
],
[]
)
end,
{ok, S1} = MkQueryFun(1),
{ok, S10} = MkQueryFun(10),
{ok, S100} = MkQueryFun(100),
#{
?S_INSERT_REWARD => S1,
?S_INSERT_REWARD_10 => S10,
?S_INSERT_REWARD_100 => S100
}.

%%
%% be_block_handler
Expand All @@ -50,37 +62,76 @@ load_block(Conn, _Hash, Block, _Sync, _Ledger, State = #state{}) ->
end,
blockchain_block_v1:transactions(Block)
),
Queries = lists:foldl(
fun(T, TAcc) ->

StartMkQuery = erlang:monotonic_time(millisecond),
Queries = lists:flatmap(
fun(T) ->
TxnHash = ?BIN_TO_B64(blockchain_txn:hash(T)),
RewardMap = collect_rewards(
blockchain_txn:type(T),
blockchain_worker:blockchain(),
T,
#{}
),
lists:foldl(
fun(Entry, RAcc) ->
lists:map(
fun(Entry) ->
q_insert_reward(
BlockHeight,
TxnHash,
BlockTime,
Entry,
RAcc
Entry
)
end,
TAcc,
maps:to_list(RewardMap)
)
end,
[],
Txns
),
ok = ?BATCH_QUERY(Conn, Queries),
be_db_follower:maybe_log_duration(db_reward_query_make, StartMkQuery),

StartQuery = erlang:monotonic_time(millisecond),
execute_queries(Conn, Queries),
be_db_follower:maybe_log_duration(db_reward_query_exec, StartQuery),

{ok, State}.

execute_queries(Conn, Queries) when length(Queries) > 100 ->
lists:foreach(
fun
(Q) when length(Q) == 100 ->
{ok, 100} = ?PREPARED_QUERY(Conn, ?S_INSERT_REWARD_100, lists:flatten(Q));
(Q) ->
execute_queries(Conn, Q)
end,
be_utils:split_list(Queries, 100)
);
execute_queries(Conn, Queries) when length(Queries) > 10 ->
lists:foreach(
fun
(Q) when length(Q) == 10 ->
{ok, 10} = ?PREPARED_QUERY(Conn, ?S_INSERT_REWARD_10, lists:flatten(Q));
(Q) ->
execute_queries(Conn, Q)
end,
be_utils:split_list(Queries, 10)
);
execute_queries(Conn, Queries) ->
ok = ?BATCH_QUERY(Conn, [{?S_INSERT_REWARD, I} || I <- Queries]).

-type reward_type() ::
poc_challenger
| poc_challengee
| poc_witness
| dc_rewards
| consensus_rewards
| securities_reward.

-type reward_map() :: #{
{Account :: libp2p_crypto:pubkey_bin(), Gateway :: libp2p_crypto:pubkey_bin() | undefined} =>
{
Account :: libp2p_crypto:pubkey_bin(),
Gateway :: libp2p_crypto:pubkey_bin() | undefined,
Type :: reward_type()
} =>
Reward :: pos_integer()
}.

Expand Down Expand Up @@ -130,32 +181,32 @@ collect_rewards(blockchain_txn_rewards_v2, Chain, Txn, RewardMap) ->
collect_v2_rewards(Rewards, Ledger, RewardMap) ->
maps:fold(
fun
({owner, _Type, O}, Amt, Acc) ->
({owner, Type, O}, Amt, Acc) ->
maps:update_with(
{O, <<>>},
{O, <<>>, Type},
fun(Balance) -> Balance + Amt end,
Amt,
Acc
);
({gateway, _Type, G}, Amt, Acc) ->
({gateway, Type, G}, Amt, Acc) ->
case blockchain_ledger_v1:find_gateway_owner(G, Ledger) of
{error, _Error} ->
Acc;
{ok, GwOwner} ->
maps:update_with(
{GwOwner, G},
{GwOwner, G, Type},
fun(Balance) -> Balance + Amt end,
Amt,
Acc
)
end;
({validator, _Type, V}, Amt, Acc) ->
({validator, Type, V}, Amt, Acc) ->
case blockchain_ledger_v1:get_validator(V, Ledger) of
{error, _Error} ->
Acc;
{ok, Validator} ->
maps:update_with(
{blockchain_ledger_validator_v1:owner_address(Validator), V},
{blockchain_ledger_validator_v1:owner_address(Validator), V, Type},
fun(Balance) -> Balance + Amt end,
Amt,
Acc
Expand All @@ -170,20 +221,34 @@ collect_v2_rewards(Rewards, Ledger, RewardMap) ->
collect_v1_rewards([], RewardMap) ->
RewardMap;
collect_v1_rewards([Reward | Rest], RewardMap) ->
Key = {blockchain_txn_reward_v1:account(Reward), blockchain_txn_reward_v1:gateway(Reward)},
Key = {
blockchain_txn_reward_v1:account(Reward),
blockchain_txn_reward_v1:gateway(Reward),
blockchain_txn_reward_v1:type(Reward)
},
Amount = blockchain_txn_reward_v1:amount(Reward),
collect_v1_rewards(
Rest,
maps:update_with(Key, fun(Balance) -> Balance + Amount end, Amount, RewardMap)
).

q_insert_reward(BlockHeight, TxnHash, BlockTime, {{Account, Gateway}, Amount}, Queries) ->
Params = [
%% ranslate from a chain reward type to a database reward. As of this writing the
%% set of names for the database reward_type matches the rewards_v2 rewards types.
-spec to_reward_type(atom()) -> reward_type().
to_reward_type(securities) -> securities_reward;
to_reward_type(data_credits) -> dc_rewards;
to_reward_type(poc_challengees) -> poc_challengee;
to_reward_type(poc_challengers) -> poc_challenger;
to_reward_type(poc_witnesses) -> poc_witness;
to_reward_type(consensus) -> consensus_rewards.

q_insert_reward(BlockHeight, TxnHash, BlockTime, {{Account, Gateway, RewardType}, Amount}) ->
[
BlockHeight,
TxnHash,
BlockTime,
?BIN_TO_B58(Account),
?BIN_TO_B58(Gateway),
Amount
],
[{?S_INSERT_REWARD, Params} | Queries].
Amount,
to_reward_type(RewardType)
].

0 comments on commit 4de5ce3

Please sign in to comment.