From 5bcf6e5453d451063a1776fa38b4f14aaf07ac88 Mon Sep 17 00:00:00 2001 From: Brian Pearce Date: Mon, 28 Nov 2022 08:40:06 +0100 Subject: [PATCH] fix: node gets banned on reorg (#4949) Description --- Remove the `FetchBlocksByHash` handler. It was only called from a single place and although designed to handle multiple blocks it was only ever sending a single hash at once making the multi-block functionality useless. Instead, opt to use the existing `GetBlockByHash` handler and expand that handler to accept a new `orphans` flag. Passing this flag means we'll accept found blocks from the orphan pool, Motivation and Context --- Previously if a node had re-orged after a sync had started it may result in not providing the complete block for a block it claimed it had. This results in a brief ban. Make it also return blocks from the orphan pool and let the peer figure out what to do with it. How Has This Been Tested? --- Tests, and running nodes. Fixes: #4799 --- .../comms_interface/comms_request.rs | 36 ++------- .../comms_interface/comms_response.rs | 2 + .../comms_interface/inbound_handlers.rs | 76 ++++++++++--------- .../comms_interface/outbound_interface.rs | 19 ++--- .../core/src/base_node/proto/request.proto | 8 +- .../core/src/base_node/proto/request.rs | 23 ++---- .../core/src/base_node/proto/response.proto | 5 ++ .../core/src/base_node/proto/response.rs | 30 +++++++- 8 files changed, 100 insertions(+), 99 deletions(-) diff --git a/base_layer/core/src/base_node/comms_interface/comms_request.rs b/base_layer/core/src/base_node/comms_interface/comms_request.rs index b18e34f04e..4aa8033de5 100644 --- a/base_layer/core/src/base_node/comms_interface/comms_request.rs +++ b/base_layer/core/src/base_node/comms_interface/comms_request.rs @@ -46,38 +46,20 @@ pub enum NodeCommsRequest { FetchHeaders(RangeInclusive), FetchHeadersByHashes(Vec), FetchMatchingUtxos(Vec), - FetchMatchingBlocks { - range: RangeInclusive, - compact: bool, - }, - FetchBlocksByHash { - block_hashes: Vec, - compact: bool, - }, + FetchMatchingBlocks { range: RangeInclusive, compact: bool }, FetchBlocksByKernelExcessSigs(Vec), FetchBlocksByUtxos(Vec), GetHeaderByHash(HashOutput), GetBlockByHash(HashOutput), GetNewBlockTemplate(GetNewBlockTemplateRequest), GetNewBlock(NewBlockTemplate), + GetBlockFromAllChains(HashOutput), FetchKernelByExcessSig(Signature), - FetchMempoolTransactionsByExcessSigs { - excess_sigs: Vec, - }, - FetchValidatorNodesKeys { - height: u64, - }, - GetShardKey { - height: u64, - public_key: PublicKey, - }, - FetchTemplateRegistrations { - start_height: u64, - end_height: u64, - }, - FetchUnspentUtxosInBlock { - block_hash: BlockHash, - }, + FetchMempoolTransactionsByExcessSigs { excess_sigs: Vec }, + FetchValidatorNodesKeys { height: u64 }, + GetShardKey { height: u64, public_key: PublicKey }, + FetchTemplateRegistrations { start_height: u64, end_height: u64 }, + FetchUnspentUtxosInBlock { block_hash: BlockHash }, } #[derive(Debug, Serialize, Deserialize)] @@ -100,15 +82,13 @@ impl Display for NodeCommsRequest { FetchMatchingBlocks { range, compact } => { write!(f, "FetchMatchingBlocks ({:?}, {})", range, compact) }, - FetchBlocksByHash { block_hashes, compact } => { - write!(f, "FetchBlocksByHash (n={}, {})", block_hashes.len(), compact) - }, FetchBlocksByKernelExcessSigs(v) => write!(f, "FetchBlocksByKernelExcessSigs (n={})", v.len()), FetchBlocksByUtxos(v) => write!(f, "FetchBlocksByUtxos (n={})", v.len()), GetHeaderByHash(v) => write!(f, "GetHeaderByHash({})", v.to_hex()), GetBlockByHash(v) => write!(f, "GetBlockByHash({})", v.to_hex()), GetNewBlockTemplate(v) => write!(f, "GetNewBlockTemplate ({}) with weight {}", v.algo, v.max_weight), GetNewBlock(b) => write!(f, "GetNewBlock (Block Height={})", b.header.height), + GetBlockFromAllChains(v) => write!(f, "GetBlockFromAllChains({})", v.to_hex()), FetchKernelByExcessSig(s) => write!( f, "FetchKernelByExcessSig (signature=({}, {}))", diff --git a/base_layer/core/src/base_node/comms_interface/comms_response.rs b/base_layer/core/src/base_node/comms_interface/comms_response.rs index c6714a6fe2..028cbb709b 100644 --- a/base_layer/core/src/base_node/comms_interface/comms_response.rs +++ b/base_layer/core/src/base_node/comms_interface/comms_response.rs @@ -44,6 +44,7 @@ pub enum NodeCommsResponse { TransactionKernels(Vec), BlockHeaders(Vec), BlockHeader(Option), + Block(Box>), TransactionOutputs(Vec), HistoricalBlocks(Vec), HistoricalBlock(Box>), @@ -70,6 +71,7 @@ impl Display for NodeCommsResponse { TransactionKernels(_) => write!(f, "TransactionKernel"), BlockHeaders(_) => write!(f, "BlockHeaders"), BlockHeader(_) => write!(f, "BlockHeader"), + Block(_) => write!(f, "Block"), HistoricalBlock(_) => write!(f, "HistoricalBlock"), TransactionOutputs(_) => write!(f, "TransactionOutputs"), HistoricalBlocks(_) => write!(f, "HistoricalBlocks"), diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 2dcd1de90f..54a1d83c52 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -169,31 +169,6 @@ where B: BlockchainBackend + 'static let blocks = self.blockchain_db.fetch_blocks(range, compact).await?; Ok(NodeCommsResponse::HistoricalBlocks(blocks)) }, - NodeCommsRequest::FetchBlocksByHash { block_hashes, compact } => { - let mut blocks = Vec::with_capacity(block_hashes.len()); - for block_hash in block_hashes { - let block_hex = block_hash.to_hex(); - debug!( - target: LOG_TARGET, - "A peer has requested a block with hash {} (compact = {})", block_hex, compact - ); - - match self.blockchain_db.fetch_block_by_hash(block_hash, compact).await { - Ok(Some(block)) => blocks.push(block), - Ok(None) => warn!( - target: LOG_TARGET, - "Could not provide requested block {} to peer because not stored", block_hex, - ), - Err(e) => warn!( - target: LOG_TARGET, - "Could not provide requested block {} to peer because: {}", - block_hex, - e.to_string() - ), - } - } - Ok(NodeCommsResponse::HistoricalBlocks(blocks)) - }, NodeCommsRequest::FetchBlocksByKernelExcessSigs(excess_sigs) => { if excess_sigs.len() > MAX_REQUEST_BY_KERNEL_EXCESS_SIGS { return Err(CommsInterfaceError::InvalidRequest { @@ -342,6 +317,43 @@ where B: BlockchainBackend + 'static block: Some(block), }) }, + NodeCommsRequest::GetBlockFromAllChains(hash) => { + let block_hex = hash.to_hex(); + debug!( + target: LOG_TARGET, + "A peer has requested a block with hash {}", block_hex + ); + + let maybe_block = match self + .blockchain_db + .fetch_block_by_hash(hash, true) + .await + .unwrap_or_else(|e| { + warn!( + target: LOG_TARGET, + "Could not provide requested block {} to peer because: {}", + block_hex, + e.to_string() + ); + + None + }) { + None => self.blockchain_db.fetch_orphan(hash).await.map_or_else( + |e| { + warn!( + target: LOG_TARGET, + "Could not provide requested block {} to peer because: {}", block_hex, e, + ); + + None + }, + Some, + ), + Some(block) => Some(block.try_into_block()?), + }; + + Ok(NodeCommsResponse::Block(Box::new(maybe_block))) + }, NodeCommsRequest::FetchKernelByExcessSig(signature) => { let kernels = match self.blockchain_db.fetch_kernel_by_excess_sig(signature).await { Ok(Some((kernel, _))) => vec![kernel], @@ -596,16 +608,12 @@ where B: BlockchainBackend + 'static source_peer: NodeId, block_hash: BlockHash, ) -> Result { - let mut historical_block = self + return match self .outbound_nci - .request_blocks_by_hashes_from_peer(vec![block_hash], Some(source_peer.clone())) - .await?; - - return match historical_block.pop() { - Some(block) => { - let block = block.try_into_block()?; - Ok(block) - }, + .request_blocks_by_hashes_from_peer(block_hash, Some(source_peer.clone())) + .await? + { + Some(block) => Ok(block), None => { if let Err(e) = self .connectivity diff --git a/base_layer/core/src/base_node/comms_interface/outbound_interface.rs b/base_layer/core/src/base_node/comms_interface/outbound_interface.rs index 662a58fa89..7efd8c614b 100644 --- a/base_layer/core/src/base_node/comms_interface/outbound_interface.rs +++ b/base_layer/core/src/base_node/comms_interface/outbound_interface.rs @@ -32,7 +32,7 @@ use crate::{ NodeCommsRequest, NodeCommsResponse, }, - blocks::{HistoricalBlock, NewBlock}, + blocks::{Block, NewBlock}, }; /// The OutboundNodeCommsInterface provides an interface to request information from remove nodes. @@ -60,22 +60,15 @@ impl OutboundNodeCommsInterface { /// Fetch the Blocks corresponding to the provided block hashes from a specific base node. pub async fn request_blocks_by_hashes_from_peer( &mut self, - block_hashes: Vec, + hash: BlockHash, node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::HistoricalBlocks(blocks) = self + ) -> Result, CommsInterfaceError> { + if let NodeCommsResponse::Block(block) = self .request_sender - .call(( - NodeCommsRequest::FetchBlocksByHash { - block_hashes, - // We always request compact inputs from peer - compact: true, - }, - node_id, - )) + .call((NodeCommsRequest::GetBlockFromAllChains(hash), node_id)) .await?? { - Ok(blocks) + Ok(*block) } else { Err(CommsInterfaceError::UnexpectedApiResponse) } diff --git a/base_layer/core/src/base_node/proto/request.proto b/base_layer/core/src/base_node/proto/request.proto index 1c60133514..f99e8e44e6 100644 --- a/base_layer/core/src/base_node/proto/request.proto +++ b/base_layer/core/src/base_node/proto/request.proto @@ -12,8 +12,7 @@ package tari.base_node; message BaseNodeServiceRequest { uint64 request_key = 1; oneof request { - // Indicates a FetchBlocksByHash request. - FetchBlocksByHashRequest fetch_blocks_by_hash = 8; + GetBlockFromAllChainsRequest get_block_from_all_chains = 8; ExcessSigs fetch_mempool_transactions_by_excess_sigs = 9; } } @@ -27,9 +26,8 @@ message BlockHeights { repeated uint64 heights = 1; } -message FetchBlocksByHashRequest { - repeated bytes block_hashes = 1; - bool compact = 2; +message GetBlockFromAllChainsRequest { + bytes hash = 1; } message Signatures { diff --git a/base_layer/core/src/base_node/proto/request.rs b/base_layer/core/src/base_node/proto/request.rs index d5c39fcdcd..09a2c95d10 100644 --- a/base_layer/core/src/base_node/proto/request.rs +++ b/base_layer/core/src/base_node/proto/request.rs @@ -35,18 +35,10 @@ impl TryInto for ProtoNodeCommsRequest { type Error = String; fn try_into(self) -> Result { - use ProtoNodeCommsRequest::{FetchBlocksByHash, FetchMempoolTransactionsByExcessSigs}; + use ProtoNodeCommsRequest::{FetchMempoolTransactionsByExcessSigs, GetBlockFromAllChains}; let request = match self { - FetchBlocksByHash(req) => { - let block_hashes = req - .block_hashes - .into_iter() - .map(|hash| hash.try_into().map_err(|_| "Malformed hash".to_string())) - .collect::>()?; - NodeCommsRequest::FetchBlocksByHash { - block_hashes, - compact: req.compact, - } + GetBlockFromAllChains(req) => { + NodeCommsRequest::GetBlockFromAllChains(req.hash.try_into().map_err(|_| "Malformed hash".to_string())?) }, FetchMempoolTransactionsByExcessSigs(excess_sigs) => { let excess_sigs = excess_sigs @@ -66,13 +58,10 @@ impl TryFrom for ProtoNodeCommsRequest { type Error = String; fn try_from(request: NodeCommsRequest) -> Result { - use NodeCommsRequest::{FetchBlocksByHash, FetchMempoolTransactionsByExcessSigs}; + use NodeCommsRequest::{FetchMempoolTransactionsByExcessSigs, GetBlockFromAllChains}; match request { - FetchBlocksByHash { block_hashes, compact } => Ok(ProtoNodeCommsRequest::FetchBlocksByHash( - proto::FetchBlocksByHashRequest { - block_hashes: block_hashes.into_iter().map(|hash| hash.to_vec()).collect(), - compact, - }, + GetBlockFromAllChains(hash) => Ok(ProtoNodeCommsRequest::GetBlockFromAllChains( + proto::GetBlockFromAllChainsRequest { hash: hash.to_vec() }, )), FetchMempoolTransactionsByExcessSigs { excess_sigs } => Ok( ProtoNodeCommsRequest::FetchMempoolTransactionsByExcessSigs(proto::ExcessSigs { diff --git a/base_layer/core/src/base_node/proto/response.proto b/base_layer/core/src/base_node/proto/response.proto index aad60c6067..5c86afc16f 100644 --- a/base_layer/core/src/base_node/proto/response.proto +++ b/base_layer/core/src/base_node/proto/response.proto @@ -13,6 +13,7 @@ package tari.base_node; message BaseNodeServiceResponse { uint64 request_key = 1; oneof response { + BlockResponse block_response = 5; // Indicates a HistoricalBlocks response. HistoricalBlocks historical_blocks = 6; FetchMempoolTransactionsResponse fetch_mempool_transactions_by_excess_sigs_response = 7; @@ -44,6 +45,10 @@ message HistoricalBlocks { repeated tari.core.HistoricalBlock blocks = 1; } +message BlockResponse { + tari.core.Block block = 1; +} + message NewBlockResponse { bool success = 1; string error = 2; diff --git a/base_layer/core/src/base_node/proto/response.rs b/base_layer/core/src/base_node/proto/response.rs index 6435026091..258bbf8f56 100644 --- a/base_layer/core/src/base_node/proto/response.rs +++ b/base_layer/core/src/base_node/proto/response.rs @@ -32,7 +32,7 @@ use tari_utilities::{convert::try_convert_all, ByteArray}; pub use crate::proto::base_node::base_node_service_response::Response as ProtoNodeCommsResponse; use crate::{ base_node::comms_interface::{FetchMempoolTransactionsResponse, NodeCommsResponse}, - blocks::{BlockHeader, HistoricalBlock}, + blocks::{Block, BlockHeader, HistoricalBlock}, proto, }; @@ -40,8 +40,9 @@ impl TryInto for ProtoNodeCommsResponse { type Error = String; fn try_into(self) -> Result { - use ProtoNodeCommsResponse::{FetchMempoolTransactionsByExcessSigsResponse, HistoricalBlocks}; + use ProtoNodeCommsResponse::{BlockResponse, FetchMempoolTransactionsByExcessSigsResponse, HistoricalBlocks}; let response = match self { + BlockResponse(block) => NodeCommsResponse::Block(Box::new(block.try_into()?)), HistoricalBlocks(blocks) => { let blocks = try_convert_all(blocks.blocks)?; NodeCommsResponse::HistoricalBlocks(blocks) @@ -76,6 +77,7 @@ impl TryFrom for ProtoNodeCommsResponse { fn try_from(response: NodeCommsResponse) -> Result { use NodeCommsResponse::{FetchMempoolTransactionsByExcessSigsResponse, HistoricalBlocks}; match response { + NodeCommsResponse::Block(block) => Ok(ProtoNodeCommsResponse::BlockResponse((*block).try_into()?)), HistoricalBlocks(historical_blocks) => { let historical_blocks = historical_blocks .into_iter() @@ -151,6 +153,30 @@ impl TryInto> for proto::base_node::HistoricalBlockRespo } } +impl TryFrom> for proto::base_node::BlockResponse { + type Error = String; + + fn try_from(v: Option) -> Result { + Ok(Self { + block: v.map(TryInto::try_into).transpose()?, + }) + } +} + +impl TryInto> for proto::base_node::BlockResponse { + type Error = String; + + fn try_into(self) -> Result, Self::Error> { + match self.block { + Some(block) => { + let block = block.try_into()?; + Ok(Some(block)) + }, + None => Ok(None), + } + } +} + //---------------------------------- Collection impls --------------------------------------------// // The following allow `Iterator::collect` to collect into these repeated types