diff --git a/Cargo.lock b/Cargo.lock index bda451ea0ab..2aaa8aa35ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -684,6 +684,7 @@ dependencies = [ "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "hash 0.9.0-pre", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.0 (git+https://github.com/nervosnetwork/lru-cache)", "numext-fixed-hash 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "numext-fixed-uint 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 916303754ad..c88a510fa99 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -28,6 +28,7 @@ ckb-traits = { path = "../traits" } failure = "0.1.5" bytes = "0.4.12" hash = {path = "../util/hash"} +lru-cache = { git = "https://github.com/nervosnetwork/lru-cache" } [dev-dependencies] ckb-notify = { path = "../notify" } diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 88dae3673a1..16c134cd3b2 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -35,12 +35,15 @@ use flatbuffers::FlatBufferBuilder; use fnv::{FnvHashMap, FnvHashSet}; use log::warn; use log::{debug, info}; +use lru_cache::LruCache; use numext_fixed_hash::H256; use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; pub const TX_PROPOSAL_TOKEN: u64 = 0; +pub const MAX_RELAY_PEERS: usize = 128; +pub const TX_FILTER_SIZE: usize = 1000; #[derive(Clone)] pub struct Relayer { @@ -171,17 +174,27 @@ where pub fn accept_block(&self, nc: &mut CKBProtocolContext, peer: PeerIndex, block: &Arc) { let ret = self.chain.process_block(Arc::clone(&block)); + if ret.is_ok() { + let block_hash = block.header().hash(); let fbb = &mut FlatBufferBuilder::new(); let message = RelayMessage::build_compact_block(fbb, block, &HashSet::new()); fbb.finish(message, None); - for peer_id in nc.connected_peers() { - if peer_id != peer { - let ret = nc.send(peer_id, fbb.finished_data().to_vec()); - if ret.is_err() { - warn!(target: "relay", "relay compact_block error {:?}", ret); - } + let mut known_blocks = self.peers.known_blocks.lock(); + let selected_peers: Vec = nc + .connected_peers() + .into_iter() + .filter(|peer_index| { + known_blocks.insert(*peer_index, block_hash.clone()) && (*peer_index != peer) + }) + .take(MAX_RELAY_PEERS) + .collect(); + + for peer_id in selected_peers { + let ret = nc.send(peer_id, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "relay", "relay compact_block error {:?}", ret); } } } else { @@ -350,9 +363,20 @@ where } } -#[derive(Default)] pub struct RelayState { pub pending_compact_blocks: Mutex>, pub inflight_proposals: Mutex>, pub pending_proposals_request: Mutex>>, + pub tx_filter: Mutex>, +} + +impl Default for RelayState { + fn default() -> Self { + RelayState { + pending_compact_blocks: Mutex::new(FnvHashMap::default()), + inflight_proposals: Mutex::new(FnvHashSet::default()), + pending_proposals_request: Mutex::new(FnvHashMap::default()), + tx_filter: Mutex::new(LruCache::new(TX_FILTER_SIZE)), + } + } } diff --git a/sync/src/relayer/transaction_process.rs b/sync/src/relayer/transaction_process.rs index 8101301527e..96114ba2405 100644 --- a/sync/src/relayer/transaction_process.rs +++ b/sync/src/relayer/transaction_process.rs @@ -1,4 +1,5 @@ use crate::relayer::Relayer; +use crate::relayer::MAX_RELAY_PEERS; use ckb_core::{transaction::Transaction, Cycle}; use ckb_network::{CKBProtocolContext, PeerIndex}; use ckb_protocol::{RelayMessage, ValidTransaction as FbsValidTransaction}; @@ -10,6 +11,7 @@ use ckb_verification::TransactionError; use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; use log::{debug, warn}; +use numext_fixed_hash::H256; use std::time::Duration; const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3); @@ -41,6 +43,13 @@ where pub fn execute(self) -> Result<(), FailureError> { let (tx, relay_cycles): (Transaction, Cycle) = (*self.message).try_into()?; + let tx_hash = tx.hash(); + + if self.already_known(tx_hash.clone()) { + debug!(target: "relay", "discarding already known transaction {:#x}", tx_hash); + return Ok(()); + } + let tx_result = { let chain_state = self.relayer.shared.chain_state().lock(); let max_block_cycles = self.relayer.shared.consensus().max_block_cycles(); @@ -54,21 +63,21 @@ where let message = RelayMessage::build_transaction(fbb, &tx, cycles); fbb.finish(message, None); - for peer in self.nc.connected_peers() { - if peer != self.peer - && self - .relayer - .peers() - .transaction_filters - .read() - .get(&peer) - .map_or(true, |filter| filter.contains(&tx)) - { - let ret = self.nc.send(peer, fbb.finished_data().to_vec()); + let mut known_txs = self.relayer.peers.known_txs.lock(); + let selected_peers: Vec = self + .nc + .connected_peers() + .into_iter() + .filter(|peer_index| { + known_txs.insert(*peer_index, tx_hash.clone()) && (self.peer != *peer_index) + }) + .take(MAX_RELAY_PEERS) + .collect(); - if ret.is_err() { - warn!(target: "relay", "relay Transaction error {:?}", ret); - } + for peer in selected_peers { + let ret = self.nc.send(peer, fbb.finished_data().to_vec()); + if ret.is_err() { + warn!(target: "relay", "relay Transaction error {:?}", ret); } } } @@ -93,4 +102,9 @@ where Ok(()) } + + fn already_known(&self, hash: H256) -> bool { + let mut tx_filter = self.relayer.state.tx_filter.lock(); + tx_filter.insert(hash, ()).is_none() + } } diff --git a/sync/src/types.rs b/sync/src/types.rs index 55aaf6691cb..7affefd3702 100644 --- a/sync/src/types.rs +++ b/sync/src/types.rs @@ -5,14 +5,19 @@ use ckb_core::block::Block; use ckb_core::header::{BlockNumber, Header}; use ckb_core::transaction::Transaction; use ckb_network::PeerIndex; +use ckb_util::Mutex; use ckb_util::RwLock; use faketime::unix_time_as_millis; use fnv::{FnvHashMap, FnvHashSet}; use log::debug; +use lru_cache::LruCache; use numext_fixed_hash::H256; use numext_fixed_uint::U256; +use std::collections::hash_map::Entry; use std::hash::{BuildHasher, Hasher}; +const FILTER_SIZE: usize = 500; + // State used to enforce CHAIN_SYNC_TIMEOUT // Only in effect for outbound, non-manual connections, with // m_protect == false @@ -57,6 +62,28 @@ pub struct PeerState { pub chain_sync: ChainSyncState, } +#[derive(Clone, Default)] +pub struct KnownFilter { + inner: FnvHashMap>, +} + +impl KnownFilter { + /// Adds a value to the filter. + /// If the filter did not have this value present, `true` is returned. + /// If the filter did have this value present, `false` is returned. + pub fn insert(&mut self, index: PeerIndex, hash: H256) -> bool { + match self.inner.entry(index) { + Entry::Occupied(mut o) => o.get_mut().insert(hash, ()).is_none(), + Entry::Vacant(v) => { + let mut lru = LruCache::new(FILTER_SIZE); + lru.insert(hash, ()); + v.insert(lru); + true + } + } + } +} + #[derive(Default)] pub struct Peers { pub state: RwLock>, @@ -65,6 +92,8 @@ pub struct Peers { pub best_known_headers: RwLock>, pub last_common_headers: RwLock>, pub transaction_filters: RwLock>, + pub known_txs: Mutex, + pub known_blocks: Mutex, } #[derive(Debug, Clone)]