diff --git a/crates/subspace-malicious-operator/src/malicious_bundle_producer.rs b/crates/subspace-malicious-operator/src/malicious_bundle_producer.rs index 177021ddd8..b76142fb1a 100644 --- a/crates/subspace-malicious-operator/src/malicious_bundle_producer.rs +++ b/crates/subspace-malicious-operator/src/malicious_bundle_producer.rs @@ -62,12 +62,12 @@ impl MaliciousOperatorStatus { *self = MaliciousOperatorStatus::NoStatus } - fn registered_operator(&self) -> Option<(&OperatorId, &OperatorPublicKey)> { + fn registered_operator(&self) -> Option<(OperatorId, OperatorPublicKey)> { match self { MaliciousOperatorStatus::Registered { operator_id, signing_key, - } => Some((operator_id, signing_key)), + } => Some((*operator_id, signing_key.clone())), _ => None, } } @@ -106,7 +106,10 @@ where CClient::Api: DomainsApi::Header> + BundleProducerElectionApi + AccountNonceApi, - TransactionPool: sc_transaction_pool_api::TransactionPool + 'static, + TransactionPool: sc_transaction_pool_api::TransactionPool< + Block = DomainBlock, + Hash = ::Hash, + > + 'static, { pub fn new( domain_id: DomainId, @@ -161,24 +164,15 @@ where } async fn handle_new_slot( - &self, + &mut self, operator_id: OperatorId, new_slot_info: OperatorSlotInfo, ) -> Option> { let slot = new_slot_info.slot; - let consensus_block_info = { - let info = self.consensus_client.info(); - sp_blockchain::HashAndNumber { - number: info.best_number, - hash: info.best_hash, - } - }; self.bundle_producer - .clone() - .produce_bundle(operator_id, consensus_block_info.clone(), new_slot_info) + .produce_bundle(operator_id, new_slot_info) .unwrap_or_else(move |error| { tracing::error!( - ?consensus_block_info, ?slot, ?operator_id, ?error, @@ -200,7 +194,7 @@ where { let maybe_opaque_bundle = self .handle_new_slot( - *operator_id, + operator_id, OperatorSlotInfo { slot, global_randomness, @@ -211,7 +205,7 @@ where if let Some(mut opaque_bundle) = maybe_opaque_bundle { if let Err(err) = self .malicious_bundle_tamper - .maybe_tamper_bundle(&mut opaque_bundle, signing_key) + .maybe_tamper_bundle(&mut opaque_bundle, &signing_key) { tracing::error!(?err, "Got error when try to tamper bundle"); } @@ -245,7 +239,7 @@ where if let Some((malicious_operator_id, _)) = self.malicious_operator_status.registered_operator() { - if next_operators.contains(malicious_operator_id) { + if next_operators.contains(&malicious_operator_id) { return Ok(()); } else { tracing::info!( @@ -255,7 +249,7 @@ where // Remove the current malicious operator to not account its stake toward // `current_total_stake` otherwise the next malicious operator will stake // more and more fund - current_operators.remove(malicious_operator_id); + current_operators.remove(&malicious_operator_id); self.malicious_operator_status.no_status(); } } diff --git a/domains/client/domain-operator/src/domain_bundle_producer.rs b/domains/client/domain-operator/src/domain_bundle_producer.rs index 6cfc64a385..20b9b8b59a 100644 --- a/domains/client/domain-operator/src/domain_bundle_producer.rs +++ b/domains/client/domain-operator/src/domain_bundle_producer.rs @@ -7,7 +7,7 @@ use domain_runtime_primitives::DomainCoreApi; use sc_client_api::{AuxStore, BlockBackend}; use sp_api::ProvideRuntimeApi; use sp_block_builder::BlockBuilder; -use sp_blockchain::{HashAndNumber, HeaderBackend}; +use sp_blockchain::HeaderBackend; use sp_domains::{ Bundle, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey, OperatorSignature, SealedBundleHeader, @@ -74,7 +74,8 @@ where Client::Api: BlockBuilder + DomainCoreApi + TaggedTransactionQueue, CClient: HeaderBackend + ProvideRuntimeApi, CClient::Api: DomainsApi + BundleProducerElectionApi, - TransactionPool: sc_transaction_pool_api::TransactionPool, + TransactionPool: + sc_transaction_pool_api::TransactionPool::Hash>, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -109,9 +110,8 @@ where } pub async fn produce_bundle( - self, + &mut self, operator_id: OperatorId, - consensus_block_info: HashAndNumber, slot_info: OperatorSlotInfo, ) -> sp_blockchain::Result>> { let OperatorSlotInfo { @@ -145,7 +145,7 @@ where if let Some((proof_of_election, operator_signing_key)) = self.bundle_producer_election_solver.solve_challenge( slot, - consensus_block_info.hash, + consensus_chain_best_hash, self.domain_id, operator_id, global_randomness, @@ -156,7 +156,7 @@ where let tx_range = self .consensus_client .runtime_api() - .domain_tx_range(consensus_block_info.hash, self.domain_id) + .domain_tx_range(consensus_chain_best_hash, self.domain_id) .map_err(|error| { sp_blockchain::Error::Application(Box::from(format!( "Error getting tx range: {error}" diff --git a/domains/client/domain-operator/src/domain_bundle_proposer.rs b/domains/client/domain-operator/src/domain_bundle_proposer.rs index 35f1c3536b..c2e26e5cf6 100644 --- a/domains/client/domain-operator/src/domain_bundle_proposer.rs +++ b/domains/client/domain-operator/src/domain_bundle_proposer.rs @@ -11,23 +11,63 @@ use sp_domains::{ BundleHeader, DomainId, DomainsApi, ExecutionReceipt, HeaderHashingFor, ProofOfElection, }; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor, One, Zero}; +use sp_runtime::Percent; use sp_transaction_pool::runtime_api::TaggedTransactionQueue; use sp_weights::Weight; +use std::collections::HashSet; use std::marker::PhantomData; use std::sync::Arc; use std::time; use subspace_core_primitives::U256; use subspace_runtime_primitives::Balance; -pub struct DomainBundleProposer { +/// If the bundle utilization is below `BUNDLE_UTILIZATION_THRESHOLD` we will attempt to push +/// at most `MAX_SKIPPED_TRANSACTIONS` number of transactions before quitting for real. +const MAX_SKIPPED_TRANSACTIONS: usize = 8; + +const BUNDLE_UTILIZATION_THRESHOLD: Percent = Percent::from_percent(95); + +// `PreviousBundledTx` used to keep track of tx that have included in previous bundle and avoid +// to re-include the these tx in the following bundle to reduce deplicated tx. +struct PreviousBundledTx { + bundled_at: ::Hash, + tx_hashes: HashSet<::Hash>, +} + +impl PreviousBundledTx { + fn new() -> Self { + PreviousBundledTx { + bundled_at: Default::default(), + tx_hashes: HashSet::new(), + } + } + + fn already_bundled(&self, tx_hash: &::Hash) -> bool { + self.tx_hashes.contains(tx_hash) + } + + fn maybe_clear(&mut self, consensus_hash: ::Hash) { + if self.bundled_at != consensus_hash { + self.bundled_at = consensus_hash; + self.tx_hashes.clear(); + } + } + + fn add_bundled(&mut self, tx_hash: ::Hash) { + self.tx_hashes.insert(tx_hash); + } +} + +pub struct DomainBundleProposer { domain_id: DomainId, client: Arc, consensus_client: Arc, transaction_pool: Arc, + previous_bundled_tx: PreviousBundledTx, _phantom_data: PhantomData<(Block, CBlock)>, } -impl Clone +impl Clone for DomainBundleProposer { fn clone(&self) -> Self { @@ -36,6 +76,7 @@ impl Clone client: self.client.clone(), consensus_client: self.consensus_client.clone(), transaction_pool: self.transaction_pool.clone(), + previous_bundled_tx: PreviousBundledTx::new(), _phantom_data: self._phantom_data, } } @@ -56,7 +97,8 @@ where Client::Api: BlockBuilder + DomainCoreApi + TaggedTransactionQueue, CClient: HeaderBackend + ProvideRuntimeApi, CClient::Api: DomainsApi, - TransactionPool: sc_transaction_pool_api::TransactionPool, + TransactionPool: + sc_transaction_pool_api::TransactionPool::Hash>, { pub fn new( domain_id: DomainId, @@ -69,12 +111,13 @@ where client, consensus_client, transaction_pool, + previous_bundled_tx: PreviousBundledTx::new(), _phantom_data: PhantomData, } } pub(crate) async fn propose_bundle_at( - &self, + &mut self, proof_of_election: ProofOfElection, tx_range: U256, ) -> sp_blockchain::Result> { @@ -96,6 +139,12 @@ where } }; + // Clear the previous bundled tx info whenever the consensus chain tip is changed, + // this allow the operator to retry for the previous bundled tx in case the previous + // bundle fail to submit to the consensus chain due to any reason. + self.previous_bundled_tx + .maybe_clear(self.consensus_client.info().best_hash); + let bundle_vrf_hash = U256::from_be_bytes(proof_of_election.vrf_hash()); let domain_block_limit = self .consensus_client @@ -109,6 +158,7 @@ where let mut extrinsics = Vec::new(); let mut estimated_bundle_weight = Weight::default(); let mut bundle_size = 0u32; + let mut skipped = 0; // Seperate code block to make sure that runtime api instance is dropped after validation is done. { @@ -132,6 +182,14 @@ where continue; } + // Skip the tx if is is already bundled by a recent bundle + if self + .previous_bundled_tx + .already_bundled(&self.transaction_pool.hash_of(pending_tx_data)) + { + continue; + } + let tx_weight = runtime_api_instance .extrinsic_weight(parent_hash, pending_tx_data) .map_err(|error| { @@ -142,17 +200,30 @@ where let next_estimated_bundle_weight = estimated_bundle_weight.saturating_add(tx_weight); if next_estimated_bundle_weight.any_gt(domain_block_limit.max_block_weight) { - break; + if skipped < MAX_SKIPPED_TRANSACTIONS + && Percent::from_rational( + estimated_bundle_weight.ref_time(), + domain_block_limit.max_block_weight.ref_time(), + ) < BUNDLE_UTILIZATION_THRESHOLD + { + skipped += 1; + } else { + break; + } } let next_bundle_size = bundle_size + pending_tx_data.encoded_size() as u32; if next_bundle_size > domain_block_limit.max_block_size { - break; + if skipped < MAX_SKIPPED_TRANSACTIONS + && Percent::from_rational(bundle_size, domain_block_limit.max_block_size) + < BUNDLE_UTILIZATION_THRESHOLD + { + skipped += 1; + } else { + break; + } } - estimated_bundle_weight = next_estimated_bundle_weight; - bundle_size = next_bundle_size; - // Double check the transaction validity, because the tx pool are re-validate the transaction // in pool asynchronously so there is race condition that the operator imported a domain block // and start producing bundle immediately before the re-validation based on the latest block @@ -181,7 +252,12 @@ where continue; } + estimated_bundle_weight = next_estimated_bundle_weight; + bundle_size = next_bundle_size; extrinsics.push(pending_tx_data.clone()); + + self.previous_bundled_tx + .add_bundled(self.transaction_pool.hash_of(pending_tx_data)); } } diff --git a/domains/client/domain-operator/src/domain_worker.rs b/domains/client/domain-operator/src/domain_worker.rs index 6b5d1c9e3e..d1678348fe 100644 --- a/domains/client/domain-operator/src/domain_worker.rs +++ b/domains/client/domain-operator/src/domain_worker.rs @@ -1,35 +1,222 @@ -//! Shared domain worker functions. +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::bundle_processor::BundleProcessor; +use crate::domain_bundle_producer::DomainBundleProducer; use crate::utils::{BlockInfo, OperatorSlotInfo}; +use crate::{NewSlotNotification, OperatorStreams}; +use domain_runtime_primitives::DomainCoreApi; use futures::channel::mpsc; use futures::{SinkExt, Stream, StreamExt}; -use sc_client_api::{BlockBackend, BlockImportNotification, BlockchainEvents}; +use sc_client_api::{ + AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, ProofProvider, +}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; -use sp_api::{ApiError, ApiExt, ProvideRuntimeApi}; -use sp_blockchain::{HashAndNumber, HeaderBackend}; -use sp_core::traits::SpawnEssentialNamed; -use sp_domains::{DomainsApi, OpaqueBundle}; +use sp_api::{ApiExt, ProvideRuntimeApi}; +use sp_block_builder::BlockBuilder; +use sp_blockchain::{HeaderBackend, HeaderMetadata}; +use sp_core::traits::{CodeExecutor, SpawnEssentialNamed}; +use sp_core::H256; +use sp_domains::{BundleProducerElectionApi, DomainsApi, OpaqueBundle, OperatorId}; +use sp_domains_fraud_proof::FraudProofApi; +use sp_messenger::MessengerApi; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use std::future::Future; -use std::pin::Pin; +use sp_transaction_pool::runtime_api::TaggedTransactionQueue; +use std::pin::pin; use std::sync::Arc; use subspace_runtime_primitives::Balance; +use tracing::{info, Instrument}; pub type OpaqueBundleFor = OpaqueBundle, ::Hash, ::Header, Balance>; +#[allow(clippy::type_complexity, clippy::too_many_arguments)] +pub(super) async fn start_worker< + Block, + CBlock, + Client, + CClient, + TransactionPool, + Backend, + IBNS, + CIBNS, + NSNS, + ASS, + E, +>( + spawn_essential: Box, + consensus_client: Arc, + consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, + maybe_operator_id: Option, + mut bundle_producer: DomainBundleProducer, + bundle_processor: BundleProcessor, + operator_streams: OperatorStreams, +) where + Block: BlockT, + Block::Hash: Into, + CBlock: BlockT, + NumberFor: From> + Into>, + CBlock::Hash: From, + Client: HeaderBackend + + BlockBackend + + AuxStore + + ProvideRuntimeApi + + ProofProvider + + Finalizer + + 'static, + Client::Api: DomainCoreApi + + MessengerApi> + + BlockBuilder + + sp_api::ApiExt + + TaggedTransactionQueue, + CClient: HeaderBackend + + HeaderMetadata + + BlockBackend + + ProofProvider + + ProvideRuntimeApi + + BlockchainEvents + + 'static, + CClient::Api: DomainsApi + + MessengerApi> + + BundleProducerElectionApi + + FraudProofApi, + TransactionPool: sc_transaction_pool_api::TransactionPool::Hash> + + 'static, + Backend: sc_client_api::Backend + 'static, + IBNS: Stream, mpsc::Sender<()>)> + Send + 'static, + CIBNS: Stream> + Send + 'static, + NSNS: Stream + Send + 'static, + ASS: Stream> + Send + 'static, + E: CodeExecutor, +{ + let span = tracing::Span::current(); + + let OperatorStreams { + consensus_block_import_throttling_buffer_size, + block_importing_notification_stream, + imported_block_notification_stream, + new_slot_notification_stream, + acknowledgement_sender_stream, + _phantom, + } = operator_streams; + + let mut throttled_block_import_notification_stream = + throttling_block_import_notifications::( + spawn_essential, + consensus_client.clone(), + Box::pin(block_importing_notification_stream), + Box::pin(imported_block_notification_stream), + consensus_block_import_throttling_buffer_size, + ); + + if let Some(operator_id) = maybe_operator_id { + info!("👷 Running as Operator[{operator_id}]..."); + let mut new_slot_notification_stream = pin!(new_slot_notification_stream); + let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream); + loop { + tokio::select! { + // Ensure any new slot/block import must handle first before the `acknowledgement_sender_stream` + // NOTE: this is only necessary for the test. + biased; + + Some((slot, global_randomness)) = new_slot_notification_stream.next() => { + let res = bundle_producer + .produce_bundle( + operator_id, + OperatorSlotInfo { + slot, + global_randomness, + }, + ) + .instrument(span.clone()) + .await; + match res { + Err(err) => { + tracing::error!(?slot, ?err, "Error at producing bundle."); + } + Ok(Some(opaque_bundle)) => { + let best_hash = consensus_client.info().best_hash; + let mut runtime_api = consensus_client.runtime_api(); + runtime_api.register_extension(consensus_offchain_tx_pool_factory.offchain_transaction_pool(best_hash)); + if let Err(err) = runtime_api.submit_bundle_unsigned(best_hash, opaque_bundle) { + tracing::error!(?slot, ?err, "Error at submitting bundle."); + } + } + Ok(None) => {} + } + } + Some(maybe_block_info) = throttled_block_import_notification_stream.next() => { + if let Some(block_info) = maybe_block_info { + if let Err(error) = bundle_processor + .clone() + .process_bundles(( + block_info.hash, + block_info.number, + block_info.is_new_best, + )) + .instrument(span.clone()) + .await + { + tracing::error!(?error, "Failed to process consensus block"); + // Bring down the service as bundles processor is an essential task. + // TODO: more graceful shutdown. + break; + } + } + } + // In production the `acknowledgement_sender_stream` is an empty stream, it only set to + // real stream in test + Some(mut acknowledgement_sender) = acknowledgement_sender_stream.next() => { + if let Err(err) = acknowledgement_sender.send(()).await { + tracing::error!( + ?err, + "Failed to send acknowledgement" + ); + } + } + } + } + } else { + info!("🧑‍ Running as Full node..."); + drop(new_slot_notification_stream); + drop(acknowledgement_sender_stream); + while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await { + if let Some(block_info) = maybe_block_info { + if let Err(error) = bundle_processor + .clone() + .process_bundles((block_info.hash, block_info.number, block_info.is_new_best)) + .instrument(span.clone()) + .await + { + tracing::error!(?error, "Failed to process consensus block"); + // Bring down the service as bundles processor is an essential task. + // TODO: more graceful shutdown. + break; + } + } + } + } +} + /// Throttle the consensus block import notification based on the `consensus_block_import_throttling_buffer_size` /// to pause the consensus block import in case the consensus chain runs much faster than the domain. /// /// Return the throttled block import notification stream #[allow(clippy::too_many_arguments)] -pub(crate) fn throttling_block_import_notifications< - Block, - CBlock, - CClient, - BlocksImporting, - BlocksImported, ->( +fn throttling_block_import_notifications( spawn_essential: Box, consensus_client: Arc, mut blocks_importing: BlocksImporting, @@ -117,48 +304,3 @@ where block_info_receiver } - -pub(crate) async fn on_new_slot( - consensus_client: &CClient, - consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, - bundler: &BundlerFn, - operator_slot_info: OperatorSlotInfo, -) -> Result<(), ApiError> -where - Block: BlockT, - CBlock: BlockT, - CClient: HeaderBackend + ProvideRuntimeApi, - CClient::Api: DomainsApi, - BundlerFn: Fn( - HashAndNumber, - OperatorSlotInfo, - ) -> Pin>> + Send>> - + Send - + Sync, -{ - let best_hash = consensus_client.info().best_hash; - let best_number = consensus_client.info().best_number; - - let consensus_block_info = HashAndNumber { - number: best_number, - hash: best_hash, - }; - - let slot = operator_slot_info.slot; - let opaque_bundle = match bundler(consensus_block_info, operator_slot_info).await { - Some(opaque_bundle) => opaque_bundle, - None => { - tracing::debug!("No bundle produced on slot {slot}"); - return Ok(()); - } - }; - - let mut runtime_api = consensus_client.runtime_api(); - // Register the offchain tx pool to be able to use it from the runtime. - runtime_api.register_extension( - consensus_offchain_tx_pool_factory.offchain_transaction_pool(best_hash), - ); - runtime_api.submit_bundle_unsigned(best_hash, opaque_bundle)?; - - Ok(()) -} diff --git a/domains/client/domain-operator/src/domain_worker_starter.rs b/domains/client/domain-operator/src/domain_worker_starter.rs deleted file mode 100644 index a01e8e7f71..0000000000 --- a/domains/client/domain-operator/src/domain_worker_starter.rs +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -use crate::bundle_processor::BundleProcessor; -use crate::domain_bundle_producer::DomainBundleProducer; -use crate::domain_worker::{on_new_slot, throttling_block_import_notifications}; -use crate::utils::OperatorSlotInfo; -use crate::{NewSlotNotification, OperatorStreams}; -use domain_runtime_primitives::DomainCoreApi; -use futures::channel::mpsc; -use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt}; -use sc_client_api::{ - AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, ProofProvider, -}; -use sc_transaction_pool_api::OffchainTransactionPoolFactory; -use sp_api::ProvideRuntimeApi; -use sp_block_builder::BlockBuilder; -use sp_blockchain::{HeaderBackend, HeaderMetadata}; -use sp_core::traits::{CodeExecutor, SpawnEssentialNamed}; -use sp_core::H256; -use sp_domains::{BundleProducerElectionApi, DomainsApi, OperatorId}; -use sp_domains_fraud_proof::FraudProofApi; -use sp_messenger::MessengerApi; -use sp_runtime::traits::{Block as BlockT, NumberFor}; -use sp_transaction_pool::runtime_api::TaggedTransactionQueue; -use std::pin::pin; -use std::sync::Arc; -use subspace_runtime_primitives::Balance; -use tracing::{info, Instrument}; - -#[allow(clippy::type_complexity, clippy::too_many_arguments)] -pub(super) async fn start_worker< - Block, - CBlock, - Client, - CClient, - TransactionPool, - Backend, - IBNS, - CIBNS, - NSNS, - ASS, - E, ->( - spawn_essential: Box, - consensus_client: Arc, - consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, - maybe_operator_id: Option, - bundle_producer: DomainBundleProducer, - bundle_processor: BundleProcessor, - operator_streams: OperatorStreams, -) where - Block: BlockT, - Block::Hash: Into, - CBlock: BlockT, - NumberFor: From> + Into>, - CBlock::Hash: From, - Client: HeaderBackend - + BlockBackend - + AuxStore - + ProvideRuntimeApi - + ProofProvider - + Finalizer - + 'static, - Client::Api: DomainCoreApi - + MessengerApi> - + BlockBuilder - + sp_api::ApiExt - + TaggedTransactionQueue, - CClient: HeaderBackend - + HeaderMetadata - + BlockBackend - + ProofProvider - + ProvideRuntimeApi - + BlockchainEvents - + 'static, - CClient::Api: DomainsApi - + MessengerApi> - + BundleProducerElectionApi - + FraudProofApi, - TransactionPool: sc_transaction_pool_api::TransactionPool + 'static, - Backend: sc_client_api::Backend + 'static, - IBNS: Stream, mpsc::Sender<()>)> + Send + 'static, - CIBNS: Stream> + Send + 'static, - NSNS: Stream + Send + 'static, - ASS: Stream> + Send + 'static, - E: CodeExecutor, -{ - let span = tracing::Span::current(); - - let OperatorStreams { - consensus_block_import_throttling_buffer_size, - block_importing_notification_stream, - imported_block_notification_stream, - new_slot_notification_stream, - acknowledgement_sender_stream, - _phantom, - } = operator_streams; - - let mut throttled_block_import_notification_stream = - throttling_block_import_notifications::( - spawn_essential, - consensus_client.clone(), - Box::pin(block_importing_notification_stream), - Box::pin(imported_block_notification_stream), - consensus_block_import_throttling_buffer_size, - ); - - if let Some(operator_id) = maybe_operator_id { - info!("👷 Running as Operator[{operator_id}]..."); - let bundler_fn = { - let span = span.clone(); - move |consensus_block_info: sp_blockchain::HashAndNumber, slot_info| { - bundle_producer - .clone() - .produce_bundle(operator_id, consensus_block_info.clone(), slot_info) - .instrument(span.clone()) - .unwrap_or_else(move |error| { - tracing::error!( - ?consensus_block_info, - ?error, - "Error at producing bundle." - ); - None - }) - .boxed() - } - }; - let mut new_slot_notification_stream = pin!(new_slot_notification_stream); - let mut acknowledgement_sender_stream = pin!(acknowledgement_sender_stream); - loop { - tokio::select! { - // Ensure any new slot/block import must handle first before the `acknowledgement_sender_stream` - // NOTE: this is only necessary for the test. - biased; - - Some((slot, global_randomness)) = new_slot_notification_stream.next() => { - if let Err(error) = on_new_slot::( - consensus_client.as_ref(), - consensus_offchain_tx_pool_factory.clone(), - &bundler_fn, - OperatorSlotInfo { - slot, - global_randomness, - }, - ) - .await - { - tracing::error!( - ?error, - "Error occurred on producing a bundle at slot {slot}" - ); - break; - } - } - Some(maybe_block_info) = throttled_block_import_notification_stream.next() => { - if let Some(block_info) = maybe_block_info { - if let Err(error) = bundle_processor - .clone() - .process_bundles(( - block_info.hash, - block_info.number, - block_info.is_new_best, - )) - .instrument(span.clone()) - .await - { - tracing::error!(?error, "Failed to process consensus block"); - // Bring down the service as bundles processor is an essential task. - // TODO: more graceful shutdown. - break; - } - } - } - // In production the `acknowledgement_sender_stream` is an empty stream, it only set to - // real stream in test - Some(mut acknowledgement_sender) = acknowledgement_sender_stream.next() => { - if let Err(err) = acknowledgement_sender.send(()).await { - tracing::error!( - ?err, - "Failed to send acknowledgement" - ); - } - } - } - } - } else { - info!("🧑‍ Running as Full node..."); - drop(new_slot_notification_stream); - drop(acknowledgement_sender_stream); - while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await { - if let Some(block_info) = maybe_block_info { - if let Err(error) = bundle_processor - .clone() - .process_bundles((block_info.hash, block_info.number, block_info.is_new_best)) - .instrument(span.clone()) - .await - { - tracing::error!(?error, "Failed to process consensus block"); - // Bring down the service as bundles processor is an essential task. - // TODO: more graceful shutdown. - break; - } - } - } - } -} diff --git a/domains/client/domain-operator/src/lib.rs b/domains/client/domain-operator/src/lib.rs index 23f9744a01..9c7bf9fdce 100644 --- a/domains/client/domain-operator/src/lib.rs +++ b/domains/client/domain-operator/src/lib.rs @@ -68,7 +68,6 @@ mod domain_block_processor; pub mod domain_bundle_producer; pub mod domain_bundle_proposer; mod domain_worker; -mod domain_worker_starter; mod fetch_domain_bootstrap_info; mod fraud_proof; mod operator; diff --git a/domains/client/domain-operator/src/operator.rs b/domains/client/domain-operator/src/operator.rs index 14f33b1e12..da8c9521e3 100644 --- a/domains/client/domain-operator/src/operator.rs +++ b/domains/client/domain-operator/src/operator.rs @@ -94,7 +94,8 @@ where + BundleProducerElectionApi + FraudProofApi, Backend: sc_client_api::Backend + Send + Sync + 'static, - TransactionPool: sc_transaction_pool_api::TransactionPool + 'static, + TransactionPool: sc_transaction_pool_api::TransactionPool::Hash> + + 'static, E: CodeExecutor, { /// Create a new instance. @@ -177,7 +178,7 @@ where spawn_essential.spawn_essential_blocking( "domain-operator-worker", None, - crate::domain_worker_starter::start_worker( + crate::domain_worker::start_worker( spawn_essential.clone(), params.consensus_client.clone(), params.consensus_offchain_tx_pool_factory.clone(), diff --git a/domains/client/domain-operator/src/tests.rs b/domains/client/domain-operator/src/tests.rs index be304e68c3..f74730d91f 100644 --- a/domains/client/domain-operator/src/tests.rs +++ b/domains/client/domain-operator/src/tests.rs @@ -8,9 +8,9 @@ use codec::{Decode, Encode}; use domain_runtime_primitives::{DomainCoreApi, Hash}; use domain_test_primitives::{OnchainStateApi, TimestampApi}; use domain_test_service::evm_domain_test_runtime::{Header, UncheckedExtrinsic}; -use domain_test_service::EcdsaKeyring::{Alice, Bob, Charlie}; +use domain_test_service::EcdsaKeyring::{Alice, Bob, Charlie, Eve}; use domain_test_service::Sr25519Keyring::{self, Ferdie}; -use domain_test_service::GENESIS_DOMAIN_ID; +use domain_test_service::{construct_extrinsic_generic, GENESIS_DOMAIN_ID}; use futures::StreamExt; use sc_client_api::{Backend, BlockBackend, BlockchainEvents, HeaderBackend}; use sc_consensus::SharedBlockImport; @@ -389,7 +389,7 @@ async fn test_domain_block_deriving_from_multiple_bundles() { ); // Run Alice (a evm domain authority node) - let mut alice = domain_test_service::DomainNodeBuilder::new( + let alice = domain_test_service::DomainNodeBuilder::new( tokio_handle.clone(), Alice, BasePath::new(directory.path().join("alice")), @@ -399,15 +399,18 @@ async fn test_domain_block_deriving_from_multiple_bundles() { produce_blocks!(ferdie, alice, 3).await.unwrap(); - let pre_bob_free_balance = alice.free_balance(Bob.to_account_id()); - let alice_account_nonce = alice.account_nonce(); - for i in 0..3 { - let tx = alice.construct_extrinsic( - alice_account_nonce + i, + let pre_eve_free_balance = alice.free_balance(Eve.to_account_id()); + for caller in [Alice, Bob, Charlie] { + let tx = construct_extrinsic_generic::( + &alice.client, pallet_balances::Call::transfer_allow_death { - dest: Bob.to_account_id(), + dest: Eve.to_account_id(), value: 1, }, + caller, + false, + 0, + 0u128, ); alice .send_extrinsic(tx) @@ -415,20 +418,17 @@ async fn test_domain_block_deriving_from_multiple_bundles() { .expect("Failed to send extrinsic"); // Produce a bundle and submit to the tx pool of the consensus node - let (slot, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; - assert!(bundle.is_some()); - - // In the last iteration, produce a consensus block which will included all the bundles - // and drive the corresponding domain block - if i == 2 { - produce_block_with!(ferdie.produce_block_with_slot(slot), alice) - .await - .unwrap(); - } + let (_, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + assert_eq!(bundle.unwrap().extrinsics.len(), 1); } + + let slot = ferdie.produce_slot(); + produce_block_with!(ferdie.produce_block_with_slot(slot), alice) + .await + .unwrap(); assert_eq!( - alice.free_balance(Bob.to_account_id()), - pre_bob_free_balance + 3 + alice.free_balance(Eve.to_account_id()), + pre_eve_free_balance + 3 ); let domain_block_number = alice.client.info().best_number; @@ -3668,7 +3668,7 @@ async fn test_bad_receipt_chain() { .into() }; - let bundle_producer = { + let mut bundle_producer = { let domain_bundle_proposer = DomainBundleProposer::new( GENESIS_DOMAIN_ID, alice.client.clone(), @@ -3732,15 +3732,9 @@ async fn test_bad_receipt_chain() { let parent_bad_receipt_hash = bad_receipt_hash; let slot = ferdie.produce_slot(); let bundle = { - let consensus_block_info = sp_blockchain::HashAndNumber { - number: ferdie.client.info().best_number, - hash: ferdie.client.info().best_hash, - }; bundle_producer - .clone() .produce_bundle( 0, - consensus_block_info, OperatorSlotInfo { slot, global_randomness: Randomness::from(Hash::random().to_fixed_bytes()), @@ -3851,3 +3845,160 @@ async fn test_domain_chain_storage_price_should_be_aligned_with_the_consensus_ch .unwrap(); assert_eq!(consensus_chain_byte_fee, operator_consensus_chain_byte_fee); } + +#[tokio::test(flavor = "multi_thread")] +async fn test_skip_duplicated_tx_in_previous_bundle() { + let directory = TempDir::new().expect("Must be able to create temporary directory"); + + let mut builder = sc_cli::LoggerBuilder::new(""); + builder.with_colors(false); + let _ = builder.init(); + + let tokio_handle = tokio::runtime::Handle::current(); + + // Start Ferdie + let mut ferdie = MockConsensusNode::run( + tokio_handle.clone(), + Ferdie, + BasePath::new(directory.path().join("ferdie")), + ); + + // Run Alice (a evm domain authority node) + let alice = domain_test_service::DomainNodeBuilder::new( + tokio_handle.clone(), + Alice, + BasePath::new(directory.path().join("alice")), + ) + .build_evm_node(Role::Authority, GENESIS_DOMAIN_ID, &mut ferdie) + .await; + + let bob_pre_balance = alice.free_balance(Bob.to_account_id()); + let call = pallet_balances::Call::transfer_allow_death { + dest: Bob.to_account_id(), + value: 1, + }; + + // Send a tx and produce a bundle, it will include the tx + alice + .construct_and_send_extrinsic_with(alice.account_nonce(), 0u32.into(), call.clone()) + .await + .expect("Failed to send extrinsic"); + let (slot, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + assert_eq!(bundle.unwrap().extrinsics.len(), 1); + + // Produce a few more bundles, all of them will be empty since the only tx in the tx pool is already pick + // up by the previous bundle + for _ in 0..3 { + let (_, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + assert!(bundle.unwrap().extrinsics.is_empty()); + } + + // Produce a domain that include all the bundles + produce_block_with!(ferdie.produce_block_with_slot(slot), alice) + .await + .unwrap(); + assert_eq!(alice.free_balance(Bob.to_account_id()), bob_pre_balance + 1); + + // Produce a bundle with a tx but not include it in the next consensus block + alice + .construct_and_send_extrinsic_with(alice.account_nonce(), 0u32.into(), call.clone()) + .await + .expect("Failed to send extrinsic"); + let (slot, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + assert_eq!(bundle.unwrap().extrinsics.len(), 1); + ferdie + .produce_block_with_slot_at(slot, ferdie.client.info().best_hash, Some(vec![])) + .await + .unwrap(); + + // Even the tx is inclued in a previous bundle, after the consensus chain's tip changed, the operator + // will resubmit the tx in the next bundle as retry + let (slot, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + assert_eq!(bundle.unwrap().extrinsics.len(), 1); + + produce_block_with!(ferdie.produce_block_with_slot(slot), alice) + .await + .unwrap(); + assert_eq!(alice.free_balance(Bob.to_account_id()), bob_pre_balance + 2); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_handle_duplicated_tx_with_diff_nonce_in_previous_bundle() { + let directory = TempDir::new().expect("Must be able to create temporary directory"); + + let mut builder = sc_cli::LoggerBuilder::new(""); + builder.with_colors(false); + let _ = builder.init(); + + let tokio_handle = tokio::runtime::Handle::current(); + + // Start Ferdie + let mut ferdie = MockConsensusNode::run( + tokio_handle.clone(), + Ferdie, + BasePath::new(directory.path().join("ferdie")), + ); + + // Run Alice (a evm domain authority node) + let alice = domain_test_service::DomainNodeBuilder::new( + tokio_handle.clone(), + Alice, + BasePath::new(directory.path().join("alice")), + ) + .build_evm_node(Role::Authority, GENESIS_DOMAIN_ID, &mut ferdie) + .await; + + let nonce = alice.account_nonce(); + let bob_pre_balance = alice.free_balance(Bob.to_account_id()); + let call = pallet_balances::Call::transfer_allow_death { + dest: Bob.to_account_id(), + value: 1, + }; + + // Send a tx and produce a bundle, it will include the tx + alice + .construct_and_send_extrinsic_with(nonce, 0u32.into(), call.clone()) + .await + .expect("Failed to send extrinsic"); + let (_, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + assert_eq!(bundle.unwrap().extrinsics.len(), 1); + + // Send a new tx with the same `nonce` and a tip then produce a bundle, this tx will replace + // the previous tx in the tx pool and included in the bundle + alice + .construct_and_send_extrinsic_with(nonce, 1u32.into(), call.clone()) + .await + .expect("Failed to send extrinsic"); + let (_, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + assert_eq!(bundle.unwrap().extrinsics.len(), 1); + + // Send a tx with `nonce + 1` and produce a bundle, it won't include this tx because the tx + // with `nonce` is included in previous bundle and is not submitted to the consensus chain yet + alice + .construct_and_send_extrinsic_with(nonce + 1, 0u32.into(), call.clone()) + .await + .expect("Failed to send extrinsic"); + let (slot, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + assert!(bundle.unwrap().extrinsics.is_empty()); + + // Produce a domain that include all the bundles + produce_block_with!(ferdie.produce_block_with_slot(slot), alice) + .await + .unwrap(); + assert_eq!(alice.free_balance(Bob.to_account_id()), bob_pre_balance + 1); + + // Send a tx with `nonce + 2` and produce a bundle, it will include both the previous `nonce + 1` + // tx and the `nonce + 2` tx + alice + .construct_and_send_extrinsic_with(nonce + 2, 0u32.into(), call.clone()) + .await + .expect("Failed to send extrinsic"); + let (slot, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + assert_eq!(bundle.unwrap().extrinsics.len(), 2); + + produce_block_with!(ferdie.produce_block_with_slot(slot), alice) + .await + .unwrap(); + assert_eq!(alice.free_balance(Bob.to_account_id()), bob_pre_balance + 3); + assert_eq!(alice.account_nonce(), nonce + 3); +} diff --git a/domains/test/service/src/domain.rs b/domains/test/service/src/domain.rs index a65b1a6f67..4d37fc5fef 100644 --- a/domains/test/service/src/domain.rs +++ b/domains/test/service/src/domain.rs @@ -312,14 +312,25 @@ where pub async fn construct_and_send_extrinsic( &mut self, function: impl Into<::RuntimeCall>, + ) -> Result { + self.construct_and_send_extrinsic_with(self.account_nonce(), 0.into(), function) + .await + } + + /// Construct an extrinsic with the given nonce and tip for the node account and send it to this node. + pub async fn construct_and_send_extrinsic_with( + &self, + nonce: u32, + tip: BalanceOf, + function: impl Into<::RuntimeCall>, ) -> Result { let extrinsic = construct_extrinsic_generic::( &self.client, function, self.key, false, - self.account_nonce(), - 0.into(), + nonce, + tip, ); self.rpc_handlers.send_transaction(extrinsic.into()).await }