From fd7e28e786394068540c35b960814b39c466ce9c Mon Sep 17 00:00:00 2001 From: prames <134806363+0xprames@users.noreply.github.com> Date: Wed, 9 Aug 2023 11:40:38 -0700 Subject: [PATCH] feat(txpool) modify txpool guard to be for pipeline syncs only (#4075) Co-authored-by: Matthias Seitz --- crates/interfaces/src/sync.rs | 6 + crates/net/network-api/src/lib.rs | 3 + crates/net/network-api/src/noop.rs | 4 + crates/net/network/src/network.rs | 22 ++- crates/net/network/src/transactions.rs | 190 ++++++++++++++++++++++--- 5 files changed, 202 insertions(+), 23 deletions(-) diff --git a/crates/interfaces/src/sync.rs b/crates/interfaces/src/sync.rs index 8becffcda09d..622df29a3ca4 100644 --- a/crates/interfaces/src/sync.rs +++ b/crates/interfaces/src/sync.rs @@ -7,6 +7,9 @@ use reth_primitives::Head; pub trait SyncStateProvider: Send + Sync { /// Returns `true` if the network is undergoing sync. fn is_syncing(&self) -> bool; + + /// Returns `true` if the network is undergoing an initial (pipeline) sync. + fn is_initially_syncing(&self) -> bool; } /// An updater for updating the [SyncState] and status of the network. @@ -54,6 +57,9 @@ impl SyncStateProvider for NoopSyncStateUpdater { fn is_syncing(&self) -> bool { false } + fn is_initially_syncing(&self) -> bool { + false + } } impl NetworkSyncUpdater for NoopSyncStateUpdater { diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index f880fe745e66..f15d3fec4aa6 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -49,6 +49,9 @@ pub trait NetworkInfo: Send + Sync { /// Returns `true` if the network is undergoing sync. fn is_syncing(&self) -> bool; + + /// Returns `true` when the node is undergoing the very first Pipeline sync. + fn is_initially_syncing(&self) -> bool; } /// Provides general purpose information about Peers in the network. diff --git a/crates/net/network-api/src/noop.rs b/crates/net/network-api/src/noop.rs index 9a5309993f77..dc1ef17a93ca 100644 --- a/crates/net/network-api/src/noop.rs +++ b/crates/net/network-api/src/noop.rs @@ -45,6 +45,10 @@ impl NetworkInfo for NoopNetwork { fn is_syncing(&self) -> bool { false } + + fn is_initially_syncing(&self) -> bool { + false + } } impl PeersInfo for NoopNetwork { diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 9a3c8926caf5..1175eea863e8 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -55,6 +55,7 @@ impl NetworkHandle { network_mode, bandwidth_meter, is_syncing: Arc::new(AtomicBool::new(false)), + initial_sync_done: Arc::new(AtomicBool::new(false)), chain_id, }; Self { inner: Arc::new(inner) } @@ -247,18 +248,33 @@ impl NetworkInfo for NetworkHandle { fn is_syncing(&self) -> bool { SyncStateProvider::is_syncing(self) } + + fn is_initially_syncing(&self) -> bool { + SyncStateProvider::is_initially_syncing(self) + } } impl SyncStateProvider for NetworkHandle { fn is_syncing(&self) -> bool { self.inner.is_syncing.load(Ordering::Relaxed) } + // used to guard the txpool + fn is_initially_syncing(&self) -> bool { + if self.inner.initial_sync_done.load(Ordering::Relaxed) { + return false + } + self.inner.is_syncing.load(Ordering::Relaxed) + } } impl NetworkSyncUpdater for NetworkHandle { fn update_sync_state(&self, state: SyncState) { - let is_syncing = state.is_syncing(); - self.inner.is_syncing.store(is_syncing, Ordering::Relaxed) + let future_state = state.is_syncing(); + let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed); + let syncing_to_idle_state_transition = prev_state && !future_state; + if syncing_to_idle_state_transition { + self.inner.initial_sync_done.store(true, Ordering::Relaxed); + } } /// Update the status of the node. @@ -285,6 +301,8 @@ struct NetworkInner { bandwidth_meter: BandwidthMeter, /// Represents if the network is currently syncing. is_syncing: Arc, + /// Used to differentiate between an initial pipeline sync or a live sync + initial_sync_done: Arc, /// The chain id chain_id: Arc, } diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 9f9dfd0682ff..7345893d1e74 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -7,7 +7,7 @@ use crate::{ metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, NetworkHandle, }; -use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt}; use reth_eth_wire::{ EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, Transactions, @@ -29,7 +29,6 @@ use reth_transaction_pool::{ }; use std::{ collections::{hash_map::Entry, HashMap}, - future::Future, num::NonZeroUsize, pin::Pin, sync::Arc, @@ -206,8 +205,8 @@ where /// transactions to a fraction of peers usually ensures that all nodes receive the transaction /// and won't need to request it. fn on_new_transactions(&mut self, hashes: impl IntoIterator) { - // Nothing to propagate while syncing - if self.network.is_syncing() { + // Nothing to propagate while initially syncing + if self.network.is_initially_syncing() { return } @@ -310,8 +309,8 @@ where peer_id: PeerId, msg: NewPooledTransactionHashes, ) { - // If the node is currently syncing, ignore transactions - if self.network.is_syncing() { + // If the node is initially syncing, ignore transactions + if self.network.is_initially_syncing() { return } @@ -405,7 +404,7 @@ where // Send a `NewPooledTransactionHashes` to the peer with up to // `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the // pool - if !self.network.is_syncing() { + if !self.network.is_initially_syncing() { let peer = self.peers.get_mut(&peer_id).expect("is present; qed"); let mut msg_builder = PooledTransactionsHashesBuilder::new(version); @@ -437,8 +436,8 @@ where transactions: Vec, source: TransactionSource, ) { - // If the node is currently syncing, ignore transactions - if self.network.is_syncing() { + // If the node is pipeline syncing, ignore transactions + if self.network.is_initially_syncing() { return } @@ -595,9 +594,11 @@ where this.on_good_import(hash); } Err(err) => { - // if we're syncing and the transaction is bad we ignore it, otherwise we - // penalize the peer that sent the bad transaction with the assumption that the - // peer should have known that this transaction is bad. (e.g. consensus rules) + // if we're _currently_ syncing and the transaction is bad we ignore it, + // otherwise we penalize the peer that sent the bad + // transaction with the assumption that the peer should have + // known that this transaction is bad. (e.g. consensus + // rules) if err.is_bad_transaction() && !this.network.is_syncing() { trace!(target: "net::tx", ?err, "Bad transaction import"); this.on_bad_import(*err.hash()); @@ -794,12 +795,23 @@ mod tests { use reth_rlp::Decodable; use reth_transaction_pool::test_utils::{testing_pool, MockTransaction}; use secp256k1::SecretKey; + use std::future::poll_fn; #[tokio::test(flavor = "multi_thread")] #[cfg_attr(not(feature = "geth-tests"), ignore)] - async fn test_ignored_tx_broadcasts_while_syncing() { + async fn test_ignored_tx_broadcasts_while_initially_syncing() { reth_tracing::init_test_tracing(); + let net = Testnet::create(3).await; + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + + drop(handles); + let handle = net.spawn(); + let listener0 = handle0.event_listener(); + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); let secret_key = SecretKey::new(&mut rand::thread_rng()); let client = NoopProvider::default(); @@ -808,7 +820,7 @@ mod tests { .disable_discovery() .listener_port(0) .build(client); - let (handle, network, mut transactions, _) = NetworkManager::new(config) + let (network_handle, network, mut transactions, _) = NetworkManager::new(config) .await .unwrap() .into_builder() @@ -817,17 +829,143 @@ mod tests { tokio::task::spawn(network); - handle.update_sync_state(SyncState::Syncing); - assert!(NetworkInfo::is_syncing(&handle)); - - let peer_id = PeerId::random(); + // go to syncing (pipeline sync) + network_handle.update_sync_state(SyncState::Syncing); + assert!(NetworkInfo::is_syncing(&network_handle)); + assert!(NetworkInfo::is_initially_syncing(&network_handle)); + // wait for all initiator connections + let mut established = listener0.take(2); + while let Some(ev) = established.next().await { + match ev { + NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + } => { + // to insert a new peer in transactions peerset + transactions.on_network_event(NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + }) + } + NetworkEvent::PeerAdded(_peer_id) => continue, + ev => { + panic!("unexpected event {ev:?}") + } + } + } + // random tx: + let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap(); + let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap(); transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { - peer_id, - msg: Transactions(vec![TransactionSigned::default()]), + peer_id: *handle1.peer_id(), + msg: Transactions(vec![signed_tx.clone()]), }); - + poll_fn(|cx| { + let _ = transactions.poll_unpin(cx); + Poll::Ready(()) + }) + .await; assert!(pool.is_empty()); + handle.terminate().await; + } + + #[tokio::test(flavor = "multi_thread")] + #[cfg_attr(not(feature = "geth-tests"), ignore)] + async fn test_tx_broadcasts_through_two_syncs() { + reth_tracing::init_test_tracing(); + let net = Testnet::create(3).await; + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + + drop(handles); + let handle = net.spawn(); + + let listener0 = handle0.event_listener(); + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + let secret_key = SecretKey::new(&mut rand::thread_rng()); + + let client = NoopProvider::default(); + let pool = testing_pool(); + let config = NetworkConfigBuilder::new(secret_key) + .disable_discovery() + .listener_port(0) + .build(client); + let (network_handle, network, mut transactions, _) = NetworkManager::new(config) + .await + .unwrap() + .into_builder() + .transactions(pool.clone()) + .split_with_handle(); + + tokio::task::spawn(network); + + // go to syncing (pipeline sync) to idle and then to syncing (live) + network_handle.update_sync_state(SyncState::Syncing); + assert!(NetworkInfo::is_syncing(&network_handle)); + network_handle.update_sync_state(SyncState::Idle); + assert!(!NetworkInfo::is_syncing(&network_handle)); + network_handle.update_sync_state(SyncState::Syncing); + assert!(NetworkInfo::is_syncing(&network_handle)); + + // wait for all initiator connections + let mut established = listener0.take(2); + while let Some(ev) = established.next().await { + match ev { + NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + } => { + // to insert a new peer in transactions peerset + transactions.on_network_event(NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + }) + } + NetworkEvent::PeerAdded(_peer_id) => continue, + ev => { + panic!("unexpected event {ev:?}") + } + } + } + // random tx: + let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap(); + let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap(); + transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { + peer_id: *handle1.peer_id(), + msg: Transactions(vec![signed_tx.clone()]), + }); + poll_fn(|cx| { + let _ = transactions.poll_unpin(cx); + Poll::Ready(()) + }) + .await; + assert!(!NetworkInfo::is_initially_syncing(&network_handle)); + assert!(NetworkInfo::is_syncing(&network_handle)); + assert!(!pool.is_empty()); + handle.terminate().await; } #[tokio::test(flavor = "multi_thread")] @@ -906,6 +1044,16 @@ mod tests { *handle1.peer_id(), transactions.transactions_by_peers.get(&signed_tx.hash()).unwrap()[0] ); + + // advance the transaction manager future + poll_fn(|cx| { + let _ = transactions.poll_unpin(cx); + Poll::Ready(()) + }) + .await; + + assert!(!pool.is_empty()); + assert!(pool.get(&signed_tx.hash).is_some()); handle.terminate().await; }