Skip to content

Commit

Permalink
feat(txpool) modify txpool guard to be for pipeline syncs only (#4075)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
0xprames and mattsse authored Aug 9, 2023
1 parent a8a2cfa commit fd7e28e
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 23 deletions.
6 changes: 6 additions & 0 deletions crates/interfaces/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use reth_primitives::Head;
pub trait SyncStateProvider: Send + Sync {
/// Returns `true` if the network is undergoing sync.
fn is_syncing(&self) -> bool;

/// Returns `true` if the network is undergoing an initial (pipeline) sync.
fn is_initially_syncing(&self) -> bool;
}

/// An updater for updating the [SyncState] and status of the network.
Expand Down Expand Up @@ -54,6 +57,9 @@ impl SyncStateProvider for NoopSyncStateUpdater {
fn is_syncing(&self) -> bool {
false
}
fn is_initially_syncing(&self) -> bool {
false
}
}

impl NetworkSyncUpdater for NoopSyncStateUpdater {
Expand Down
3 changes: 3 additions & 0 deletions crates/net/network-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub trait NetworkInfo: Send + Sync {

/// Returns `true` if the network is undergoing sync.
fn is_syncing(&self) -> bool;

/// Returns `true` when the node is undergoing the very first Pipeline sync.
fn is_initially_syncing(&self) -> bool;
}

/// Provides general purpose information about Peers in the network.
Expand Down
4 changes: 4 additions & 0 deletions crates/net/network-api/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ impl NetworkInfo for NoopNetwork {
fn is_syncing(&self) -> bool {
false
}

fn is_initially_syncing(&self) -> bool {
false
}
}

impl PeersInfo for NoopNetwork {
Expand Down
22 changes: 20 additions & 2 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl NetworkHandle {
network_mode,
bandwidth_meter,
is_syncing: Arc::new(AtomicBool::new(false)),
initial_sync_done: Arc::new(AtomicBool::new(false)),
chain_id,
};
Self { inner: Arc::new(inner) }
Expand Down Expand Up @@ -247,18 +248,33 @@ impl NetworkInfo for NetworkHandle {
fn is_syncing(&self) -> bool {
SyncStateProvider::is_syncing(self)
}

fn is_initially_syncing(&self) -> bool {
SyncStateProvider::is_initially_syncing(self)
}
}

impl SyncStateProvider for NetworkHandle {
fn is_syncing(&self) -> bool {
self.inner.is_syncing.load(Ordering::Relaxed)
}
// used to guard the txpool
fn is_initially_syncing(&self) -> bool {
if self.inner.initial_sync_done.load(Ordering::Relaxed) {
return false
}
self.inner.is_syncing.load(Ordering::Relaxed)
}
}

impl NetworkSyncUpdater for NetworkHandle {
fn update_sync_state(&self, state: SyncState) {
let is_syncing = state.is_syncing();
self.inner.is_syncing.store(is_syncing, Ordering::Relaxed)
let future_state = state.is_syncing();
let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed);
let syncing_to_idle_state_transition = prev_state && !future_state;
if syncing_to_idle_state_transition {
self.inner.initial_sync_done.store(true, Ordering::Relaxed);
}
}

/// Update the status of the node.
Expand All @@ -285,6 +301,8 @@ struct NetworkInner {
bandwidth_meter: BandwidthMeter,
/// Represents if the network is currently syncing.
is_syncing: Arc<AtomicBool>,
/// Used to differentiate between an initial pipeline sync or a live sync
initial_sync_done: Arc<AtomicBool>,
/// The chain id
chain_id: Arc<AtomicU64>,
}
Expand Down
190 changes: 169 additions & 21 deletions crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkHandle,
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt};
use reth_eth_wire::{
EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, PooledTransactions, Transactions,
Expand All @@ -29,7 +29,6 @@ use reth_transaction_pool::{
};
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
num::NonZeroUsize,
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -206,8 +205,8 @@ where
/// transactions to a fraction of peers usually ensures that all nodes receive the transaction
/// and won't need to request it.
fn on_new_transactions(&mut self, hashes: impl IntoIterator<Item = TxHash>) {
// Nothing to propagate while syncing
if self.network.is_syncing() {
// Nothing to propagate while initially syncing
if self.network.is_initially_syncing() {
return
}

Expand Down Expand Up @@ -310,8 +309,8 @@ where
peer_id: PeerId,
msg: NewPooledTransactionHashes,
) {
// If the node is currently syncing, ignore transactions
if self.network.is_syncing() {
// If the node is initially syncing, ignore transactions
if self.network.is_initially_syncing() {
return
}

Expand Down Expand Up @@ -405,7 +404,7 @@ where
// Send a `NewPooledTransactionHashes` to the peer with up to
// `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the
// pool
if !self.network.is_syncing() {
if !self.network.is_initially_syncing() {
let peer = self.peers.get_mut(&peer_id).expect("is present; qed");

let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
Expand Down Expand Up @@ -437,8 +436,8 @@ where
transactions: Vec<TransactionSigned>,
source: TransactionSource,
) {
// If the node is currently syncing, ignore transactions
if self.network.is_syncing() {
// If the node is pipeline syncing, ignore transactions
if self.network.is_initially_syncing() {
return
}

Expand Down Expand Up @@ -595,9 +594,11 @@ where
this.on_good_import(hash);
}
Err(err) => {
// if we're syncing and the transaction is bad we ignore it, otherwise we
// penalize the peer that sent the bad transaction with the assumption that the
// peer should have known that this transaction is bad. (e.g. consensus rules)
// if we're _currently_ syncing and the transaction is bad we ignore it,
// otherwise we penalize the peer that sent the bad
// transaction with the assumption that the peer should have
// known that this transaction is bad. (e.g. consensus
// rules)
if err.is_bad_transaction() && !this.network.is_syncing() {
trace!(target: "net::tx", ?err, "Bad transaction import");
this.on_bad_import(*err.hash());
Expand Down Expand Up @@ -794,12 +795,23 @@ mod tests {
use reth_rlp::Decodable;
use reth_transaction_pool::test_utils::{testing_pool, MockTransaction};
use secp256k1::SecretKey;
use std::future::poll_fn;

#[tokio::test(flavor = "multi_thread")]
#[cfg_attr(not(feature = "geth-tests"), ignore)]
async fn test_ignored_tx_broadcasts_while_syncing() {
async fn test_ignored_tx_broadcasts_while_initially_syncing() {
reth_tracing::init_test_tracing();
let net = Testnet::create(3).await;

let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();

drop(handles);
let handle = net.spawn();

let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());

let client = NoopProvider::default();
Expand All @@ -808,7 +820,7 @@ mod tests {
.disable_discovery()
.listener_port(0)
.build(client);
let (handle, network, mut transactions, _) = NetworkManager::new(config)
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
Expand All @@ -817,17 +829,143 @@ mod tests {

tokio::task::spawn(network);

handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&handle));

let peer_id = PeerId::random();
// go to syncing (pipeline sync)
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));
assert!(NetworkInfo::is_initially_syncing(&network_handle));

// wait for all initiator connections
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
panic!("unexpected event {ev:?}")
}
}
}
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap();
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id,
msg: Transactions(vec![TransactionSigned::default()]),
peer_id: *handle1.peer_id(),
msg: Transactions(vec![signed_tx.clone()]),
});

poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;
assert!(pool.is_empty());
handle.terminate().await;
}

#[tokio::test(flavor = "multi_thread")]
#[cfg_attr(not(feature = "geth-tests"), ignore)]
async fn test_tx_broadcasts_through_two_syncs() {
reth_tracing::init_test_tracing();
let net = Testnet::create(3).await;

let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();

drop(handles);
let handle = net.spawn();

let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());

let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(client);
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.split_with_handle();

tokio::task::spawn(network);

// go to syncing (pipeline sync) to idle and then to syncing (live)
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));
network_handle.update_sync_state(SyncState::Idle);
assert!(!NetworkInfo::is_syncing(&network_handle));
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));

// wait for all initiator connections
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
panic!("unexpected event {ev:?}")
}
}
}
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap();
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id: *handle1.peer_id(),
msg: Transactions(vec![signed_tx.clone()]),
});
poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;
assert!(!NetworkInfo::is_initially_syncing(&network_handle));
assert!(NetworkInfo::is_syncing(&network_handle));
assert!(!pool.is_empty());
handle.terminate().await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -906,6 +1044,16 @@ mod tests {
*handle1.peer_id(),
transactions.transactions_by_peers.get(&signed_tx.hash()).unwrap()[0]
);

// advance the transaction manager future
poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;

assert!(!pool.is_empty());
assert!(pool.get(&signed_tx.hash).is_some());
handle.terminate().await;
}

Expand Down

0 comments on commit fd7e28e

Please sign in to comment.