Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download range of headers during syncing #1270

Merged
merged 50 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
fb481a0
Add requests for blocks to p2p service with test
MitchTurner Aug 1, 2023
2ce71ea
Create stream from singlee headers request rather than multiple requests
MitchTurner Aug 2, 2023
18cad5f
WIP fix tests
MitchTurner Aug 3, 2023
47b7347
Finish tests, remove unused functions
MitchTurner Aug 3, 2023
2df5a9b
Appease Clippy-sama
MitchTurner Aug 3, 2023
964e798
Get headers range test working
MitchTurner Aug 4, 2023
341e54b
Cleanup, make test more robust
MitchTurner Aug 4, 2023
7c25646
Rename tests
MitchTurner Aug 4, 2023
e677568
Fix back-pressure tests
MitchTurner Aug 4, 2023
fa39cd5
Fix new service test
MitchTurner Aug 4, 2023
3ab1ec1
Add impl for adapter
MitchTurner Aug 4, 2023
ce4c4b6
Fix data mapping
MitchTurner Aug 4, 2023
5abf18a
Appease Clippy-sama
MitchTurner Aug 4, 2023
318afda
Merge branch 'master' into download-block-range
xgreenx Aug 6, 2023
560ff4e
Remove not used variant
xgreenx Aug 6, 2023
b61b3b7
Merge remote-tracking branch 'origin/download-block-range' into downl…
xgreenx Aug 6, 2023
9ed60e4
Improve test, use range instead of start/end
MitchTurner Aug 7, 2023
a3854e8
Merge branch 'download-block-range' of github.com:FuelLabs/fuel-core …
MitchTurner Aug 7, 2023
432d49c
Improve the header filtering, rename helper, cleanup missed comments
MitchTurner Aug 7, 2023
53136b7
Appease clippy-sama
MitchTurner Aug 7, 2023
0aa691a
Merge remote-tracking branch 'origin/download-block-range' into downl…
xgreenx Aug 7, 2023
9b6d9ee
Fix off-by-one error, rename field
MitchTurner Aug 7, 2023
2523661
Merge branch 'download-block-range' of github.com:FuelLabs/fuel-core …
MitchTurner Aug 7, 2023
2733c5d
Merge remote-tracking branch 'origin/master' into download-block-range
MitchTurner Aug 10, 2023
b1e970f
Update Changelog
MitchTurner Aug 10, 2023
ee9efdd
Get headers in batches
MitchTurner Aug 11, 2023
efb7703
Cleanup commented code
MitchTurner Aug 11, 2023
66b7d7c
Merge remote-tracking branch 'origin/master' into download-block-range
MitchTurner Aug 11, 2023
0215ed1
Add new fields to sync config
MitchTurner Aug 11, 2023
1972302
Instrument
MitchTurner Aug 11, 2023
fabb2b3
Helm lint
MitchTurner Aug 11, 2023
2b75c60
Address most of the PR comments
MitchTurner Aug 14, 2023
b5df8d4
Merge branch 'master' into download-block-range
MitchTurner Aug 14, 2023
7ec0ac0
Fix compilation errors
MitchTurner Aug 14, 2023
1d4328a
Update crates/services/p2p/src/service.rs
MitchTurner Aug 15, 2023
3283587
Update crates/services/sync/src/ports.rs
MitchTurner Aug 15, 2023
d1f809a
Update crates/services/sync/src/import.rs
MitchTurner Aug 15, 2023
94fc48b
Update crates/services/sync/src/import.rs
MitchTurner Aug 15, 2023
c6a4b3d
Make PR requested changes
MitchTurner Aug 15, 2023
9c0412e
Add ignore for RustSec advisory
MitchTurner Aug 15, 2023
5d63451
Undo bump, remove unused imports
MitchTurner Aug 15, 2023
9a174b5
Add max requests check with config
MitchTurner Aug 16, 2023
98856e6
Remove sealed header (singular) path from p2p service
MitchTurner Aug 16, 2023
91d5253
Merge remote-tracking branch 'origin/master' into download-block-range
MitchTurner Aug 16, 2023
5332d01
Update crates/services/p2p/src/p2p_service.rs
MitchTurner Aug 16, 2023
f2d37b0
Manually format macro
MitchTurner Aug 16, 2023
978b488
Merge branch 'master' into download-block-range
MitchTurner Aug 17, 2023
5970d6a
Add max headers per request to deployment env vars
MitchTurner Aug 17, 2023
6132f5d
Helm lint, add other vars
MitchTurner Aug 17, 2023
46e2c17
Wrap headers in option to represent ambiguous failed request
MitchTurner Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ Description of the upcoming release here.

### Changed

- Something changed here 1
- Something changed here 2
- [#1270](https://github.com/FuelLabs/fuel-core/pull/1270): Modify the way block headers are retrieved from peers to be done in batches.

#### Breaking
- [#1262](https://github.com/FuelLabs/fuel-core/pull/1262): The `ConsensusParameters` aggregates all configuration data related to the consensus. It contains many fields that are segregated by the usage. The API of some functions was affected to use lesser types instead the whole `ConsensusParameters`. It is a huge breaking change requiring repetitively monotonically updating all places that use the `ConsensusParameters`. But during updating, consider that maybe you can use lesser types. Usage of them may simplify signatures of methods and make them more user-friendly and transparent.
Expand Down
12 changes: 8 additions & 4 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,15 @@ pub struct P2PArgs {

#[derive(Debug, Clone, Args)]
pub struct SyncArgs {
/// The maximum number of get header requests to make in a single batch.
#[clap(long = "sync-max-get-header", default_value = "10", env)]
pub max_get_header_requests: usize,
/// The maximum number of get transaction requests to make in a single batch.
#[clap(long = "sync-max-get-txns", default_value = "10", env)]
pub max_get_txns_requests: usize,
/// The maximum number of headers to request in a single batch.
#[clap(long = "sync-header-batch-size", default_value = "10", env)]
pub header_batch_size: u32,
/// The maximum number of header batch requests to have active at one time.
#[clap(long = "sync-max-header-batch-requests", default_value = "10", env)]
pub max_header_batch_requests: usize,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -206,8 +209,9 @@ impl KeypairArg {
impl From<SyncArgs> for fuel_core::sync::Config {
fn from(value: SyncArgs) -> Self {
Self {
max_get_header_requests: value.max_get_header_requests,
max_get_txns_requests: value.max_get_txns_requests,
header_batch_size: value.header_batch_size,
max_header_batch_requests: value.max_header_batch_requests,
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions crates/fuel-core/src/database/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use fuel_core_types::{
use itertools::Itertools;
use std::{
borrow::{
Borrow,
BorrowMut,
Cow,
},
Expand Down Expand Up @@ -271,9 +270,8 @@ impl Database {
.get(commit_block_height)?
.ok_or(not_found!(FuelBlockMerkleMetadata))?;

let storage = self.borrow();
let tree: MerkleTree<FuelBlockMerkleData, _> =
MerkleTree::load(storage, commit_merkle_metadata.version)
MerkleTree::load(self, commit_merkle_metadata.version)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an orthogonal change to the PR. Perhaps this can be done separately. On a personal note, I think I like having the intermediate borrow, because it demonstrates that the first argument to load is something called storage, which is more intuitive than self.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember the context I was in when I changed this. I think I was just reading and felt like this was some misdirection in the code and removed the borrow to see what would happen.

I can't argue with your point though. I do like documentation with code. So I can change it back.

.map_err(|err| StorageError::Other(err.into()))?;

let proof_index = message_merkle_metadata.version - 1;
Expand Down
15 changes: 15 additions & 0 deletions crates/fuel-core/src/database/sealed_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use fuel_core_types::{
fuel_tx::Transaction,
fuel_types::BlockHeight,
};
use std::ops::Range;

impl DatabaseColumn for SealedBlockConsensus {
fn column() -> Column {
Expand Down Expand Up @@ -93,6 +94,20 @@ impl Database {
self.get_sealed_block_header(&block_id)
}

pub fn get_sealed_block_headers(
&self,
block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
let headers = block_height_range
.map(BlockHeight::from)
.map(|height| self.get_sealed_block_header_by_height(&height))
.collect::<StorageResult<Vec<_>>>()?
.into_iter()
.flatten()
.collect();
Ok(headers)
}

pub fn get_sealed_block_header(
&self,
block_id: &BlockId,
Expand Down
8 changes: 8 additions & 0 deletions crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use fuel_core_types::{
fuel_tx::Transaction,
fuel_types::BlockHeight,
};
use std::ops::Range;

impl P2pDb for Database {
fn get_sealed_block(
Expand All @@ -31,6 +32,13 @@ impl P2pDb for Database {
self.get_sealed_block_header_by_height(height)
}

fn get_sealed_headers_range(
&self,
block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>> {
self.get_sealed_block_headers(block_height_range)
}

fn get_transactions(
&self,
block_id: &BlockId,
Expand Down
30 changes: 29 additions & 1 deletion crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ use fuel_core_types::{
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::SourcePeer,
services::p2p::{
PeerId,
SourcePeer,
},
};
use std::ops::Range;

#[async_trait::async_trait]
impl PeerToPeerPort for P2PAdapter {
Expand Down Expand Up @@ -56,6 +60,30 @@ impl PeerToPeerPort for P2PAdapter {
}
}

async fn get_sealed_block_headers(
&self,
block_range_height: Range<u32>,
) -> anyhow::Result<Vec<SourcePeer<SealedBlockHeader>>> {
if let Some(service) = &self.service {
Ok(service
.get_sealed_block_headers(block_range_height)
.await?
.map(|(peer_id, headers)| {
let peer_id: PeerId = peer_id.into();
headers
.into_iter()
.map(|header| SourcePeer {
peer_id: peer_id.clone(),
data: header,
})
.collect()
})
.unwrap_or(Vec::new()))
} else {
Ok(Vec::new())
}
}

async fn get_transactions(
&self,
block: SourcePeer<BlockId>,
Expand Down
8 changes: 8 additions & 0 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ impl RequestResponseConverter for PostcardCodec {

Ok(ResponseMessage::Transactions(response))
}
NetworkResponse::Headers(response) => {
let deserialized = self.deserialize(response)?;
Ok(ResponseMessage::SealedHeaders(deserialized))
}
}
}

Expand Down Expand Up @@ -257,6 +261,10 @@ impl RequestResponseConverter for PostcardCodec {

Ok(NetworkResponse::Transactions(response))
}
OutboundResponse::SealedHeadersRangeInclusive(headers) => {
let serialized = self.serialize(headers)?;
Ok(NetworkResponse::Headers(serialized))
}
}
}
}
Expand Down
87 changes: 76 additions & 11 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,17 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
);
}
}
(
Some(ResponseChannelItem::SealedHeaders(channel)),
Ok(ResponseMessage::SealedHeaders(headers)),
) => {
if channel.send(Some((peer, headers))).is_err() {
debug!(
"Failed to send through the channel for {:?}",
request_id
);
}
}

(Some(_), Err(e)) => {
debug!("Failed to convert IntermediateResponse into a ResponseMessage {:?} with {:?}", response, e);
Expand Down Expand Up @@ -692,7 +703,10 @@ mod tests {
Consensus,
ConsensusVote,
},
header::PartialBlockHeader,
header::{
BlockHeader,
PartialBlockHeader,
},
primitives::BlockId,
SealedBlock,
SealedBlockHeader,
Expand Down Expand Up @@ -1514,6 +1528,27 @@ mod tests {
}
}

fn arbitrary_headers() -> Vec<SealedBlockHeader> {
let mut blocks = Vec::new();
for i in 2..=5 {
let mut header: BlockHeader = Default::default();
header.consensus.height = i.into();

let sealed_block = SealedBlockHeader {
entity: header,
consensus: Consensus::PoA(PoAConsensus::new(Default::default())),
};
blocks.push(sealed_block);
}
blocks
}

// Metadata gets skipped during serialization, so this is the fuzzy way to compare blocks
fn eq_except_metadata(a: &SealedBlockHeader, b: &SealedBlockHeader) -> bool {
a.entity.application == b.entity.application
&& a.entity.consensus == b.entity.consensus
}

async fn request_response_works_with(request_msg: RequestMessage) {
let mut p2p_config = Config::default_initialized("request_response_works_with");

Expand All @@ -1524,15 +1559,15 @@ mod tests {
p2p_config.bootstrap_nodes = node_a.multiaddrs();
let mut node_b = build_service_from_config(p2p_config.clone()).await;

let (tx_test_end, mut rx_test_end) = mpsc::channel(1);
let (tx_test_end, mut rx_test_end) = mpsc::channel::<bool>(1);

let mut request_sent = false;

loop {
tokio::select! {
message_sent = rx_test_end.recv() => {
// we received a signal to end the test
assert_eq!(message_sent, Some(true), "Receuved incorrect or missing missing messsage");
assert!(message_sent.unwrap(), "Receuved incorrect or missing missing messsage");
break;
}
node_a_event = node_a.next_event() => {
Expand All @@ -1542,10 +1577,10 @@ mod tests {
if !peer_addresses.is_empty() && !request_sent {
request_sent = true;

match request_msg {
match &request_msg {
RequestMessage::Block(_) => {
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert!(node_a.send_request_msg(None, request_msg, ResponseChannelItem::Block(tx_orchestrator)).is_ok());
assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::Block(tx_orchestrator)).is_ok());
let tx_test_end = tx_test_end.clone();

tokio::spawn(async move {
Expand All @@ -1562,7 +1597,7 @@ mod tests {
}
RequestMessage::SealedHeader(_) => {
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert!(node_a.send_request_msg(None, request_msg, ResponseChannelItem::SealedHeader(tx_orchestrator)).is_ok());
assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::SealedHeader(tx_orchestrator)).is_ok());
let tx_test_end = tx_test_end.clone();

tokio::spawn(async move {
Expand All @@ -1576,9 +1611,28 @@ mod tests {
}
});
}
RequestMessage::SealedHeaders(_) => {
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::SealedHeaders(tx_orchestrator)).is_ok());
let tx_test_end = tx_test_end.clone();

tokio::spawn(async move {
let response_message = rx_orchestrator.await;

let expected = arbitrary_headers();

if let Ok(Some((_, sealed_headers))) = response_message {
let check = expected.iter().zip(sealed_headers.iter()).all(|(a, b)| eq_except_metadata(a, b));
let _ = tx_test_end.send(check).await;
} else {
tracing::error!("Orchestrator failed to receive a message: {:?}", response_message);
let _ = tx_test_end.send(false).await;
}
});
}
RequestMessage::Transactions(_) => {
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert!(node_a.send_request_msg(None, request_msg, ResponseChannelItem::Transactions(tx_orchestrator)).is_ok());
assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::Transactions(tx_orchestrator)).is_ok());
let tx_test_end = tx_test_end.clone();

tokio::spawn(async move {
Expand All @@ -1601,7 +1655,7 @@ mod tests {
},
node_b_event = node_b.next_event() => {
// 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator
if let Some(FuelP2PEvent::RequestMessage{ request_id, request_message: received_request_message }) = node_b_event {
if let Some(FuelP2PEvent::RequestMessage{ request_id, request_message: received_request_message }) = &node_b_event {
match received_request_message {
RequestMessage::Block(_) => {
let block = Block::new(PartialBlockHeader::default(), (0..5).map(|_| Transaction::default_test_tx()).collect(), &[]);
Expand All @@ -1611,7 +1665,7 @@ mod tests {
consensus: Consensus::PoA(PoAConsensus::new(Default::default())),
};

let _ = node_b.send_response_msg(request_id, OutboundResponse::Block(Some(Arc::new(sealed_block))));
let _ = node_b.send_response_msg(*request_id, OutboundResponse::Block(Some(Arc::new(sealed_block))));
}
RequestMessage::SealedHeader(_) => {
let header = Default::default();
Expand All @@ -1621,11 +1675,16 @@ mod tests {
consensus: Consensus::PoA(PoAConsensus::new(Default::default())),
};

let _ = node_b.send_response_msg(request_id, OutboundResponse::SealedHeader(Some(Arc::new(sealed_header))));
let _ = node_b.send_response_msg(*request_id, OutboundResponse::SealedHeader(Some(Arc::new(sealed_header))));
}
RequestMessage::SealedHeaders(_) => {
let sealed_headers: Vec<_> = arbitrary_headers();

let _ = node_b.send_response_msg(*request_id, OutboundResponse::SealedHeadersRangeInclusive(sealed_headers));
}
RequestMessage::Transactions(_) => {
let transactions = (0..5).map(|_| Transaction::default_test_tx()).collect();
let _ = node_b.send_response_msg(request_id, OutboundResponse::Transactions(Some(Arc::new(transactions))));
let _ = node_b.send_response_msg(*request_id, OutboundResponse::Transactions(Some(Arc::new(transactions))));
}
}

Expand All @@ -1650,6 +1709,12 @@ mod tests {
request_response_works_with(RequestMessage::Block(0.into())).await
}

#[tokio::test]
#[instrument]
async fn request_response_works_with_sealed_headers_range_inclusive() {
request_response_works_with(RequestMessage::SealedHeaders(0..0)).await
}

#[tokio::test]
#[instrument]
async fn request_response_works_with_sealed_header() {
Expand Down
6 changes: 6 additions & 0 deletions crates/services/p2p/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fuel_core_types::{
fuel_tx::Transaction,
fuel_types::BlockHeight,
};
use std::ops::Range;

pub trait P2pDb: Send + Sync {
fn get_sealed_block(
Expand All @@ -21,6 +22,11 @@ pub trait P2pDb: Send + Sync {
height: &BlockHeight,
) -> StorageResult<Option<SealedBlockHeader>>;

fn get_sealed_headers_range(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider simply calling this get_sealed_headers? I think _range is not idiomatic in the context of the rest of our interfaces, where we may already accept a range without putting that in the function name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Totally. I'll fix it.

&self,
block_height_range: Range<u32>,
) -> StorageResult<Vec<SealedBlockHeader>>;

fn get_transactions(
&self,
block_id: &BlockId,
Expand Down
Loading