Skip to content

Commit

Permalink
Merge #448
Browse files Browse the repository at this point in the history
448: relay known filter r=TheWaWaR a=zhangsoledad



Co-authored-by: zhangsoledad <[email protected]>
  • Loading branch information
bors[bot] and zhangsoledad committed Apr 12, 2019
2 parents b58721d + 3ada34e commit 86d825e
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 152 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion network/src/peers_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl PeersRegistry {
&self.peers
}

fn peer_indexes_guard(&self) -> &RwLock<FnvHashMap<PeerIndex, PeerId>> {
pub fn peer_indexes_guard(&self) -> &RwLock<FnvHashMap<PeerIndex, PeerId>> {
&self.peer_id_by_index
}

Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ckb-traits = { path = "../traits" }
failure = "0.1.5"
bytes = "0.4.12"
hash = {path = "../util/hash"}
lru-cache = { git = "https://github.com/nervosnetwork/lru-cache" }

[dev-dependencies]
ckb-notify = { path = "../notify" }
Expand Down
22 changes: 15 additions & 7 deletions sync/src/relayer/block_proposal_process.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
use crate::relayer::Relayer;
use ckb_core::transaction::Transaction;
use ckb_network::PeerIndex;
use ckb_protocol::{cast, BlockProposal, FlatbuffersVectorIterator};
use ckb_shared::index::ChainIndex;
use ckb_traits::chain_provider::ChainProvider;
use ckb_util::TryInto;
use failure::Error as FailureError;
use log::warn;

pub struct BlockProposalProcess<'a, CI> {
pub struct BlockProposalProcess<'a, CI: ChainIndex + 'a> {
message: &'a BlockProposal<'a>,
relayer: &'a Relayer<CI>,
peer: PeerIndex,
}

impl<'a, CI: ChainIndex> BlockProposalProcess<'a, CI> {
pub fn new(message: &'a BlockProposal, relayer: &'a Relayer<CI>) -> Self {
BlockProposalProcess { message, relayer }
pub fn new(message: &'a BlockProposal, relayer: &'a Relayer<CI>, peer: PeerIndex) -> Self {
BlockProposalProcess {
message,
relayer,
peer,
}
}

pub fn execute(self) -> Result<(), FailureError> {
let chain_state = self.relayer.shared.chain_state().lock();
let txs = FlatbuffersVectorIterator::new(cast!(self.message.transactions())?);
let mut known_txs = self.relayer.peers.known_txs.lock();
for tx in txs {
let ret = chain_state.add_tx_to_pool(
TryInto::try_into(tx)?,
self.relayer.shared.consensus().max_block_cycles(),
);
let tx: Transaction = TryInto::try_into(tx)?;
known_txs.insert(self.peer, tx.hash());
let ret =
chain_state.add_tx_to_pool(tx, self.relayer.shared.consensus().max_block_cycles());
if ret.is_err() {
warn!(target: "relay", "BlockProposal add_tx_to_pool error {:?}", ret)
}
Expand Down
40 changes: 32 additions & 8 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ use flatbuffers::FlatBufferBuilder;
use fnv::{FnvHashMap, FnvHashSet};
use log::warn;
use log::{debug, info};
use lru_cache::LruCache;
use numext_fixed_hash::H256;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

pub const TX_PROPOSAL_TOKEN: u64 = 0;
pub const MAX_RELAY_PEERS: usize = 128;
pub const TX_FILTER_SIZE: usize = 1000;

pub struct Relayer<CI> {
chain: ChainController,
Expand Down Expand Up @@ -113,7 +116,7 @@ impl<CI: ChainIndex> Relayer<CI> {
.execute()?;
}
RelayPayload::BlockProposal => {
BlockProposalProcess::new(&cast!(message.payload_as_block_proposal())?, self)
BlockProposalProcess::new(&cast!(message.payload_as_block_proposal())?, self, peer)
.execute()?;
}
RelayPayload::NONE => {
Expand Down Expand Up @@ -167,17 +170,27 @@ impl<CI: ChainIndex> Relayer<CI> {

pub fn accept_block(&self, nc: &mut CKBProtocolContext, peer: PeerIndex, block: &Arc<Block>) {
let ret = self.chain.process_block(Arc::clone(&block));

if ret.is_ok() {
let block_hash = block.header().hash();
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_compact_block(fbb, block, &HashSet::new());
fbb.finish(message, None);

for peer_id in nc.connected_peers() {
if peer_id != peer {
let ret = nc.send(peer_id, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay compact_block error {:?}", ret);
}
let mut known_blocks = self.peers.known_blocks.lock();
let selected_peers: Vec<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter(|peer_index| {
known_blocks.insert(*peer_index, block_hash.clone()) && (*peer_index != peer)
})
.take(MAX_RELAY_PEERS)
.collect();

for peer_id in selected_peers {
let ret = nc.send(peer_id, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay compact_block error {:?}", ret);
}
}
} else {
Expand Down Expand Up @@ -343,9 +356,20 @@ impl<CI: ChainIndex> CKBProtocolHandler for Relayer<CI> {
}
}

#[derive(Default)]
pub struct RelayState {
pub pending_compact_blocks: Mutex<FnvHashMap<H256, CompactBlock>>,
pub inflight_proposals: Mutex<FnvHashSet<ProposalShortId>>,
pub pending_proposals_request: Mutex<FnvHashMap<ProposalShortId, FnvHashSet<PeerIndex>>>,
pub tx_filter: Mutex<LruCache<H256, ()>>,
}

impl Default for RelayState {
fn default() -> Self {
RelayState {
pending_compact_blocks: Mutex::new(FnvHashMap::default()),
inflight_proposals: Mutex::new(FnvHashSet::default()),
pending_proposals_request: Mutex::new(FnvHashMap::default()),
tx_filter: Mutex::new(LruCache::new(TX_FILTER_SIZE)),
}
}
}
42 changes: 28 additions & 14 deletions sync/src/relayer/transaction_process.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::relayer::Relayer;
use crate::relayer::MAX_RELAY_PEERS;
use ckb_core::{transaction::Transaction, Cycle};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_protocol::{RelayMessage, ValidTransaction as FbsValidTransaction};
Expand All @@ -10,6 +11,7 @@ use ckb_verification::TransactionError;
use failure::Error as FailureError;
use flatbuffers::FlatBufferBuilder;
use log::{debug, warn};
use numext_fixed_hash::H256;
use std::time::Duration;

const DEFAULT_BAN_TIME: Duration = Duration::from_secs(3600 * 24 * 3);
Expand Down Expand Up @@ -38,6 +40,13 @@ impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {

pub fn execute(self) -> Result<(), FailureError> {
let (tx, relay_cycles): (Transaction, Cycle) = (*self.message).try_into()?;
let tx_hash = tx.hash();

if self.already_known(tx_hash.clone()) {
debug!(target: "relay", "discarding already known transaction {:#x}", tx_hash);
return Ok(());
}

let tx_result = {
let chain_state = self.relayer.shared.chain_state().lock();
let max_block_cycles = self.relayer.shared.consensus().max_block_cycles();
Expand All @@ -51,21 +60,21 @@ impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {
let message = RelayMessage::build_transaction(fbb, &tx, cycles);
fbb.finish(message, None);

for peer in self.nc.connected_peers() {
if peer != self.peer
&& self
.relayer
.peers()
.transaction_filters
.read()
.get(&peer)
.map_or(true, |filter| filter.contains(&tx))
{
let ret = self.nc.send(peer, fbb.finished_data().to_vec());
let mut known_txs = self.relayer.peers.known_txs.lock();
let selected_peers: Vec<PeerIndex> = self
.nc
.connected_peers()
.into_iter()
.filter(|peer_index| {
known_txs.insert(*peer_index, tx_hash.clone()) && (self.peer != *peer_index)
})
.take(MAX_RELAY_PEERS)
.collect();

if ret.is_err() {
warn!(target: "relay", "relay Transaction error {:?}", ret);
}
for peer in selected_peers {
let ret = self.nc.send(peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay Transaction error {:?}", ret);
}
}
}
Expand All @@ -91,4 +100,9 @@ impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {

Ok(())
}

fn already_known(&self, hash: H256) -> bool {
let mut tx_filter = self.relayer.state.tx_filter.lock();
tx_filter.insert(hash, ()).is_some()
}
}
37 changes: 6 additions & 31 deletions sync/src/synchronizer/get_blocks_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,12 @@ where
debug!(target: "sync", "get_blocks {:x}", block_hash);
if let Some(block) = self.synchronizer.get_block(&block_hash) {
debug!(target: "sync", "respond_block {} {:x}", block.header().number(), block.header().hash());
if let Some(filter) = self
.synchronizer
.peers
.transaction_filters
.read()
.get(&self.peer)
{
let transactions_index = block
.commit_transactions()
.iter()
.enumerate()
.filter(|(_index, tx)| filter.contains(tx))
.map(|ti| ti.0)
.collect::<Vec<_>>();

let fbb = &mut FlatBufferBuilder::new();
let message =
SyncMessage::build_filtered_block(fbb, &block, &transactions_index);
fbb.finish(message, None);
let ret = self.nc.send(self.peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "response GetBlocks error {:?}", ret);
}
} else {
let fbb = &mut FlatBufferBuilder::new();
let message = SyncMessage::build_block(fbb, &block);
fbb.finish(message, None);
let ret = self.nc.send(self.peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "response GetBlocks error {:?}", ret);
}
let fbb = &mut FlatBufferBuilder::new();
let message = SyncMessage::build_block(fbb, &block);
fbb.finish(message, None);
let ret = self.nc.send(self.peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "response GetBlocks error {:?}", ret);
}
} else {
// TODO response not found
Expand Down
18 changes: 2 additions & 16 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
mod block_fetcher;
mod block_pool;
mod block_process;
mod filter_process;
mod get_blocks_process;
mod get_headers_process;
mod headers_process;

use self::block_fetcher::BlockFetcher;
use self::block_pool::OrphanBlockPool;
use self::block_process::BlockProcess;
use self::filter_process::{AddFilterProcess, ClearFilterProcess, SetFilterProcess};
use self::get_blocks_process::GetBlocksProcess;
use self::get_headers_process::GetHeadersProcess;
use self::headers_process::HeadersProcess;
Expand Down Expand Up @@ -158,22 +156,10 @@ impl<CI: ChainIndex> Synchronizer<CI> {
SyncPayload::Block => {
BlockProcess::new(&cast!(message.payload_as_block())?, self, peer, nc).execute()?;
}
SyncPayload::SetFilter => {
SetFilterProcess::new(&cast!(message.payload_as_set_filter())?, self, peer)
.execute()?;
}
SyncPayload::AddFilter => {
AddFilterProcess::new(&cast!(message.payload_as_add_filter())?, self, peer)
.execute()?;
}
SyncPayload::ClearFilter => {
ClearFilterProcess::new(self, peer).execute()?;
}
SyncPayload::FilteredBlock => {
// ignore, should not receive FilteredBlock in full node mode
SyncPayload::NONE => {
cast!(None)?;
}
SyncPayload::NONE => {
_ => {
cast!(None)?;
}
}
Expand Down
1 change: 0 additions & 1 deletion sync/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::sync::Arc;
use std::thread;
use std::time::Duration;

mod filter;
#[cfg(not(disable_faketime))]
mod relayer;
#[cfg(not(disable_faketime))]
Expand Down
Loading

0 comments on commit 86d825e

Please sign in to comment.