diff --git a/.pylintrc b/.pylintrc index 166be96..6061a6d 100644 --- a/.pylintrc +++ b/.pylintrc @@ -169,7 +169,8 @@ disable=raw-checker-failed, too-few-public-methods, too-many-instance-attributes, logging-fstring-interpolation, - inconsistent-return-statements + inconsistent-return-statements, + duplicate-code # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/abi/ProofChainContractABI b/abi/ProofChainContractABI index 17ca36c..2bfa290 100644 --- a/abi/ProofChainContractABI +++ b/abi/ProofChainContractABI @@ -1,4 +1,42 @@ [ + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint64", + "name": "chainId", + "type": "uint64" + }, + { + "indexed": false, + "internalType": "uint64", + "name": "blockHeight", + "type": "uint64" + } + ], + "name": "BRPQuorumNotReached", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint64", + "name": "chainId", + "type": "uint64" + }, + { + "indexed": false, + "internalType": "uint64", + "name": "blockHeight", + "type": "uint64" + } + ], + "name": "BSPQuorumNotReached", + "type": "event" + }, { "anonymous": false, "inputs": [ @@ -18,6 +56,106 @@ "name": "BlockHeightSubmissionThresholdChanged", "type": "event" }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "uint64", + "name": "chainId", + "type": "uint64" + }, + { + "indexed": false, + "internalType": "uint64", + "name": "blockHeight", + "type": "uint64" + }, + { + "indexed": false, + "internalType": "bytes32", + "name": "specimenHash", + "type": "bytes32" + }, + { + "indexed": false, + "internalType": "bytes32", + "name": "resultHash", + "type": "bytes32" + }, + { + "indexed": false, + "internalType": "string", + "name": "storageURL", + "type": "string" + }, + { + "indexed": false, + "internalType": "uint128", + "name": "submittedStake", + "type": "uint128" + } + ], + "name": "BlockResultProductionProofSubmitted", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "internalType": "uint64", + "name": "chainId", + "type": "uint64" + }, + { + "indexed": true, + "internalType": "uint64", + "name": "blockHeight", + "type": "uint64" + }, + { + "indexed": true, + "internalType": "bytes32", + "name": "specimenHash", + "type": "bytes32" + }, + { + "indexed": false, + "internalType": "bytes32", + "name": "blockResultHash", + "type": "bytes32" + } + ], + "name": "BlockResultRewardAwarded", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "uint128", + "name": "newBlockSpecimenRewardAllocation", + "type": "uint128" + } + ], + "name": "BlockResultRewardChanged", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "uint256", + "name": "newQuorumThreshold", + "type": "uint256" + } + ], + "name": "BlockResultSessionQuorumChanged", + "type": "event" + }, { "anonymous": false, "inputs": [ @@ -61,6 +199,19 @@ "name": "BlockSpecimenProductionProofSubmitted", "type": "event" }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "uint128", + "name": "newBlockResultRewardAllocation", + "type": "uint128" + } + ], + "name": "BlockSpecimenResultChanged", + "type": "event" + }, { "anonymous": false, "inputs": [ @@ -162,6 +313,19 @@ "name": "MaxSubmissionsPerBlockHeightChanged", "type": "event" }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "uint128", + "name": "newStakeRequirement", + "type": "uint128" + } + ], + "name": "MinimumRequiredBlockResultStakeChanged", + "type": "event" + }, { "anonymous": false, "inputs": [ @@ -277,25 +441,6 @@ "name": "OwnershipTransferred", "type": "event" }, - { - "anonymous": false, - "inputs": [ - { - "indexed": true, - "internalType": "uint64", - "name": "chainId", - "type": "uint64" - }, - { - "indexed": false, - "internalType": "uint64", - "name": "blockHeight", - "type": "uint64" - } - ], - "name": "QuorumNotReached", - "type": "event" - }, { "anonymous": false, "inputs": [ @@ -399,6 +544,19 @@ "stateMutability": "view", "type": "function" }, + { + "inputs": [], + "name": "BLOCK_RESULT_PRODUCER_ROLE", + "outputs": [ + { + "internalType": "bytes32", + "name": "", + "type": "bytes32" + } + ], + "stateMutability": "view", + "type": "function" + }, { "inputs": [], "name": "BLOCK_SPECIMEN_PRODUCER_ROLE", @@ -438,6 +596,24 @@ "stateMutability": "nonpayable", "type": "function" }, + { + "inputs": [ + { + "internalType": "address", + "name": "operator", + "type": "address" + }, + { + "internalType": "uint128", + "name": "validatorId", + "type": "uint128" + } + ], + "name": "addBRPOperator", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, { "inputs": [ { @@ -487,6 +663,34 @@ "stateMutability": "nonpayable", "type": "function" }, + { + "inputs": [ + { + "internalType": "uint64", + "name": "chainId", + "type": "uint64" + }, + { + "internalType": "uint64", + "name": "blockHeight", + "type": "uint64" + }, + { + "internalType": "bytes32", + "name": "blockHash", + "type": "bytes32" + }, + { + "internalType": "bytes32", + "name": "definitiveSpecimenHash", + "type": "bytes32" + } + ], + "name": "arbitrateBlockResultSession", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, { "inputs": [ { @@ -515,6 +719,19 @@ "stateMutability": "nonpayable", "type": "function" }, + { + "inputs": [ + { + "internalType": "address", + "name": "operator", + "type": "address" + } + ], + "name": "disableBRPOperator", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, { "inputs": [ { @@ -546,6 +763,19 @@ "stateMutability": "nonpayable", "type": "function" }, + { + "inputs": [ + { + "internalType": "address", + "name": "operator", + "type": "address" + } + ], + "name": "enableBRPOperator", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, { "inputs": [ { @@ -559,6 +789,24 @@ "stateMutability": "nonpayable", "type": "function" }, + { + "inputs": [ + { + "internalType": "uint64", + "name": "chainId", + "type": "uint64" + }, + { + "internalType": "uint64", + "name": "blockHeight", + "type": "uint64" + } + ], + "name": "finalizeAndRewardBlockResultSession", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, { "inputs": [ { @@ -595,6 +843,29 @@ "internalType": "address[]", "name": "__auditors", "type": "address[]" + }, + { + "internalType": "address[]", + "name": "_brps", + "type": "address[]" + } + ], + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [], + "name": "getBRPRoleData", + "outputs": [ + { + "internalType": "uint128", + "name": "requiredStake", + "type": "uint128" + }, + { + "internalType": "address[]", + "name": "activeMembers", + "type": "address[]" } ], "stateMutability": "view", @@ -676,6 +947,11 @@ "name": "blockSpecimenRewardAllocation", "type": "uint128" }, + { + "internalType": "uint128", + "name": "blockResultRewardAllocation", + "type": "uint128" + }, { "internalType": "uint64", "name": "blockSpecimenSessionDuration", @@ -719,6 +995,25 @@ "stateMutability": "view", "type": "function" }, + { + "inputs": [ + { + "internalType": "bytes32", + "name": "specimenhash", + "type": "bytes32" + } + ], + "name": "getURLS", + "outputs": [ + { + "internalType": "string[]", + "name": "", + "type": "string[]" + } + ], + "stateMutability": "view", + "type": "function" + }, { "inputs": [ { @@ -756,6 +1051,40 @@ "stateMutability": "view", "type": "function" }, + { + "inputs": [ + { + "internalType": "uint64", + "name": "chainId", + "type": "uint64" + }, + { + "internalType": "uint64", + "name": "blockHeight", + "type": "uint64" + }, + { + "internalType": "bool", + "name": "bspSession", + "type": "bool" + }, + { + "internalType": "address", + "name": "operator", + "type": "address" + } + ], + "name": "isSessionOpen", + "outputs": [ + { + "internalType": "bool", + "name": "", + "type": "bool" + } + ], + "stateMutability": "view", + "type": "function" + }, { "inputs": [ { @@ -801,6 +1130,19 @@ "stateMutability": "nonpayable", "type": "function" }, + { + "inputs": [ + { + "internalType": "address", + "name": "operator", + "type": "address" + } + ], + "name": "removeBRPOperator", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, { "inputs": [ { @@ -834,6 +1176,19 @@ "stateMutability": "nonpayable", "type": "function" }, + { + "inputs": [ + { + "internalType": "uint128", + "name": "newStakeAmount", + "type": "uint128" + } + ], + "name": "setBRPRequiredStake", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, { "inputs": [ { @@ -869,11 +1224,11 @@ "inputs": [ { "internalType": "uint128", - "name": "newBlockSpecimenReward", + "name": "newBlockResultReward", "type": "uint128" } ], - "name": "setBlockSpecimenReward", + "name": "setBlockResultReward", "outputs": [], "stateMutability": "nonpayable", "type": "function" @@ -881,12 +1236,12 @@ { "inputs": [ { - "internalType": "uint64", - "name": "newSessionDuration", - "type": "uint64" + "internalType": "uint128", + "name": "newBlockSpecimenReward", + "type": "uint128" } ], - "name": "setBlockSpecimenSessionDuration", + "name": "setBlockSpecimenReward", "outputs": [], "stateMutability": "nonpayable", "type": "function" @@ -994,6 +1349,19 @@ "stateMutability": "nonpayable", "type": "function" }, + { + "inputs": [ + { + "internalType": "uint64", + "name": "newSessionDuration", + "type": "uint64" + } + ], + "name": "setSessionDuration", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, { "inputs": [ { @@ -1007,6 +1375,39 @@ "stateMutability": "nonpayable", "type": "function" }, + { + "inputs": [ + { + "internalType": "uint64", + "name": "chainId", + "type": "uint64" + }, + { + "internalType": "uint64", + "name": "blockHeight", + "type": "uint64" + }, + { + "internalType": "bytes32", + "name": "blockHash", + "type": "bytes32" + }, + { + "internalType": "bytes32", + "name": "specimenHash", + "type": "bytes32" + }, + { + "internalType": "string", + "name": "storageURL", + "type": "string" + } + ], + "name": "submitBlockResultProof", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + }, { "inputs": [ { diff --git a/sql/proof_chain_mbase_result.sql b/sql/proof_chain_mbase_result.sql new file mode 100644 index 0000000..805e237 --- /dev/null +++ b/sql/proof_chain_mbase_result.sql @@ -0,0 +1,72 @@ +CREATE OR REPLACE VIEW chain_moonbeam_moonbase_alpha._proof_chain_result_events AS +WITH +session_started_events AS ( + SELECT session_started.tx_hash AS observer_chain_tx_hash, + session_started.block_id AS observer_chain_block_id, + session_started.tx_offset AS observer_chain_tx_offset, + session_started.topics[2]::numeric AS origin_chain_id, + session_started.topics[3]::numeric AS origin_chain_block_height, + abi_field(session_started.data, 0)::numeric AS result_session_deadline + FROM chain_moonbeam_moonbase_alpha.block_log_events session_started + JOIN chain_moonbeam_moonbase_alpha.block_transactions trx + ON (trx.block_id = session_started.block_id AND trx.tx_offset = session_started.tx_offset) + WHERE + session_started.sender = '\x19492a5019B30471aA8fa2c6D9d39c99b5Cda20C'::bytea + AND session_started.topics @> ARRAY[ + '\x06a773d98907981dde2b75694bea53d9542cb1434717f5c66e699dee821a7324'::bytea + ] + AND trx.successful = TRUE + AND session_started.block_id >= '1910104892088990000'::bigint + ORDER BY session_started.block_id ASC, session_started.log_offset ASC +), +result_reward_awarded_events AS ( + SELECT + fin.tx_hash AS observer_chain_tx_hash, + fin.topics[2]::numeric AS origin_chain_id, + fin.topics[3]::numeric AS origin_chain_block_height + FROM chain_moonbeam_moonbase_alpha.block_log_events fin + JOIN chain_moonbeam_moonbase_alpha.block_transactions trx_1 + ON (trx_1.block_id = fin.block_id AND trx_1.tx_offset = fin.tx_offset) + WHERE + fin.sender = '\x19492a5019B30471aA8fa2c6D9d39c99b5Cda20C'::bytea + AND fin.topics @> ARRAY['\x93dcf9329a330cb95723152c05719560f2fbd50e215c542854b27acc80c9108d'::bytea] + AND trx_1.successful = TRUE + AND fin.block_id >= '1910104892088990000'::bigint + ORDER BY fin.block_id ASC, fin.log_offset ASC +), +result_quorum_not_reached_events AS ( + SELECT + fin.tx_hash AS observer_chain_tx_hash, + fin.topics[2]::numeric AS origin_chain_id, + public.abi_field(fin.data, 0)::numeric AS origin_chain_block_height + FROM chain_moonbeam_moonbase_alpha.block_log_events fin + JOIN chain_moonbeam_moonbase_alpha.block_transactions trx_1 + ON (trx_1.block_id = fin.block_id AND trx_1.tx_offset = fin.tx_offset) + WHERE + fin.sender = '\x19492a5019B30471aA8fa2c6D9d39c99b5Cda20C'::bytea + AND fin.topics @> ARRAY['\x31d16d882c6405d327fa305ecf0d52b45154868e0828822533fd2547f4b21a75'::bytea] + AND trx_1.successful = TRUE + AND fin.block_id >= '1910104892088990000'::bigint + ORDER BY fin.block_id ASC, fin.log_offset ASC +), +all_finalization_events AS ( + SELECT * FROM result_reward_awarded_events + UNION ALL + SELECT * FROM result_quorum_not_reached_events +) +SELECT + sse.observer_chain_tx_hash AS observer_chain_session_start_tx_hash, + sse.observer_chain_block_id AS observer_chain_session_start_block_id, + sse.observer_chain_tx_offset AS observer_chain_session_start_tx_offset, + sse.origin_chain_id, + sse.origin_chain_block_height, + sse.result_session_deadline, + afe.observer_chain_tx_hash AS observer_chain_finalization_tx_hash +FROM session_started_events sse +LEFT JOIN all_finalization_events afe ON ( + sse.origin_chain_id = afe.origin_chain_id + AND sse.origin_chain_block_height = afe.origin_chain_block_height +) +WHERE sse.origin_chain_block_height > 16568764::numeric +ORDER BY sse.observer_chain_block_id ASC, sse.observer_chain_tx_offset ASC +; diff --git a/sql/proof_chain_moonbase_alpha.sql b/sql/proof_chain_mbase_specimen.sql similarity index 81% rename from sql/proof_chain_moonbase_alpha.sql rename to sql/proof_chain_mbase_specimen.sql index 45cba0a..e2707a1 100644 --- a/sql/proof_chain_moonbase_alpha.sql +++ b/sql/proof_chain_mbase_specimen.sql @@ -1,4 +1,4 @@ -CREATE OR REPLACE VIEW chain_moonbeam_moonbase_alpha._proof_chain_events AS +CREATE OR REPLACE VIEW chain_moonbeam_moonbase_alpha._proof_chain_specimen_events AS WITH session_started_events AS ( SELECT session_started.tx_hash AS observer_chain_tx_hash, @@ -13,12 +13,13 @@ session_started_events AS ( WHERE session_started.sender = '\x19492a5019B30471aA8fa2c6D9d39c99b5Cda20C'::bytea AND session_started.topics @> ARRAY[ - '\x8b1f889addbfa41db5227bae3b091bd5c8b9a9122f874dfe54ba2f75aabe1f4c'::bytea + '\x49caa59dfff8e73f72d249149e72487a67c49cf76549aed997c63963b436c3c2'::bytea ] AND trx.successful = TRUE + AND session_started.block_id >= '1910104892088990000'::bigint ORDER BY session_started.block_id ASC, session_started.log_offset ASC ), -block_specimen_reward_awarded_events AS ( +specimen_reward_awarded_events AS ( SELECT fin.tx_hash AS observer_chain_tx_hash, fin.topics[2]::numeric AS origin_chain_id, @@ -30,9 +31,10 @@ block_specimen_reward_awarded_events AS ( fin.sender = '\x19492a5019B30471aA8fa2c6D9d39c99b5Cda20C'::bytea AND fin.topics @> ARRAY['\xf05ac779af1ec75a7b2fbe9415b33a67c00294a121786f7ce2eb3f92e4a6424a'::bytea] AND trx_1.successful = TRUE + AND fin.block_id >= '1910104892088990000'::bigint ORDER BY fin.block_id ASC, fin.log_offset ASC ), -quorum_not_reached_events AS ( +specimen_quorum_not_reached_events AS ( SELECT fin.tx_hash AS observer_chain_tx_hash, fin.topics[2]::numeric AS origin_chain_id, @@ -42,14 +44,15 @@ quorum_not_reached_events AS ( ON (trx_1.block_id = fin.block_id AND trx_1.tx_offset = fin.tx_offset) WHERE fin.sender = '\x19492a5019B30471aA8fa2c6D9d39c99b5Cda20C'::bytea - AND fin.topics @> ARRAY['\x398fd8f638a7242217f011fd0720a06747f7a85b7d28d7276684b841baea4021'::bytea] + AND fin.topics @> ARRAY['\x8340aa7a5b37153230f8b64fa66f25c843e5002c60e63a25db6a9195005ccabd'::bytea] AND trx_1.successful = TRUE + AND fin.block_id >= '1910104892088990000'::bigint ORDER BY fin.block_id ASC, fin.log_offset ASC ), all_finalization_events AS ( - SELECT * FROM block_specimen_reward_awarded_events + SELECT * FROM specimen_reward_awarded_events UNION ALL - SELECT * FROM quorum_not_reached_events + SELECT * FROM specimen_quorum_not_reached_events ) SELECT sse.observer_chain_tx_hash AS observer_chain_session_start_tx_hash, @@ -64,5 +67,6 @@ LEFT JOIN all_finalization_events afe ON ( sse.origin_chain_id = afe.origin_chain_id AND sse.origin_chain_block_height = afe.origin_chain_block_height ) +WHERE sse.origin_chain_block_height > 16568764::numeric ORDER BY sse.observer_chain_block_id ASC, sse.observer_chain_tx_offset ASC ; diff --git a/sql/proof_chain_mbeam_result.sql b/sql/proof_chain_mbeam_result.sql new file mode 100644 index 0000000..377b461 --- /dev/null +++ b/sql/proof_chain_mbeam_result.sql @@ -0,0 +1,68 @@ +CREATE OR REPLACE VIEW chain_moonbeam_mainnet._proof_chain_result_events AS +WITH +session_started_events AS ( + SELECT session_started.tx_hash AS observer_chain_tx_hash, + session_started.block_id AS observer_chain_block_id, + session_started.tx_offset AS observer_chain_tx_offset, + session_started.topics[2]::numeric AS origin_chain_id, + session_started.topics[3]::numeric AS origin_chain_block_height, + abi_field(session_started.data, 0)::numeric AS proof_session_deadline + FROM chain_moonbeam_mainnet.block_log_events session_started + JOIN chain_moonbeam_mainnet.block_transactions trx + ON (trx.block_id = session_started.block_id AND trx.tx_offset = session_started.tx_offset) + WHERE + session_started.sender = '\x4f2e285227d43d9eb52799d0a28299540452446e'::bytea + AND session_started.topics @> ARRAY[ + '\x06a773d98907981dde2b75694bea53d9542cb1434717f5c66e699dee821a7324'::bytea + ] + AND trx.successful = TRUE + ORDER BY session_started.block_id ASC, session_started.log_offset ASC +), +result_reward_awarded_events AS ( + SELECT + fin.tx_hash AS observer_chain_tx_hash, + fin.topics[2]::numeric AS origin_chain_id, + fin.topics[3]::numeric AS origin_chain_block_height + FROM chain_moonbeam_mainnet.block_log_events fin + JOIN chain_moonbeam_mainnet.block_transactions trx_1 + ON (trx_1.block_id = fin.block_id AND trx_1.tx_offset = fin.tx_offset) + WHERE + fin.sender = '\x4f2e285227d43d9eb52799d0a28299540452446e'::bytea + AND fin.topics @> ARRAY['\x93dcf9329a330cb95723152c05719560f2fbd50e215c542854b27acc80c9108d'::bytea] + AND trx_1.successful = TRUE + ORDER BY fin.block_id ASC, fin.log_offset ASC +), +result_quorum_not_reached_events AS ( + SELECT + fin.tx_hash AS observer_chain_tx_hash, + fin.topics[2]::numeric AS origin_chain_id, + public.abi_field(fin.data, 0)::numeric AS origin_chain_block_height + FROM chain_moonbeam_mainnet.block_log_events fin + JOIN chain_moonbeam_mainnet.block_transactions trx_1 + ON (trx_1.block_id = fin.block_id AND trx_1.tx_offset = fin.tx_offset) + WHERE + fin.sender = '\x4f2e285227d43d9eb52799d0a28299540452446e'::bytea + AND fin.topics @> ARRAY['\x31d16d882c6405d327fa305ecf0d52b45154868e0828822533fd2547f4b21a75'::bytea] + AND trx_1.successful = TRUE + ORDER BY fin.block_id ASC, fin.log_offset ASC +), +all_finalization_events AS ( + SELECT * FROM result_reward_awarded_events + UNION ALL + SELECT * FROM result_quorum_not_reached_events +) +SELECT + sse.observer_chain_tx_hash AS observer_chain_session_start_tx_hash, + sse.observer_chain_block_id AS observer_chain_session_start_block_id, + sse.observer_chain_tx_offset AS observer_chain_session_start_tx_offset, + sse.origin_chain_id, + sse.origin_chain_block_height, + sse.result_session_deadline, + afe.observer_chain_tx_hash AS observer_chain_finalization_tx_hash +FROM session_started_events sse +LEFT JOIN all_finalization_events afe ON ( + sse.origin_chain_id = afe.origin_chain_id + AND sse.origin_chain_block_height = afe.origin_chain_block_height +) +ORDER BY sse.observer_chain_block_id ASC, sse.observer_chain_tx_offset ASC +; diff --git a/sql/proof_chain_moonbeam_view.sql b/sql/proof_chain_mbeam_specimen.sql similarity index 85% rename from sql/proof_chain_moonbeam_view.sql rename to sql/proof_chain_mbeam_specimen.sql index f01c890..7ce71dd 100644 --- a/sql/proof_chain_moonbeam_view.sql +++ b/sql/proof_chain_mbeam_specimen.sql @@ -1,4 +1,4 @@ -CREATE OR REPLACE VIEW chain_moonbeam_mainnet._proof_chain_events AS +CREATE OR REPLACE VIEW chain_moonbeam_mainnet._proof_chain_specimen_events AS WITH session_started_events AS ( SELECT session_started.tx_hash AS observer_chain_tx_hash, @@ -13,12 +13,12 @@ session_started_events AS ( WHERE session_started.sender = '\x4f2e285227d43d9eb52799d0a28299540452446e'::bytea AND session_started.topics @> ARRAY[ - '\x8b1f889addbfa41db5227bae3b091bd5c8b9a9122f874dfe54ba2f75aabe1f4c'::bytea + '\x49caa59dfff8e73f72d249149e72487a67c49cf76549aed997c63963b436c3c2'::bytea ] AND trx.successful = TRUE ORDER BY session_started.block_id ASC, session_started.log_offset ASC ), -block_specimen_reward_awarded_events AS ( +specimen_reward_awarded_events AS ( SELECT fin.tx_hash AS observer_chain_tx_hash, fin.topics[2]::numeric AS origin_chain_id, @@ -32,7 +32,7 @@ block_specimen_reward_awarded_events AS ( AND trx_1.successful = TRUE ORDER BY fin.block_id ASC, fin.log_offset ASC ), -quorum_not_reached_events AS ( +specimen_quorum_not_reached_events AS ( SELECT fin.tx_hash AS observer_chain_tx_hash, fin.topics[2]::numeric AS origin_chain_id, @@ -42,14 +42,14 @@ quorum_not_reached_events AS ( ON (trx_1.block_id = fin.block_id AND trx_1.tx_offset = fin.tx_offset) WHERE fin.sender = '\x4f2e285227d43d9eb52799d0a28299540452446e'::bytea - AND fin.topics @> ARRAY['\x398fd8f638a7242217f011fd0720a06747f7a85b7d28d7276684b841baea4021'::bytea] + AND fin.topics @> ARRAY['\x8340aa7a5b37153230f8b64fa66f25c843e5002c60e63a25db6a9195005ccabd'::bytea] AND trx_1.successful = TRUE ORDER BY fin.block_id ASC, fin.log_offset ASC ), all_finalization_events AS ( - SELECT * FROM block_specimen_reward_awarded_events + SELECT * FROM specimen_reward_awarded_events UNION ALL - SELECT * FROM quorum_not_reached_events + SELECT * FROM specimen_quorum_not_reached_events ) SELECT sse.observer_chain_tx_hash AS observer_chain_session_start_tx_hash, diff --git a/src/contract.py b/src/contract.py index 54e455b..2dc1cbe 100644 --- a/src/contract.py +++ b/src/contract.py @@ -107,14 +107,19 @@ def _retry_with_backoff(self, fn, retries=2, backoff_in_seconds=1, **kwargs): retries_left -= 1 exp += 1 - def send_finalize(self, **kwargs): - return self._retry_with_backoff(self._attempt_send_finalize, **kwargs) + def send_specimen_finalize(self, **kwargs): + return self._retry_with_backoff(self._attempt_send_specimen_finalize, **kwargs) - def _attempt_send_finalize(self, chainId, blockHeight, timeout): + def send_result_finalize(self, **kwargs): + return self._retry_with_backoff(self._attempt_send_result_finalize, **kwargs) + + def _attempt_send_specimen_finalize(self, chainId, blockHeight, timeout): if self.nonce is None: self._refresh_nonce() self.gasPrice = self.w3.eth.gasPrice - self.logger.info(f"TX dynamic gas price is {self.gasPrice}") + self.logger.info( + f"TX dynamic gas price for specimen finalization is {self.gasPrice}" + ) transaction = self.contract.functions.finalizeAndRewardSpecimenSession( chainId, blockHeight ).buildTransaction( @@ -137,7 +142,7 @@ def _attempt_send_finalize(self, chainId, blockHeight, timeout): predicted_tx_hash = eth_hash.auto.keccak(signed_txn.rawTransaction) self.logger.info( - f"Sending finalization tx {chainId}/{blockHeight}" + f"Sending Specimen finalization tx {chainId}/{blockHeight}" f" senderBalance={balance_before_send_glmr}GLMR" f" senderNonce={self.nonce}" f" txHash=0x{predicted_tx_hash.hex()}" @@ -170,11 +175,90 @@ def _attempt_send_finalize(self, chainId, blockHeight, timeout): # retry immediately (we already waited) return (False, 0) - case (-32603, "Session cannot be finalized"): - self.logger.info("Skipping session that cannot be finalized...") + case (-32603, "Specimen Session cannot be finalized"): + self.logger.info( + "Skipping specimen session that cannot be finalized..." + ) return (True, None) case (-32603, "already known"): - self.logger.info("Skipping finalization tx that's already known...") + self.logger.info( + "Skipping specimen finalization tx that's already known..." + ) + return (True, None) + case _: + raise + + def _attempt_send_result_finalize(self, chainId, blockHeight, timeout): + if self.nonce is None: + self._refresh_nonce() + self.gasPrice = self.w3.eth.gasPrice + self.logger.info( + f"TX dynamic gas price for result finalization is {self.gasPrice}" + ) + transaction = self.contract.functions.finalizeAndRewardBlockResultSession( + chainId, blockHeight + ).buildTransaction( + { + "gas": self.gas, + "gasPrice": self.gasPrice, + "from": self.finalizer_address, + "nonce": self.nonce, + } + ) + signed_txn = self.w3.eth.account.signTransaction( + transaction, private_key=self.finalizer_prvkey + ) + + balance_before_send_wei = self.w3.eth.get_balance(self.finalizer_address) + balance_before_send_glmr = web3.auto.w3.fromWei( + balance_before_send_wei, "ether" + ) + + predicted_tx_hash = eth_hash.auto.keccak(signed_txn.rawTransaction) + + self.logger.info( + f"Sending Result finalization tx {chainId}/{blockHeight}" + f" senderBalance={balance_before_send_glmr}GLMR" + f" senderNonce={self.nonce}" + f" txHash=0x{predicted_tx_hash.hex()}" + ) + + tx_hash = None + try: + tx_hash = self.w3.eth.sendRawTransaction(signed_txn.rawTransaction) + return self.report_transaction_receipt(tx_hash, timeout) + except ValueError as ex: + if len(ex.args) != 1 or type(ex.args[0]) != dict: + raise + + jsonrpc_err = ex.args[0] + if "code" not in jsonrpc_err or "message" not in jsonrpc_err: + raise + + match (jsonrpc_err["code"], jsonrpc_err["message"]): + case (-32603, "nonce too low"): + self.report_transaction_bounce( + predicted_tx_hash, + err="nonce too low", + details={"txNonce": self.nonce}, + ) + self.logger.info( + "Pausing to allow pending txs to clear, then refreshing nonce..." + ) + time.sleep(60) + self._refresh_nonce() + + # retry immediately (we already waited) + return (False, 0) + case (-32603, "Result Session cannot be finalized"): + self.logger.info( + "Skipping Result session that cannot be finalized..." + ) + return (True, None) + case (-32603, "already known"): + self.logger.info( + "Skipping Result finalization tx that's already known..." + ) return (True, None) case _: raise @@ -210,7 +294,7 @@ def report_transaction_receipt(self, tx_hash, timeout, **kwargs): def _refresh_nonce(self): self.nonce = self.w3.eth.get_transaction_count(self.finalizer_address) - self.logger.info(f"Refreshed nonce newNonce={self.nonce}") + self.logger.info(f"Refreshed nonce {self.nonce}") def block_number(self): return self._retry_with_backoff(self._attempt_block_number) diff --git a/src/dbmanager.py b/src/dbmanresult.py similarity index 80% rename from src/dbmanager.py rename to src/dbmanresult.py index 2ad042a..f6fbbbe 100644 --- a/src/dbmanager.py +++ b/src/dbmanresult.py @@ -5,10 +5,10 @@ import psycopg2 import logformat -from finalizationrequest import FinalizationRequest +from finalizationresultrequest import FinalizationResultRequest -class DBManager(threading.Thread): +class DBManagerResult(threading.Thread): caught_up: bool = False last_block_id: int starting_point: int @@ -36,7 +36,7 @@ def _process_outputs(self, outputs): chainId = output[3] deadline = output[5] finalizationHash = output[6] - fr = FinalizationRequest( + fr = FinalizationResultRequest( chainId=chainId, blockHeight=blockHeight, deadline=deadline, @@ -53,9 +53,9 @@ def _process_outputs(self, outputs): self._update_cursor(fr.session_started_block_id) c += 1 if fl > 0: - self.logger.info(f"Queued {fl} proof-sessions for finalization") + self.logger.info(f"Queued {fl} result proof-sessions for finalization") if c > 0: - self.logger.info(f"Confirmed {c} proof-sessions") + self.logger.info(f"Confirmed {c} result proof-sessions") if self.last_block_id > prev_last_block_id: self.logger.info(f"Updated cursor position block_id={self.last_block_id}") @@ -80,17 +80,19 @@ def __main_loop(self): # we are catching up. So we only need to grab what we need to attempt for finalizing if self.chain_table == "chain_moonbeam_moonbase_alpha": cur.execute( - r'SELECT * FROM chain_moonbeam_moonbase_alpha."_proof_chain_events" WHERE observer_chain_session_start_block_id > %s AND observer_chain_finalization_tx_hash IS NULL AND origin_chain_block_height > 16864740;', + r'SELECT * FROM chain_moonbeam_moonbase_alpha."_proof_chain_result_events" WHERE observer_chain_session_start_block_id > %s AND observer_chain_finalization_tx_hash IS NULL AND origin_chain_block_height > 16927620;', (self.last_block_id,), ) else: cur.execute( - r'SELECT * FROM chain_moonbeam_mainnet."_proof_chain_events" WHERE observer_chain_session_start_block_id > %s AND observer_chain_finalization_tx_hash IS NULL;', + r'SELECT * FROM chain_moonbeam_mainnet."_proof_chain_result_events" WHERE observer_chain_session_start_block_id > %s AND observer_chain_finalization_tx_hash IS NULL;', (self.last_block_id,), ) outputs = cur.fetchall() - self.logger.info(f"Processing {len(outputs)} proof-session records...") + self.logger.info( + f"Processing {len(outputs)} result proof-session records..." + ) self._process_outputs(outputs) self.caught_up = True @@ -105,19 +107,19 @@ def __main_loop(self): # we need everything after last max block number if self.chain_table == "chain_moonbeam_moonbase_alpha": cur.execute( - r'SELECT * FROM chain_moonbeam_moonbase_alpha."_proof_chain_events" WHERE observer_chain_session_start_block_id > %s AND origin_chain_block_height > 16864740;', + r'SELECT * FROM chain_moonbeam_moonbase_alpha."_proof_chain_result_events" WHERE observer_chain_session_start_block_id > %s AND origin_chain_block_height > 16927620;', (self.last_block_id,), ) else: cur.execute( - r'SELECT * FROM chain_moonbeam_mainnet."_proof_chain_events" WHERE observer_chain_session_start_block_id > %s;', + r'SELECT * FROM chain_moonbeam_mainnet."_proof_chain_result_events" WHERE observer_chain_session_start_block_id > %s;', (self.last_block_id,), ) outputs = cur.fetchall() if self._process_outputs(outputs) == 0: - self.logger.info("No new proof-session records discovered") + self.logger.info("No new result proof-session records discovered") time.sleep(40) @@ -146,11 +148,11 @@ def __fetch_last_block(self): with conn.cursor() as cur: if self.chain_table == "chain_moonbeam_moonbase_alpha": cur.execute( - r'SELECT observer_chain_session_start_block_id FROM chain_moonbeam_moonbase_alpha."_proof_chain_events" WHERE observer_chain_finalization_tx_hash IS NULL LIMIT 1' + r'SELECT observer_chain_session_start_block_id FROM chain_moonbeam_moonbase_alpha."_proof_chain_result_events" WHERE observer_chain_finalization_tx_hash IS NULL LIMIT 1' ) else: cur.execute( - r'SELECT observer_chain_session_start_block_id FROM chain_moonbeam_mainnet."_proof_chain_events" WHERE observer_chain_finalization_tx_hash IS NULL LIMIT 1' + r'SELECT observer_chain_session_start_block_id FROM chain_moonbeam_mainnet."_proof_chain_result_events" WHERE observer_chain_finalization_tx_hash IS NULL LIMIT 1' ) block_id = cur.fetchone() if block_id is not None: @@ -161,10 +163,10 @@ def __fetch_last_block(self): self.logger.warning("".join(traceback.format_exception(ex))) def _update_cursor(self, block_id): - for fr in FinalizationRequest.get_requests_to_be_confirmed(): + for fr in FinalizationResultRequest.get_result_requests_to_be_confirmed(): if fr.session_started_block_id < block_id: return - for fr in FinalizationRequest.get_requests_to_be_finalized(): + for fr in FinalizationResultRequest.get_result_requests_to_be_finalized(): if fr.session_started_block_id < block_id: return self.last_block_id = block_id diff --git a/src/dbmanspecimen.py b/src/dbmanspecimen.py new file mode 100644 index 0000000..57efafc --- /dev/null +++ b/src/dbmanspecimen.py @@ -0,0 +1,172 @@ +import logging +import threading +import time +import traceback +import psycopg2 +import logformat + +from finalizationspecimenrequest import FinalizationSpecimenRequest + + +class DBManagerSpecimen(threading.Thread): + caught_up: bool = False + last_block_id: int + starting_point: int + logger: logging.Logger + + def __init__(self, user, password, database, host, starting_point, chain_table): + super().__init__() + self.host = host + self.database = database + self.password = password + self.user = user + self.last_block_id = None + self.chain_table = chain_table + + self.logger = logformat.get_logger("DB") + self.starting_point = starting_point + + def _process_outputs(self, outputs): + fl = 0 + c = 0 + prev_last_block_id = self.last_block_id + for output in outputs: + block_id = output[1] + blockHeight = output[4] + chainId = output[3] + deadline = output[5] + finalizationHash = output[6] + fr = FinalizationSpecimenRequest( + chainId=chainId, + blockHeight=blockHeight, + deadline=deadline, + block_id=block_id, + ) + + if finalizationHash is None: + if not fr.waiting_for_confirm() and not fr.waiting_for_finalize(): + if fr.finalize_later(): + fl += 1 + else: + if fr.waiting_for_confirm(): + if fr.confirm_request(): + self._update_cursor(fr.session_started_block_id) + c += 1 + if fl > 0: + self.logger.info(f"Queued {fl} specimen proof-sessions for finalization") + if c > 0: + self.logger.info(f"Confirmed {c} specimen proof-sessions") + if self.last_block_id > prev_last_block_id: + self.logger.info(f"Updated cursor position block_id={self.last_block_id}") + + return fl + c + + def __connect(self): + return psycopg2.connect( + host=self.host, + database=self.database, + user=self.user, + password=self.password, + ) + + def __main_loop(self): + try: + self.logger.info("Connecting to the database...") + if not self.caught_up: + self.logger.info(f"Initial scan block_id={self.last_block_id}") + + with self.__connect() as conn: + with conn.cursor() as cur: + # we are catching up. So we only need to grab what we need to attempt for finalizing + if self.chain_table == "chain_moonbeam_moonbase_alpha": + cur.execute( + r'SELECT * FROM chain_moonbeam_moonbase_alpha."_proof_chain_specimen_events" WHERE observer_chain_session_start_block_id > %s AND observer_chain_finalization_tx_hash IS NULL AND origin_chain_block_height > 16928490;', + (self.last_block_id,), + ) + else: + cur.execute( + r'SELECT * FROM chain_moonbeam_mainnet."_proof_chain_specimen_events" WHERE observer_chain_session_start_block_id > %s AND observer_chain_finalization_tx_hash IS NULL;', + (self.last_block_id,), + ) + outputs = cur.fetchall() + + self.logger.info( + f"Processing {len(outputs)} specimen proof-session records..." + ) + self._process_outputs(outputs) + + self.caught_up = True + self.logger.info(f"Caught up with db block_id={self.last_block_id}") + + while True: + with self.__connect() as conn: + with conn.cursor() as cur: + self.logger.info( + f"Incremental scan block_id={self.last_block_id}" + ) + # we need everything after last max block number + if self.chain_table == "chain_moonbeam_moonbase_alpha": + cur.execute( + r'SELECT * FROM chain_moonbeam_moonbase_alpha."_proof_chain_specimen_events" WHERE observer_chain_session_start_block_id > %s AND origin_chain_block_height > 16928490;', + (self.last_block_id,), + ) + else: + cur.execute( + r'SELECT * FROM chain_moonbeam_mainnet."_proof_chain_specimen_events" WHERE observer_chain_session_start_block_id > %s;', + (self.last_block_id,), + ) + + outputs = cur.fetchall() + + if self._process_outputs(outputs) == 0: + self.logger.info("No new specimen proof-session records discovered") + + time.sleep(40) + + except (Exception, psycopg2.DatabaseError) as ex: + self.logger.critical("".join(traceback.format_exception(ex))) + + def run(self): + # we need to avoid recursion in order to avoid stack depth exceeded exception + if self.starting_point != -1: + self.last_block_id = self.starting_point + else: + self.__fetch_last_block() + while True: + try: + self.__main_loop() + time.sleep(60) + except (Exception, psycopg2.DatabaseError) as ex: + self.logger.warning("".join(traceback.format_exception(ex))) + # this should never happen + self.__main_loop() + + def __fetch_last_block(self): + try: + self.logger.info("Determining initial cursor position...") + with self.__connect() as conn: + with conn.cursor() as cur: + if self.chain_table == "chain_moonbeam_moonbase_alpha": + cur.execute( + r'SELECT observer_chain_session_start_block_id FROM chain_moonbeam_moonbase_alpha."_proof_chain_specimen_events" WHERE observer_chain_finalization_tx_hash IS NULL LIMIT 1' + ) + else: + cur.execute( + r'SELECT observer_chain_session_start_block_id FROM chain_moonbeam_mainnet."_proof_chain_specimen_events" WHERE observer_chain_finalization_tx_hash IS NULL LIMIT 1' + ) + block_id = cur.fetchone() + if block_id is not None: + self.last_block_id = block_id[0] - 1 + else: + self.last_block_id = 1 + except Exception as ex: + self.logger.warning("".join(traceback.format_exception(ex))) + + def _update_cursor(self, block_id): + for fr in FinalizationSpecimenRequest.get_requests_to_be_confirmed(): + if fr.session_started_block_id < block_id: + return + for fr in FinalizationSpecimenRequest.get_requests_to_be_finalized(): + if fr.session_started_block_id < block_id: + return + self.last_block_id = block_id diff --git a/src/finalizationresultrequest.py b/src/finalizationresultrequest.py new file mode 100644 index 0000000..6de961e --- /dev/null +++ b/src/finalizationresultrequest.py @@ -0,0 +1,110 @@ +import time + + +class FinalizationResultRequest: + result_requests_to_be_finalized = {} + result_requests_to_be_confirmed = {} + + @staticmethod + def get_result_requests_to_be_finalized() -> []: + values = list( + FinalizationResultRequest.result_requests_to_be_finalized.values() + ) + frs = [] + for v in values: + for fr in v.values(): + frs.append(fr) + return frs + + @staticmethod + def get_result_requests_to_be_confirmed() -> []: + values = list( + FinalizationResultRequest.result_requests_to_be_confirmed.values() + ) + frs = [] + for v in values: + for fr in v.values(): + frs.append(fr) + return frs + + def __init__(self, chainId, blockHeight, deadline, block_id): + self.deadline = deadline + self.chainId = chainId + self.blockHeight = blockHeight + self.block_id = block_id + self.finalized_time = None + + def update_block_id(self, bid): + self.block_id = bid + + def confirm_request(self): + if ( + self.chainId + not in FinalizationResultRequest.result_requests_to_be_confirmed + ): + return None + FinalizationResultRequest.result_requests_to_be_confirmed[self.chainId].pop( + self.blockHeight, None + ) + + def finalize_request(self): + if ( + self.chainId + not in FinalizationResultRequest.result_requests_to_be_finalized + ): + return None + FinalizationResultRequest.result_requests_to_be_finalized[self.chainId].pop( + self.blockHeight, None + ) + + self.finalized_time = time.time() + + def finalize_later(self): + if ( + self.chainId + not in FinalizationResultRequest.result_requests_to_be_finalized + ): + FinalizationResultRequest.result_requests_to_be_finalized[self.chainId] = {} + reqs_for_chain = FinalizationResultRequest.result_requests_to_be_finalized[ + self.chainId + ] + if self.blockHeight in reqs_for_chain: + return False + reqs_for_chain[self.blockHeight] = self + return True + + def confirm_later(self): + if ( + self.chainId + not in FinalizationResultRequest.result_requests_to_be_confirmed + ): + FinalizationResultRequest.result_requests_to_be_confirmed[self.chainId] = {} + reqs_for_chain = FinalizationResultRequest.result_requests_to_be_confirmed[ + self.chainId + ] + if self.blockHeight in reqs_for_chain: + return False + reqs_for_chain[self.blockHeight] = self + return True + + def waiting_for_confirm(self): + if ( + self.chainId + not in FinalizationResultRequest.result_requests_to_be_confirmed + ): + return False + return ( + self.blockHeight + in FinalizationResultRequest.result_requests_to_be_confirmed[self.chainId] + ) + + def waiting_for_finalize(self): + if ( + self.chainId + not in FinalizationResultRequest.result_requests_to_be_finalized + ): + return False + return ( + self.blockHeight + in FinalizationResultRequest.result_requests_to_be_finalized[self.chainId] + ) diff --git a/src/finalizationrequest.py b/src/finalizationspecimenrequest.py similarity index 52% rename from src/finalizationrequest.py rename to src/finalizationspecimenrequest.py index 536f610..94478e4 100644 --- a/src/finalizationrequest.py +++ b/src/finalizationspecimenrequest.py @@ -1,13 +1,13 @@ import time -class FinalizationRequest: +class FinalizationSpecimenRequest: requests_to_be_finalized = {} requests_to_be_confirmed = {} @staticmethod def get_requests_to_be_finalized() -> []: - values = list(FinalizationRequest.requests_to_be_finalized.values()) + values = list(FinalizationSpecimenRequest.requests_to_be_finalized.values()) frs = [] for v in values: for fr in v.values(): @@ -16,7 +16,7 @@ def get_requests_to_be_finalized() -> []: @staticmethod def get_requests_to_be_confirmed() -> []: - values = list(FinalizationRequest.requests_to_be_confirmed.values()) + values = list(FinalizationSpecimenRequest.requests_to_be_confirmed.values()) frs = [] for v in values: for fr in v.values(): @@ -34,51 +34,55 @@ def update_block_id(self, bid): self.block_id = bid def confirm_request(self): - if self.chainId not in FinalizationRequest.requests_to_be_confirmed: + if self.chainId not in FinalizationSpecimenRequest.requests_to_be_confirmed: return None - FinalizationRequest.requests_to_be_confirmed[self.chainId].pop( + FinalizationSpecimenRequest.requests_to_be_confirmed[self.chainId].pop( self.blockHeight, None ) def finalize_request(self): - if self.chainId not in FinalizationRequest.requests_to_be_finalized: + if self.chainId not in FinalizationSpecimenRequest.requests_to_be_finalized: return None - FinalizationRequest.requests_to_be_finalized[self.chainId].pop( + FinalizationSpecimenRequest.requests_to_be_finalized[self.chainId].pop( self.blockHeight, None ) self.finalized_time = time.time() def finalize_later(self): - if self.chainId not in FinalizationRequest.requests_to_be_finalized: - FinalizationRequest.requests_to_be_finalized[self.chainId] = {} - reqs_for_chain = FinalizationRequest.requests_to_be_finalized[self.chainId] + if self.chainId not in FinalizationSpecimenRequest.requests_to_be_finalized: + FinalizationSpecimenRequest.requests_to_be_finalized[self.chainId] = {} + reqs_for_chain = FinalizationSpecimenRequest.requests_to_be_finalized[ + self.chainId + ] if self.blockHeight in reqs_for_chain: return False reqs_for_chain[self.blockHeight] = self return True def confirm_later(self): - if self.chainId not in FinalizationRequest.requests_to_be_confirmed: - FinalizationRequest.requests_to_be_confirmed[self.chainId] = {} - reqs_for_chain = FinalizationRequest.requests_to_be_confirmed[self.chainId] + if self.chainId not in FinalizationSpecimenRequest.requests_to_be_confirmed: + FinalizationSpecimenRequest.requests_to_be_confirmed[self.chainId] = {} + reqs_for_chain = FinalizationSpecimenRequest.requests_to_be_confirmed[ + self.chainId + ] if self.blockHeight in reqs_for_chain: return False reqs_for_chain[self.blockHeight] = self return True def waiting_for_confirm(self): - if self.chainId not in FinalizationRequest.requests_to_be_confirmed: + if self.chainId not in FinalizationSpecimenRequest.requests_to_be_confirmed: return False return ( self.blockHeight - in FinalizationRequest.requests_to_be_confirmed[self.chainId] + in FinalizationSpecimenRequest.requests_to_be_confirmed[self.chainId] ) def waiting_for_finalize(self): - if self.chainId not in FinalizationRequest.requests_to_be_finalized: + if self.chainId not in FinalizationSpecimenRequest.requests_to_be_finalized: return False return ( self.blockHeight - in FinalizationRequest.requests_to_be_finalized[self.chainId] + in FinalizationSpecimenRequest.requests_to_be_finalized[self.chainId] ) diff --git a/src/finalizer.py b/src/finalizer.py index edcff01..6ec3313 100644 --- a/src/finalizer.py +++ b/src/finalizer.py @@ -3,8 +3,10 @@ import traceback import logformat + +from finalizationspecimenrequest import FinalizationSpecimenRequest +from finalizationresultrequest import FinalizationResultRequest from contract import ProofChainContract -from finalizationrequest import FinalizationRequest class Finalizer(threading.Thread): @@ -28,27 +30,55 @@ def wait_for_next_observer_chain_block(self): def __main_loop(self): self.wait_for_next_observer_chain_block() - self.refinalize_rejected_requests() + # self.refinalize_rejected_specimen_requests() + # self.refinalize_rejected_result_requests() + + ready_to_specimen_finalize = [] + open_specimen_session_count = 0 - ready_to_finalize = [] - open_session_count = 0 + ready_to_result_finalize = [] + open_result_session_count = 0 - for fr in FinalizationRequest.get_requests_to_be_finalized(): - if fr.deadline < self.observer_chain_block_height: - ready_to_finalize.append(fr) + for frs in FinalizationSpecimenRequest.get_requests_to_be_finalized(): + if frs.deadline < self.observer_chain_block_height: + ready_to_specimen_finalize.append(frs) else: - open_session_count += 1 + open_specimen_session_count += 1 + + self.logger.info( + f"Finalizing {len(ready_to_specimen_finalize)} specimen proof-sessions..." + ) + for frs in ready_to_specimen_finalize: + self._attempt_to_finalize_specimen(frs) + self.logger.info( + f"Finalized {len(ready_to_specimen_finalize)} specimen proof-sessions" + ) - if len(ready_to_finalize) == 0: + if len(ready_to_specimen_finalize) == 0: self.logger.debug( - f"Nothing ready to finalize height={self.observer_chain_block_height} openSessions={open_session_count}" + f"Nothing ready to finalize height={self.observer_chain_block_height} specimen openSessions={open_specimen_session_count}" ) - return - self.logger.info(f"Finalizing {len(ready_to_finalize)} proof-sessions...") - for fr in ready_to_finalize: - self._attempt_to_finalize(fr) - self.logger.info(f"Finalized {len(ready_to_finalize)} proof-sessions") + for frr in FinalizationResultRequest.get_result_requests_to_be_finalized(): + if frr.deadline < self.observer_chain_block_height: + ready_to_result_finalize.append(frr) + else: + open_result_session_count += 1 + + self.logger.info( + f"Finalizing {len(ready_to_result_finalize)} result proof-sessions..." + ) + for frr in ready_to_result_finalize: + self._attempt_to_finalize_result(frr) + self.logger.info( + f"Finalized {len(ready_to_result_finalize)} result proof-sessions" + ) + + if len(ready_to_result_finalize) == 0: + self.logger.debug( + f"Nothing ready to finalize height={self.observer_chain_block_height} result openSessions={open_result_session_count}" + ) + return def run(self) -> None: # we need to avoid recursion in order to avoid stack depth exceeded exception @@ -59,29 +89,58 @@ def run(self) -> None: # this should never happen pass - def refinalize_rejected_requests(self): + def refinalize_rejected_specimen_requests(self): + to_send = [] + for frs in FinalizationSpecimenRequest.get_requests_to_be_confirmed(): + if frs.finalized_time < time.time() - 600: + to_send.append(frs) + num_to_send = len(to_send) + if num_to_send == 0: + return + self.logger.info(f"Refinalizing {num_to_send} specimen proof-sessions...") + while len(to_send) > 0: + i = 0 + for frs in to_send[:1000]: + self._attempt_to_finalize_specimen(frs) + i += 1 + to_send = to_send[1000:] + refinalized = num_to_send - len(to_send) + self.logger.info(f"Refinalized {refinalized} specimen proof-sessions") + + def refinalize_rejected_result_requests(self): to_send = [] - for fr in FinalizationRequest.get_requests_to_be_confirmed(): - if fr.finalized_time < time.time() - 600: - to_send.append(fr) + for frr in FinalizationResultRequest.get_result_requests_to_be_confirmed(): + if frr.finalized_time < time.time() - 600: + to_send.append(frr) num_to_send = len(to_send) if num_to_send == 0: return - self.logger.info(f"Refinalizing {num_to_send} proof-sessions...") + self.logger.info(f"Refinalizing {num_to_send} result proof-sessions...") while len(to_send) > 0: i = 0 - for fr in to_send[:1000]: - self._attempt_to_finalize(fr) + for frr in to_send[:1000]: + self._attempt_to_finalize_specimen(frr) i += 1 to_send = to_send[1000:] - self.logger.info("Refinalized {num_to_send - len(to_send)} proof-sessions") + refinalized = num_to_send - len(to_send) + self.logger.info(f"Refinalized {refinalized} result proof-sessions") + + def _attempt_to_finalize_specimen(self, frs): + try: + self.contract.send_specimen_finalize( + chainId=int(frs.chainId), blockHeight=int(frs.blockHeight), timeout=360 + ) + frs.finalize_request() + frs.confirm_later() + except Exception as ex: + self.logger.critical("".join(traceback.format_exception(ex))) - def _attempt_to_finalize(self, fr): + def _attempt_to_finalize_result(self, frr): try: - self.contract.send_finalize( - chainId=int(fr.chainId), blockHeight=int(fr.blockHeight), timeout=300 + self.contract.send_result_finalize( + chainId=int(frr.chainId), blockHeight=int(frr.blockHeight), timeout=360 ) - fr.finalize_request() - fr.confirm_later() + frr.finalize_request() + frr.confirm_later() except Exception as ex: self.logger.critical("".join(traceback.format_exception(ex))) diff --git a/src/main.py b/src/main.py index 558407c..9dcf309 100644 --- a/src/main.py +++ b/src/main.py @@ -4,8 +4,9 @@ import os from dotenv import load_dotenv +from dbmanspecimen import DBManagerSpecimen +from dbmanresult import DBManagerResult from contract import ProofChainContract -from dbmanager import DBManager from finalizer import Finalizer @@ -38,7 +39,7 @@ def is_any_thread_alive(threads): finalizer_prvkey=FINALIZER_PRIVATE_KEY, finalizer_address=FINALIZER_ADDRESS, ) - dbm = DBManager( + dbms = DBManagerSpecimen( starting_point=int(BLOCK_ID_START), user=DB_USER, password=DB_PASSWORD, @@ -46,20 +47,26 @@ def is_any_thread_alive(threads): host=DB_HOST, chain_table=CHAIN_TABLE_NAME, ) - dbm.daemon = True + + dbms.daemon = True + + dbmr = DBManagerResult( + starting_point=int(BLOCK_ID_START), + user=DB_USER, + password=DB_PASSWORD, + database=DB_DATABASE, + host=DB_HOST, + chain_table=CHAIN_TABLE_NAME, + ) + + dbmr.daemon = True finalizer = Finalizer(contract) finalizer.daemon = True - dbm.start() + dbms.start() + dbmr.start() finalizer.start() - while is_any_thread_alive([finalizer, dbm]): + while is_any_thread_alive([finalizer, dbmr, dbms]): time.sleep(0.3) - - # - # contract.send_finalize(4, 10430382) - # contract.subscribe_on_event(handle_event) - - # dbm.join() - # finalizer.join()