From da380a61b4071c95b5e876fb178dae95a262146f Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Sat, 5 Aug 2023 06:04:32 -0400 Subject: [PATCH 1/9] feat(txpool) modify txpool guard to be for pipeline syncs only --- crates/interfaces/src/sync.rs | 6 ++ crates/net/network-api/src/lib.rs | 3 + crates/net/network-api/src/noop.rs | 4 ++ crates/net/network/src/network.rs | 27 +++++++- crates/net/network/src/transactions.rs | 85 +++++++++++++++++++++----- 5 files changed, 109 insertions(+), 16 deletions(-) diff --git a/crates/interfaces/src/sync.rs b/crates/interfaces/src/sync.rs index 8becffcda09d..622df29a3ca4 100644 --- a/crates/interfaces/src/sync.rs +++ b/crates/interfaces/src/sync.rs @@ -7,6 +7,9 @@ use reth_primitives::Head; pub trait SyncStateProvider: Send + Sync { /// Returns `true` if the network is undergoing sync. fn is_syncing(&self) -> bool; + + /// Returns `true` if the network is undergoing an initial (pipeline) sync. + fn is_initially_syncing(&self) -> bool; } /// An updater for updating the [SyncState] and status of the network. @@ -54,6 +57,9 @@ impl SyncStateProvider for NoopSyncStateUpdater { fn is_syncing(&self) -> bool { false } + fn is_initially_syncing(&self) -> bool { + false + } } impl NetworkSyncUpdater for NoopSyncStateUpdater { diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index f880fe745e66..b015e62f8ed2 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -49,6 +49,9 @@ pub trait NetworkInfo: Send + Sync { /// Returns `true` if the network is undergoing sync. fn is_syncing(&self) -> bool; + + /// Returns `true` if the network is undergoing an initial sync. + fn is_initially_syncing(&self) -> bool; } /// Provides general purpose information about Peers in the network. diff --git a/crates/net/network-api/src/noop.rs b/crates/net/network-api/src/noop.rs index 9a5309993f77..dc1ef17a93ca 100644 --- a/crates/net/network-api/src/noop.rs +++ b/crates/net/network-api/src/noop.rs @@ -45,6 +45,10 @@ impl NetworkInfo for NoopNetwork { fn is_syncing(&self) -> bool { false } + + fn is_initially_syncing(&self) -> bool { + false + } } impl PeersInfo for NoopNetwork { diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 9a3c8926caf5..2534c6d78a84 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -55,6 +55,7 @@ impl NetworkHandle { network_mode, bandwidth_meter, is_syncing: Arc::new(AtomicBool::new(false)), + initial_sync_done: Arc::new(AtomicBool::new(false)), chain_id, }; Self { inner: Arc::new(inner) } @@ -247,18 +248,38 @@ impl NetworkInfo for NetworkHandle { fn is_syncing(&self) -> bool { SyncStateProvider::is_syncing(self) } + + fn is_initially_syncing(&self) -> bool { + SyncStateProvider::is_initially_syncing(self) + } } impl SyncStateProvider for NetworkHandle { fn is_syncing(&self) -> bool { self.inner.is_syncing.load(Ordering::Relaxed) } + // used to guard the txpool + fn is_initially_syncing(&self) -> bool { + let init_sync_done = self.inner.initial_sync_done.load(Ordering::Relaxed); + let is_currently_syncing = self.inner.is_syncing.load(Ordering::Relaxed); + return is_currently_syncing && !init_sync_done + } } impl NetworkSyncUpdater for NetworkHandle { fn update_sync_state(&self, state: SyncState) { - let is_syncing = state.is_syncing(); - self.inner.is_syncing.store(is_syncing, Ordering::Relaxed) + let future_state = state.is_syncing(); + let curr_state = self.inner.is_syncing.load(Ordering::Relaxed); + let init_sync_done = self.inner.initial_sync_done.load(Ordering::Relaxed); + self.inner.is_syncing.store(future_state, Ordering::Relaxed); + if !init_sync_done { + // we've either been in Idle or pipeline Syncing here, check to see if we're + // moving from a pipeline sync to idle + if curr_state && !future_state { + // set initial_sync_done flag to true here, every sync after this will be a livesync + self.inner.initial_sync_done.store(true, Ordering::Relaxed); + } + } } /// Update the status of the node. @@ -285,6 +306,8 @@ struct NetworkInner { bandwidth_meter: BandwidthMeter, /// Represents if the network is currently syncing. is_syncing: Arc, + /// Used to differentiate between an initial pipeline sync or a live sync + initial_sync_done: Arc, /// The chain id chain_id: Arc, } diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 9f9dfd0682ff..38aa575b81ce 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -7,7 +7,7 @@ use crate::{ metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, NetworkHandle, }; -use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt}; use reth_eth_wire::{ EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, Transactions, @@ -29,7 +29,6 @@ use reth_transaction_pool::{ }; use std::{ collections::{hash_map::Entry, HashMap}, - future::Future, num::NonZeroUsize, pin::Pin, sync::Arc, @@ -206,8 +205,8 @@ where /// transactions to a fraction of peers usually ensures that all nodes receive the transaction /// and won't need to request it. fn on_new_transactions(&mut self, hashes: impl IntoIterator) { - // Nothing to propagate while syncing - if self.network.is_syncing() { + // Nothing to propagate while initially syncing + if self.network.is_initially_syncing() { return } @@ -310,8 +309,8 @@ where peer_id: PeerId, msg: NewPooledTransactionHashes, ) { - // If the node is currently syncing, ignore transactions - if self.network.is_syncing() { + // If the node is initially syncing, ignore transactions + if self.network.is_initially_syncing() { return } @@ -405,7 +404,7 @@ where // Send a `NewPooledTransactionHashes` to the peer with up to // `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the // pool - if !self.network.is_syncing() { + if !self.network.is_initially_syncing() { let peer = self.peers.get_mut(&peer_id).expect("is present; qed"); let mut msg_builder = PooledTransactionsHashesBuilder::new(version); @@ -437,8 +436,8 @@ where transactions: Vec, source: TransactionSource, ) { - // If the node is currently syncing, ignore transactions - if self.network.is_syncing() { + // If the node is pipeline syncing, ignore transactions + if self.network.is_initially_syncing() { return } @@ -595,10 +594,11 @@ where this.on_good_import(hash); } Err(err) => { - // if we're syncing and the transaction is bad we ignore it, otherwise we - // penalize the peer that sent the bad transaction with the assumption that the - // peer should have known that this transaction is bad. (e.g. consensus rules) - if err.is_bad_transaction() && !this.network.is_syncing() { + // if we're initially syncing and the transaction is bad we ignore it, otherwise + // we penalize the peer that sent the bad transaction with + // the assumption that the peer should have known that this + // transaction is bad. (e.g. consensus rules) + if err.is_bad_transaction() && !this.network.is_initially_syncing() { trace!(target: "net::tx", ?err, "Bad transaction import"); this.on_bad_import(*err.hash()); continue @@ -826,10 +826,65 @@ mod tests { peer_id, msg: Transactions(vec![TransactionSigned::default()]), }); - + // this is a no-op assert that doesn't actually test the SyncState + // logic above, a rand PeerId short-circuits even if + // SyncState::Idle assert!(pool.is_empty()); } + #[tokio::test(flavor = "multi_thread")] + #[cfg_attr(not(feature = "geth-tests"), ignore)] + async fn test_tx_broadcasts_through_two_syncs() { + reth_tracing::init_test_tracing(); + + let secret_key = SecretKey::new(&mut rand::thread_rng()); + + let client = NoopProvider::default(); + let pool = testing_pool(); + let config = NetworkConfigBuilder::new(secret_key) + .disable_discovery() + .listener_port(0) + .build(client); + let (handle, network, mut transactions, _) = NetworkManager::new(config) + .await + .unwrap() + .into_builder() + .transactions(pool.clone()) + .split_with_handle(); + + tokio::task::spawn(network); + + // go to syncing (pipline sync) to idle and then to syncing (live) + handle.update_sync_state(SyncState::Syncing); + assert!(NetworkInfo::is_syncing(&handle)); + handle.update_sync_state(SyncState::Idle); + assert!(!NetworkInfo::is_syncing(&handle)); + handle.update_sync_state(SyncState::Syncing); + assert!(NetworkInfo::is_syncing(&handle)); + + let peer_id = PeerId::random(); + transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { + peer_id, + msg: Transactions(vec![TransactionSigned::default()]), + }); + assert!(!NetworkInfo::is_initially_syncing(&handle)); + assert!(NetworkInfo::is_syncing(&handle)); + // assert!(!pool.is_empty()); this keeps failing - indicating that there's a bug with these + // tests + + // if we attempt to modify the initial test above with SyncState::Idle, the same assert + // fails. + + // this is because the tx doesn't actually make its way to the pool with a + // Rand PeerId because there is no matching peer in the TM's peerset + + // so lets say we fix that by implementing the boiler plate SessionEstablished logic from + // the below tests - the assert !pool.empty() still fails because there doesn't seem + // to be anything polling the TM future to make progress on draining the pool_import + // futures to make progress on adding the txs to the pool + // The same can be seen in the tests below, try assert !pool.empty() and they will fail! + } + #[tokio::test(flavor = "multi_thread")] async fn test_handle_incoming_transactions() { reth_tracing::init_test_tracing(); @@ -906,6 +961,8 @@ mod tests { *handle1.peer_id(), transactions.transactions_by_peers.get(&signed_tx.hash()).unwrap()[0] ); + // assert!(!pool.is_empty()); This fails - since pool_imports doesnt make progress on adding + // the tx to the pool (?) handle.terminate().await; } From 0f01360197358db787ea9a31f8350c5d38ea4020 Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Sat, 5 Aug 2023 19:21:10 -0400 Subject: [PATCH 2/9] refactor/simplify init_sync_done logic, add pipeline sync context to doc comment for flag --- crates/net/network-api/src/lib.rs | 2 +- crates/net/network/src/network.rs | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index b015e62f8ed2..53c30dcdf9a5 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -50,7 +50,7 @@ pub trait NetworkInfo: Send + Sync { /// Returns `true` if the network is undergoing sync. fn is_syncing(&self) -> bool; - /// Returns `true` if the network is undergoing an initial sync. + /// Returns `true` when a fresh node is initially being synced, known as a Pipeline sync fn is_initially_syncing(&self) -> bool; } diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 2534c6d78a84..d125c96c80a4 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -269,17 +269,17 @@ impl SyncStateProvider for NetworkHandle { impl NetworkSyncUpdater for NetworkHandle { fn update_sync_state(&self, state: SyncState) { let future_state = state.is_syncing(); - let curr_state = self.inner.is_syncing.load(Ordering::Relaxed); - let init_sync_done = self.inner.initial_sync_done.load(Ordering::Relaxed); + let curr_syncing = self.inner.is_syncing.load(Ordering::Relaxed); self.inner.is_syncing.store(future_state, Ordering::Relaxed); - if !init_sync_done { - // we've either been in Idle or pipeline Syncing here, check to see if we're - // moving from a pipeline sync to idle - if curr_state && !future_state { - // set initial_sync_done flag to true here, every sync after this will be a livesync - self.inner.initial_sync_done.store(true, Ordering::Relaxed); - } - } + // explicitly throw this error away since we expect it to error every time after it's been + // set to true, i.e after every live sync + let _ = self.inner.initial_sync_done.compare_exchange( + false, + // on first move back from Syncing->Idle - swap and set init_sync_done to true + curr_syncing && !future_state, + Ordering::Relaxed, + Ordering::Relaxed, + ); } /// Update the status of the node. From 246ea67b20a641576ba65e26ee4be88507bed8c8 Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Sat, 5 Aug 2023 19:51:01 -0400 Subject: [PATCH 3/9] remove unecessary ret statement, make linter happy --- crates/net/network/src/network.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index d125c96c80a4..6b64ce9b469e 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -262,7 +262,7 @@ impl SyncStateProvider for NetworkHandle { fn is_initially_syncing(&self) -> bool { let init_sync_done = self.inner.initial_sync_done.load(Ordering::Relaxed); let is_currently_syncing = self.inner.is_syncing.load(Ordering::Relaxed); - return is_currently_syncing && !init_sync_done + is_currently_syncing && !init_sync_done } } From 2bf479fb9642cab0acf11fe8d5e3ce320e5f1488 Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Mon, 7 Aug 2023 10:44:12 -0400 Subject: [PATCH 4/9] simplify one more load/store into swap, more comment clarity on is_initial_syncing flag --- crates/net/network-api/src/lib.rs | 2 +- crates/net/network/src/network.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index 53c30dcdf9a5..1428216679c9 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -50,7 +50,7 @@ pub trait NetworkInfo: Send + Sync { /// Returns `true` if the network is undergoing sync. fn is_syncing(&self) -> bool; - /// Returns `true` when a fresh node is initially being synced, known as a Pipeline sync + /// Returns `true` when a fresh node being bootstrapped is undergoing a Pipeline sync fn is_initially_syncing(&self) -> bool; } diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 6b64ce9b469e..6d3d9c79a453 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -269,14 +269,13 @@ impl SyncStateProvider for NetworkHandle { impl NetworkSyncUpdater for NetworkHandle { fn update_sync_state(&self, state: SyncState) { let future_state = state.is_syncing(); - let curr_syncing = self.inner.is_syncing.load(Ordering::Relaxed); - self.inner.is_syncing.store(future_state, Ordering::Relaxed); + let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed); // explicitly throw this error away since we expect it to error every time after it's been // set to true, i.e after every live sync let _ = self.inner.initial_sync_done.compare_exchange( false, // on first move back from Syncing->Idle - swap and set init_sync_done to true - curr_syncing && !future_state, + prev_state && !future_state, Ordering::Relaxed, Ordering::Relaxed, ); From 11fd70018915a7a548a2952f03e0f9a076a05c03 Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Tue, 8 Aug 2023 10:45:22 -0400 Subject: [PATCH 5/9] review comments, showcase integ test failures w.r.t txpool --- crates/net/network-api/src/lib.rs | 2 +- crates/net/network/src/network.rs | 13 +++------ crates/net/network/src/transactions.rs | 37 +++++++++++++++++++++++++- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index 1428216679c9..f15d3fec4aa6 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -50,7 +50,7 @@ pub trait NetworkInfo: Send + Sync { /// Returns `true` if the network is undergoing sync. fn is_syncing(&self) -> bool; - /// Returns `true` when a fresh node being bootstrapped is undergoing a Pipeline sync + /// Returns `true` when the node is undergoing the very first Pipeline sync. fn is_initially_syncing(&self) -> bool; } diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 6d3d9c79a453..7e31f861fa39 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -270,15 +270,10 @@ impl NetworkSyncUpdater for NetworkHandle { fn update_sync_state(&self, state: SyncState) { let future_state = state.is_syncing(); let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed); - // explicitly throw this error away since we expect it to error every time after it's been - // set to true, i.e after every live sync - let _ = self.inner.initial_sync_done.compare_exchange( - false, - // on first move back from Syncing->Idle - swap and set init_sync_done to true - prev_state && !future_state, - Ordering::Relaxed, - Ordering::Relaxed, - ); + let syncing_to_idle_state_transition = prev_state && !future_state; + if syncing_to_idle_state_transition { + self.inner.initial_sync_done.store(true, Ordering::Relaxed); + } } /// Update the status of the node. diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 38aa575b81ce..7bff49268484 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -832,6 +832,41 @@ mod tests { assert!(pool.is_empty()); } + // this test fails - because the transaction doesn't actually get propagated to the pool. + #[tokio::test(flavor = "multi_thread")] + #[cfg_attr(not(feature = "geth-tests"), ignore)] + async fn test_tx_broadcast_in_network_idle() { + reth_tracing::init_test_tracing(); + + let secret_key = SecretKey::new(&mut rand::thread_rng()); + + let client = NoopProvider::default(); + let pool = testing_pool(); + let config = NetworkConfigBuilder::new(secret_key) + .disable_discovery() + .listener_port(0) + .build(client); + let (handle, network, mut transactions, _) = NetworkManager::new(config) + .await + .unwrap() + .into_builder() + .transactions(pool.clone()) + .split_with_handle(); + + tokio::task::spawn(network); + + handle.update_sync_state(SyncState::Idle); + assert!(NetworkInfo::is_syncing(&handle)); + + let peer_id = PeerId::random(); + + transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { + peer_id, + msg: Transactions(vec![TransactionSigned::default()]), + }); + assert!(!pool.is_empty()); + } + #[tokio::test(flavor = "multi_thread")] #[cfg_attr(not(feature = "geth-tests"), ignore)] async fn test_tx_broadcasts_through_two_syncs() { @@ -869,7 +904,7 @@ mod tests { }); assert!(!NetworkInfo::is_initially_syncing(&handle)); assert!(NetworkInfo::is_syncing(&handle)); - // assert!(!pool.is_empty()); this keeps failing - indicating that there's a bug with these + assert!(!pool.is_empty()); // tests // if we attempt to modify the initial test above with SyncState::Idle, the same assert From 83ed55bc5bfd1d3347ed2e3eeda49fc67c92b825 Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Tue, 8 Aug 2023 10:59:08 -0400 Subject: [PATCH 6/9] break one more existing test to showcase hidden bug in tests --- crates/net/network/src/transactions.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 7bff49268484..c1dfcd36e994 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -996,8 +996,9 @@ mod tests { *handle1.peer_id(), transactions.transactions_by_peers.get(&signed_tx.hash()).unwrap()[0] ); - // assert!(!pool.is_empty()); This fails - since pool_imports doesnt make progress on adding - // the tx to the pool (?) + // This fails - since pool_imports doesnt make progress on adding the tx the pool, nothing + // is calling poll() + assert!(!pool.is_empty()); handle.terminate().await; } From 9997c9eb7eed80ee8ce401b30d1dd1a389361793 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 9 Aug 2023 14:31:19 +0200 Subject: [PATCH 7/9] some touchups --- crates/net/network/src/network.rs | 7 ++++--- crates/net/network/src/transactions.rs | 23 ++++++++++++++++------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 7e31f861fa39..1175eea863e8 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -260,9 +260,10 @@ impl SyncStateProvider for NetworkHandle { } // used to guard the txpool fn is_initially_syncing(&self) -> bool { - let init_sync_done = self.inner.initial_sync_done.load(Ordering::Relaxed); - let is_currently_syncing = self.inner.is_syncing.load(Ordering::Relaxed); - is_currently_syncing && !init_sync_done + if self.inner.initial_sync_done.load(Ordering::Relaxed) { + return false + } + self.inner.is_syncing.load(Ordering::Relaxed) } } diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index c1dfcd36e994..d5a8ba64da32 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -594,11 +594,12 @@ where this.on_good_import(hash); } Err(err) => { - // if we're initially syncing and the transaction is bad we ignore it, otherwise - // we penalize the peer that sent the bad transaction with - // the assumption that the peer should have known that this - // transaction is bad. (e.g. consensus rules) - if err.is_bad_transaction() && !this.network.is_initially_syncing() { + // if we're _currently_ syncing and the transaction is bad we ignore it, + // otherwise we penalize the peer that sent the bad + // transaction with the assumption that the peer should have + // known that this transaction is bad. (e.g. consensus + // rules) + if err.is_bad_transaction() && !this.network.is_syncing() { trace!(target: "net::tx", ?err, "Bad transaction import"); this.on_bad_import(*err.hash()); continue @@ -794,6 +795,7 @@ mod tests { use reth_rlp::Decodable; use reth_transaction_pool::test_utils::{testing_pool, MockTransaction}; use secp256k1::SecretKey; + use std::future::poll_fn; #[tokio::test(flavor = "multi_thread")] #[cfg_attr(not(feature = "geth-tests"), ignore)] @@ -996,9 +998,16 @@ mod tests { *handle1.peer_id(), transactions.transactions_by_peers.get(&signed_tx.hash()).unwrap()[0] ); - // This fails - since pool_imports doesnt make progress on adding the tx the pool, nothing - // is calling poll() + + // advance the transaction manager future + poll_fn(|cx| { + let _ = transactions.poll_unpin(cx); + Poll::Ready(()) + }) + .await; + assert!(!pool.is_empty()); + assert!(pool.get(&signed_tx.hash).is_some()); handle.terminate().await; } From 2f820cf5f5a696958d53065ade429a3b5d61918c Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Wed, 9 Aug 2023 10:40:19 -0400 Subject: [PATCH 8/9] add mock peer sessions/poll_fn to two_sync test, works --- crates/net/network/src/transactions.rs | 85 ++++++++++++++++++-------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index d5a8ba64da32..b2e3786cff85 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -873,7 +873,17 @@ mod tests { #[cfg_attr(not(feature = "geth-tests"), ignore)] async fn test_tx_broadcasts_through_two_syncs() { reth_tracing::init_test_tracing(); + let net = Testnet::create(3).await; + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + drop(handles); + let handle = net.spawn(); + + let listener0 = handle0.event_listener(); + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); let secret_key = SecretKey::new(&mut rand::thread_rng()); let client = NoopProvider::default(); @@ -882,7 +892,7 @@ mod tests { .disable_discovery() .listener_port(0) .build(client); - let (handle, network, mut transactions, _) = NetworkManager::new(config) + let (network_handle, network, mut transactions, _) = NetworkManager::new(config) .await .unwrap() .into_builder() @@ -892,34 +902,59 @@ mod tests { tokio::task::spawn(network); // go to syncing (pipline sync) to idle and then to syncing (live) - handle.update_sync_state(SyncState::Syncing); - assert!(NetworkInfo::is_syncing(&handle)); - handle.update_sync_state(SyncState::Idle); - assert!(!NetworkInfo::is_syncing(&handle)); - handle.update_sync_state(SyncState::Syncing); - assert!(NetworkInfo::is_syncing(&handle)); + network_handle.update_sync_state(SyncState::Syncing); + assert!(NetworkInfo::is_syncing(&network_handle)); + network_handle.update_sync_state(SyncState::Idle); + assert!(!NetworkInfo::is_syncing(&network_handle)); + network_handle.update_sync_state(SyncState::Syncing); + assert!(NetworkInfo::is_syncing(&network_handle)); - let peer_id = PeerId::random(); + // wait for all initiator connections + let mut established = listener0.take(2); + while let Some(ev) = established.next().await { + match ev { + NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + } => { + // to insert a new peer in transactions peerset + transactions.on_network_event(NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + }) + } + NetworkEvent::PeerAdded(_peer_id) => continue, + ev => { + panic!("unexpected event {ev:?}") + } + } + } + // random tx: + let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap(); + let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap(); transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { - peer_id, - msg: Transactions(vec![TransactionSigned::default()]), + peer_id: *handle1.peer_id(), + msg: Transactions(vec![signed_tx.clone()]), }); - assert!(!NetworkInfo::is_initially_syncing(&handle)); - assert!(NetworkInfo::is_syncing(&handle)); + poll_fn(|cx| { + let _ = transactions.poll_unpin(cx); + Poll::Ready(()) + }) + .await; + assert!(!NetworkInfo::is_initially_syncing(&network_handle)); + assert!(NetworkInfo::is_syncing(&network_handle)); assert!(!pool.is_empty()); - // tests - - // if we attempt to modify the initial test above with SyncState::Idle, the same assert - // fails. - - // this is because the tx doesn't actually make its way to the pool with a - // Rand PeerId because there is no matching peer in the TM's peerset - - // so lets say we fix that by implementing the boiler plate SessionEstablished logic from - // the below tests - the assert !pool.empty() still fails because there doesn't seem - // to be anything polling the TM future to make progress on draining the pool_import - // futures to make progress on adding the txs to the pool - // The same can be seen in the tests below, try assert !pool.empty() and they will fail! + handle.terminate().await; } #[tokio::test(flavor = "multi_thread")] From dc6d636010341d19d24638600e3ea674a57fe40e Mon Sep 17 00:00:00 2001 From: 0xprames <0xprames@proton.me> Date: Wed, 9 Aug 2023 10:56:03 -0400 Subject: [PATCH 9/9] add poll/mock peer to existing ignore_while_syncing test --- crates/net/network/src/transactions.rs | 103 ++++++++++++++----------- 1 file changed, 57 insertions(+), 46 deletions(-) diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index b2e3786cff85..7345893d1e74 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -799,47 +799,19 @@ mod tests { #[tokio::test(flavor = "multi_thread")] #[cfg_attr(not(feature = "geth-tests"), ignore)] - async fn test_ignored_tx_broadcasts_while_syncing() { + async fn test_ignored_tx_broadcasts_while_initially_syncing() { reth_tracing::init_test_tracing(); + let net = Testnet::create(3).await; - let secret_key = SecretKey::new(&mut rand::thread_rng()); - - let client = NoopProvider::default(); - let pool = testing_pool(); - let config = NetworkConfigBuilder::new(secret_key) - .disable_discovery() - .listener_port(0) - .build(client); - let (handle, network, mut transactions, _) = NetworkManager::new(config) - .await - .unwrap() - .into_builder() - .transactions(pool.clone()) - .split_with_handle(); - - tokio::task::spawn(network); - - handle.update_sync_state(SyncState::Syncing); - assert!(NetworkInfo::is_syncing(&handle)); - - let peer_id = PeerId::random(); - - transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { - peer_id, - msg: Transactions(vec![TransactionSigned::default()]), - }); - // this is a no-op assert that doesn't actually test the SyncState - // logic above, a rand PeerId short-circuits even if - // SyncState::Idle - assert!(pool.is_empty()); - } + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); - // this test fails - because the transaction doesn't actually get propagated to the pool. - #[tokio::test(flavor = "multi_thread")] - #[cfg_attr(not(feature = "geth-tests"), ignore)] - async fn test_tx_broadcast_in_network_idle() { - reth_tracing::init_test_tracing(); + drop(handles); + let handle = net.spawn(); + let listener0 = handle0.event_listener(); + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); let secret_key = SecretKey::new(&mut rand::thread_rng()); let client = NoopProvider::default(); @@ -848,7 +820,7 @@ mod tests { .disable_discovery() .listener_port(0) .build(client); - let (handle, network, mut transactions, _) = NetworkManager::new(config) + let (network_handle, network, mut transactions, _) = NetworkManager::new(config) .await .unwrap() .into_builder() @@ -857,16 +829,55 @@ mod tests { tokio::task::spawn(network); - handle.update_sync_state(SyncState::Idle); - assert!(NetworkInfo::is_syncing(&handle)); - - let peer_id = PeerId::random(); + // go to syncing (pipeline sync) + network_handle.update_sync_state(SyncState::Syncing); + assert!(NetworkInfo::is_syncing(&network_handle)); + assert!(NetworkInfo::is_initially_syncing(&network_handle)); + // wait for all initiator connections + let mut established = listener0.take(2); + while let Some(ev) = established.next().await { + match ev { + NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + } => { + // to insert a new peer in transactions peerset + transactions.on_network_event(NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + }) + } + NetworkEvent::PeerAdded(_peer_id) => continue, + ev => { + panic!("unexpected event {ev:?}") + } + } + } + // random tx: + let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap(); + let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap(); transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { - peer_id, - msg: Transactions(vec![TransactionSigned::default()]), + peer_id: *handle1.peer_id(), + msg: Transactions(vec![signed_tx.clone()]), }); - assert!(!pool.is_empty()); + poll_fn(|cx| { + let _ = transactions.poll_unpin(cx); + Poll::Ready(()) + }) + .await; + assert!(pool.is_empty()); + handle.terminate().await; } #[tokio::test(flavor = "multi_thread")] @@ -901,7 +912,7 @@ mod tests { tokio::task::spawn(network); - // go to syncing (pipline sync) to idle and then to syncing (live) + // go to syncing (pipeline sync) to idle and then to syncing (live) network_handle.update_sync_state(SyncState::Syncing); assert!(NetworkInfo::is_syncing(&network_handle)); network_handle.update_sync_state(SyncState::Idle);