From 68b5bb66cf9a341fb762b29787ad88a3780e118f Mon Sep 17 00:00:00 2001 From: keroro520 Date: Wed, 29 May 2019 10:49:49 +0800 Subject: [PATCH 1/5] test: Add test for losing BlockTXN messages --- test/src/specs/relay/compact_block.rs | 147 ++++++++++++++++---------- 1 file changed, 94 insertions(+), 53 deletions(-) diff --git a/test/src/specs/relay/compact_block.rs b/test/src/specs/relay/compact_block.rs index c8ed5a7663..ea97f83a85 100644 --- a/test/src/specs/relay/compact_block.rs +++ b/test/src/specs/relay/compact_block.rs @@ -1,7 +1,7 @@ use crate::utils::{ build_compact_block, build_compact_block_with_prefilled, clear_messages, wait_until, }; -use crate::{Net, Spec, TestProtocol}; +use crate::{Net, Node, Spec, TestProtocol}; use ckb_chain_spec::ChainSpec; use ckb_core::header::HeaderBuilder; use ckb_network::PeerIndex; @@ -9,31 +9,33 @@ use ckb_protocol::{get_root, RelayMessage, RelayPayload, SyncMessage, SyncPayloa use ckb_sync::NetworkProtocol; use log::info; use numext_fixed_hash::{h256, H256}; +use std::time::Duration; pub struct CompactBlockBasic; impl CompactBlockBasic { // Case: Sent to node0 a parent-unknown empty block, node0 should be unable to reconstruct // it and send us back a `GetHeaders` message - pub fn test_empty_parent_unknown_compact_block(&self, net: &Net, peer_ids: &[PeerIndex]) { - // `node` generate 1 block to exit IBD mode. - // `net` ignore relay message from `node0` - let node0 = &net.nodes[0]; - let peer_id0 = peer_ids[0]; - node0.generate_block(); + pub fn test_empty_parent_unknown_compact_block( + &self, + net: &Net, + node: &Node, + peer_id: PeerIndex, + ) { + node.generate_block(); let _ = net.receive(); - let parent_unknown_block = node0 + let parent_unknown_block = node .new_block_builder(None, None, None) .header_builder(HeaderBuilder::default().parent_hash(h256!("0x123456"))) .build(); - let tip_block = node0.get_tip_block(); + let tip_block = node.get_tip_block(); net.send( NetworkProtocol::RELAY.into(), - peer_id0, + peer_id, build_compact_block(&parent_unknown_block), ); - let ret = wait_until(10, move || node0.get_tip_block() != tip_block); + let ret = wait_until(10, move || node.get_tip_block() != tip_block); assert!(!ret, "Node0 should reconstruct empty block failed"); let (_, _, data) = net.receive(); @@ -46,58 +48,48 @@ impl CompactBlockBasic { } // Case: Send to node0 a parent-known empty block, node0 should be able to reconstruct it - pub fn test_empty_compact_block(&self, net: &Net, peer_ids: &[PeerIndex]) { - // `node` generate 1 block to exit IBD mode. - // `net` ignore relay message from `node0` - let node0 = &net.nodes[0]; - let peer_id0 = peer_ids[0]; - node0.generate_block(); + pub fn test_empty_compact_block(&self, net: &Net, node: &Node, peer_id: PeerIndex) { + node.generate_block(); let _ = net.receive(); - let new_empty_block = node0.new_block(None, None, None); + let new_empty_block = node.new_block(None, None, None); net.send( NetworkProtocol::RELAY.into(), - peer_id0, + peer_id, build_compact_block(&new_empty_block), ); - let ret = wait_until(10, move || node0.get_tip_block() == new_empty_block); + let ret = wait_until(10, move || node.get_tip_block() == new_empty_block); assert!(ret, "Node0 should reconstruct empty block successfully"); clear_messages(net); } // Case: Send to node0 a block with all transactions prefilled, node0 should be able to reconstruct it - pub fn test_all_prefilled_compact_block(&self, net: &Net, peer_ids: &[PeerIndex]) { - // `node` generate 1 block to exit IBD mode. - // `net` ignore relay message from `node0` - let node0 = &net.nodes[0]; - let peer_id0 = peer_ids[0]; - node0.generate_block(); + pub fn test_all_prefilled_compact_block(&self, net: &Net, node: &Node, peer_id: PeerIndex) { + node.generate_block(); let _ = net.receive(); // Proposal a tx, and grow up into proposal window - let new_tx = node0.new_transaction(node0.get_tip_block().transactions()[0].hash().clone()); - node0.submit_block( - &node0 + let new_tx = node.new_transaction(node.get_tip_block().transactions()[0].hash().clone()); + node.submit_block( + &node .new_block_builder(None, None, None) .proposal(new_tx.proposal_short_id()) .build(), ); - (0..3).for_each(|_| { - node0.generate_block(); - }); + node.generate_blocks(3); // Relay a block contains `new_tx` as committed - let new_block = node0 + let new_block = node .new_block_builder(None, None, None) .transaction(new_tx) .build(); net.send( NetworkProtocol::RELAY.into(), - peer_id0, + peer_id, build_compact_block_with_prefilled(&new_block, vec![1]), ); - let ret = wait_until(10, move || node0.get_tip_block() == new_block); + let ret = wait_until(10, move || node.get_tip_block() == new_block); assert!( ret, "Node0 should reconstruct all-prefilled block successfully" @@ -108,21 +100,16 @@ impl CompactBlockBasic { // Case: Send to node0 a block which missing a tx, node0 should send `GetBlockTransactions` // back for requesting these missing txs - pub fn test_missing_txs_compact_block(&self, net: &Net, peer_ids: &[PeerIndex]) { - let node0 = &net.nodes[0]; - let peer_id0 = peer_ids[0]; - + pub fn test_missing_txs_compact_block(&self, net: &Net, node: &Node, peer_id: PeerIndex) { // Proposal a tx, and grow up into proposal window - let new_tx = node0.new_transaction(node0.get_tip_block().transactions()[0].hash().clone()); - node0.submit_block( - &node0 + let new_tx = node.new_transaction(node.get_tip_block().transactions()[0].hash().clone()); + node.submit_block( + &node .new_block_builder(None, None, None) .proposal(new_tx.proposal_short_id()) .build(), ); - (0..3).for_each(|_| { - node0.generate_block(); - }); + node.generate_blocks(3); // Net consume and ignore the recent blocks (0..4).for_each(|_| { @@ -130,16 +117,16 @@ impl CompactBlockBasic { }); // Relay a block contains `new_tx` as committed, but not include in prefilled - let new_block = node0 + let new_block = node .new_block_builder(None, None, None) .transaction(new_tx) .build(); net.send( NetworkProtocol::RELAY.into(), - peer_id0, + peer_id, build_compact_block(&new_block), ); - let ret = wait_until(10, move || node0.get_tip_block() == new_block); + let ret = wait_until(10, move || node.get_tip_block() == new_block); assert!(!ret, "Node0 should be unable to reconstruct the block"); let (_, _, data) = net.receive(); @@ -152,13 +139,66 @@ impl CompactBlockBasic { clear_messages(net); } + + pub fn test_lose_get_block_transactions( + &self, + net: &Net, + node0: &Node, + node1: &Node, + peer_id0: PeerIndex, + ) { + node0.generate_block(); + let new_tx = node0.new_transaction(node0.get_tip_block().transactions()[0].hash().clone()); + node0.submit_block( + &node0 + .new_block_builder(None, None, None) + .proposal(new_tx.proposal_short_id()) + .build(), + ); + // Proposal a tx, and grow up into proposal window + node0.generate_blocks(6); + + // Make node0 and node1 reach the same height + node1.generate_block(); + node0.connect(node1); + node0.waiting_for_sync(node1, node0.get_tip_block().header().number(), 20); + + // Net consume and ignore the recent blocks + clear_messages(net); + + // Construct a new block contains one transaction + let block = node0 + .new_block_builder(None, None, None) + .transaction(new_tx) + .build(); + + // Net send the compact block to node0, but dose not send the corresponding missing + // block transactions. It will make node0 unable to reconstruct the complete block + net.send( + NetworkProtocol::RELAY.into(), + peer_id0, + build_compact_block(&block), + ); + let (_, _, data) = net + .receive_timeout(Duration::from_secs(10)) + .expect("receive GetBlockTransactions"); + let message = get_root::(&data).unwrap(); + assert_eq!( + message.payload_type(), + RelayPayload::GetBlockTransactions, + "Node0 should send GetBlockTransactions message for missing transactions", + ); + + // Submit the new block to node1. We expect node1 will relay the new block to node0. + node1.submit_block(&block); + node1.waiting_for_sync(node0, node1.get_tip_block().header().number(), 20); + } } impl Spec for CompactBlockBasic { fn run(&self, net: Net) { info!("Running CompactBlockBasic"); - info!("Connect nodes"); let peer_ids = net .nodes .iter() @@ -170,10 +210,11 @@ impl Spec for CompactBlockBasic { .collect::>(); clear_messages(&net); - self.test_empty_compact_block(&net, &peer_ids); - self.test_empty_parent_unknown_compact_block(&net, &peer_ids); - self.test_all_prefilled_compact_block(&net, &peer_ids); - self.test_missing_txs_compact_block(&net, &peer_ids); + self.test_empty_compact_block(&net, &net.nodes[0], peer_ids[0]); + self.test_empty_parent_unknown_compact_block(&net, &net.nodes[0], peer_ids[0]); + self.test_all_prefilled_compact_block(&net, &net.nodes[0], peer_ids[0]); + self.test_missing_txs_compact_block(&net, &net.nodes[0], peer_ids[0]); + self.test_lose_get_block_transactions(&net, &net.nodes[0], &net.nodes[1], peer_ids[0]); } fn test_protocols(&self) -> Vec { From 321b4a71833606fdf69e3b24a62108548191de24 Mon Sep 17 00:00:00 2001 From: keroro520 Date: Wed, 29 May 2019 11:25:40 +0800 Subject: [PATCH 2/5] feat: Remove compact_block_filter --- sync/src/relayer/compact_block_process.rs | 12 ------------ sync/src/relayer/mod.rs | 11 ----------- 2 files changed, 23 deletions(-) diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index 33b33163bd..fcbded6687 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -43,18 +43,6 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { let compact_block: CompactBlock = (*self.message).try_into()?; let block_hash = compact_block.header.hash().to_owned(); - if self.relayer.state.already_known_compact_block(&block_hash) { - debug_target!( - crate::LOG_TARGET_RELAY, - "discarding already known compact block {:x}", - block_hash - ); - return Ok(()); - } - self.relayer - .state - .mark_as_known_compact_block(block_hash.clone()); - if let Some(parent_header_view) = self .relayer .shared diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 16ee511596..1ab1ce417f 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -54,7 +54,6 @@ pub const ASK_FOR_TXS_TOKEN: u64 = 1; pub const MAX_RELAY_PEERS: usize = 128; pub const TX_FILTER_SIZE: usize = 50000; pub const TX_ASKED_SIZE: usize = TX_FILTER_SIZE; -pub const COMPACT_BLOCK_FILTER_SIZE: usize = 8192; pub struct Relayer { chain: ChainController, @@ -493,7 +492,6 @@ pub struct RelayState { pub pending_proposals_request: Mutex>>, pub tx_filter: Mutex>, pub tx_already_asked: Mutex>, - pub compact_block_filter: Mutex>, } impl Default for RelayState { @@ -504,7 +502,6 @@ impl Default for RelayState { pending_proposals_request: Mutex::new(FnvHashMap::default()), tx_filter: Mutex::new(LruCache::new(TX_FILTER_SIZE)), tx_already_asked: Mutex::new(LruCache::new(TX_ASKED_SIZE)), - compact_block_filter: Mutex::new(LruCache::new(COMPACT_BLOCK_FILTER_SIZE)), } } } @@ -518,12 +515,4 @@ impl RelayState { fn already_known_tx(&self, hash: &H256) -> bool { self.tx_filter.lock().contains_key(hash) } - - fn already_known_compact_block(&self, hash: &H256) -> bool { - self.compact_block_filter.lock().contains_key(hash) - } - - fn mark_as_known_compact_block(&self, hash: H256) { - self.compact_block_filter.lock().insert(hash, ()); - } } From 9cd1ffb84f1ba1719369fc85dd88b5327f85f2d1 Mon Sep 17 00:00:00 2001 From: keroro520 Date: Thu, 30 May 2019 15:28:30 +0800 Subject: [PATCH 3/5] feat: Filter duplicated compact blocks via BlocksInFlight Currently, we discard the new arriving compact blocks if we had already received a same one, via `RelayState::compact_block_filter`. The purpose is avoiding re-processing/re-requesting the same compact blocks. But the problem is, if we failed to reconstruct a compact block, we cannot process this compact block any more, before receiving the corresponding transactions from peer1. That's terrible if peer1's inventory was lost on the way. Here I remove `RelayState::compact_block_filter`, and insteadly use `Peers::blocks_inflight` to avoid re-requesting to same peers. --- sync/src/relayer/compact_block_process.rs | 28 +++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index fcbded6687..a9971e7e43 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -76,6 +76,19 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { return Ok(()); } + if let Some(flight) = self + .relayer + .peers + .blocks_inflight + .read() + .inflight_state_by_block(&block_hash) + { + if flight.peers.contains(&self.peer) { + debug!(target: "relay", "discarding already in-flight compact block {:x}", block_hash); + return Ok(()); + } + } + // The new arrived has greater difficulty than local best known chain let mut missing_indexes: Vec = Vec::new(); { @@ -143,6 +156,21 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { } } if !missing_indexes.is_empty() { + if !self + .relayer + .peers + .blocks_inflight + .write() + .insert(self.peer, block_hash.to_owned()) + { + debug!( + target: "relay", + "BlockInFlight reach limit or had requested, peer: {}, block: {:x}", + self.peer, block_hash, + ); + return Ok(()); + } + let fbb = &mut FlatBufferBuilder::new(); let message = RelayMessage::build_get_block_transactions( fbb, From 789f4bf558cf3c44821983468134290c6cd4bfd9 Mon Sep 17 00:00:00 2001 From: keroro520 Date: Thu, 30 May 2019 15:30:40 +0800 Subject: [PATCH 4/5] fix: Filter compact block at peer-level --- .../src/relayer/block_transactions_process.rs | 41 +++++----- sync/src/relayer/compact_block_process.rs | 79 ++++++++++++------- sync/src/relayer/mod.rs | 2 +- 3 files changed, 72 insertions(+), 50 deletions(-) diff --git a/sync/src/relayer/block_transactions_process.rs b/sync/src/relayer/block_transactions_process.rs index 981da2380e..6723c30682 100644 --- a/sync/src/relayer/block_transactions_process.rs +++ b/sync/src/relayer/block_transactions_process.rs @@ -4,6 +4,7 @@ use ckb_network::{CKBProtocolContext, PeerIndex}; use ckb_protocol::{cast, BlockTransactions, FlatbuffersVectorIterator}; use ckb_store::ChainStore; use failure::Error as FailureError; +use std::collections::hash_map::Entry; use std::convert::TryInto; use std::sync::Arc; @@ -31,29 +32,31 @@ impl<'a, CS: ChainStore + 'static> BlockTransactionsProcess<'a, CS> { pub fn execute(self) -> Result<(), FailureError> { let block_hash = cast!(self.message.block_hash())?.try_into()?; - if let Some(compact_block) = self - .relayer - .state - .pending_compact_blocks - .lock() - .remove(&block_hash) - { - let transactions: Vec = - FlatbuffersVectorIterator::new(cast!(self.message.transactions())?) - .map(TryInto::try_into) - .collect::>()?; + let pending_compact_blocks = &self.relayer.state.pending_compact_blocks; + if let Entry::Occupied(mut pending) = pending_compact_blocks.lock().entry(block_hash) { + let (compact_block, peers_set) = pending.get_mut(); + if peers_set.remove(&self.peer) { + let transactions: Vec = + FlatbuffersVectorIterator::new(cast!(self.message.transactions())?) + .map(TryInto::try_into) + .collect::>()?; - let ret = { - let chain_state = self.relayer.shared.lock_chain_state(); - self.relayer - .reconstruct_block(&chain_state, &compact_block, transactions) - }; + let ret = { + let chain_state = self.relayer.shared.lock_chain_state(); + self.relayer + .reconstruct_block(&chain_state, compact_block, transactions) + }; - if let Ok(block) = ret { - self.relayer - .accept_block(self.nc.as_ref(), self.peer, &Arc::new(block)); + // TODO Add this (compact_block, peer) into RecentRejects if reconstruct_block failed? + // TODO Add this block into RecentRejects if accept_block failed? + if let Ok(block) = ret { + pending.remove(); + self.relayer + .accept_block(self.nc.as_ref(), self.peer, &Arc::new(block)); + } } } + Ok(()) } } diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index a9971e7e43..751368929a 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -12,7 +12,7 @@ use ckb_traits::{BlockMedianTimeContext, ChainProvider}; use ckb_verification::{HeaderResolverWrapper, HeaderVerifier, Verifier}; use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; -use fnv::FnvHashMap; +use fnv::FnvHashSet; use numext_fixed_hash::H256; use std::convert::TryInto; use std::sync::Arc; @@ -43,14 +43,30 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { let compact_block: CompactBlock = (*self.message).try_into()?; let block_hash = compact_block.header.hash().to_owned(); - if let Some(parent_header_view) = self + let parent = self .relayer .shared - .get_header_view(&compact_block.header.parent_hash()) + .get_header_view(compact_block.header.parent_hash()); + if parent.is_none() { + debug_target!( + crate::LOG_TARGET_RELAY, + "UnknownParent: {:#x}, send_getheaders_to_peer({})", + block_hash, + self.peer + ); + self.relayer.shared.send_getheaders_to_peer( + self.nc.as_ref(), + self.peer, + self.relayer.shared.lock_chain_state().tip_header(), + ); + return Ok(()); + } + { + let parent = parent.unwrap(); let best_known_header = self.relayer.shared.best_known_header(); let current_total_difficulty = - parent_header_view.total_difficulty() + compact_block.header.difficulty(); + parent.total_difficulty() + compact_block.header.difficulty(); if current_total_difficulty <= *best_known_header.total_difficulty() { debug_target!( crate::LOG_TARGET_RELAY, @@ -61,19 +77,6 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { ); return Ok(()); } - } else { - debug_target!( - crate::LOG_TARGET_RELAY, - "UnknownParent: {:#x}, send_getheaders_to_peer({})", - block_hash, - self.peer - ); - self.relayer.shared.send_getheaders_to_peer( - self.nc.as_ref(), - self.peer, - self.relayer.shared.lock_chain_state().tip_header(), - ); - return Ok(()); } if let Some(flight) = self @@ -84,7 +87,11 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { .inflight_state_by_block(&block_hash) { if flight.peers.contains(&self.peer) { - debug!(target: "relay", "discarding already in-flight compact block {:x}", block_hash); + debug_target!( + crate::LOG_TARGET_RELAY, + "discard already in-flight compact block {:x}", + block_hash, + ); return Ok(()); } } @@ -94,16 +101,26 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { { // Verify compact block let mut pending_compact_blocks = self.relayer.state.pending_compact_blocks.lock(); - if pending_compact_blocks.get(&block_hash).is_some() + if pending_compact_blocks + .get(&block_hash) + .map(|(_, peers_set)| peers_set.contains(&self.peer)) + .unwrap_or(false) || self.relayer.shared.get_block(&block_hash).is_some() { debug_target!( crate::LOG_TARGET_RELAY, - "already processed compact block {:x}", + "discard already pending compact block {:x}", block_hash ); return Ok(()); } else { + let fn_get_pending_header = { + |block_hash| { + pending_compact_blocks + .get(&block_hash) + .map(|(compact_block, _)| compact_block.header.to_owned()) + } + }; let resolver = HeaderResolverWrapper::new( &compact_block.header, self.relayer.shared.shared().to_owned(), @@ -111,7 +128,7 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { let header_verifier = HeaderVerifier::new( CompactBlockMedianTimeView { anchor_hash: compact_block.header.hash(), - pending_compact_blocks: &pending_compact_blocks, + fn_get_pending_header: Box::new(fn_get_pending_header), shared: self.relayer.shared.shared(), }, Arc::clone(&self.relayer.shared.consensus().pow_engine()), @@ -151,7 +168,11 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { } Err(missing) => { missing_indexes = missing; - pending_compact_blocks.insert(block_hash.clone(), compact_block); + pending_compact_blocks + .entry(block_hash.clone()) + .or_insert_with(|| (compact_block, FnvHashSet::default())) + .1 + .insert(self.peer); } } } @@ -163,10 +184,11 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { .write() .insert(self.peer, block_hash.to_owned()) { - debug!( - target: "relay", + debug_target!( + crate::LOG_TARGET_RELAY, "BlockInFlight reach limit or had requested, peer: {}, block: {:x}", - self.peer, block_hash, + self.peer, + block_hash, ); return Ok(()); } @@ -190,7 +212,7 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { struct CompactBlockMedianTimeView<'a, CS> { anchor_hash: &'a H256, - pending_compact_blocks: &'a FnvHashMap, + fn_get_pending_header: Box Option
+ 'a>, shared: &'a Shared, } @@ -199,10 +221,7 @@ where CS: ChainStore, { fn get_header(&self, hash: &H256) -> Option
{ - self.pending_compact_blocks - .get(hash) - .map(|cb| cb.header.to_owned()) - .or_else(|| self.shared.block_header(hash)) + (self.fn_get_pending_header)(hash.to_owned()).or_else(|| self.shared.block_header(hash)) } } diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 1ab1ce417f..c5248b1af9 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -487,7 +487,7 @@ impl CKBProtocolHandler for Relayer { } pub struct RelayState { - pub pending_compact_blocks: Mutex>, + pub pending_compact_blocks: Mutex)>>, pub inflight_proposals: Mutex>, pub pending_proposals_request: Mutex>>, pub tx_filter: Mutex>, From f196dc16c7832804123611f87dd1ee5b4e199da3 Mon Sep 17 00:00:00 2001 From: keroro520 Date: Sat, 1 Jun 2019 10:49:40 +0800 Subject: [PATCH 5/5] fix: Clear pending compact blocks after successing to reconstrct --- sync/src/relayer/compact_block_process.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index 751368929a..d2fc2430c0 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -163,6 +163,7 @@ impl<'a, CS: ChainStore + 'static> CompactBlockProcess<'a, CS> { // into database match ret { Ok(block) => { + pending_compact_blocks.remove(&block_hash); self.relayer .accept_block(self.nc.as_ref(), self.peer, &Arc::new(block)) }