Skip to content
This repository has been archived by the owner on Nov 5, 2023. It is now read-only.

Add reward type to rewards schema #289

Merged
merged 1 commit into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions migrations/1637295492-rewards_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- migrations/1637295492-rewards_type.sql
-- :up

DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'reward_type') THEN
create type reward_type as ENUM (
'poc_challenger',
'poc_challengee',
'poc_witness',
'dc_rewards',
'consensus_rewards',
'securities_reward'
);
END IF;
END
$$;

alter table rewards add column type reward_type;

alter table rewards drop constraint if exists rewards_pkey;

-- :down

alter table rewards drop column type;


129 changes: 97 additions & 32 deletions src/be_db_reward.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,34 @@
-record(state, {}).

-define(S_INSERT_REWARD, "insert_reward").
-define(S_INSERT_REWARD_10, "insert_reward_10").
-define(S_INSERT_REWARD_100, "insert_reward_100").

%%
%% be_db_worker
%%

prepare_conn(Conn) ->
{ok, S1} = epgsql:parse(
Conn,
?S_INSERT_REWARD,
[
"insert into rewards (block, transaction_hash, time, account, gateway, amount) ",
"values ($1, $2, $3, $4, $5, $6) "
],
[]
),
#{?S_INSERT_REWARD => S1}.
MkQueryFun = fun(Rows) ->
epgsql:parse(
Conn,
?S_INSERT_REWARD ++ "_" ++ integer_to_list(Rows),
[
"insert into rewards (block, transaction_hash, time, account, gateway, amount, type) ",
"values ",
be_utils:make_values_list(7, Rows)
],
[]
)
end,
{ok, S1} = MkQueryFun(1),
{ok, S10} = MkQueryFun(10),
{ok, S100} = MkQueryFun(100),
#{
?S_INSERT_REWARD => S1,
?S_INSERT_REWARD_10 => S10,
?S_INSERT_REWARD_100 => S100
}.

%%
%% be_block_handler
Expand All @@ -50,37 +62,76 @@ load_block(Conn, _Hash, Block, _Sync, _Ledger, State = #state{}) ->
end,
blockchain_block_v1:transactions(Block)
),
Queries = lists:foldl(
fun(T, TAcc) ->

StartMkQuery = erlang:monotonic_time(millisecond),
Queries = lists:flatmap(
fun(T) ->
TxnHash = ?BIN_TO_B64(blockchain_txn:hash(T)),
RewardMap = collect_rewards(
blockchain_txn:type(T),
blockchain_worker:blockchain(),
T,
#{}
),
lists:foldl(
fun(Entry, RAcc) ->
lists:map(
fun(Entry) ->
q_insert_reward(
BlockHeight,
TxnHash,
BlockTime,
Entry,
RAcc
Entry
)
end,
TAcc,
maps:to_list(RewardMap)
)
end,
[],
Txns
),
ok = ?BATCH_QUERY(Conn, Queries),
be_db_follower:maybe_log_duration(db_reward_query_make, StartMkQuery),

StartQuery = erlang:monotonic_time(millisecond),
execute_queries(Conn, Queries),
be_db_follower:maybe_log_duration(db_reward_query_exec, StartQuery),

{ok, State}.

execute_queries(Conn, Queries) when length(Queries) > 100 ->
lists:foreach(
fun
(Q) when length(Q) == 100 ->
{ok, 100} = ?PREPARED_QUERY(Conn, ?S_INSERT_REWARD_100, lists:flatten(Q));
(Q) ->
execute_queries(Conn, Q)
end,
be_utils:split_list(Queries, 100)
);
execute_queries(Conn, Queries) when length(Queries) > 10 ->
lists:foreach(
fun
(Q) when length(Q) == 10 ->
{ok, 10} = ?PREPARED_QUERY(Conn, ?S_INSERT_REWARD_10, lists:flatten(Q));
(Q) ->
execute_queries(Conn, Q)
end,
be_utils:split_list(Queries, 10)
);
execute_queries(Conn, Queries) ->
ok = ?BATCH_QUERY(Conn, [{?S_INSERT_REWARD, I} || I <- Queries]).

-type reward_type() ::
poc_challenger
| poc_challengee
| poc_witness
| dc_rewards
| consensus_rewards
| securities_reward.

-type reward_map() :: #{
{Account :: libp2p_crypto:pubkey_bin(), Gateway :: libp2p_crypto:pubkey_bin() | undefined} =>
{
Account :: libp2p_crypto:pubkey_bin(),
Gateway :: libp2p_crypto:pubkey_bin() | undefined,
Type :: reward_type()
} =>
Reward :: pos_integer()
}.

Expand Down Expand Up @@ -130,32 +181,32 @@ collect_rewards(blockchain_txn_rewards_v2, Chain, Txn, RewardMap) ->
collect_v2_rewards(Rewards, Ledger, RewardMap) ->
maps:fold(
fun
({owner, _Type, O}, Amt, Acc) ->
({owner, Type, O}, Amt, Acc) ->
maps:update_with(
{O, <<>>},
{O, <<>>, Type},
fun(Balance) -> Balance + Amt end,
Amt,
Acc
);
({gateway, _Type, G}, Amt, Acc) ->
({gateway, Type, G}, Amt, Acc) ->
case blockchain_ledger_v1:find_gateway_owner(G, Ledger) of
{error, _Error} ->
Acc;
{ok, GwOwner} ->
maps:update_with(
{GwOwner, G},
{GwOwner, G, Type},
fun(Balance) -> Balance + Amt end,
Amt,
Acc
)
end;
({validator, _Type, V}, Amt, Acc) ->
({validator, Type, V}, Amt, Acc) ->
case blockchain_ledger_v1:get_validator(V, Ledger) of
{error, _Error} ->
Acc;
{ok, Validator} ->
maps:update_with(
{blockchain_ledger_validator_v1:owner_address(Validator), V},
{blockchain_ledger_validator_v1:owner_address(Validator), V, Type},
fun(Balance) -> Balance + Amt end,
Amt,
Acc
Expand All @@ -170,20 +221,34 @@ collect_v2_rewards(Rewards, Ledger, RewardMap) ->
collect_v1_rewards([], RewardMap) ->
RewardMap;
collect_v1_rewards([Reward | Rest], RewardMap) ->
Key = {blockchain_txn_reward_v1:account(Reward), blockchain_txn_reward_v1:gateway(Reward)},
Key = {
blockchain_txn_reward_v1:account(Reward),
blockchain_txn_reward_v1:gateway(Reward),
blockchain_txn_reward_v1:type(Reward)
},
Amount = blockchain_txn_reward_v1:amount(Reward),
collect_v1_rewards(
Rest,
maps:update_with(Key, fun(Balance) -> Balance + Amount end, Amount, RewardMap)
).

q_insert_reward(BlockHeight, TxnHash, BlockTime, {{Account, Gateway}, Amount}, Queries) ->
Params = [
%% ranslate from a chain reward type to a database reward. As of this writing the
%% set of names for the database reward_type matches the rewards_v2 rewards types.
-spec to_reward_type(atom()) -> reward_type().
to_reward_type(securities) -> securities_reward;
to_reward_type(data_credits) -> dc_rewards;
to_reward_type(poc_challengees) -> poc_challengee;
to_reward_type(poc_challengers) -> poc_challenger;
to_reward_type(poc_witnesses) -> poc_witness;
to_reward_type(consensus) -> consensus_rewards.

q_insert_reward(BlockHeight, TxnHash, BlockTime, {{Account, Gateway, RewardType}, Amount}) ->
[
BlockHeight,
TxnHash,
BlockTime,
?BIN_TO_B58(Account),
?BIN_TO_B58(Gateway),
Amount
],
[{?S_INSERT_REWARD, Params} | Queries].
Amount,
to_reward_type(RewardType)
].