Skip to content

Commit

Permalink
feat: introduce FileClient for importing blocks (#1111)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected authored Feb 1, 2023
1 parent 0149bde commit a588f79
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 41 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions crates/net/downloaders/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,20 @@ tokio-stream = "0.1"
tracing = "0.1.37"
metrics = "0.20.1"

thiserror = { version = "1", optional = true }
reth-rlp = { path = "../../common/rlp", optional = true }

[dev-dependencies]
reth-db = { path = "../../storage/db", features = ["test-utils"] }
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
reth-tracing = { path = "../../tracing" }

assert_matches = "1.5.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
reth-rlp = { path = "../../common/rlp" }

thiserror = "1"
tempfile = "3.3"

[features]
test-utils = ["dep:reth-rlp", "dep:thiserror"]
2 changes: 1 addition & 1 deletion crates/net/downloaders/src/bodies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ mod queue;
mod request;

#[cfg(test)]
mod test_utils;
pub mod test_utils;
4 changes: 2 additions & 2 deletions crates/net/downloaders/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ pub mod headers;
/// Common downloader metrics.
pub mod metrics;

#[cfg(test)]
mod test_utils;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
183 changes: 183 additions & 0 deletions crates/net/downloaders/src/test_utils/file_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use std::{
collections::HashMap,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
error::RequestError,
priority::Priority,
},
sync::{SyncState, SyncStateProvider, SyncStateUpdater},
};
use reth_primitives::{BlockHash, BlockNumber, Header, PeerId, H256};
use thiserror::Error;
use tokio::{fs::File, io::BufReader};
use tracing::warn;

/// Front-end API for fetching chain data from a file.
///
/// Blocks are assumed to be written one after another in a file, as rlp bytes.
///
/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows:
/// rlp(block1) || rlp(block2) || rlp(block3)
///
/// Blocks are assumed to have populated transactions, so reading headers will also buffer
/// transactions in memory for use in the bodies stage.
#[derive(Debug)]
pub struct FileClient {
/// The open reader for the file.
reader: BufReader<File>,

/// The buffered headers retrieved when fetching new bodies.
headers: HashMap<BlockNumber, Header>,

/// The buffered bodies retrieved when fetching new headers.
bodies: HashMap<BlockHash, BlockBody>,

/// Represents if we are currently syncing.
is_syncing: Arc<AtomicBool>,
}

/// An error that can occur when constructing and using a [`FileClient`](FileClient).
#[derive(Debug, Error)]
pub enum FileClientError {
/// An error occurred when opening or reading the file.
#[error(transparent)]
Io(#[from] std::io::Error),

/// An error occurred when decoding blocks, headers, or rlp headers from the file.
#[error(transparent)]
Rlp(#[from] reth_rlp::DecodeError),
}

impl FileClient {
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
FileClient::from_file(file)
}

/// Initialize the [`FileClient`](FileClient) with a file directly.
pub(crate) fn from_file(file: File) -> Result<Self, FileClientError> {
let reader = BufReader::new(file);

Ok(Self {
reader,
headers: HashMap::new(),
bodies: HashMap::new(),
is_syncing: Arc::new(Default::default()),
})
}

/// Use the provided bodies as the file client's block body buffer.
pub(crate) fn with_bodies(mut self, bodies: HashMap<BlockHash, BlockBody>) -> Self {
self.bodies = bodies;
self
}

/// Use the provided headers as the file client's block body buffer.
pub(crate) fn with_headers(mut self, headers: HashMap<BlockNumber, Header>) -> Self {
self.headers = headers;
self
}
}

impl BodiesClient for FileClient {
type Output = BodiesFut;

fn get_block_bodies_with_priority(
&self,
hashes: Vec<H256>,
_priority: Priority,
) -> Self::Output {
// this just searches the buffer, and fails if it can't find the block
let mut bodies = Vec::new();

// check if any are an error
// could unwrap here
for hash in hashes {
match self.bodies.get(&hash).cloned() {
Some(body) => bodies.push(body),
None => return Box::pin(async move { Err(RequestError::BadResponse) }),
}
}

Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
}
}

impl DownloadClient for FileClient {
fn report_bad_message(&self, _peer_id: PeerId) {
warn!("Reported a bad message on a file client, the file may be corrupted or invalid");
// noop
}

fn num_connected_peers(&self) -> usize {
// no such thing as connected peers when we are just using a file
1
}
}

impl SyncStateProvider for FileClient {
fn is_syncing(&self) -> bool {
self.is_syncing.load(Ordering::Relaxed)
}
}

impl SyncStateUpdater for FileClient {
fn update_sync_state(&self, state: SyncState) {
let is_syncing = state.is_syncing();
self.is_syncing.store(is_syncing, Ordering::Relaxed)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
bodies::{
bodies::BodiesDownloaderBuilder,
test_utils::{insert_headers, zip_blocks},
},
test_utils::generate_bodies,
};
use assert_matches::assert_matches;
use futures_util::stream::StreamExt;
use reth_db::mdbx::{test_utils::create_test_db, EnvKind, WriteMap};
use reth_interfaces::{p2p::bodies::downloader::BodyDownloader, test_utils::TestConsensus};
use std::sync::Arc;

#[tokio::test]
async fn streams_bodies_from_buffer() {
// Generate some random blocks
let db = create_test_db::<WriteMap>(EnvKind::RW);
let (headers, mut bodies) = generate_bodies(0..20);

insert_headers(&db, &headers);

// create an empty file
let file = tempfile::tempfile().unwrap();

let client =
Arc::new(FileClient::from_file(file.into()).unwrap().with_bodies(bodies.clone()));
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
);
downloader.set_download_range(0..20).expect("failed to set download range");

assert_matches!(
downloader.next().await,
Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
);
}
}
38 changes: 38 additions & 0 deletions crates/net/downloaders/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#![allow(unused)]
//! Test helper impls
use reth_eth_wire::BlockBody;
use reth_interfaces::test_utils::generators::random_block_range;
use reth_primitives::{SealedHeader, H256};
use std::collections::HashMap;

/// Metrics scope used for testing.
pub(crate) const TEST_SCOPE: &str = "downloaders.test";

/// Generate a set of bodies and their corresponding block hashes
pub(crate) fn generate_bodies(
rng: std::ops::Range<u64>,
) -> (Vec<SealedHeader>, HashMap<H256, BlockBody>) {
let blocks = random_block_range(rng, H256::zero(), 0..2);

let headers = blocks.iter().map(|block| block.header.clone()).collect();
let bodies = blocks
.into_iter()
.map(|block| {
(
block.hash(),
BlockBody {
transactions: block.body,
ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(),
},
)
})
.collect();

(headers, bodies)
}

mod file_client;
mod test_client;

pub use file_client::{FileClient, FileClientError};
pub use test_client::TestBodiesClient;
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
//! Test helper impls
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
priority::Priority,
},
test_utils::generators::random_block_range,
use reth_interfaces::p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
priority::Priority,
};
use reth_primitives::{PeerId, SealedHeader, H256};
use reth_primitives::{PeerId, H256};
use std::{
collections::HashMap,
fmt::Debug,
Expand All @@ -21,35 +16,9 @@ use std::{
};
use tokio::sync::Mutex;

/// Metrics scope used for testing.
pub(crate) const TEST_SCOPE: &str = "downloaders.test";

/// Generate a set of bodies and their corresponding block hashes
pub(crate) fn generate_bodies(
rng: std::ops::Range<u64>,
) -> (Vec<SealedHeader>, HashMap<H256, BlockBody>) {
let blocks = random_block_range(rng, H256::zero(), 0..2);

let headers = blocks.iter().map(|block| block.header.clone()).collect();
let bodies = blocks
.into_iter()
.map(|block| {
(
block.hash(),
BlockBody {
transactions: block.body,
ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(),
},
)
})
.collect();

(headers, bodies)
}

/// A [BodiesClient] for testing.
#[derive(Debug, Default)]
pub(crate) struct TestBodiesClient {
pub struct TestBodiesClient {
bodies: Arc<Mutex<HashMap<H256, BlockBody>>>,
should_delay: bool,
max_batch_size: Option<usize>,
Expand Down Expand Up @@ -97,7 +66,7 @@ impl BodiesClient for TestBodiesClient {
) -> Self::Output {
let should_delay = self.should_delay;
let bodies = self.bodies.clone();
let max_batch_size = self.max_batch_size.clone();
let max_batch_size = self.max_batch_size;

self.times_requested.fetch_add(1, Ordering::Relaxed);

Expand Down

0 comments on commit a588f79

Please sign in to comment.