Skip to content

Commit

Permalink
Merge pull request #919 from keroro520/share-block-inflight
Browse files Browse the repository at this point in the history
feat: synchronizer and relayer share BlocksInflight
  • Loading branch information
doitian authored Jun 1, 2019
2 parents 0a54cee + f196dc1 commit e985fb0
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 118 deletions.
41 changes: 22 additions & 19 deletions sync/src/relayer/block_transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_protocol::{cast, BlockTransactions, FlatbuffersVectorIterator};
use ckb_store::ChainStore;
use failure::Error as FailureError;
use std::collections::hash_map::Entry;
use std::convert::TryInto;
use std::sync::Arc;

Expand Down Expand Up @@ -31,29 +32,31 @@ impl<'a, CS: ChainStore + 'static> BlockTransactionsProcess<'a, CS> {

pub fn execute(self) -> Result<(), FailureError> {
let block_hash = cast!(self.message.block_hash())?.try_into()?;
if let Some(compact_block) = self
.relayer
.state
.pending_compact_blocks
.lock()
.remove(&block_hash)
{
let transactions: Vec<Transaction> =
FlatbuffersVectorIterator::new(cast!(self.message.transactions())?)
.map(TryInto::try_into)
.collect::<Result<_, FailureError>>()?;
let pending_compact_blocks = &self.relayer.state.pending_compact_blocks;
if let Entry::Occupied(mut pending) = pending_compact_blocks.lock().entry(block_hash) {
let (compact_block, peers_set) = pending.get_mut();
if peers_set.remove(&self.peer) {
let transactions: Vec<Transaction> =
FlatbuffersVectorIterator::new(cast!(self.message.transactions())?)
.map(TryInto::try_into)
.collect::<Result<_, FailureError>>()?;

let ret = {
let chain_state = self.relayer.shared.lock_chain_state();
self.relayer
.reconstruct_block(&chain_state, &compact_block, transactions)
};
let ret = {
let chain_state = self.relayer.shared.lock_chain_state();
self.relayer
.reconstruct_block(&chain_state, compact_block, transactions)
};

if let Ok(block) = ret {
self.relayer
.accept_block(self.nc.as_ref(), self.peer, &Arc::new(block));
// TODO Add this (compact_block, peer) into RecentRejects if reconstruct_block failed?
// TODO Add this block into RecentRejects if accept_block failed?
if let Ok(block) = ret {
pending.remove();
self.relayer
.accept_block(self.nc.as_ref(), self.peer, &Arc::new(block));
}
}
}

Ok(())
}
}
104 changes: 70 additions & 34 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use ckb_traits::{BlockMedianTimeContext, ChainProvider};
use ckb_verification::{HeaderResolverWrapper, HeaderVerifier, Verifier};
use failure::Error as FailureError;
use flatbuffers::FlatBufferBuilder;
use fnv::FnvHashMap;
use fnv::FnvHashSet;
use numext_fixed_hash::H256;
use std::convert::TryInto;
use std::sync::Arc;
Expand Down Expand Up @@ -43,26 +43,30 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> {
let compact_block: CompactBlock = (*self.message).try_into()?;
let block_hash = compact_block.header.hash().to_owned();

if self.relayer.state.already_known_compact_block(&block_hash) {
let parent = self
.relayer
.shared
.get_header_view(compact_block.header.parent_hash());
if parent.is_none() {
debug_target!(
crate::LOG_TARGET_RELAY,
"discarding already known compact block {:x}",
block_hash
"UnknownParent: {:#x}, send_getheaders_to_peer({})",
block_hash,
self.peer
);
self.relayer.shared.send_getheaders_to_peer(
self.nc.as_ref(),
self.peer,
self.relayer.shared.lock_chain_state().tip_header(),
);
return Ok(());
}
self.relayer
.state
.mark_as_known_compact_block(block_hash.clone());

if let Some(parent_header_view) = self
.relayer
.shared
.get_header_view(&compact_block.header.parent_hash())
{
let parent = parent.unwrap();
let best_known_header = self.relayer.shared.best_known_header();
let current_total_difficulty =
parent_header_view.total_difficulty() + compact_block.header.difficulty();
parent.total_difficulty() + compact_block.header.difficulty();
if current_total_difficulty <= *best_known_header.total_difficulty() {
debug_target!(
crate::LOG_TARGET_RELAY,
Expand All @@ -73,44 +77,58 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> {
);
return Ok(());
}
} else {
debug_target!(
crate::LOG_TARGET_RELAY,
"UnknownParent: {:#x}, send_getheaders_to_peer({})",
block_hash,
self.peer
);
self.relayer.shared.send_getheaders_to_peer(
self.nc.as_ref(),
self.peer,
self.relayer.shared.lock_chain_state().tip_header(),
);
return Ok(());
}

if let Some(flight) = self
.relayer
.peers
.blocks_inflight
.read()
.inflight_state_by_block(&block_hash)
{
if flight.peers.contains(&self.peer) {
debug_target!(
crate::LOG_TARGET_RELAY,
"discard already in-flight compact block {:x}",
block_hash,
);
return Ok(());
}
}

// The new arrived has greater difficulty than local best known chain
let mut missing_indexes: Vec<usize> = Vec::new();
{
// Verify compact block
let mut pending_compact_blocks = self.relayer.state.pending_compact_blocks.lock();
if pending_compact_blocks.get(&block_hash).is_some()
if pending_compact_blocks
.get(&block_hash)
.map(|(_, peers_set)| peers_set.contains(&self.peer))
.unwrap_or(false)
|| self.relayer.shared.get_block(&block_hash).is_some()
{
debug_target!(
crate::LOG_TARGET_RELAY,
"already processed compact block {:x}",
"discard already pending compact block {:x}",
block_hash
);
return Ok(());
} else {
let fn_get_pending_header = {
|block_hash| {
pending_compact_blocks
.get(&block_hash)
.map(|(compact_block, _)| compact_block.header.to_owned())
}
};
let resolver = HeaderResolverWrapper::new(
&compact_block.header,
self.relayer.shared.shared().to_owned(),
);
let header_verifier = HeaderVerifier::new(
CompactBlockMedianTimeView {
anchor_hash: compact_block.header.hash(),
pending_compact_blocks: &pending_compact_blocks,
fn_get_pending_header: Box::new(fn_get_pending_header),
shared: self.relayer.shared.shared(),
},
Arc::clone(&self.relayer.shared.consensus().pow_engine()),
Expand Down Expand Up @@ -145,16 +163,37 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> {
// into database
match ret {
Ok(block) => {
pending_compact_blocks.remove(&block_hash);
self.relayer
.accept_block(self.nc.as_ref(), self.peer, &Arc::new(block))
}
Err(missing) => {
missing_indexes = missing;
pending_compact_blocks.insert(block_hash.clone(), compact_block);
pending_compact_blocks
.entry(block_hash.clone())
.or_insert_with(|| (compact_block, FnvHashSet::default()))
.1
.insert(self.peer);
}
}
}
if !missing_indexes.is_empty() {
if !self
.relayer
.peers
.blocks_inflight
.write()
.insert(self.peer, block_hash.to_owned())
{
debug_target!(
crate::LOG_TARGET_RELAY,
"BlockInFlight reach limit or had requested, peer: {}, block: {:x}",
self.peer,
block_hash,
);
return Ok(());
}

let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_get_block_transactions(
fbb,
Expand All @@ -174,7 +213,7 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> {

struct CompactBlockMedianTimeView<'a, CS> {
anchor_hash: &'a H256,
pending_compact_blocks: &'a FnvHashMap<H256, CompactBlock>,
fn_get_pending_header: Box<Fn(H256) -> Option<Header> + 'a>,
shared: &'a Shared<CS>,
}

Expand All @@ -183,10 +222,7 @@ where
CS: ChainStore,
{
fn get_header(&self, hash: &H256) -> Option<Header> {
self.pending_compact_blocks
.get(hash)
.map(|cb| cb.header.to_owned())
.or_else(|| self.shared.block_header(hash))
(self.fn_get_pending_header)(hash.to_owned()).or_else(|| self.shared.block_header(hash))
}
}

Expand Down
13 changes: 1 addition & 12 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ pub const ASK_FOR_TXS_TOKEN: u64 = 1;
pub const MAX_RELAY_PEERS: usize = 128;
pub const TX_FILTER_SIZE: usize = 50000;
pub const TX_ASKED_SIZE: usize = TX_FILTER_SIZE;
pub const COMPACT_BLOCK_FILTER_SIZE: usize = 8192;

pub struct Relayer<CS> {
chain: ChainController,
Expand Down Expand Up @@ -488,12 +487,11 @@ impl<CS: ChainStore + 'static> CKBProtocolHandler for Relayer<CS> {
}

pub struct RelayState {
pub pending_compact_blocks: Mutex<FnvHashMap<H256, CompactBlock>>,
pub pending_compact_blocks: Mutex<FnvHashMap<H256, (CompactBlock, FnvHashSet<PeerIndex>)>>,
pub inflight_proposals: Mutex<FnvHashSet<ProposalShortId>>,
pub pending_proposals_request: Mutex<FnvHashMap<ProposalShortId, FnvHashSet<PeerIndex>>>,
pub tx_filter: Mutex<LruCache<H256, ()>>,
pub tx_already_asked: Mutex<LruCache<H256, Instant>>,
pub compact_block_filter: Mutex<LruCache<H256, ()>>,
}

impl Default for RelayState {
Expand All @@ -504,7 +502,6 @@ impl Default for RelayState {
pending_proposals_request: Mutex::new(FnvHashMap::default()),
tx_filter: Mutex::new(LruCache::new(TX_FILTER_SIZE)),
tx_already_asked: Mutex::new(LruCache::new(TX_ASKED_SIZE)),
compact_block_filter: Mutex::new(LruCache::new(COMPACT_BLOCK_FILTER_SIZE)),
}
}
}
Expand All @@ -518,12 +515,4 @@ impl RelayState {
fn already_known_tx(&self, hash: &H256) -> bool {
self.tx_filter.lock().contains_key(hash)
}

fn already_known_compact_block(&self, hash: &H256) -> bool {
self.compact_block_filter.lock().contains_key(hash)
}

fn mark_as_known_compact_block(&self, hash: H256) {
self.compact_block_filter.lock().insert(hash, ());
}
}
Loading

0 comments on commit e985fb0

Please sign in to comment.