diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index b2e3786cff85..7345893d1e74 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -799,47 +799,19 @@ mod tests { #[tokio::test(flavor = "multi_thread")] #[cfg_attr(not(feature = "geth-tests"), ignore)] - async fn test_ignored_tx_broadcasts_while_syncing() { + async fn test_ignored_tx_broadcasts_while_initially_syncing() { reth_tracing::init_test_tracing(); + let net = Testnet::create(3).await; - let secret_key = SecretKey::new(&mut rand::thread_rng()); - - let client = NoopProvider::default(); - let pool = testing_pool(); - let config = NetworkConfigBuilder::new(secret_key) - .disable_discovery() - .listener_port(0) - .build(client); - let (handle, network, mut transactions, _) = NetworkManager::new(config) - .await - .unwrap() - .into_builder() - .transactions(pool.clone()) - .split_with_handle(); - - tokio::task::spawn(network); - - handle.update_sync_state(SyncState::Syncing); - assert!(NetworkInfo::is_syncing(&handle)); - - let peer_id = PeerId::random(); - - transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { - peer_id, - msg: Transactions(vec![TransactionSigned::default()]), - }); - // this is a no-op assert that doesn't actually test the SyncState - // logic above, a rand PeerId short-circuits even if - // SyncState::Idle - assert!(pool.is_empty()); - } + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); - // this test fails - because the transaction doesn't actually get propagated to the pool. - #[tokio::test(flavor = "multi_thread")] - #[cfg_attr(not(feature = "geth-tests"), ignore)] - async fn test_tx_broadcast_in_network_idle() { - reth_tracing::init_test_tracing(); + drop(handles); + let handle = net.spawn(); + let listener0 = handle0.event_listener(); + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); let secret_key = SecretKey::new(&mut rand::thread_rng()); let client = NoopProvider::default(); @@ -848,7 +820,7 @@ mod tests { .disable_discovery() .listener_port(0) .build(client); - let (handle, network, mut transactions, _) = NetworkManager::new(config) + let (network_handle, network, mut transactions, _) = NetworkManager::new(config) .await .unwrap() .into_builder() @@ -857,16 +829,55 @@ mod tests { tokio::task::spawn(network); - handle.update_sync_state(SyncState::Idle); - assert!(NetworkInfo::is_syncing(&handle)); - - let peer_id = PeerId::random(); + // go to syncing (pipeline sync) + network_handle.update_sync_state(SyncState::Syncing); + assert!(NetworkInfo::is_syncing(&network_handle)); + assert!(NetworkInfo::is_initially_syncing(&network_handle)); + // wait for all initiator connections + let mut established = listener0.take(2); + while let Some(ev) = established.next().await { + match ev { + NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + } => { + // to insert a new peer in transactions peerset + transactions.on_network_event(NetworkEvent::SessionEstablished { + peer_id, + remote_addr, + client_version, + capabilities, + messages, + status, + version, + }) + } + NetworkEvent::PeerAdded(_peer_id) => continue, + ev => { + panic!("unexpected event {ev:?}") + } + } + } + // random tx: + let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap(); + let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap(); transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { - peer_id, - msg: Transactions(vec![TransactionSigned::default()]), + peer_id: *handle1.peer_id(), + msg: Transactions(vec![signed_tx.clone()]), }); - assert!(!pool.is_empty()); + poll_fn(|cx| { + let _ = transactions.poll_unpin(cx); + Poll::Ready(()) + }) + .await; + assert!(pool.is_empty()); + handle.terminate().await; } #[tokio::test(flavor = "multi_thread")] @@ -901,7 +912,7 @@ mod tests { tokio::task::spawn(network); - // go to syncing (pipline sync) to idle and then to syncing (live) + // go to syncing (pipeline sync) to idle and then to syncing (live) network_handle.update_sync_state(SyncState::Syncing); assert!(NetworkInfo::is_syncing(&network_handle)); network_handle.update_sync_state(SyncState::Idle);