Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(txpool) modify txpool guard to be for pipeline syncs only #4075

Merged
merged 9 commits into from
Aug 9, 2023
6 changes: 6 additions & 0 deletions crates/interfaces/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use reth_primitives::Head;
pub trait SyncStateProvider: Send + Sync {
/// Returns `true` if the network is undergoing sync.
fn is_syncing(&self) -> bool;

/// Returns `true` if the network is undergoing an initial (pipeline) sync.
fn is_initially_syncing(&self) -> bool;
}

/// An updater for updating the [SyncState] and status of the network.
Expand Down Expand Up @@ -54,6 +57,9 @@ impl SyncStateProvider for NoopSyncStateUpdater {
fn is_syncing(&self) -> bool {
false
}
fn is_initially_syncing(&self) -> bool {
false
}
}

impl NetworkSyncUpdater for NoopSyncStateUpdater {
Expand Down
3 changes: 3 additions & 0 deletions crates/net/network-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub trait NetworkInfo: Send + Sync {

/// Returns `true` if the network is undergoing sync.
fn is_syncing(&self) -> bool;

/// Returns `true` when the node is undergoing the very first Pipeline sync.
fn is_initially_syncing(&self) -> bool;
}

/// Provides general purpose information about Peers in the network.
Expand Down
4 changes: 4 additions & 0 deletions crates/net/network-api/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ impl NetworkInfo for NoopNetwork {
fn is_syncing(&self) -> bool {
false
}

fn is_initially_syncing(&self) -> bool {
false
}
}

impl PeersInfo for NoopNetwork {
Expand Down
22 changes: 20 additions & 2 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl NetworkHandle {
network_mode,
bandwidth_meter,
is_syncing: Arc::new(AtomicBool::new(false)),
initial_sync_done: Arc::new(AtomicBool::new(false)),
mattsse marked this conversation as resolved.
Show resolved Hide resolved
chain_id,
};
Self { inner: Arc::new(inner) }
Expand Down Expand Up @@ -247,18 +248,33 @@ impl NetworkInfo for NetworkHandle {
fn is_syncing(&self) -> bool {
SyncStateProvider::is_syncing(self)
}

fn is_initially_syncing(&self) -> bool {
SyncStateProvider::is_initially_syncing(self)
}
}

impl SyncStateProvider for NetworkHandle {
fn is_syncing(&self) -> bool {
self.inner.is_syncing.load(Ordering::Relaxed)
}
// used to guard the txpool
fn is_initially_syncing(&self) -> bool {
if self.inner.initial_sync_done.load(Ordering::Relaxed) {
return false
}
self.inner.is_syncing.load(Ordering::Relaxed)
}
}

impl NetworkSyncUpdater for NetworkHandle {
fn update_sync_state(&self, state: SyncState) {
let is_syncing = state.is_syncing();
self.inner.is_syncing.store(is_syncing, Ordering::Relaxed)
let future_state = state.is_syncing();
let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed);
let syncing_to_idle_state_transition = prev_state && !future_state;
if syncing_to_idle_state_transition {
self.inner.initial_sync_done.store(true, Ordering::Relaxed);
}
}

/// Update the status of the node.
Expand All @@ -285,6 +301,8 @@ struct NetworkInner {
bandwidth_meter: BandwidthMeter,
/// Represents if the network is currently syncing.
is_syncing: Arc<AtomicBool>,
/// Used to differentiate between an initial pipeline sync or a live sync
initial_sync_done: Arc<AtomicBool>,
/// The chain id
chain_id: Arc<AtomicU64>,
}
Expand Down
190 changes: 169 additions & 21 deletions crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkHandle,
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt};
use reth_eth_wire::{
EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, PooledTransactions, Transactions,
Expand All @@ -29,7 +29,6 @@ use reth_transaction_pool::{
};
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
num::NonZeroUsize,
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -206,8 +205,8 @@ where
/// transactions to a fraction of peers usually ensures that all nodes receive the transaction
/// and won't need to request it.
fn on_new_transactions(&mut self, hashes: impl IntoIterator<Item = TxHash>) {
// Nothing to propagate while syncing
if self.network.is_syncing() {
// Nothing to propagate while initially syncing
if self.network.is_initially_syncing() {
return
}

Expand Down Expand Up @@ -310,8 +309,8 @@ where
peer_id: PeerId,
msg: NewPooledTransactionHashes,
) {
// If the node is currently syncing, ignore transactions
if self.network.is_syncing() {
// If the node is initially syncing, ignore transactions
if self.network.is_initially_syncing() {
return
}

Expand Down Expand Up @@ -405,7 +404,7 @@ where
// Send a `NewPooledTransactionHashes` to the peer with up to
// `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the
// pool
if !self.network.is_syncing() {
if !self.network.is_initially_syncing() {
let peer = self.peers.get_mut(&peer_id).expect("is present; qed");

let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
Expand Down Expand Up @@ -437,8 +436,8 @@ where
transactions: Vec<TransactionSigned>,
source: TransactionSource,
) {
// If the node is currently syncing, ignore transactions
if self.network.is_syncing() {
// If the node is pipeline syncing, ignore transactions
if self.network.is_initially_syncing() {
return
}

Expand Down Expand Up @@ -595,9 +594,11 @@ where
this.on_good_import(hash);
}
Err(err) => {
// if we're syncing and the transaction is bad we ignore it, otherwise we
// penalize the peer that sent the bad transaction with the assumption that the
// peer should have known that this transaction is bad. (e.g. consensus rules)
// if we're _currently_ syncing and the transaction is bad we ignore it,
// otherwise we penalize the peer that sent the bad
// transaction with the assumption that the peer should have
// known that this transaction is bad. (e.g. consensus
// rules)
if err.is_bad_transaction() && !this.network.is_syncing() {
trace!(target: "net::tx", ?err, "Bad transaction import");
this.on_bad_import(*err.hash());
Expand Down Expand Up @@ -794,12 +795,23 @@ mod tests {
use reth_rlp::Decodable;
use reth_transaction_pool::test_utils::{testing_pool, MockTransaction};
use secp256k1::SecretKey;
use std::future::poll_fn;

#[tokio::test(flavor = "multi_thread")]
#[cfg_attr(not(feature = "geth-tests"), ignore)]
async fn test_ignored_tx_broadcasts_while_syncing() {
async fn test_ignored_tx_broadcasts_while_initially_syncing() {
reth_tracing::init_test_tracing();
let net = Testnet::create(3).await;

let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();

drop(handles);
let handle = net.spawn();

let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());

let client = NoopProvider::default();
Expand All @@ -808,7 +820,7 @@ mod tests {
.disable_discovery()
.listener_port(0)
.build(client);
let (handle, network, mut transactions, _) = NetworkManager::new(config)
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
Expand All @@ -817,17 +829,143 @@ mod tests {

tokio::task::spawn(network);

handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&handle));

let peer_id = PeerId::random();
// go to syncing (pipeline sync)
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));
assert!(NetworkInfo::is_initially_syncing(&network_handle));

// wait for all initiator connections
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
panic!("unexpected event {ev:?}")
}
}
}
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap();
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id,
msg: Transactions(vec![TransactionSigned::default()]),
peer_id: *handle1.peer_id(),
msg: Transactions(vec![signed_tx.clone()]),
});

poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;
assert!(pool.is_empty());
handle.terminate().await;
}

#[tokio::test(flavor = "multi_thread")]
#[cfg_attr(not(feature = "geth-tests"), ignore)]
async fn test_tx_broadcasts_through_two_syncs() {
reth_tracing::init_test_tracing();
let net = Testnet::create(3).await;

let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();

drop(handles);
let handle = net.spawn();

let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());

let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(client);
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.split_with_handle();

tokio::task::spawn(network);

// go to syncing (pipeline sync) to idle and then to syncing (live)
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));
network_handle.update_sync_state(SyncState::Idle);
assert!(!NetworkInfo::is_syncing(&network_handle));
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));

// wait for all initiator connections
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
})
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
panic!("unexpected event {ev:?}")
}
}
}
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap();
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id: *handle1.peer_id(),
msg: Transactions(vec![signed_tx.clone()]),
});
poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;
assert!(!NetworkInfo::is_initially_syncing(&network_handle));
assert!(NetworkInfo::is_syncing(&network_handle));
assert!(!pool.is_empty());
handle.terminate().await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -906,6 +1044,16 @@ mod tests {
*handle1.peer_id(),
transactions.transactions_by_peers.get(&signed_tx.hash()).unwrap()[0]
);

// advance the transaction manager future
poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;

assert!(!pool.is_empty());
assert!(pool.get(&signed_tx.hash).is_some());
handle.terminate().await;
}

Expand Down