Skip to content

Commit

Permalink
feat: relay known filter
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Apr 11, 2019
1 parent 04f090e commit cc4f1f3
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ckb-traits = { path = "../traits" }
failure = "0.1.5"
bytes = "0.4.12"
hash = {path = "../util/hash"}
lru-cache = { git = "https://github.com/nervosnetwork/lru-cache" }

[dev-dependencies]
ckb-notify = { path = "../notify" }
Expand Down
38 changes: 31 additions & 7 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ use flatbuffers::FlatBufferBuilder;
use fnv::{FnvHashMap, FnvHashSet};
use log::warn;
use log::{debug, info};
use lru_cache::LruCache;
use numext_fixed_hash::H256;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

pub const TX_PROPOSAL_TOKEN: u64 = 0;
pub const MAX_RELAY_PEERS: usize = 128;
pub const TX_FILTER_SIZE: usize = 1000;

#[derive(Clone)]
pub struct Relayer<CI: ChainIndex> {
Expand Down Expand Up @@ -171,17 +174,27 @@ where

pub fn accept_block(&self, nc: &mut CKBProtocolContext, peer: PeerIndex, block: &Arc<Block>) {
let ret = self.chain.process_block(Arc::clone(&block));

if ret.is_ok() {
let block_hash = block.header().hash();
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_compact_block(fbb, block, &HashSet::new());
fbb.finish(message, None);

for peer_id in nc.connected_peers() {
if peer_id != peer {
let ret = nc.send(peer_id, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay compact_block error {:?}", ret);
}
let mut known_blocks = self.peers.known_blocks.lock();
let selected_peers: Vec<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter(|peer_index| {
known_blocks.insert(*peer_index, block_hash.clone()) && (*peer_index != peer)
})
.take(MAX_RELAY_PEERS)
.collect();

for peer_id in selected_peers {
let ret = nc.send(peer_id, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay compact_block error {:?}", ret);
}
}
} else {
Expand Down Expand Up @@ -350,9 +363,20 @@ where
}
}

#[derive(Default)]
pub struct RelayState {
pub pending_compact_blocks: Mutex<FnvHashMap<H256, CompactBlock>>,
pub inflight_proposals: Mutex<FnvHashSet<ProposalShortId>>,
pub pending_proposals_request: Mutex<FnvHashMap<ProposalShortId, FnvHashSet<PeerIndex>>>,
pub tx_filter: Mutex<LruCache<H256, ()>>,
}

impl Default for RelayState {
fn default() -> Self {
RelayState {
pending_compact_blocks: Mutex::new(FnvHashMap::default()),
inflight_proposals: Mutex::new(FnvHashSet::default()),
pending_proposals_request: Mutex::new(FnvHashMap::default()),
tx_filter: Mutex::new(LruCache::new(TX_FILTER_SIZE)),
}
}
}
42 changes: 28 additions & 14 deletions sync/src/relayer/transaction_process.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::relayer::Relayer;
use crate::relayer::MAX_RELAY_PEERS;
use ckb_core::{transaction::Transaction, Cycle};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_protocol::{RelayMessage, ValidTransaction as FbsValidTransaction};
Expand All @@ -10,6 +11,7 @@ use ckb_verification::TransactionError;
use failure::Error as FailureError;
use flatbuffers::FlatBufferBuilder;
use log::{debug, warn};
use numext_fixed_hash::H256;
use std::time::Duration;

const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
Expand Down Expand Up @@ -41,6 +43,13 @@ where

pub fn execute(self) -> Result<(), FailureError> {
let (tx, relay_cycles): (Transaction, Cycle) = (*self.message).try_into()?;
let tx_hash = tx.hash();

if self.already_known(tx_hash.clone()) {
debug!(target: "relay", "discarding already known transaction {:#x}", tx_hash);
return Ok(());
}

let tx_result = {
let chain_state = self.relayer.shared.chain_state().lock();
let max_block_cycles = self.relayer.shared.consensus().max_block_cycles();
Expand All @@ -54,21 +63,21 @@ where
let message = RelayMessage::build_transaction(fbb, &tx, cycles);
fbb.finish(message, None);

for peer in self.nc.connected_peers() {
if peer != self.peer
&& self
.relayer
.peers()
.transaction_filters
.read()
.get(&peer)
.map_or(true, |filter| filter.contains(&tx))
{
let ret = self.nc.send(peer, fbb.finished_data().to_vec());
let mut known_txs = self.relayer.peers.known_txs.lock();
let selected_peers: Vec<PeerIndex> = self
.nc
.connected_peers()
.into_iter()
.filter(|peer_index| {
known_txs.insert(*peer_index, tx_hash.clone()) && (self.peer != *peer_index)
})
.take(MAX_RELAY_PEERS)
.collect();

if ret.is_err() {
warn!(target: "relay", "relay Transaction error {:?}", ret);
}
for peer in selected_peers {
let ret = self.nc.send(peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay Transaction error {:?}", ret);
}
}
}
Expand All @@ -93,4 +102,9 @@ where

Ok(())
}

fn already_known(&self, hash: H256) -> bool {
let mut tx_filter = self.relayer.state.tx_filter.lock();
tx_filter.insert(hash, ()).is_none()
}
}
29 changes: 29 additions & 0 deletions sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ use ckb_core::block::Block;
use ckb_core::header::{BlockNumber, Header};
use ckb_core::transaction::Transaction;
use ckb_network::PeerIndex;
use ckb_util::Mutex;
use ckb_util::RwLock;
use faketime::unix_time_as_millis;
use fnv::{FnvHashMap, FnvHashSet};
use log::debug;
use lru_cache::LruCache;
use numext_fixed_hash::H256;
use numext_fixed_uint::U256;
use std::collections::hash_map::Entry;
use std::hash::{BuildHasher, Hasher};

const FILTER_SIZE: usize = 500;

// State used to enforce CHAIN_SYNC_TIMEOUT
// Only in effect for outbound, non-manual connections, with
// m_protect == false
Expand Down Expand Up @@ -57,6 +62,28 @@ pub struct PeerState {
pub chain_sync: ChainSyncState,
}

#[derive(Clone, Default)]
pub struct KnownFilter {
inner: FnvHashMap<PeerIndex, LruCache<H256, ()>>,
}

impl KnownFilter {
/// Adds a value to the filter.
/// If the filter did not have this value present, `true` is returned.
/// If the filter did have this value present, `false` is returned.
pub fn insert(&mut self, index: PeerIndex, hash: H256) -> bool {
match self.inner.entry(index) {
Entry::Occupied(mut o) => o.get_mut().insert(hash, ()).is_none(),
Entry::Vacant(v) => {
let mut lru = LruCache::new(FILTER_SIZE);
lru.insert(hash, ());
v.insert(lru);
true
}
}
}
}

#[derive(Default)]
pub struct Peers {
pub state: RwLock<FnvHashMap<PeerIndex, PeerState>>,
Expand All @@ -65,6 +92,8 @@ pub struct Peers {
pub best_known_headers: RwLock<FnvHashMap<PeerIndex, HeaderView>>,
pub last_common_headers: RwLock<FnvHashMap<PeerIndex, Header>>,
pub transaction_filters: RwLock<FnvHashMap<PeerIndex, TransactionFilter>>,
pub known_txs: Mutex<KnownFilter>,
pub known_blocks: Mutex<KnownFilter>,
}

#[derive(Debug, Clone)]
Expand Down

0 comments on commit cc4f1f3

Please sign in to comment.