From dd5f8b6ca0d13435404caacd1e7af4bf6ae0b00d Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Thu, 21 Mar 2019 20:34:29 +0800 Subject: [PATCH] feat: use TryFrom convert protocol --- Cargo.lock | 1 + Makefile | 2 +- network/src/service/discovery_service.rs | 2 +- protocol/Cargo.toml | 1 + protocol/src/builder.rs | 5 +- protocol/src/convert.rs | 282 +++++++++++------- protocol/src/error.rs | 7 + protocol/src/lib.rs | 8 + sync/src/relayer/block_proposal_process.rs | 14 +- .../src/relayer/block_transactions_process.rs | 20 +- sync/src/relayer/compact_block.rs | 61 ++-- sync/src/relayer/compact_block_process.rs | 7 +- .../src/relayer/get_block_proposal_process.rs | 19 +- .../relayer/get_block_transactions_process.rs | 17 +- sync/src/relayer/mod.rs | 130 ++++---- sync/src/relayer/transaction_process.rs | 8 +- sync/src/synchronizer/block_process.rs | 7 +- sync/src/synchronizer/filter_process.rs | 18 +- sync/src/synchronizer/get_blocks_process.rs | 37 ++- sync/src/synchronizer/get_headers_process.rs | 74 ++--- sync/src/synchronizer/headers_process.rs | 42 ++- sync/src/synchronizer/mod.rs | 71 +++-- util/src/lib.rs | 1 + util/src/unstable/mod.rs | 2 + util/src/unstable/try_convert.rs | 28 ++ 25 files changed, 544 insertions(+), 320 deletions(-) create mode 100644 protocol/src/error.rs create mode 100644 util/src/unstable/try_convert.rs diff --git a/Cargo.lock b/Cargo.lock index d6878be97a..191ef7643e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -526,6 +526,7 @@ dependencies = [ "ckb-core 0.7.0-pre", "ckb-merkle-tree 0.7.0-pre", "ckb-util 0.7.0-pre", + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "hash 0.7.0-pre", "numext-fixed-hash 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Makefile b/Makefile index 59220abd26..6357134c39 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ fmt: cargo fmt ${VERBOSE} --all -- --check clippy: - cargo clippy ${VERBOSE} --all --all-targets --all-features -- -D warnings -D clippy::clone_on_ref_ptr -D clippy::enum_glob_use + cargo clippy ${VERBOSE} --all --all-targets --all-features -- -D warnings -D clippy::clone_on_ref_ptr -D clippy::enum_glob_use -D clippy::fallible_impl_from ci: fmt clippy test diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs index d2dcc56453..c065c16d3d 100644 --- a/network/src/service/discovery_service.rs +++ b/network/src/service/discovery_service.rs @@ -65,7 +65,7 @@ impl ServiceProtocol for DiscoveryProtocol { Ok(()) }) }) - .unwrap(); + .expect("Discovery init only once"); control.future_task(discovery_task); } diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index ae05962996..7847c04e01 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -15,3 +15,4 @@ siphasher = "0.3.0" rand = "0.6" ckb-util = { path = "../util" } ckb-merkle-tree = { path = "../util/merkle-tree"} +failure = "0.1.5" diff --git a/protocol/src/builder.rs b/protocol/src/builder.rs index 9e22015332..c2f6fc23f7 100644 --- a/protocol/src/builder.rs +++ b/protocol/src/builder.rs @@ -622,6 +622,7 @@ mod tests { use ckb_core::block::BlockBuilder; use ckb_core::header::HeaderBuilder; use ckb_core::transaction::TransactionBuilder; + use ckb_util::TryInto; use flatbuffers::get_root; #[test] @@ -632,7 +633,7 @@ mod tests { builder.finish(b, None); let fbs_header = get_root::(builder.finished_data()); - assert_eq!(header, fbs_header.into()); + assert_eq!(header, fbs_header.try_into().unwrap()); } #[test] @@ -645,7 +646,7 @@ mod tests { builder.finish(b, None); let fbs_block = get_root::(builder.finished_data()); - assert_eq!(block, fbs_block.into()); + assert_eq!(block, fbs_block.try_into().unwrap()); } #[test] diff --git a/protocol/src/convert.rs b/protocol/src/convert.rs index 764747d95e..95ed1e37dc 100644 --- a/protocol/src/convert.rs +++ b/protocol/src/convert.rs @@ -1,6 +1,9 @@ +use crate::cast; use crate::protocol_generated::ckb::protocol as ckb_protocol; use crate::FlatbuffersVectorIterator; use ckb_core; +use ckb_util::{TryFrom, TryInto}; +use failure::Error as FailureError; use numext_fixed_hash::H256; use numext_fixed_uint::U256; @@ -16,9 +19,11 @@ impl From<&H256> for ckb_protocol::H256 { } } -impl From<&ckb_protocol::H256> for H256 { - fn from(h256: &ckb_protocol::H256) -> Self { - H256::from_slice(&[ +impl TryFrom<&ckb_protocol::H256> for H256 { + type Error = FailureError; + + fn try_from(h256: &ckb_protocol::H256) -> Result { + let ret = H256::from_slice(&[ h256.u0(), h256.u1(), h256.u2(), @@ -51,8 +56,8 @@ impl From<&ckb_protocol::H256> for H256 { h256.u29(), h256.u30(), h256.u31(), - ]) - .unwrap() + ])?; + Ok(ret) } } @@ -66,9 +71,11 @@ impl From<&ckb_core::transaction::ProposalShortId> for ckb_protocol::ProposalSho } } -impl From<&ckb_protocol::ProposalShortId> for ckb_core::transaction::ProposalShortId { - fn from(short_id: &ckb_protocol::ProposalShortId) -> Self { - Self::from_slice(&[ +impl TryFrom<&ckb_protocol::ProposalShortId> for ckb_core::transaction::ProposalShortId { + type Error = FailureError; + + fn try_from(short_id: &ckb_protocol::ProposalShortId) -> Result { + let ret = cast!(Self::from_slice(&[ short_id.u0(), short_id.u1(), short_id.u2(), @@ -79,164 +86,221 @@ impl From<&ckb_protocol::ProposalShortId> for ckb_core::transaction::ProposalSho short_id.u7(), short_id.u8_(), short_id.u9(), - ]) - .unwrap() + ]))?; + Ok(ret) } } -impl<'a> From> for ckb_core::block::Block { - fn from(block: ckb_protocol::Block<'a>) -> Self { - let commit_transactions = - FlatbuffersVectorIterator::new(block.commit_transactions().unwrap()) - .map(Into::into) +impl<'a> TryFrom> for ckb_core::block::Block { + type Error = FailureError; + + fn try_from(block: ckb_protocol::Block<'a>) -> Result { + let commit_transactions: Result, FailureError> = + FlatbuffersVectorIterator::new(cast!(block.commit_transactions())?) + .map(TryInto::try_into) .collect(); - let uncles = FlatbuffersVectorIterator::new(block.uncles().unwrap()) - .map(Into::into) - .collect(); + let uncles: Result, FailureError> = + FlatbuffersVectorIterator::new(cast!(block.uncles())?) + .map(TryInto::try_into) + .collect(); - let proposal_transactions = block - .proposal_transactions() - .unwrap() + let proposal_transactions: Result< + Vec, + FailureError, + > = cast!(block.proposal_transactions())? .iter() - .map(Into::into) + .map(TryInto::try_into) .collect(); - ckb_core::block::BlockBuilder::default() - .header(block.header().unwrap().into()) - .uncles(uncles) - .commit_transactions(commit_transactions) - .proposal_transactions(proposal_transactions) - .build() + let header = cast!(block.header())?; + + Ok(ckb_core::block::BlockBuilder::default() + .header(TryInto::try_into(header)?) + .uncles(uncles?) + .commit_transactions(commit_transactions?) + .proposal_transactions(proposal_transactions?) + .build()) } } -impl<'a> From> for ckb_core::uncle::UncleBlock { - fn from(uncle_block: ckb_protocol::UncleBlock<'a>) -> Self { - ckb_core::uncle::UncleBlock { - header: uncle_block.header().unwrap().into(), - cellbase: uncle_block.cellbase().unwrap().into(), - proposal_transactions: uncle_block - .proposal_transactions() - .unwrap() - .iter() - .map(Into::into) - .collect(), - } +impl<'a> TryFrom> for ckb_core::uncle::UncleBlock { + type Error = FailureError; + + fn try_from(uncle_block: ckb_protocol::UncleBlock<'a>) -> Result { + let proposal_transactions: Result< + Vec, + FailureError, + > = cast!(uncle_block.proposal_transactions())? + .iter() + .map(TryInto::try_into) + .collect(); + let header = cast!(uncle_block.header())?; + let cellbase = cast!(uncle_block.cellbase())?; + + Ok(ckb_core::uncle::UncleBlock { + header: TryInto::try_into(header)?, + cellbase: TryInto::try_into(cellbase)?, + proposal_transactions: proposal_transactions?, + }) } } -impl<'a> From> for ckb_core::header::Header { - fn from(header: ckb_protocol::Header<'a>) -> Self { - ckb_core::header::HeaderBuilder::default() +impl<'a> TryFrom> for ckb_core::header::Header { + type Error = FailureError; + + fn try_from(header: ckb_protocol::Header<'a>) -> Result { + let parent_hash = cast!(header.parent_hash())?; + let txs_commit = cast!(header.txs_commit())?; + let txs_proposal = cast!(header.txs_proposal())?; + let cellbase_id = cast!(header.cellbase_id())?; + let uncles_hash = cast!(header.uncles_hash())?; + + Ok(ckb_core::header::HeaderBuilder::default() .version(header.version()) - .parent_hash(header.parent_hash().unwrap().into()) + .parent_hash(TryInto::try_into(parent_hash)?) .timestamp(header.timestamp()) .number(header.number()) - .txs_commit(header.txs_commit().unwrap().into()) - .txs_proposal(header.txs_proposal().unwrap().into()) - .difficulty( - U256::from_little_endian(header.difficulty().and_then(|b| b.seq()).unwrap()) - .unwrap(), - ) - .cellbase_id(header.cellbase_id().unwrap().into()) - .uncles_hash(header.uncles_hash().unwrap().into()) + .txs_commit(TryInto::try_into(txs_commit)?) + .txs_proposal(TryInto::try_into(txs_proposal)?) + .difficulty(U256::from_little_endian(cast!(header + .difficulty() + .and_then(|d| d.seq()))?)?) + .cellbase_id(TryInto::try_into(cellbase_id)?) + .uncles_hash(TryInto::try_into(uncles_hash)?) .nonce(header.nonce()) - .proof(header.proof().and_then(|b| b.seq()).unwrap().to_vec()) + .proof(cast!(header + .proof() + .and_then(|p| p.seq()) + .map(|p| p.to_vec()))?) .uncles_count(header.uncles_count()) - .build() + .build()) } } -impl<'a> From> for ckb_core::transaction::Transaction { - fn from(transaction: ckb_protocol::Transaction<'a>) -> Self { - let deps = FlatbuffersVectorIterator::new(transaction.deps().unwrap()) - .map(Into::into) - .collect(); +impl<'a> TryFrom> for ckb_core::transaction::Transaction { + type Error = FailureError; - let inputs = FlatbuffersVectorIterator::new(transaction.inputs().unwrap()) - .map(Into::into) - .collect(); + fn try_from(transaction: ckb_protocol::Transaction<'a>) -> Result { + let deps: Result, FailureError> = + FlatbuffersVectorIterator::new(cast!(transaction.deps())?) + .map(TryInto::try_into) + .collect(); - let outputs = FlatbuffersVectorIterator::new(transaction.outputs().unwrap()) - .map(Into::into) - .collect(); + let inputs: Result, FailureError> = + FlatbuffersVectorIterator::new(cast!(transaction.inputs())?) + .map(TryInto::try_into) + .collect(); + + let outputs: Result, FailureError> = + FlatbuffersVectorIterator::new(cast!(transaction.outputs())?) + .map(TryInto::try_into) + .collect(); - ckb_core::transaction::TransactionBuilder::default() + Ok(ckb_core::transaction::TransactionBuilder::default() .version(transaction.version()) - .deps(deps) - .inputs(inputs) - .outputs(outputs) - .build() + .deps(deps?) + .inputs(inputs?) + .outputs(outputs?) + .build()) } } -impl<'a> From> +impl<'a> TryFrom> for (ckb_core::transaction::Transaction, ckb_core::Cycle) { - fn from(tx: ckb_protocol::ValidTransaction<'a>) -> Self { - let cycles = tx.cycles(); - (tx.transaction().unwrap().into(), cycles) + type Error = FailureError; + + fn try_from(vtx: ckb_protocol::ValidTransaction<'a>) -> Result { + let tx = cast!(vtx.transaction())?; + let cycles = vtx.cycles(); + Ok((TryInto::try_into(tx)?, cycles)) } } -impl<'a> From> for ckb_core::transaction::OutPoint { - fn from(out_point: ckb_protocol::OutPoint<'a>) -> Self { - ckb_core::transaction::OutPoint { - hash: out_point.hash().unwrap().into(), +impl<'a> TryFrom> for ckb_core::transaction::OutPoint { + type Error = FailureError; + + fn try_from(out_point: ckb_protocol::OutPoint<'a>) -> Result { + let hash = cast!(out_point.hash())?; + Ok(ckb_core::transaction::OutPoint { + hash: TryInto::try_into(hash)?, index: out_point.index(), - } + }) } } -impl<'a> From> for ckb_core::script::Script { - fn from(script: ckb_protocol::Script<'a>) -> Self { - let args = FlatbuffersVectorIterator::new(script.args().unwrap()) - .map(|argument| argument.seq().unwrap().to_vec()) - .collect(); +impl<'a> TryFrom> for ckb_core::script::Script { + type Error = FailureError; - let signed_args = FlatbuffersVectorIterator::new(script.signed_args().unwrap()) - .map(|argument| argument.seq().unwrap().to_vec()) + fn try_from(script: ckb_protocol::Script<'a>) -> Result { + let args: Option>> = FlatbuffersVectorIterator::new(cast!(script.args())?) + .map(|argument| argument.seq().map(|s| s.to_vec())) .collect(); - ckb_core::script::Script { + let signed_args: Option>> = + FlatbuffersVectorIterator::new(cast!(script.signed_args())?) + .map(|argument| argument.seq().map(|s| s.to_vec())) + .collect(); + + let reference = match script.reference() { + Some(reference) => Some(TryInto::try_into(reference)?), + None => None, + }; + + Ok(ckb_core::script::Script { version: script.version(), - args, + args: cast!(args)?, binary: script.binary().and_then(|s| s.seq()).map(|s| s.to_vec()), - signed_args, - reference: script.reference().map(Into::into), - } + signed_args: cast!(signed_args)?, + reference, + }) } } -impl<'a> From> for ckb_core::transaction::CellInput { - fn from(cell_input: ckb_protocol::CellInput<'a>) -> Self { - ckb_core::transaction::CellInput { +impl<'a> TryFrom> for ckb_core::transaction::CellInput { + type Error = FailureError; + + fn try_from(cell_input: ckb_protocol::CellInput<'a>) -> Result { + let hash = cast!(cell_input.hash())?; + let unlock = cast!(cell_input.unlock())?; + Ok(ckb_core::transaction::CellInput { previous_output: ckb_core::transaction::OutPoint { - hash: cell_input.hash().unwrap().into(), + hash: TryInto::try_into(hash)?, index: cell_input.index(), }, - unlock: cell_input.unlock().unwrap().into(), - } + unlock: TryInto::try_into(unlock)?, + }) } } -impl<'a> From> for ckb_core::transaction::CellOutput { - fn from(cell_output: ckb_protocol::CellOutput<'a>) -> Self { - ckb_core::transaction::CellOutput { +impl<'a> TryFrom> for ckb_core::transaction::CellOutput { + type Error = FailureError; + + fn try_from(cell_output: ckb_protocol::CellOutput<'a>) -> Result { + let lock = cast!(cell_output.lock())?; + let type_ = match cell_output.type_() { + Some(type_) => Some(TryInto::try_into(type_)?), + None => None, + }; + + Ok(ckb_core::transaction::CellOutput { capacity: cell_output.capacity(), - data: cell_output.data().and_then(|b| b.seq()).unwrap().to_vec(), - lock: cell_output.lock().unwrap().into(), - type_: cell_output.type_().map(Into::into), - } + data: cast!(cell_output.data().and_then(|s| s.seq()))?.to_vec(), + lock: TryInto::try_into(lock)?, + type_, + }) } } -impl<'a> From> for ckb_core::transaction::IndexTransaction { - fn from(it: ckb_protocol::IndexTransaction<'a>) -> Self { - ckb_core::transaction::IndexTransaction { +impl<'a> TryFrom> for ckb_core::transaction::IndexTransaction { + type Error = FailureError; + + fn try_from(it: ckb_protocol::IndexTransaction<'a>) -> Result { + let transaction = cast!(it.transaction())?; + Ok(ckb_core::transaction::IndexTransaction { index: it.index() as usize, - transaction: it.transaction().unwrap().into(), - } + transaction: TryInto::try_into(transaction)?, + }) } } diff --git a/protocol/src/error.rs b/protocol/src/error.rs new file mode 100644 index 0000000000..0121d653ad --- /dev/null +++ b/protocol/src/error.rs @@ -0,0 +1,7 @@ +use failure::Fail; + +#[derive(Debug, Fail, PartialEq)] +pub enum Error { + #[fail(display = "Malformed message")] + Malformed, +} diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 83f4b24529..1c4402ae51 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -1,5 +1,6 @@ mod builder; mod convert; +pub mod error; #[rustfmt::skip] #[allow(clippy::all)] mod protocol_generated; @@ -69,3 +70,10 @@ pub fn short_transaction_id(key0: u64, key1: u64, transaction_hash: &H256) -> Sh short_transaction_id } + +#[macro_export] +macro_rules! cast { + ($expr:expr) => { + $expr.ok_or_else(|| $crate::error::Error::Malformed) + }; +} diff --git a/sync/src/relayer/block_proposal_process.rs b/sync/src/relayer/block_proposal_process.rs index c829557a8d..4f0c294c13 100644 --- a/sync/src/relayer/block_proposal_process.rs +++ b/sync/src/relayer/block_proposal_process.rs @@ -1,7 +1,9 @@ use crate::relayer::Relayer; -use ckb_protocol::{BlockProposal, FlatbuffersVectorIterator}; +use ckb_protocol::{cast, BlockProposal, FlatbuffersVectorIterator}; use ckb_shared::index::ChainIndex; use ckb_traits::chain_provider::ChainProvider; +use ckb_util::TryInto; +use failure::Error as FailureError; pub struct BlockProposalProcess<'a, CI: ChainIndex + 'a> { message: &'a BlockProposal<'a>, @@ -16,13 +18,15 @@ where BlockProposalProcess { message, relayer } } - pub fn execute(self) { + pub fn execute(self) -> Result<(), FailureError> { let chain_state = self.relayer.shared.chain_state().lock(); - FlatbuffersVectorIterator::new(self.message.transactions().unwrap()).for_each(|tx| { + let txs = FlatbuffersVectorIterator::new(cast!(self.message.transactions())?); + for tx in txs { let _ = chain_state.add_tx_to_pool( - tx.into(), + TryInto::try_into(tx)?, self.relayer.shared.consensus().max_block_cycles(), ); - }) + } + Ok(()) } } diff --git a/sync/src/relayer/block_transactions_process.rs b/sync/src/relayer/block_transactions_process.rs index 18388a27dd..36e9ed5515 100644 --- a/sync/src/relayer/block_transactions_process.rs +++ b/sync/src/relayer/block_transactions_process.rs @@ -1,9 +1,10 @@ use crate::relayer::Relayer; use ckb_core::transaction::Transaction; -use ckb_network::CKBProtocolContext; -use ckb_network::PeerIndex; -use ckb_protocol::{BlockTransactions, FlatbuffersVectorIterator}; +use ckb_network::{CKBProtocolContext, PeerIndex}; +use ckb_protocol::{cast, BlockTransactions, FlatbuffersVectorIterator}; use ckb_shared::index::ChainIndex; +use ckb_util::TryInto; +use failure::Error as FailureError; use std::sync::Arc; pub struct BlockTransactionsProcess<'a, CI: ChainIndex + 'a> { @@ -31,8 +32,8 @@ where } } - pub fn execute(self) { - let hash = self.message.hash().unwrap().into(); + pub fn execute(self) -> Result<(), FailureError> { + let hash = cast!(self.message.hash())?.try_into()?; if let Some(compact_block) = self .relayer .state @@ -40,15 +41,15 @@ where .lock() .remove(&hash) { - let transactions: Vec = - FlatbuffersVectorIterator::new(self.message.transactions().unwrap()) - .map(Into::into) + let transactions: Result, FailureError> = + FlatbuffersVectorIterator::new(cast!(self.message.transactions())?) + .map(TryInto::try_into) .collect(); let ret = { let chain_state = self.relayer.shared.chain_state().lock(); self.relayer - .reconstruct_block(&chain_state, &compact_block, transactions) + .reconstruct_block(&chain_state, &compact_block, transactions?) }; if let Ok(block) = ret { @@ -56,5 +57,6 @@ where .accept_block(self.nc, self.peer, &Arc::new(block)); } } + Ok(()) } } diff --git a/sync/src/relayer/compact_block.rs b/sync/src/relayer/compact_block.rs index 47d000045e..be0bb428bd 100644 --- a/sync/src/relayer/compact_block.rs +++ b/sync/src/relayer/compact_block.rs @@ -1,7 +1,9 @@ use ckb_core::header::Header; use ckb_core::transaction::{IndexTransaction, ProposalShortId}; use ckb_core::uncle::UncleBlock; -use ckb_protocol::{self, FlatbuffersVectorIterator}; +use ckb_protocol::{self, cast, FlatbuffersVectorIterator}; +use ckb_util::{TryFrom, TryInto}; +use failure::Error as FailureError; pub type ShortTransactionID = [u8; 6]; @@ -15,34 +17,39 @@ pub struct CompactBlock { pub proposal_transactions: Vec, } -impl<'a> From> for CompactBlock { - fn from(b: ckb_protocol::CompactBlock<'a>) -> Self { - CompactBlock { - header: b.header().unwrap().into(), +impl<'a> TryFrom> for CompactBlock { + type Error = FailureError; + + fn try_from(b: ckb_protocol::CompactBlock<'a>) -> Result { + let header = cast!(b.header())?; + let short_ids = cast!(b.short_ids())?; + let prefilled_transactions: Result, FailureError> = + FlatbuffersVectorIterator::new(cast!(b.prefilled_transactions())?) + .map(TryInto::try_into) + .collect(); + + let uncles: Result, FailureError> = + FlatbuffersVectorIterator::new(cast!(b.uncles())?) + .map(TryInto::try_into) + .collect(); + let proposal_transactions: Result, FailureError> = cast!(b.proposal_transactions())? + .iter() + .map(TryInto::try_into) + .collect(); + + Ok(CompactBlock { + header: header.try_into()?, nonce: b.nonce(), - short_ids: FlatbuffersVectorIterator::new(b.short_ids().unwrap()) - .map(|bytes| { + short_ids: cast!(FlatbuffersVectorIterator::new(short_ids) + .map(|bytes| bytes.seq().map(|seq| { let mut short_id = [0u8; 6]; - short_id.copy_from_slice(bytes.seq().unwrap()); + short_id.copy_from_slice(seq); short_id - }) - .collect(), - prefilled_transactions: FlatbuffersVectorIterator::new( - b.prefilled_transactions().unwrap(), - ) - .map(Into::into) - .collect(), - - uncles: FlatbuffersVectorIterator::new(b.uncles().unwrap()) - .map(Into::into) - .collect(), - - proposal_transactions: b - .proposal_transactions() - .unwrap() - .iter() - .map(Into::into) - .collect(), - } + })) + .collect::>>())?, + prefilled_transactions: prefilled_transactions?, + uncles: uncles?, + proposal_transactions: proposal_transactions?, + }) } } diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index c5fd3212c1..94ba5f3f5c 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -5,7 +5,9 @@ use ckb_protocol::{CompactBlock as FbsCompactBlock, RelayMessage}; use ckb_shared::index::ChainIndex; use ckb_shared::shared::Shared; use ckb_traits::{BlockMedianTimeContext, ChainProvider}; +use ckb_util::TryInto; use ckb_verification::{HeaderResolverWrapper, HeaderVerifier, Verifier}; +use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; use fnv::FnvHashMap; use numext_fixed_hash::H256; @@ -36,8 +38,8 @@ where } } - pub fn execute(self) { - let compact_block: CompactBlock = (*self.message).into(); + pub fn execute(self) -> Result<(), FailureError> { + let compact_block: CompactBlock = (*self.message).try_into()?; let block_hash = compact_block.header.hash(); let mut missing_indexes: Vec = Vec::new(); { @@ -94,6 +96,7 @@ where fbb.finish(message, None); let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); } + Ok(()) } } diff --git a/sync/src/relayer/get_block_proposal_process.rs b/sync/src/relayer/get_block_proposal_process.rs index ba41995d31..2b98d3d4d1 100644 --- a/sync/src/relayer/get_block_proposal_process.rs +++ b/sync/src/relayer/get_block_proposal_process.rs @@ -1,7 +1,9 @@ use crate::relayer::Relayer; use ckb_network::{CKBProtocolContext, PeerIndex}; -use ckb_protocol::{GetBlockProposal, RelayMessage}; +use ckb_protocol::{cast, GetBlockProposal, RelayMessage}; use ckb_shared::index::ChainIndex; +use ckb_util::TryInto; +use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; pub struct GetBlockProposalProcess<'a, CI: ChainIndex + 'a> { @@ -29,17 +31,21 @@ where } } - pub fn execute(self) { + pub fn execute(self) -> Result<(), FailureError> { let mut pending_proposals_request = self.relayer.state.pending_proposals_request.lock(); + let proposal_transactions = cast!(self.message.proposal_transactions())?; let transactions = { let chain_state = self.relayer.shared.chain_state().lock(); let tx_pool = chain_state.tx_pool(); - self.message - .proposal_transactions() - .unwrap() + + let proposals = proposal_transactions .iter() - .map(Into::into) + .map(TryInto::try_into) + .collect::, FailureError>>(); + + proposals? + .into_iter() .filter_map(|short_id| { tx_pool.get_tx(&short_id).or({ pending_proposals_request @@ -57,5 +63,6 @@ where fbb.finish(message, None); let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + Ok(()) } } diff --git a/sync/src/relayer/get_block_transactions_process.rs b/sync/src/relayer/get_block_transactions_process.rs index 1e2a96e732..0c7031b03f 100644 --- a/sync/src/relayer/get_block_transactions_process.rs +++ b/sync/src/relayer/get_block_transactions_process.rs @@ -1,7 +1,9 @@ use crate::relayer::Relayer; use ckb_network::{CKBProtocolContext, PeerIndex}; -use ckb_protocol::{GetBlockTransactions, RelayMessage}; +use ckb_protocol::{cast, GetBlockTransactions, RelayMessage}; use ckb_shared::index::ChainIndex; +use ckb_util::TryInto; +use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; use log::debug; @@ -30,15 +32,14 @@ where } } - pub fn execute(self) { - let hash = self.message.hash().unwrap().into(); + pub fn execute(self) -> Result<(), FailureError> { + let hash = cast!(self.message.hash())?.try_into()?; debug!(target: "relay", "get_block_transactions {:?}", hash); + let indexes = cast!(self.message.indexes())?; + if let Some(block) = self.relayer.get_block(&hash) { - let transactions = self - .message - .indexes() - .unwrap() + let transactions = indexes .safe_slice() .iter() .filter_map(|i| block.commit_transactions().get(*i as usize).cloned()) @@ -50,5 +51,7 @@ where let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); } + + Ok(()) } } diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 2c2f28f8f9..f2baf4d459 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -21,13 +21,16 @@ use bytes::Bytes; use ckb_chain::chain::ChainController; use ckb_core::block::{Block, BlockBuilder}; use ckb_core::transaction::{ProposalShortId, Transaction}; -use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, TimerToken}; -use ckb_protocol::{short_transaction_id, short_transaction_id_keys, RelayMessage, RelayPayload}; +use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, Severity, TimerToken}; +use ckb_protocol::{ + cast, short_transaction_id, short_transaction_id_keys, RelayMessage, RelayPayload, +}; use ckb_shared::chain_state::ChainState; use ckb_shared::index::ChainIndex; use ckb_shared::shared::Shared; use ckb_traits::ChainProvider; use ckb_util::Mutex; +use failure::Error as FailureError; use flatbuffers::{get_root, FlatBufferBuilder}; use fnv::{FnvHashMap, FnvHashSet}; use log::{debug, info}; @@ -60,48 +63,72 @@ where } } - fn process(&self, nc: &CKBProtocolContext, peer: PeerIndex, message: RelayMessage) { + fn try_process( + &self, + nc: &CKBProtocolContext, + peer: PeerIndex, + message: RelayMessage, + ) -> Result<(), FailureError> { match message.payload_type() { - RelayPayload::CompactBlock => CompactBlockProcess::new( - &message.payload_as_compact_block().unwrap(), - self, - peer, - nc, - ) - .execute(), - RelayPayload::ValidTransaction => TransactionProcess::new( - &message.payload_as_valid_transaction().unwrap(), - self, - peer, - nc, - ) - .execute(), - RelayPayload::GetBlockTransactions => GetBlockTransactionsProcess::new( - &message.payload_as_get_block_transactions().unwrap(), - self, - peer, - nc, - ) - .execute(), - RelayPayload::BlockTransactions => BlockTransactionsProcess::new( - &message.payload_as_block_transactions().unwrap(), - self, - peer, - nc, - ) - .execute(), - RelayPayload::GetBlockProposal => GetBlockProposalProcess::new( - &message.payload_as_get_block_proposal().unwrap(), - self, - peer, - nc, - ) - .execute(), + RelayPayload::CompactBlock => { + CompactBlockProcess::new( + &cast!(message.payload_as_compact_block())?, + self, + peer, + nc, + ) + .execute()?; + } + RelayPayload::ValidTransaction => { + TransactionProcess::new( + &cast!(message.payload_as_valid_transaction())?, + self, + peer, + nc, + ) + .execute()?; + } + RelayPayload::GetBlockTransactions => { + GetBlockTransactionsProcess::new( + &cast!(message.payload_as_get_block_transactions())?, + self, + peer, + nc, + ) + .execute()?; + } + RelayPayload::BlockTransactions => { + BlockTransactionsProcess::new( + &cast!(message.payload_as_block_transactions())?, + self, + peer, + nc, + ) + .execute()?; + } + RelayPayload::GetBlockProposal => { + GetBlockProposalProcess::new( + &cast!(message.payload_as_get_block_proposal())?, + self, + peer, + nc, + ) + .execute()?; + } RelayPayload::BlockProposal => { - BlockProposalProcess::new(&message.payload_as_block_proposal().unwrap(), self) - .execute() + BlockProposalProcess::new(&cast!(message.payload_as_block_proposal())?, self) + .execute()?; + } + RelayPayload::NONE => { + cast!(None)?; } - RelayPayload::NONE => {} + } + Ok(()) + } + + fn process(&self, nc: &CKBProtocolContext, peer: PeerIndex, message: RelayMessage) { + if self.try_process(nc, peer, message).is_err() { + nc.report_peer(peer, Severity::Bad("Malformed RelayMessage")); } } @@ -199,24 +226,27 @@ where // append remain transactions short_ids_iter.for_each(|short_id| block_transactions.push(txs_map.remove(short_id))); - let mut missing_indexes = Vec::new(); - - for (i, t) in block_transactions.iter().enumerate() { - if t.is_none() { - missing_indexes.push(i); - } - } + let missing = block_transactions.iter().any(|tx| tx.is_none()); - if missing_indexes.is_empty() { + if !missing { + let txs = block_transactions + .into_iter() + .collect::>>() + .expect("missing checked, should not fail"); let block = BlockBuilder::default() .header(compact_block.header.clone()) .uncles(compact_block.uncles.clone()) - .commit_transactions(block_transactions.into_iter().map(|t| t.unwrap()).collect()) + .commit_transactions(txs) .proposal_transactions(compact_block.proposal_transactions.clone()) .build(); Ok(block) } else { + let missing_indexes = block_transactions + .iter() + .enumerate() + .filter_map(|(i, t)| if t.is_none() { Some(i) } else { None }) + .collect(); Err(missing_indexes) } } diff --git a/sync/src/relayer/transaction_process.rs b/sync/src/relayer/transaction_process.rs index 750850ebca..3b3d3f71b8 100644 --- a/sync/src/relayer/transaction_process.rs +++ b/sync/src/relayer/transaction_process.rs @@ -5,7 +5,9 @@ use ckb_protocol::{RelayMessage, ValidTransaction as FbsValidTransaction}; use ckb_shared::index::ChainIndex; use ckb_shared::tx_pool::types::PoolError; use ckb_traits::chain_provider::ChainProvider; +use ckb_util::TryInto; use ckb_verification::TransactionError; +use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; use log::debug; use std::time::Duration; @@ -37,8 +39,8 @@ where } } - pub fn execute(self) { - let (tx, relay_cycles): (Transaction, Cycle) = (*self.message).into(); + pub fn execute(self) -> Result<(), FailureError> { + let (tx, relay_cycles): (Transaction, Cycle) = (*self.message).try_into()?; let tx_result = { let chain_state = self.relayer.shared.chain_state().lock(); let max_block_cycles = self.relayer.shared.consensus().max_block_cycles(); @@ -83,5 +85,7 @@ where self.nc.ban_peer(self.peer, DEFAULT_BAN_TIME); } } + + Ok(()) } } diff --git a/sync/src/synchronizer/block_process.rs b/sync/src/synchronizer/block_process.rs index 40c350ca0d..2a1a722082 100644 --- a/sync/src/synchronizer/block_process.rs +++ b/sync/src/synchronizer/block_process.rs @@ -3,6 +3,8 @@ use ckb_core::block::Block; use ckb_network::{CKBProtocolContext, PeerIndex}; use ckb_protocol::Block as PBlock; use ckb_shared::index::ChainIndex; +use ckb_util::TryInto; +use failure::Error as FailureError; use log::debug; pub struct BlockProcess<'a, CI: ChainIndex + 'a> { @@ -28,11 +30,12 @@ where } } - pub fn execute(self) { - let block: Block = (*self.message).into(); + pub fn execute(self) -> Result<(), FailureError> { + let block: Block = (*self.message).try_into()?; debug!(target: "sync", "BlockProcess received block {} {:x}", block.header().number(), block.header().hash()); self.synchronizer.peers.block_received(self.peer, &block); self.synchronizer.process_new_block(self.peer, block); + Ok(()) } } diff --git a/sync/src/synchronizer/filter_process.rs b/sync/src/synchronizer/filter_process.rs index 0e8b6cc1b5..4a775999e5 100644 --- a/sync/src/synchronizer/filter_process.rs +++ b/sync/src/synchronizer/filter_process.rs @@ -1,8 +1,9 @@ use crate::synchronizer::Synchronizer; use crate::types::TransactionFilter; use ckb_network::PeerIndex; -use ckb_protocol::{AddFilter, SetFilter}; +use ckb_protocol::{cast, AddFilter, SetFilter}; use ckb_shared::index::ChainIndex; +use failure::Error as FailureError; pub struct SetFilterProcess<'a, CI: ChainIndex + 'a> { message: &'a SetFilter<'a>, @@ -26,16 +27,18 @@ where } } - pub fn execute(self) { + pub fn execute(self) -> Result<(), FailureError> { // TODO add filter size and num_hashes max value checking let mut filters = self.synchronizer.peers.transaction_filters.write(); + let msg = cast!(self.message.filter())?; filters.entry(self.peer).or_insert_with(|| { TransactionFilter::new( - self.message.filter().unwrap(), + msg, self.message.num_hashes() as usize, self.message.hash_seed() as usize, ) }); + Ok(()) } } @@ -61,11 +64,13 @@ where } } - pub fn execute(self) { + pub fn execute(self) -> Result<(), FailureError> { let mut filters = self.synchronizer.peers.transaction_filters.write(); + let msg = cast!(self.message.filter())?; filters .entry(self.peer) - .and_modify(|filter| filter.update(self.message.filter().unwrap())); + .and_modify(|filter| filter.update(msg)); + Ok(()) } } @@ -82,8 +87,9 @@ where Self { peer, synchronizer } } - pub fn execute(self) { + pub fn execute(self) -> Result<(), FailureError> { let mut filters = self.synchronizer.peers.transaction_filters.write(); filters.remove(&self.peer); + Ok(()) } } diff --git a/sync/src/synchronizer/get_blocks_process.rs b/sync/src/synchronizer/get_blocks_process.rs index 023f28ae73..475f3b328c 100644 --- a/sync/src/synchronizer/get_blocks_process.rs +++ b/sync/src/synchronizer/get_blocks_process.rs @@ -1,7 +1,9 @@ use crate::synchronizer::Synchronizer; use ckb_network::{CKBProtocolContext, PeerIndex}; -use ckb_protocol::{GetBlocks, SyncMessage}; +use ckb_protocol::{cast, GetBlocks, SyncMessage}; use ckb_shared::index::ChainIndex; +use ckb_util::TryInto; +use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; use log::debug; @@ -30,19 +32,32 @@ where } } - pub fn execute(self) { - self.message.block_hashes().unwrap().iter().for_each(|fbs_h256| { - let block_hash = fbs_h256.into(); + pub fn execute(self) -> Result<(), FailureError> { + let block_hashes = cast!(self.message.block_hashes())?; + + for fbs_h256 in block_hashes { + let block_hash = fbs_h256.try_into()?; debug!(target: "sync", "get_blocks {:x}", block_hash); if let Some(block) = self.synchronizer.get_block(&block_hash) { debug!(target: "sync", "respond_block {} {:x}", block.header().number(), block.header().hash()); - if let Some(filter) = self.synchronizer.peers.transaction_filters.read().get(&self.peer) { - let transactions_index = block.commit_transactions().iter().enumerate().filter(|(_index, tx)| - filter.contains(tx) - ).map(|ti| ti.0).collect::>(); + if let Some(filter) = self + .synchronizer + .peers + .transaction_filters + .read() + .get(&self.peer) + { + let transactions_index = block + .commit_transactions() + .iter() + .enumerate() + .filter(|(_index, tx)| filter.contains(tx)) + .map(|ti| ti.0) + .collect::>(); let fbb = &mut FlatBufferBuilder::new(); - let message = SyncMessage::build_filtered_block(fbb, &block, &transactions_index); + let message = + SyncMessage::build_filtered_block(fbb, &block, &transactions_index); fbb.finish(message, None); let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); } else { @@ -55,6 +70,8 @@ where // TODO response not found // TODO add timeout check in synchronizer } - }) + } + + Ok(()) } } diff --git a/sync/src/synchronizer/get_headers_process.rs b/sync/src/synchronizer/get_headers_process.rs index 6a1f16d82c..e6af618e53 100644 --- a/sync/src/synchronizer/get_headers_process.rs +++ b/sync/src/synchronizer/get_headers_process.rs @@ -2,8 +2,10 @@ use crate::synchronizer::Synchronizer; use crate::MAX_LOCATOR_SIZE; use ckb_core::header::Header; use ckb_network::{CKBProtocolContext, PeerIndex, Severity}; -use ckb_protocol::{GetHeaders, SyncMessage}; +use ckb_protocol::{cast, GetHeaders, SyncMessage}; use ckb_shared::index::ChainIndex; +use ckb_util::TryInto; +use failure::Error as FailureError; use flatbuffers::FlatBufferBuilder; use log::{debug, info, warn}; use numext_fixed_hash::H256; @@ -33,48 +35,50 @@ where } } - pub fn execute(self) { + pub fn execute(self) -> Result<(), FailureError> { if self.synchronizer.is_initial_block_download() { info!(target: "sync", "Ignoring getheaders from peer={} because node is in initial block download", self.peer); - return; + return Ok(()); } - if let Some(locator) = self.message.block_locator_hashes() { - let locator_size = locator.len(); - if locator_size > MAX_LOCATOR_SIZE { - warn!(target: "sync", " getheaders locator size {} from peer={}", locator_size, self.peer); - self.nc - .report_peer(self.peer, Severity::Bad("over maximum locator size")); - return; - } - let hash_stop = H256::zero(); // TODO PENDING self.message.hash_stop().unwrap().into(); - let block_locator_hashes = locator.iter().map(Into::into).collect::>(); + let locator = cast!(self.message.block_locator_hashes())?; + let locator_size = locator.len(); + if locator_size > MAX_LOCATOR_SIZE { + warn!(target: "sync", " getheaders locator size {} from peer={}", locator_size, self.peer); + cast!(None)?; + } - if let Some(block_number) = self - .synchronizer - .locate_latest_common_block(&hash_stop, &block_locator_hashes[..]) - { - debug!(target: "sync", "\n\nheaders latest_common={} tip={} begin\n\n", block_number, {self.synchronizer.tip_header().number()}); + let hash_stop = H256::zero(); // TODO PENDING self.message.hash_stop().into(); + let block_locator_hashes = locator + .iter() + .map(TryInto::try_into) + .collect::, FailureError>>()?; - self.synchronizer.peers.getheaders_received(self.peer); - let headers: Vec
= self - .synchronizer - .get_locator_response(block_number, &hash_stop); - // response headers + if let Some(block_number) = self + .synchronizer + .locate_latest_common_block(&hash_stop, &block_locator_hashes[..]) + { + debug!(target: "sync", "\n\nheaders latest_common={} tip={} begin\n\n", block_number, {self.synchronizer.tip_header().number()}); + + self.synchronizer.peers.getheaders_received(self.peer); + let headers: Vec
= self + .synchronizer + .get_locator_response(block_number, &hash_stop); + // response headers - debug!(target: "sync", "\nheaders len={}\n", headers.len()); + debug!(target: "sync", "\nheaders len={}\n", headers.len()); - let fbb = &mut FlatBufferBuilder::new(); - let message = SyncMessage::build_headers(fbb, &headers); - fbb.finish(message, None); - let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); - } else { - warn!(target: "sync", "\n\nunknown block headers from peer {} {:#?}\n\n", self.peer, block_locator_hashes); - // Got 'headers' message without known blocks - // ban or close peers - self.nc - .report_peer(self.peer, Severity::Bad("without common headers")); - } + let fbb = &mut FlatBufferBuilder::new(); + let message = SyncMessage::build_headers(fbb, &headers); + fbb.finish(message, None); + let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + } else { + warn!(target: "sync", "\n\nunknown block headers from peer {} {:#?}\n\n", self.peer, block_locator_hashes); + // Got 'headers' message without known blocks + // ban or close peers + self.nc + .report_peer(self.peer, Severity::Bad("without common headers")); } + Ok(()) } } diff --git a/sync/src/synchronizer/headers_process.rs b/sync/src/synchronizer/headers_process.rs index 3e07f46a23..845b8994a8 100644 --- a/sync/src/synchronizer/headers_process.rs +++ b/sync/src/synchronizer/headers_process.rs @@ -2,12 +2,13 @@ use crate::synchronizer::{BlockStatus, Synchronizer}; use crate::MAX_HEADERS_LEN; use ckb_core::header::Header; use ckb_network::{CKBProtocolContext, PeerIndex}; -use ckb_protocol::{FlatbuffersVectorIterator, Headers}; +use ckb_protocol::{cast, FlatbuffersVectorIterator, Headers}; use ckb_shared::index::ChainIndex; use ckb_traits::{BlockMedianTimeContext, ChainProvider}; +use ckb_util::TryInto; use ckb_verification::{Error as VerifyError, HeaderResolver, HeaderVerifier, Verifier}; -use log; -use log::{debug, log_enabled}; +use failure::Error as FailureError; +use log::{self, debug, log_enabled, warn}; use numext_fixed_hash::H256; use numext_fixed_uint::U256; use std::sync::Arc; @@ -163,14 +164,6 @@ where } } - fn is_empty(&self) -> bool { - self.message.headers().unwrap().len() == 0 - } - - fn is_oversize(&self) -> bool { - self.message.headers().unwrap().len() > MAX_HEADERS_LEN - } - fn is_continuous(&self, headers: &[Header]) -> bool { for window in headers.windows(2) { if let [parent, header] = &window { @@ -205,28 +198,30 @@ where acceptor.accept() } - pub fn execute(self) { + pub fn execute(self) -> Result<(), FailureError> { debug!(target: "sync", "HeadersProcess begin"); - if self.is_oversize() { + let headers = cast!(self.message.headers())?; + + if headers.len() > MAX_HEADERS_LEN { self.synchronizer.peers.misbehavior(self.peer, 20); - debug!(target: "sync", "HeadersProcess is_oversize"); - return; + warn!(target: "sync", "HeadersProcess is_oversize"); + return Ok(()); } - if self.is_empty() { + if headers.len() == 0 { debug!(target: "sync", "HeadersProcess is_empty"); - return; + return Ok(()); } - let headers = FlatbuffersVectorIterator::new(self.message.headers().unwrap()) - .map(Into::into) - .collect::>(); + let headers = FlatbuffersVectorIterator::new(headers) + .map(TryInto::try_into) + .collect::, FailureError>>()?; if !self.is_continuous(&headers) { self.synchronizer.peers.misbehavior(self.peer, 20); debug!(target: "sync", "HeadersProcess is not continuous"); - return; + return Ok(()); } let result = self.accept_first(&headers[0]); @@ -237,7 +232,7 @@ where .misbehavior(self.peer, result.misbehavior); } debug!(target: "sync", "\n\nHeadersProcess accept_first is_valid {:?} headers = {:?}\n\n", result, headers[0]); - return; + return Ok(()); } for window in headers.windows(2) { @@ -258,7 +253,7 @@ where .misbehavior(self.peer, result.misbehavior); } debug!(target: "sync", "HeadersProcess accept is invalid {:?}", result); - return; + return Ok(()); } } } @@ -300,6 +295,7 @@ where self.synchronizer .send_getheaders_to_peer(self.nc, self.peer, start); } + Ok(()) } } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 886ac968be..ec0503b974 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -27,7 +27,7 @@ use ckb_chain_spec::consensus::Consensus; use ckb_core::block::Block; use ckb_core::header::{BlockNumber, Header}; use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, Severity, TimerToken}; -use ckb_protocol::{SyncMessage, SyncPayload}; +use ckb_protocol::{cast, SyncMessage, SyncPayload}; use ckb_shared::index::ChainIndex; use ckb_shared::shared::Shared; use ckb_traits::ChainProvider; @@ -136,35 +136,53 @@ impl Synchronizer { outbound_peers_with_protect: Arc::new(AtomicUsize::new(0)), } } - - fn process(&self, nc: &CKBProtocolContext, peer: PeerIndex, message: SyncMessage) { + fn try_process( + &self, + nc: &CKBProtocolContext, + peer: PeerIndex, + message: SyncMessage, + ) -> Result<(), FailureError> { match message.payload_type() { SyncPayload::GetHeaders => { - GetHeadersProcess::new(&message.payload_as_get_headers().unwrap(), self, peer, nc) - .execute() + GetHeadersProcess::new(&cast!(message.payload_as_get_headers())?, self, peer, nc) + .execute()?; } SyncPayload::Headers => { - HeadersProcess::new(&message.payload_as_headers().unwrap(), self, peer, nc) - .execute() + HeadersProcess::new(&cast!(message.payload_as_headers())?, self, peer, nc) + .execute()?; } SyncPayload::GetBlocks => { - GetBlocksProcess::new(&message.payload_as_get_blocks().unwrap(), self, peer, nc) - .execute() + GetBlocksProcess::new(&cast!(message.payload_as_get_blocks())?, self, peer, nc) + .execute()?; } SyncPayload::Block => { - BlockProcess::new(&message.payload_as_block().unwrap(), self, peer, nc).execute() + BlockProcess::new(&cast!(message.payload_as_block())?, self, peer, nc).execute()?; } SyncPayload::SetFilter => { - SetFilterProcess::new(&message.payload_as_set_filter().unwrap(), self, peer) - .execute() + SetFilterProcess::new(&cast!(message.payload_as_set_filter())?, self, peer) + .execute()?; } SyncPayload::AddFilter => { - AddFilterProcess::new(&message.payload_as_add_filter().unwrap(), self, peer) - .execute() + AddFilterProcess::new(&cast!(message.payload_as_add_filter())?, self, peer) + .execute()?; + } + SyncPayload::ClearFilter => { + ClearFilterProcess::new(self, peer).execute()?; + } + SyncPayload::FilteredBlock => { + // ignore, should not receive FilteredBlock in full node mode + cast!(None)?; + } + SyncPayload::NONE => { + cast!(None)?; } - SyncPayload::ClearFilter => ClearFilterProcess::new(self, peer).execute(), - SyncPayload::FilteredBlock => {} // ignore, should not receive FilteredBlock in full node mode - SyncPayload::NONE => {} + } + Ok(()) + } + + fn process(&self, nc: &CKBProtocolContext, peer: PeerIndex, message: SyncMessage) { + if self.try_process(nc, peer, message).is_err() { + nc.report_peer(peer, Severity::Bad("Malformed SyncMessage")); } } @@ -453,12 +471,11 @@ impl Synchronizer { .block_header(&block.header().parent_hash()) .is_some() { - let ret = self.accept_block(peer, &block); - if ret.is_err() { + if let Err(e) = self.accept_block(peer, &block) { debug!( target: "sync", "[Synchronizer] accept_block {:?} error {:?}", block, - ret.unwrap_err() + e ); } } else { @@ -592,7 +609,11 @@ impl Synchronizer { self.send_getheaders_to_peer( nc, *peer, - &state.chain_sync.work_header.clone().unwrap(), + &state + .chain_sync + .work_header + .clone() + .expect("work_header be assigned"), ); } } @@ -1196,7 +1217,9 @@ mod tests { let fbs_headers = get_root::(fbb.finished_data()); let peer = 1usize; - HeadersProcess::new(&fbs_headers, &synchronizer1, peer, &mock_network_context(0)).execute(); + HeadersProcess::new(&fbs_headers, &synchronizer1, peer, &mock_network_context(0)) + .execute() + .unwrap(); let best_known_header = synchronizer1.peers.best_known_header(peer); @@ -1224,7 +1247,9 @@ mod tests { fbb.finish(fbs_block, None); let fbs_block = get_root::(fbb.finished_data()); - BlockProcess::new(&fbs_block, &synchronizer1, peer, &mock_network_context(0)).execute(); + BlockProcess::new(&fbs_block, &synchronizer1, peer, &mock_network_context(0)) + .execute() + .unwrap(); } assert_eq!( diff --git a/util/src/lib.rs b/util/src/lib.rs index fdaa9c30c3..7774644263 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -1,5 +1,6 @@ mod unstable; +pub use crate::unstable::{TryFrom, TryInto}; pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; /// Helper macro for reducing boilerplate code for matching `Option` together diff --git a/util/src/unstable/mod.rs b/util/src/unstable/mod.rs index 8b13789179..30abac43e2 100644 --- a/util/src/unstable/mod.rs +++ b/util/src/unstable/mod.rs @@ -1 +1,3 @@ +pub mod try_convert; +pub use try_convert::{TryFrom, TryInto}; diff --git a/util/src/unstable/try_convert.rs b/util/src/unstable/try_convert.rs new file mode 100644 index 0000000000..cfe8a76dfd --- /dev/null +++ b/util/src/unstable/try_convert.rs @@ -0,0 +1,28 @@ +pub trait TryInto: Sized { + /// The type returned in the event of a conversion error. + type Error; + + /// Performs the conversion. + fn try_into(self) -> Result; +} + +/// Attempt to construct `Self` via a conversion. +pub trait TryFrom: Sized { + /// The type returned in the event of a conversion error. + type Error; + + /// Performs the conversion. + fn try_from(value: T) -> Result; +} + +// TryFrom implies TryInto +impl TryInto for T +where + U: TryFrom, +{ + type Error = U::Error; + + fn try_into(self) -> Result { + U::try_from(self) + } +}