From a588f790232cbd1f54f22d8241fd33d0728d43d5 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 1 Feb 2023 02:51:15 -0500 Subject: [PATCH] feat: introduce FileClient for importing blocks (#1111) --- Cargo.lock | 3 + crates/net/downloaders/Cargo.toml | 10 + crates/net/downloaders/src/bodies/mod.rs | 2 +- crates/net/downloaders/src/lib.rs | 4 +- .../downloaders/src/test_utils/file_client.rs | 183 ++++++++++++++++++ crates/net/downloaders/src/test_utils/mod.rs | 38 ++++ .../test_client.rs} | 45 +---- 7 files changed, 244 insertions(+), 41 deletions(-) create mode 100644 crates/net/downloaders/src/test_utils/file_client.rs create mode 100644 crates/net/downloaders/src/test_utils/mod.rs rename crates/net/downloaders/src/{test_utils.rs => test_utils/test_client.rs} (65%) diff --git a/Cargo.lock b/Cargo.lock index ff6fdb8adda8..c267637eb61b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4162,7 +4162,10 @@ dependencies = [ "reth-interfaces", "reth-metrics-derive", "reth-primitives", + "reth-rlp", "reth-tracing", + "tempfile", + "thiserror", "tokio", "tokio-stream", "tracing", diff --git a/crates/net/downloaders/Cargo.toml b/crates/net/downloaders/Cargo.toml index 81fc7e1b17a2..75505fdba48e 100644 --- a/crates/net/downloaders/Cargo.toml +++ b/crates/net/downloaders/Cargo.toml @@ -26,6 +26,9 @@ tokio-stream = "0.1" tracing = "0.1.37" metrics = "0.20.1" +thiserror = { version = "1", optional = true } +reth-rlp = { path = "../../common/rlp", optional = true } + [dev-dependencies] reth-db = { path = "../../storage/db", features = ["test-utils"] } reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } @@ -33,3 +36,10 @@ reth-tracing = { path = "../../tracing" } assert_matches = "1.5.0" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +reth-rlp = { path = "../../common/rlp" } + +thiserror = "1" +tempfile = "3.3" + +[features] +test-utils = ["dep:reth-rlp", "dep:thiserror"] diff --git a/crates/net/downloaders/src/bodies/mod.rs b/crates/net/downloaders/src/bodies/mod.rs index b6ce04829723..685026f414db 100644 --- a/crates/net/downloaders/src/bodies/mod.rs +++ b/crates/net/downloaders/src/bodies/mod.rs @@ -9,4 +9,4 @@ mod queue; mod request; #[cfg(test)] -mod test_utils; +pub mod test_utils; diff --git a/crates/net/downloaders/src/lib.rs b/crates/net/downloaders/src/lib.rs index d909cf4ef93c..c192c9231c6e 100644 --- a/crates/net/downloaders/src/lib.rs +++ b/crates/net/downloaders/src/lib.rs @@ -17,5 +17,5 @@ pub mod headers; /// Common downloader metrics. pub mod metrics; -#[cfg(test)] -mod test_utils; +#[cfg(any(test, feature = "test-utils"))] +pub mod test_utils; diff --git a/crates/net/downloaders/src/test_utils/file_client.rs b/crates/net/downloaders/src/test_utils/file_client.rs new file mode 100644 index 000000000000..518e03949953 --- /dev/null +++ b/crates/net/downloaders/src/test_utils/file_client.rs @@ -0,0 +1,183 @@ +use std::{ + collections::HashMap, + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use reth_eth_wire::BlockBody; +use reth_interfaces::{ + p2p::{ + bodies::client::{BodiesClient, BodiesFut}, + download::DownloadClient, + error::RequestError, + priority::Priority, + }, + sync::{SyncState, SyncStateProvider, SyncStateUpdater}, +}; +use reth_primitives::{BlockHash, BlockNumber, Header, PeerId, H256}; +use thiserror::Error; +use tokio::{fs::File, io::BufReader}; +use tracing::warn; + +/// Front-end API for fetching chain data from a file. +/// +/// Blocks are assumed to be written one after another in a file, as rlp bytes. +/// +/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows: +/// rlp(block1) || rlp(block2) || rlp(block3) +/// +/// Blocks are assumed to have populated transactions, so reading headers will also buffer +/// transactions in memory for use in the bodies stage. +#[derive(Debug)] +pub struct FileClient { + /// The open reader for the file. + reader: BufReader, + + /// The buffered headers retrieved when fetching new bodies. + headers: HashMap, + + /// The buffered bodies retrieved when fetching new headers. + bodies: HashMap, + + /// Represents if we are currently syncing. + is_syncing: Arc, +} + +/// An error that can occur when constructing and using a [`FileClient`](FileClient). +#[derive(Debug, Error)] +pub enum FileClientError { + /// An error occurred when opening or reading the file. + #[error(transparent)] + Io(#[from] std::io::Error), + + /// An error occurred when decoding blocks, headers, or rlp headers from the file. + #[error(transparent)] + Rlp(#[from] reth_rlp::DecodeError), +} + +impl FileClient { + /// Create a new file client from a file path. + pub async fn new>(path: P) -> Result { + let file = File::open(path).await?; + FileClient::from_file(file) + } + + /// Initialize the [`FileClient`](FileClient) with a file directly. + pub(crate) fn from_file(file: File) -> Result { + let reader = BufReader::new(file); + + Ok(Self { + reader, + headers: HashMap::new(), + bodies: HashMap::new(), + is_syncing: Arc::new(Default::default()), + }) + } + + /// Use the provided bodies as the file client's block body buffer. + pub(crate) fn with_bodies(mut self, bodies: HashMap) -> Self { + self.bodies = bodies; + self + } + + /// Use the provided headers as the file client's block body buffer. + pub(crate) fn with_headers(mut self, headers: HashMap) -> Self { + self.headers = headers; + self + } +} + +impl BodiesClient for FileClient { + type Output = BodiesFut; + + fn get_block_bodies_with_priority( + &self, + hashes: Vec, + _priority: Priority, + ) -> Self::Output { + // this just searches the buffer, and fails if it can't find the block + let mut bodies = Vec::new(); + + // check if any are an error + // could unwrap here + for hash in hashes { + match self.bodies.get(&hash).cloned() { + Some(body) => bodies.push(body), + None => return Box::pin(async move { Err(RequestError::BadResponse) }), + } + } + + Box::pin(async move { Ok((PeerId::default(), bodies).into()) }) + } +} + +impl DownloadClient for FileClient { + fn report_bad_message(&self, _peer_id: PeerId) { + warn!("Reported a bad message on a file client, the file may be corrupted or invalid"); + // noop + } + + fn num_connected_peers(&self) -> usize { + // no such thing as connected peers when we are just using a file + 1 + } +} + +impl SyncStateProvider for FileClient { + fn is_syncing(&self) -> bool { + self.is_syncing.load(Ordering::Relaxed) + } +} + +impl SyncStateUpdater for FileClient { + fn update_sync_state(&self, state: SyncState) { + let is_syncing = state.is_syncing(); + self.is_syncing.store(is_syncing, Ordering::Relaxed) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + bodies::{ + bodies::BodiesDownloaderBuilder, + test_utils::{insert_headers, zip_blocks}, + }, + test_utils::generate_bodies, + }; + use assert_matches::assert_matches; + use futures_util::stream::StreamExt; + use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap}; + use reth_interfaces::{p2p::bodies::downloader::BodyDownloader, test_utils::TestConsensus}; + use std::sync::Arc; + + #[tokio::test] + async fn streams_bodies_from_buffer() { + // Generate some random blocks + let db = create_test_db::(EnvKind::RW); + let (headers, mut bodies) = generate_bodies(0..20); + + insert_headers(&db, &headers); + + // create an empty file + let file = tempfile::tempfile().unwrap(); + + let client = + Arc::new(FileClient::from_file(file.into()).unwrap().with_bodies(bodies.clone())); + let mut downloader = BodiesDownloaderBuilder::default().build( + client.clone(), + Arc::new(TestConsensus::default()), + db, + ); + downloader.set_download_range(0..20).expect("failed to set download range"); + + assert_matches!( + downloader.next().await, + Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies)) + ); + } +} diff --git a/crates/net/downloaders/src/test_utils/mod.rs b/crates/net/downloaders/src/test_utils/mod.rs new file mode 100644 index 000000000000..9bee5ea6267c --- /dev/null +++ b/crates/net/downloaders/src/test_utils/mod.rs @@ -0,0 +1,38 @@ +#![allow(unused)] +//! Test helper impls +use reth_eth_wire::BlockBody; +use reth_interfaces::test_utils::generators::random_block_range; +use reth_primitives::{SealedHeader, H256}; +use std::collections::HashMap; + +/// Metrics scope used for testing. +pub(crate) const TEST_SCOPE: &str = "downloaders.test"; + +/// Generate a set of bodies and their corresponding block hashes +pub(crate) fn generate_bodies( + rng: std::ops::Range, +) -> (Vec, HashMap) { + let blocks = random_block_range(rng, H256::zero(), 0..2); + + let headers = blocks.iter().map(|block| block.header.clone()).collect(); + let bodies = blocks + .into_iter() + .map(|block| { + ( + block.hash(), + BlockBody { + transactions: block.body, + ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(), + }, + ) + }) + .collect(); + + (headers, bodies) +} + +mod file_client; +mod test_client; + +pub use file_client::{FileClient, FileClientError}; +pub use test_client::TestBodiesClient; diff --git a/crates/net/downloaders/src/test_utils.rs b/crates/net/downloaders/src/test_utils/test_client.rs similarity index 65% rename from crates/net/downloaders/src/test_utils.rs rename to crates/net/downloaders/src/test_utils/test_client.rs index ddf3e3469da1..562baad3b931 100644 --- a/crates/net/downloaders/src/test_utils.rs +++ b/crates/net/downloaders/src/test_utils/test_client.rs @@ -1,15 +1,10 @@ -//! Test helper impls - use reth_eth_wire::BlockBody; -use reth_interfaces::{ - p2p::{ - bodies::client::{BodiesClient, BodiesFut}, - download::DownloadClient, - priority::Priority, - }, - test_utils::generators::random_block_range, +use reth_interfaces::p2p::{ + bodies::client::{BodiesClient, BodiesFut}, + download::DownloadClient, + priority::Priority, }; -use reth_primitives::{PeerId, SealedHeader, H256}; +use reth_primitives::{PeerId, H256}; use std::{ collections::HashMap, fmt::Debug, @@ -21,35 +16,9 @@ use std::{ }; use tokio::sync::Mutex; -/// Metrics scope used for testing. -pub(crate) const TEST_SCOPE: &str = "downloaders.test"; - -/// Generate a set of bodies and their corresponding block hashes -pub(crate) fn generate_bodies( - rng: std::ops::Range, -) -> (Vec, HashMap) { - let blocks = random_block_range(rng, H256::zero(), 0..2); - - let headers = blocks.iter().map(|block| block.header.clone()).collect(); - let bodies = blocks - .into_iter() - .map(|block| { - ( - block.hash(), - BlockBody { - transactions: block.body, - ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(), - }, - ) - }) - .collect(); - - (headers, bodies) -} - /// A [BodiesClient] for testing. #[derive(Debug, Default)] -pub(crate) struct TestBodiesClient { +pub struct TestBodiesClient { bodies: Arc>>, should_delay: bool, max_batch_size: Option, @@ -97,7 +66,7 @@ impl BodiesClient for TestBodiesClient { ) -> Self::Output { let should_delay = self.should_delay; let bodies = self.bodies.clone(); - let max_batch_size = self.max_batch_size.clone(); + let max_batch_size = self.max_batch_size; self.times_requested.fetch_add(1, Ordering::Relaxed);