Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return events from blocks skipped over during Finalization, too #473

Merged
merged 15 commits into from
Mar 10, 2022
Merged
4 changes: 2 additions & 2 deletions codegen/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,11 @@ impl RuntimeGenerator {
::subxt::events::at::<T, Event>(self.client, block_hash).await
}

pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> {
pub async fn subscribe(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub<T::Header>, T, Event>, ::subxt::BasicError> {
::subxt::events::subscribe::<T, Event>(self.client).await
}

pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError> {
pub async fn subscribe_finalized(&self) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError> {
::subxt::events::subscribe_finalized::<T, Event>(self.client).await
}
}
Expand Down
5 changes: 4 additions & 1 deletion subxt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ pub trait Config: 'static {
+ Default
+ Copy
+ core::hash::Hash
+ core::str::FromStr;
+ core::str::FromStr
+ num_traits::One
+ core::ops::Add<Output = Self::BlockNumber>
+ MaybeSerializeDeserialize;

/// The output of the `Hashing` function.
type Hash: Parameter
Expand Down
159 changes: 138 additions & 21 deletions subxt/src/events/event_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ use crate::{
use codec::Decode;
use derivative::Derivative;
use futures::{
future::Either,
stream::{
self,
BoxStream,
},
Future,
FutureExt,
Stream,
StreamExt,
};
use jsonrpsee::core::client::Subscription;
use num_traits::One;
use sp_runtime::traits::Header;
use std::{
marker::Unpin,
task::Poll,
Expand All @@ -54,9 +61,9 @@ pub use super::{
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe()` over calling this directly.
#[doc(hidden)]
pub async fn subscribe<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
pub async fn subscribe<'a, T: Config, Evs: Decode + 'static>(
client: &'a Client<T>,
) -> Result<EventSubscription<'a, EventSub<T::Header>, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
}
Expand All @@ -67,20 +74,108 @@ pub async fn subscribe<T: Config, Evs: Decode + 'static>(
/// and is exposed only to be called via the codegen. Thus, prefer to use
/// `api.events().subscribe_finalized()` over calling this directly.
#[doc(hidden)]
pub async fn subscribe_finalized<T: Config, Evs: Decode + 'static>(
client: &'_ Client<T>,
) -> Result<EventSubscription<'_, T, Evs>, BasicError> {
let block_subscription = client.rpc().subscribe_finalized_blocks().await?;
Ok(EventSubscription::new(client, block_subscription))
pub async fn subscribe_finalized<'a, T: Config, Evs: Decode + 'static>(
client: &'a Client<T>,
) -> Result<EventSubscription<'a, FinalizedEventSub<'a, T::Header>, T, Evs>, BasicError> {
// fetch the last finalised block details immediately, so that we'll get
// events from this block onwards.
let last_finalized_block_hash = client.rpc().finalized_head().await?;

let mut last_finalized_block_number = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| *h.number());

// The complexity here is because the finalized block subscription may skip over
// some blocks, so here we attempt to ensure that we pay attention to every block
// that was skipped over as well as the latest block we were told about.
let block_subscription =
client
.rpc()
.subscribe_finalized_blocks()
.await?
.flat_map(move |s| {
// Get the header, or return a stream containing just the error. Our EventSubscription
// stream will return `None` as soon as it hits an error like this.
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};

// This is one after the last block we returned details for last time.
let start_block_num = last_finalized_block_number
.map(|n| n + One::one())
.unwrap_or(*header.number());

// We want all previous details up to, but not including this current block num.
let end_block_number = *header.number();

// Iterate over all of the previous blocks we need headers for, ignoring the current block
// (which we already have the header info for):
let previous_headers =
get_block_headers(client, start_block_num, end_block_number);

// On the next iteration, we'll get details starting just after this end block.
last_finalized_block_number = Some(end_block_number);

// Return a combination of any previous headers plus the new header.
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
});

Ok(EventSubscription::new(client, Box::pin(block_subscription)))
}

/// Return a Stream of all block headers starting from `current_block_num` and ending just before `end_num`.
fn get_block_headers<'a, T: Config>(
client: &'a Client<T>,
mut current_block_num: T::BlockNumber,
end_num: T::BlockNumber,
) -> impl Stream<Item = Result<T::Header, BasicError>> + Unpin + Send + 'a {
// Iterate over all of the previous blocks we need headers for. We go from (start_num..end_num).
// If start_num == end_num, return nothing.
let block_numbers = std::iter::from_fn(move || {
let res = if current_block_num == end_num {
None
} else {
Some(current_block_num)
};

current_block_num = current_block_num + One::one();
res
});

// Produce a stream of all of the previous headers that finalization skipped over.
let block_headers = stream::iter(block_numbers)
.then(move |n| {
async move {
let hash = client.rpc().block_hash_internal(n).await?;
let header = client.rpc().header(hash).await?;
Ok::<_, BasicError>(header)
}
})
.filter_map(|h| async { h.transpose() });

Box::pin(block_headers)
}

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe_finalized`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type FinalizedEventSub<'a, Header> = BoxStream<'a, Result<Header, BasicError>>;

/// A `jsonrpsee` Subscription. This forms a part of the `EventSubscription` type handed back
/// in codegen from `subscribe`, and is exposed to be used in codegen.
#[doc(hidden)]
pub type EventSub<Item> = Subscription<Item>;

/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct EventSubscription<'a, T: Config, Evs: 'static> {
#[derivative(Debug(bound = "Sub: std::fmt::Debug"))]
pub struct EventSubscription<'a, Sub, T: Config, Evs: 'static> {
finished: bool,
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
block_header_subscription: Sub,
#[derivative(Debug = "ignore")]
at: Option<
std::pin::Pin<
Expand All @@ -90,11 +185,12 @@ pub struct EventSubscription<'a, T: Config, Evs: 'static> {
_event_type: std::marker::PhantomData<Evs>,
}

impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
fn new(
client: &'a Client<T>,
block_header_subscription: Subscription<T::Header>,
) -> Self {
impl<'a, Sub, T: Config, Evs: Decode, E: Into<BasicError>>
EventSubscription<'a, Sub, T, Evs>
where
Sub: Stream<Item = Result<T::Header, E>> + Unpin + 'a,
{
fn new(client: &'a Client<T>, block_header_subscription: Sub) -> Self {
EventSubscription {
finished: false,
client,
Expand All @@ -111,7 +207,10 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> {
}
}

impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
impl<'a, T: Config, Sub: Unpin, Evs: Decode> Unpin
for EventSubscription<'a, Sub, T, Evs>
{
}

// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose
// way to roughly implement the following function:
Expand All @@ -130,7 +229,13 @@ impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {}
//
// The advantage of this manual implementation is that we have a named type that we (and others)
// can derive things on, store away, alias etc.
impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
impl<'a, Sub, T, Evs, E> Stream for EventSubscription<'a, Sub, T, Evs>
where
T: Config,
Evs: Decode,
Sub: Stream<Item = Result<T::Header, E>> + Unpin + 'a,
E: Into<BasicError>,
{
type Item = Result<Events<'a, T, Evs>, BasicError>;

fn poll_next(
Expand All @@ -155,7 +260,6 @@ impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
return Poll::Ready(Some(Err(e.into())))
}
Some(Ok(block_header)) => {
use sp_runtime::traits::Header;
// Note [jsdw]: We may be able to get rid of the per-item allocation
// with https://github.com/oblique/reusable-box-future.
self.at = Some(Box::pin(at(self.client, block_header.hash())));
Expand All @@ -181,9 +285,22 @@ mod test {
use super::*;

// Ensure `EventSubscription` can be sent; only actually a compile-time check.
#[test]
#[allow(unused)]
fn check_sendability() {
fn assert_send<T: Send>() {}
assert_send::<EventSubscription<crate::DefaultConfig, ()>>();
assert_send::<
EventSubscription<
EventSub<<crate::DefaultConfig as Config>::Header>,
crate::DefaultConfig,
(),
>,
>();
assert_send::<
EventSubscription<
FinalizedEventSub<<crate::DefaultConfig as Config>::Header>,
crate::DefaultConfig,
(),
>,
>();
}
}
2 changes: 2 additions & 0 deletions subxt/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub use decoding::EventsDecodingError;
pub use event_subscription::{
subscribe,
subscribe_finalized,
EventSub,
EventSubscription,
FinalizedEventSub,
};
pub use events_type::{
at,
Expand Down
17 changes: 17 additions & 0 deletions subxt/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,23 @@ impl<T: Config> Rpc<T> {
}
}

/// Get a block hash given a T::BlockNumber from our config.
/// This may not be useful externally, but we need to be able to
/// get detail for T::BlockNumbers.
#[doc(hidden)]
pub async fn block_hash_internal(
&self,
block_number: T::BlockNumber,
) -> Result<Option<T::Hash>, BasicError> {
let block_number = ListOrValue::Value(block_number);
let params = rpc_params![block_number];
let list_or_value = self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(hash) => Ok(hash),
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of duplicating this block_hash function. The problem is that Config::BlockNumber isn't something that we can easily work with.

I could also require that Config::BlockNumberimplements something like Into<u128>, or I could remove the "other" copy of this function, but it sounds like some consideration went into it, so I was hesitant. What do you think @ascjones?

Copy link
Collaborator Author

@jsdw jsdw Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nice thing abour requiring Into<u128> or similar would be that I can get rid of num_traits again, and the other required traits I added to BlockNumber, which I was just using to add a minimal amount of information about the BlockNumber so that I could iterate over them.

Is it reasonable to assume that nobody would use something more esoteric than a rust unsigned int to represent block numbers, and so Into<u128> shouldn't cause any issues?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is reasonable to use Into<u128>

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went for Into<u64> in the end because u128's don't serialize to JSON so easily; I'm hoping u64 is also a safe target here (I assume it's basically a choice between u32 and u64 that people would tend to make?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Into<u64> should also be fine.


/// Get a block hash of the latest finalized block
pub async fn finalized_head(&self) -> Result<T::Hash, BasicError> {
let hash = self
Expand Down
4 changes: 2 additions & 2 deletions subxt/tests/integration/codegen/polkadot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27849,13 +27849,13 @@ pub mod api {
}
pub async fn subscribe(
&self,
) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError>
) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::EventSub<T::Header>, T, Event>, ::subxt::BasicError>
{
::subxt::events::subscribe::<T, Event>(self.client).await
}
pub async fn subscribe_finalized(
&self,
) -> Result<::subxt::events::EventSubscription<'a, T, Event>, ::subxt::BasicError>
) -> Result<::subxt::events::EventSubscription<'a, ::subxt::events::FinalizedEventSub<'a, T::Header>, T, Event>, ::subxt::BasicError>
{
::subxt::events::subscribe_finalized::<T, Event>(self.client).await
}
Expand Down