Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: relay known filter #448

Merged
merged 2 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion network/src/peers_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl PeersRegistry {
&self.peers
}

fn peer_indexes_guard(&self) -> &RwLock<FnvHashMap<PeerIndex, PeerId>> {
pub fn peer_indexes_guard(&self) -> &RwLock<FnvHashMap<PeerIndex, PeerId>> {
&self.peer_id_by_index
}

Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ckb-traits = { path = "../traits" }
failure = "0.1.5"
bytes = "0.4.12"
hash = {path = "../util/hash"}
lru-cache = { git = "https://github.com/nervosnetwork/lru-cache" }

[dev-dependencies]
ckb-notify = { path = "../notify" }
Expand Down
38 changes: 31 additions & 7 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ use flatbuffers::FlatBufferBuilder;
use fnv::{FnvHashMap, FnvHashSet};
use log::warn;
use log::{debug, info};
use lru_cache::LruCache;
use numext_fixed_hash::H256;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

pub const TX_PROPOSAL_TOKEN: u64 = 0;
pub const MAX_RELAY_PEERS: usize = 128;
pub const TX_FILTER_SIZE: usize = 1000;

pub struct Relayer<CI> {
chain: ChainController,
Expand Down Expand Up @@ -168,17 +171,27 @@ impl<CI: ChainIndex> Relayer<CI> {

pub fn accept_block(&self, nc: &mut CKBProtocolContext, peer: PeerIndex, block: &Arc<Block>) {
let ret = self.chain.process_block(Arc::clone(&block));

if ret.is_ok() {
let block_hash = block.header().hash();
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_compact_block(fbb, block, &HashSet::new());
fbb.finish(message, None);

for peer_id in nc.connected_peers() {
if peer_id != peer {
let ret = nc.send(peer_id, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay compact_block error {:?}", ret);
}
let mut known_blocks = self.peers.known_blocks.lock();
let selected_peers: Vec<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter(|peer_index| {
known_blocks.insert(*peer_index, block_hash.clone()) && (*peer_index != peer)
})
.take(MAX_RELAY_PEERS)
.collect();

for peer_id in selected_peers {
let ret = nc.send(peer_id, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay compact_block error {:?}", ret);
}
}
} else {
Expand Down Expand Up @@ -344,9 +357,20 @@ impl<CI: ChainIndex> CKBProtocolHandler for Relayer<CI> {
}
}

#[derive(Default)]
pub struct RelayState {
pub pending_compact_blocks: Mutex<FnvHashMap<H256, CompactBlock>>,
pub inflight_proposals: Mutex<FnvHashSet<ProposalShortId>>,
pub pending_proposals_request: Mutex<FnvHashMap<ProposalShortId, FnvHashSet<PeerIndex>>>,
pub tx_filter: Mutex<LruCache<H256, ()>>,
}

impl Default for RelayState {
fn default() -> Self {
RelayState {
pending_compact_blocks: Mutex::new(FnvHashMap::default()),
inflight_proposals: Mutex::new(FnvHashSet::default()),
pending_proposals_request: Mutex::new(FnvHashMap::default()),
tx_filter: Mutex::new(LruCache::new(TX_FILTER_SIZE)),
}
}
}
42 changes: 28 additions & 14 deletions sync/src/relayer/transaction_process.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::relayer::Relayer;
use crate::relayer::MAX_RELAY_PEERS;
use ckb_core::{transaction::Transaction, Cycle};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_protocol::{RelayMessage, ValidTransaction as FbsValidTransaction};
Expand All @@ -9,6 +10,7 @@ use ckb_verification::TransactionError;
use failure::Error as FailureError;
use flatbuffers::FlatBufferBuilder;
use log::{debug, warn};
use numext_fixed_hash::H256;
use std::convert::TryInto;
use std::time::Duration;

Expand Down Expand Up @@ -38,6 +40,13 @@ impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {

pub fn execute(self) -> Result<(), FailureError> {
let (tx, relay_cycles): (Transaction, Cycle) = (*self.message).try_into()?;
let tx_hash = tx.hash();

if self.already_known(tx_hash.clone()) {
debug!(target: "relay", "discarding already known transaction {:#x}", tx_hash);
return Ok(());
}

let tx_result = {
let chain_state = self.relayer.shared.chain_state().lock();
let max_block_cycles = self.relayer.shared.consensus().max_block_cycles();
Expand All @@ -51,21 +60,21 @@ impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {
let message = RelayMessage::build_transaction(fbb, &tx, cycles);
fbb.finish(message, None);

for peer in self.nc.connected_peers() {
if peer != self.peer
&& self
.relayer
.peers()
.transaction_filters
.read()
.get(&peer)
.map_or(true, |filter| filter.contains(&tx))
{
let ret = self.nc.send(peer, fbb.finished_data().to_vec());
let mut known_txs = self.relayer.peers.known_txs.lock();
let selected_peers: Vec<PeerIndex> = self
.nc
.connected_peers()
.into_iter()
.filter(|peer_index| {
known_txs.insert(*peer_index, tx_hash.clone()) && (self.peer != *peer_index)
})
.take(MAX_RELAY_PEERS)
.collect();

if ret.is_err() {
warn!(target: "relay", "relay Transaction error {:?}", ret);
}
for peer in selected_peers {
let ret = self.nc.send(peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay Transaction error {:?}", ret);
}
}
}
Expand All @@ -91,4 +100,9 @@ impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {

Ok(())
}

fn already_known(&self, hash: H256) -> bool {
let mut tx_filter = self.relayer.state.tx_filter.lock();
tx_filter.insert(hash, ()).is_some()
}
}
37 changes: 6 additions & 31 deletions sync/src/synchronizer/get_blocks_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,12 @@ where
debug!(target: "sync", "get_blocks {:x}", block_hash);
if let Some(block) = self.synchronizer.get_block(&block_hash) {
debug!(target: "sync", "respond_block {} {:x}", block.header().number(), block.header().hash());
if let Some(filter) = self
.synchronizer
.peers
.transaction_filters
.read()
.get(&self.peer)
{
let transactions_index = block
.commit_transactions()
.iter()
.enumerate()
.filter(|(_index, tx)| filter.contains(tx))
.map(|ti| ti.0)
.collect::<Vec<_>>();

let fbb = &mut FlatBufferBuilder::new();
let message =
SyncMessage::build_filtered_block(fbb, &block, &transactions_index);
fbb.finish(message, None);
let ret = self.nc.send(self.peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "response GetBlocks error {:?}", ret);
}
} else {
let fbb = &mut FlatBufferBuilder::new();
let message = SyncMessage::build_block(fbb, &block);
fbb.finish(message, None);
let ret = self.nc.send(self.peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "response GetBlocks error {:?}", ret);
}
let fbb = &mut FlatBufferBuilder::new();
let message = SyncMessage::build_block(fbb, &block);
fbb.finish(message, None);
let ret = self.nc.send(self.peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "response GetBlocks error {:?}", ret);
}
} else {
// TODO response not found
Expand Down
18 changes: 2 additions & 16 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
mod block_fetcher;
mod block_pool;
mod block_process;
mod filter_process;
mod get_blocks_process;
mod get_headers_process;
mod headers_process;

use self::block_fetcher::BlockFetcher;
use self::block_pool::OrphanBlockPool;
use self::block_process::BlockProcess;
use self::filter_process::{AddFilterProcess, ClearFilterProcess, SetFilterProcess};
use self::get_blocks_process::GetBlocksProcess;
use self::get_headers_process::GetHeadersProcess;
use self::headers_process::HeadersProcess;
Expand Down Expand Up @@ -158,22 +156,10 @@ impl<CI: ChainIndex> Synchronizer<CI> {
SyncPayload::Block => {
BlockProcess::new(&cast!(message.payload_as_block())?, self, peer, nc).execute()?;
}
SyncPayload::SetFilter => {
SetFilterProcess::new(&cast!(message.payload_as_set_filter())?, self, peer)
.execute()?;
}
SyncPayload::AddFilter => {
AddFilterProcess::new(&cast!(message.payload_as_add_filter())?, self, peer)
.execute()?;
}
SyncPayload::ClearFilter => {
ClearFilterProcess::new(self, peer).execute()?;
}
SyncPayload::FilteredBlock => {
// ignore, should not receive FilteredBlock in full node mode
SyncPayload::NONE => {
cast!(None)?;
}
SyncPayload::NONE => {
_ => {
cast!(None)?;
}
}
Expand Down
1 change: 0 additions & 1 deletion sync/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::sync::Arc;
use std::thread;
use std::time::Duration;

mod filter;
#[cfg(not(disable_faketime))]
mod relayer;
#[cfg(not(disable_faketime))]
Expand Down
103 changes: 29 additions & 74 deletions sync/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use bloom_filters::{
BloomFilter, ClassicBloomFilter, DefaultBuildHashKernels, UpdatableBloomFilter,
};
use ckb_core::block::Block;
use ckb_core::header::{BlockNumber, Header};
use ckb_core::transaction::Transaction;
use ckb_network::PeerIndex;
use ckb_util::Mutex;
use ckb_util::RwLock;
use faketime::unix_time_as_millis;
use fnv::{FnvHashMap, FnvHashSet};
use log::debug;
use lru_cache::LruCache;
use numext_fixed_hash::H256;
use numext_fixed_uint::U256;
use std::hash::{BuildHasher, Hasher};
use std::collections::hash_map::Entry;

const FILTER_SIZE: usize = 500;

// State used to enforce CHAIN_SYNC_TIMEOUT
// Only in effect for outbound, non-manual connections, with
Expand Down Expand Up @@ -57,14 +57,37 @@ pub struct PeerState {
pub chain_sync: ChainSyncState,
}

#[derive(Clone, Default)]
pub struct KnownFilter {
inner: FnvHashMap<PeerIndex, LruCache<H256, ()>>,
}

impl KnownFilter {
/// Adds a value to the filter.
/// If the filter did not have this value present, `true` is returned.
/// If the filter did have this value present, `false` is returned.
pub fn insert(&mut self, index: PeerIndex, hash: H256) -> bool {
match self.inner.entry(index) {
Entry::Occupied(mut o) => o.get_mut().insert(hash, ()).is_none(),
Entry::Vacant(v) => {
let mut lru = LruCache::new(FILTER_SIZE);
lru.insert(hash, ());
v.insert(lru);
true
}
}
}
}

#[derive(Default)]
pub struct Peers {
pub state: RwLock<FnvHashMap<PeerIndex, PeerState>>,
pub misbehavior: RwLock<FnvHashMap<PeerIndex, u32>>,
pub blocks_inflight: RwLock<FnvHashMap<PeerIndex, BlocksInflight>>,
pub best_known_headers: RwLock<FnvHashMap<PeerIndex, HeaderView>>,
pub last_common_headers: RwLock<FnvHashMap<PeerIndex, Header>>,
pub transaction_filters: RwLock<FnvHashMap<PeerIndex, TransactionFilter>>,
pub known_txs: Mutex<KnownFilter>,
pub known_blocks: Mutex<KnownFilter>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -230,71 +253,3 @@ impl HeaderView {
self.inner
}
}

pub struct TransactionFilter {
filter: ClassicBloomFilter<DefaultBuildHashKernels<HighLowBytesBuildHasher>>,
}

impl TransactionFilter {
pub fn new(raw_data: &[u8], k: usize, hash_seed: usize) -> Self {
Self {
filter: ClassicBloomFilter::with_raw_data(
raw_data,
k,
DefaultBuildHashKernels::new(hash_seed, HighLowBytesBuildHasher),
),
}
}

pub fn update(&mut self, raw_data: &[u8]) {
self.filter.update(raw_data)
}

pub fn insert(&mut self, hash: &H256) {
self.filter.insert(hash);
}

pub fn contains(&self, transaction: &Transaction) -> bool {
self.filter.contains(&transaction.hash())
|| transaction
.inputs()
.iter()
.any(|input| self.filter.contains(&input.previous_output.hash))
|| transaction
.outputs()
.iter()
.any(|output| self.filter.contains(&output.lock.hash()))
}
}

struct HighLowBytesBuildHasher;

impl BuildHasher for HighLowBytesBuildHasher {
type Hasher = HighLowBytesHasher;

fn build_hasher(&self) -> Self::Hasher {
HighLowBytesHasher(0)
}
}

/// a hasher which only accepts H256 bytes and use high / low bytes as hash value
struct HighLowBytesHasher(u64);

impl Hasher for HighLowBytesHasher {
fn write(&mut self, bytes: &[u8]) {
if bytes.len() == 32 {
self.0 = (u64::from(bytes[0]) << 56)
+ (u64::from(bytes[1]) << 48)
+ (u64::from(bytes[2]) << 40)
+ (u64::from(bytes[3]) << 32)
+ (u64::from(bytes[28]) << 24)
+ (u64::from(bytes[29]) << 16)
+ (u64::from(bytes[30]) << 8)
+ u64::from(bytes[31]);
}
}

fn finish(&self) -> u64 {
self.0
}
}