Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement BlocksClient for working with blocks #671

Merged
merged 23 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
738cd82
rpc: Fill in any missing finalized blocks
lexnv Sep 29, 2022
453ca83
tests: Move fill blocks test to RPC location
lexnv Sep 29, 2022
8e919f7
events: Remove the fill in strategy
lexnv Sep 29, 2022
4985682
Merge remote-tracking branch 'origin/master' into lexnv/fill_blocks
lexnv Sep 29, 2022
65a2960
blocks: Introduce blocks client
lexnv Sep 30, 2022
bc8ec5c
client: Enable the block API
lexnv Sep 30, 2022
e1892c7
blocks: Simplify `subscribe_finalized_headers` method
lexnv Sep 30, 2022
be1a20c
tests: Add tests for `subscribe_finalized_headers`
lexnv Sep 30, 2022
2250701
blocks: Implement `subscribe_headers`
lexnv Sep 30, 2022
a86237a
tests: Add tests for `subscribe_headers`
lexnv Sep 30, 2022
15b0478
tests: Move `missing_block_headers_will_be_filled_in` to blocks
lexnv Sep 30, 2022
af98f7f
events: Use the new subscribe to blocks
lexnv Sep 30, 2022
cea372e
Merge remote-tracking branch 'origin/master' into lexnv/fill_blocks
lexnv Oct 4, 2022
64dfd79
blocks: Change API to return future similar to events
lexnv Oct 4, 2022
2b5066b
events: Use blocks API for subscribing to blocks
lexnv Oct 4, 2022
c069473
Update subxt/src/blocks/blocks_client.rs
lexnv Oct 4, 2022
1a70b4a
blocks: Simplify docs for `subscribe_finalized_headers`
lexnv Oct 4, 2022
dc9a435
blocks: Use `PhantomDataSendSync` to avoid other bounds on `T: Config`
lexnv Oct 4, 2022
bedc6a4
blocks: Add docs for best blocks
lexnv Oct 4, 2022
75bc024
blocks: Avoid one clone for the `client.rpc()`
lexnv Oct 4, 2022
bf5863f
Update testing/integration-tests/src/blocks/mod.rs
lexnv Oct 4, 2022
ae7f5ff
blocks: Improve `subscribe_headers` doc
lexnv Oct 4, 2022
afcf759
Merge remote-tracking branch 'origin/lexnv/fill_blocks' into lexnv/fi…
lexnv Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 3 additions & 59 deletions subxt/src/events/events_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@ use crate::{
Events,
FinalizedEventSub,
},
rpc::Rpc,
Config,
};
use derivative::Derivative;
use futures::{
future::Either,
stream,
Stream,
StreamExt,
};
use sp_core::{
storage::StorageKey,
twox_128,
Expand Down Expand Up @@ -181,66 +176,15 @@ where
let sub = client.rpc().subscribe_finalized_blocks().await?;

// Fill in any gaps between the block above and the finalized blocks reported.
let block_subscription = subscribe_to_block_headers_filling_in_gaps(
client.clone(),
let block_subscription = Rpc::subscribe_to_block_headers_filling_in_gaps(
client.rpc().clone(),
last_finalized_block_number,
sub,
);

Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}

/// Note: This is exposed for testing but is not considered stable and may change
/// without notice in a patch release.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<T, Client, S, E>(
client: Client,
mut last_block_num: Option<u64>,
sub: S,
) -> impl Stream<Item = Result<T::Header, Error>> + Send
where
T: Config,
Client: OnlineClientT<T> + Send + Sync,
S: Stream<Item = Result<T::Header, E>> + Send,
E: Into<Error> + Send + 'static,
{
sub.flat_map(move |s| {
let client = client.clone();

// Get the header, or return a stream containing just the error. Our EventSubscription
// stream will return `None` as soon as it hits an error like this.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};

// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();

// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);

// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
let client = client.clone();
async move {
let hash = client.rpc().block_hash(Some(n.into())).await?;
let header = client.rpc().header(hash).await?;
Ok::<_, Error>(header)
}
})
.filter_map(|h| async { h.transpose() });

// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);

// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}

// The storage key needed to access events.
fn system_events_key() -> StorageKey {
let mut storage_key = twox_128(b"System").to_vec();
Expand Down
6 changes: 1 addition & 5 deletions subxt/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ pub use event_subscription::{
EventSubscription,
FinalizedEventSub,
};
pub use events_client::{
// Exposed only for testing:
subscribe_to_block_headers_filling_in_gaps,
EventsClient,
};
pub use events_client::EventsClient;
pub use events_type::{
EventDetails,
Events,
Expand Down
98 changes: 98 additions & 0 deletions subxt/src/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ use codec::{
Encode,
};
use frame_metadata::RuntimeMetadataPrefixed;
use futures::{
future::Either,
stream,
Stream,
StreamExt,
};
use serde::{
Deserialize,
Serialize,
Expand All @@ -74,6 +80,7 @@ use sp_runtime::{
Block,
SignedBlock,
},
traits::Header,
ApplyExtrinsicResult,
};
use std::collections::HashMap;
Expand Down Expand Up @@ -569,6 +576,97 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}

/// Subscribe to finalized blocks and ensure that all missing blocks are filled.
///
/// The Substrate's RPC does not guarantee that all finalized blocks are provided.
/// This function ensures that all missed blocks are fetched and returned in order.
///
/// # Example
///
/// The following is a possible scenario where Substrate does not generate an event
/// for the second block.
///
/// ```sh
/// substrate | Block A | --- | Block B | --- | Block C |
/// client | Block A | --- --- | Block C |
/// ```
///
/// This function mitigates this by filling the gaps of missing blocks.
///
/// ```sh
/// substrate | Block A | --- | Block B | --- | Block C |
/// client | Block A | --- | Block B | --- | Block C |
/// ^^^ filled
/// ```
pub async fn subscribe_finalized_blocks_filled(
&self,
) -> Result<impl Stream<Item = Result<T::Header, Error>> + Send + '_, Error> {
// Fetch the last finalised block details immediately, so that we'll get
// all blocks after this one.
let last_finalized_block_hash = self.finalized_head().await?;
let last_finalized_block_num = self
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());

let sub = self.subscribe_finalized_blocks().await?;

// Adjust the subscription stream to fill in any missing blocks.
Ok(Rpc::subscribe_to_block_headers_filling_in_gaps(
self.clone(),
last_finalized_block_num,
sub,
))
}

/// Note: This is exposed for testing but is not considered stable and may change
/// without notice in a patch release.
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<S, E>(
client: Rpc<T>,
mut last_block_num: Option<u64>,
sub: S,
) -> impl Stream<Item = Result<T::Header, Error>> + Send
where
S: Stream<Item = Result<T::Header, E>> + Send,
E: Into<Error> + Send + 'static,
{
sub.flat_map(move |s| {
let rpc = client.clone();

// Get the header, or return a stream containing just the error.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};

// We want all previous details up to, but not including this current block num.
let end_block_num = (*header.number()).into();

// This is one after the last block we returned details for last time.
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);

// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
let rpc = rpc.clone();
async move {
let hash = rpc.block_hash(Some(n.into())).await?;
let header = rpc.header(hash).await?;
Ok::<_, Error>(header)
}
})
.filter_map(|h| async { h.transpose() });

// On the next iteration, we'll get details starting just after this end block.
last_block_num = Some(end_block_num);

// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}

/// Subscribe to runtime version updates that produce changes in the metadata.
pub async fn subscribe_runtime_version(
&self,
Expand Down
51 changes: 50 additions & 1 deletion testing/integration-tests/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ use crate::{
wait_for_blocks,
},
};
use futures::StreamExt;
use sp_core::{
sr25519::Pair as Sr25519Pair,
storage::well_known_keys,
Pair,
};
use sp_keyring::AccountKeyring;
use subxt::error::DispatchError;
use subxt::{
error::DispatchError,
rpc::Rpc,
};

#[tokio::test]
async fn insert_key() {
Expand Down Expand Up @@ -91,6 +95,51 @@ async fn chain_subscribe_finalized_blocks() {
blocks.next().await.unwrap().unwrap();
}

#[tokio::test]
async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();

// Manually subscribe to the next 6 finalized block headers, but deliberately
// filter out some in the middle so we get back b _ _ b _ b. This guarantees
// that there will be some gaps, even if there aren't any from the subscription.
let some_finalized_blocks = api
.rpc()
.subscribe_finalized_blocks()
.await?
.enumerate()
.take(6)
.filter(|(n, _)| {
let n = *n;
async move { n == 0 || n == 3 || n == 5 }
})
.map(|(_, h)| h);

// This should spot any gaps in the middle and fill them back in.
let all_finalized_blocks = Rpc::subscribe_to_block_headers_filling_in_gaps(
ctx.client().rpc().clone(),
None,
some_finalized_blocks,
);
futures::pin_mut!(all_finalized_blocks);

// Iterate the block headers, making sure we get them all in order.
let mut last_block_number = None;
while let Some(header) = all_finalized_blocks.next().await {
let header = header?;

use sp_runtime::traits::Header;
let block_number: u128 = (*header.number()).into();

if let Some(last) = last_block_number {
assert_eq!(last + 1, block_number);
}
last_block_number = Some(block_number);
}

Ok(())
}

#[tokio::test]
async fn fetch_keys() {
let ctx = test_context().await;
Expand Down
51 changes: 0 additions & 51 deletions testing/integration-tests/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,57 +169,6 @@ async fn balance_transfer_subscription() -> Result<(), subxt::Error> {
Ok(())
}

#[tokio::test]
async fn missing_block_headers_will_be_filled_in() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();

// This function is not publically available to use, but contains
// the key logic for filling in missing blocks, so we want to test it.
// This is used in `subscribe_finalized` to ensure no block headers are
// missed.
use subxt::events::subscribe_to_block_headers_filling_in_gaps;

// Manually subscribe to the next 6 finalized block headers, but deliberately
// filter out some in the middle so we get back b _ _ b _ b. This guarantees
// that there will be some gaps, even if there aren't any from the subscription.
let some_finalized_blocks = api
.rpc()
.subscribe_finalized_blocks()
.await?
.enumerate()
.take(6)
.filter(|(n, _)| {
let n = *n;
async move { n == 0 || n == 3 || n == 5 }
})
.map(|(_, h)| h);

// This should spot any gaps in the middle and fill them back in.
let all_finalized_blocks = subscribe_to_block_headers_filling_in_gaps(
ctx.client(),
None,
some_finalized_blocks,
);
futures::pin_mut!(all_finalized_blocks);

// Iterate the block headers, making sure we get them all in order.
let mut last_block_number = None;
while let Some(header) = all_finalized_blocks.next().await {
let header = header?;

use sp_runtime::traits::Header;
let block_number: u128 = (*header.number()).into();

if let Some(last) = last_block_number {
assert_eq!(last + 1, block_number);
}
last_block_number = Some(block_number);
}

Ok(())
}

// This is just a compile-time check that we can subscribe to events in
// a context that requires the event subscription/filtering to be Send-able.
// We test a typical use of EventSubscription and FilterEvents. We don't need
Expand Down