Skip to content

Commit

Permalink
add poll/mock peer to existing ignore_while_syncing test
Browse files Browse the repository at this point in the history
  • Loading branch information
0xprames committed Aug 9, 2023
1 parent 2f820cf commit dc6d636
Showing 1 changed file with 57 additions and 46 deletions.
103 changes: 57 additions & 46 deletions crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,47 +799,19 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
#[cfg_attr(not(feature = "geth-tests"), ignore)]
async fn test_ignored_tx_broadcasts_while_syncing() {
async fn test_ignored_tx_broadcasts_while_initially_syncing() {
reth_tracing::init_test_tracing();
let net = Testnet::create(3).await;

let secret_key = SecretKey::new(&mut rand::thread_rng());

let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(client);
let (handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.split_with_handle();

tokio::task::spawn(network);

handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&handle));

let peer_id = PeerId::random();

transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id,
msg: Transactions(vec![TransactionSigned::default()]),
});
// this is a no-op assert that doesn't actually test the SyncState
// logic above, a rand PeerId short-circuits even if
// SyncState::Idle
assert!(pool.is_empty());
}
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();

// this test fails - because the transaction doesn't actually get propagated to the pool.
#[tokio::test(flavor = "multi_thread")]
#[cfg_attr(not(feature = "geth-tests"), ignore)]
async fn test_tx_broadcast_in_network_idle() {
reth_tracing::init_test_tracing();
drop(handles);
let handle = net.spawn();

let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());

let client = NoopProvider::default();
Expand All @@ -848,7 +820,7 @@ mod tests {
.disable_discovery()
.listener_port(0)
.build(client);
let (handle, network, mut transactions, _) = NetworkManager::new(config)
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
Expand All @@ -857,16 +829,55 @@ mod tests {

tokio::task::spawn(network);

handle.update_sync_state(SyncState::Idle);
assert!(NetworkInfo::is_syncing(&handle));

let peer_id = PeerId::random();
// go to syncing (pipeline sync)
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));
assert!(NetworkInfo::is_initially_syncing(&network_handle));

// wait for all initiator connections
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
panic!("unexpected event {ev:?}")
}
}
}
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap();
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id,
msg: Transactions(vec![TransactionSigned::default()]),
peer_id: *handle1.peer_id(),
msg: Transactions(vec![signed_tx.clone()]),
});
assert!(!pool.is_empty());
poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;
assert!(pool.is_empty());
handle.terminate().await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -901,7 +912,7 @@ mod tests {

tokio::task::spawn(network);

// go to syncing (pipline sync) to idle and then to syncing (live)
// go to syncing (pipeline sync) to idle and then to syncing (live)
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));
network_handle.update_sync_state(SyncState::Idle);
Expand Down

0 comments on commit dc6d636

Please sign in to comment.