diff --git a/examples/examples/submit_and_watch.rs b/examples/examples/submit_and_watch.rs index c1d72ca2fe..6ca86a57ad 100644 --- a/examples/examples/submit_and_watch.rs +++ b/examples/examples/submit_and_watch.rs @@ -67,7 +67,7 @@ async fn simple_transfer() -> Result<(), Box> { .await?; let transfer_event = - balance_transfer.find_first_event::()?; + balance_transfer.find_first::()?; if let Some(event) = transfer_event { println!("Balance transfer success: {event:?}"); @@ -108,7 +108,7 @@ async fn simple_transfer_separate_events() -> Result<(), Box()?; + events.find_first::()?; if let Some(_ev) = failed_event { // We found a failed event; the transfer didn't succeed. @@ -117,7 +117,7 @@ async fn simple_transfer_separate_events() -> Result<(), Box()?; + events.find_first::()?; if let Some(event) = transfer_event { println!("Balance transfer success: {event:?}"); } else { @@ -161,7 +161,7 @@ async fn handle_transfer_events() -> Result<(), Box> { let events = details.wait_for_success().await?; let transfer_event = - events.find_first_event::()?; + events.find_first::()?; if let Some(event) = transfer_event { println!( @@ -181,7 +181,7 @@ async fn handle_transfer_events() -> Result<(), Box> { let events = details.wait_for_success().await?; let transfer_event = - events.find_first_event::()?; + events.find_first::()?; if let Some(event) = transfer_event { println!("Balance transfer success: {event:?}"); diff --git a/examples/examples/subscribe_all_events.rs b/examples/examples/subscribe_all_events.rs index 92b5cae1b1..6c23cd95aa 100644 --- a/examples/examples/subscribe_all_events.rs +++ b/examples/examples/subscribe_all_events.rs @@ -103,7 +103,7 @@ async fn main() -> Result<(), Box> { // Or we can dynamically find the first transfer event, ignoring any others: let transfer_event = - events.find_first_event::()?; + events.find_first::()?; if let Some(ev) = transfer_event { println!(" - Balance transfer success: value: {:?}", ev.amount); diff --git a/examples/examples/subscribe_one_event.rs b/examples/examples/subscribe_one_event.rs index e58a92dba0..b6e6c65519 100644 --- a/examples/examples/subscribe_one_event.rs +++ b/examples/examples/subscribe_one_event.rs @@ -22,11 +22,7 @@ //! polkadot --dev --tmp //! ``` -use futures::{ - future, - stream, - StreamExt, -}; +use futures::StreamExt; use sp_keyring::AccountKeyring; use std::time::Duration; use subxt::{ @@ -51,21 +47,13 @@ async fn main() -> Result<(), Box> { .await? .to_runtime_api::>>(); - // Subscribe to just balance transfer events, making use of `flat_map` and - // `filter_map` from the StreamExt trait to filter everything else out. + // Subscribe to just balance transfer events, making use of `filter_events` + // to select a single event type (note the 1-tuple) to filter out and return. let mut transfer_events = api .events() .subscribe() .await? - // Ignore errors returning events: - .filter_map(|events| future::ready(events.ok())) - // Map events to just the one we care about: - .flat_map(|events| { - let transfer_events = events - .find::() - .collect::>(); - stream::iter(transfer_events) - }); + .filter_events::<(polkadot::balances::events::Transfer,)>(); // While this subscription is active, we imagine some balance transfers are made somewhere else: async_std::task::spawn(async { diff --git a/examples/examples/subscribe_some_events.rs b/examples/examples/subscribe_some_events.rs new file mode 100644 index 0000000000..f57a67f01f --- /dev/null +++ b/examples/examples/subscribe_some_events.rs @@ -0,0 +1,100 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! To run this example, a local polkadot node should be running. Example verified against polkadot 0.9.13-82616422d0-aarch64-macos. +//! +//! E.g. +//! ```bash +//! curl "https://github.com/paritytech/polkadot/releases/download/v0.9.13/polkadot" --output /usr/local/bin/polkadot --location +//! polkadot --dev --tmp +//! ``` + +use futures::StreamExt; +use sp_keyring::AccountKeyring; +use std::time::Duration; +use subxt::{ + ClientBuilder, + DefaultConfig, + DefaultExtra, + PairSigner, +}; + +#[subxt::subxt(runtime_metadata_path = "examples/polkadot_metadata.scale")] +pub mod polkadot {} + +/// Subscribe to all events, and then manually look through them and +/// pluck out the events that we care about. +#[async_std::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + // Subscribe to any events that occur: + let api = ClientBuilder::new() + .build() + .await? + .to_runtime_api::>>(); + + // Subscribe to several balance related events. If we ask for more than one event, + // we'll be given a correpsonding tuple of `Option`'s, with exactly one + // variant populated each time. + let mut balance_events = api.events().subscribe().await?.filter_events::<( + polkadot::balances::events::Withdraw, + polkadot::balances::events::Transfer, + polkadot::balances::events::Deposit, + )>(); + + // While this subscription is active, we imagine some balance transfers are made somewhere else: + async_std::task::spawn(async { + let signer = PairSigner::new(AccountKeyring::Alice.pair()); + let api = ClientBuilder::new() + .build() + .await + .unwrap() + .to_runtime_api::>>(); + + // Make small balance transfers from Alice to Bob in a loop: + loop { + api.tx() + .balances() + .transfer(AccountKeyring::Bob.to_account_id().into(), 1_000_000_000) + .sign_and_submit(&signer) + .await + .unwrap(); + async_std::task::sleep(Duration::from_secs(10)).await; + } + }); + + // Our subscription will see all of the balance events we're filtering on: + while let Some(ev) = balance_events.next().await { + let event_details = ev?; + + let block_hash = event_details.block_hash; + let event = event_details.event; + println!("Event at {:?}:", block_hash); + + if let (Some(withdraw), _, _) = &event { + println!(" Withdraw event: {withdraw:?}"); + } + if let (_, Some(transfer), _) = &event { + println!(" Transfer event: {transfer:?}"); + } + if let (_, _, Some(deposit)) = &event { + println!(" Deposit event: {deposit:?}"); + } + } + + Ok(()) +} diff --git a/subxt/src/events/decoding.rs b/subxt/src/events/decoding.rs new file mode 100644 index 0000000000..e2ea7834e8 --- /dev/null +++ b/subxt/src/events/decoding.rs @@ -0,0 +1,487 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! Dynamically decoding events. + +use crate::{ + error::BasicError, + metadata::MetadataError, +}; +use bitvec::{ + order::Lsb0, + vec::BitVec, +}; +use codec::{ + Codec, + Compact, + Decode, +}; +use scale_info::{ + PortableRegistry, + TypeDef, + TypeDefPrimitive, +}; + +/// Given a type Id and a type registry, attempt to consume the bytes +/// corresponding to that type from our input. +pub fn decode_and_consume_type( + type_id: u32, + types: &PortableRegistry, + input: &mut &[u8], +) -> Result<(), BasicError> { + let ty = types + .resolve(type_id) + .ok_or(MetadataError::TypeNotFound(type_id))?; + + fn consume_type(input: &mut &[u8]) -> Result<(), BasicError> { + T::decode(input)?; + Ok(()) + } + + match ty.type_def() { + TypeDef::Composite(composite) => { + for field in composite.fields() { + decode_and_consume_type(field.ty().id(), types, input)? + } + Ok(()) + } + TypeDef::Variant(variant) => { + let variant_index = u8::decode(input)?; + let variant = variant + .variants() + .iter() + .find(|v| v.index() == variant_index) + .ok_or_else(|| { + BasicError::Other(format!("Variant {} not found", variant_index)) + })?; + for field in variant.fields() { + decode_and_consume_type(field.ty().id(), types, input)?; + } + Ok(()) + } + TypeDef::Sequence(seq) => { + let len = >::decode(input)?; + for _ in 0..len.0 { + decode_and_consume_type(seq.type_param().id(), types, input)?; + } + Ok(()) + } + TypeDef::Array(arr) => { + for _ in 0..arr.len() { + decode_and_consume_type(arr.type_param().id(), types, input)?; + } + Ok(()) + } + TypeDef::Tuple(tuple) => { + for field in tuple.fields() { + decode_and_consume_type(field.id(), types, input)?; + } + Ok(()) + } + TypeDef::Primitive(primitive) => { + match primitive { + TypeDefPrimitive::Bool => consume_type::(input), + TypeDefPrimitive::Char => { + Err( + EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char) + .into(), + ) + } + TypeDefPrimitive::Str => consume_type::(input), + TypeDefPrimitive::U8 => consume_type::(input), + TypeDefPrimitive::U16 => consume_type::(input), + TypeDefPrimitive::U32 => consume_type::(input), + TypeDefPrimitive::U64 => consume_type::(input), + TypeDefPrimitive::U128 => consume_type::(input), + TypeDefPrimitive::U256 => { + Err( + EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::U256) + .into(), + ) + } + TypeDefPrimitive::I8 => consume_type::(input), + TypeDefPrimitive::I16 => consume_type::(input), + TypeDefPrimitive::I32 => consume_type::(input), + TypeDefPrimitive::I64 => consume_type::(input), + TypeDefPrimitive::I128 => consume_type::(input), + TypeDefPrimitive::I256 => { + Err( + EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::I256) + .into(), + ) + } + } + } + TypeDef::Compact(compact) => { + let inner = types + .resolve(compact.type_param().id()) + .ok_or(MetadataError::TypeNotFound(type_id))?; + let mut decode_compact_primitive = |primitive: &TypeDefPrimitive| { + match primitive { + TypeDefPrimitive::U8 => consume_type::>(input), + TypeDefPrimitive::U16 => consume_type::>(input), + TypeDefPrimitive::U32 => consume_type::>(input), + TypeDefPrimitive::U64 => consume_type::>(input), + TypeDefPrimitive::U128 => consume_type::>(input), + prim => { + Err(EventsDecodingError::InvalidCompactPrimitive(prim.clone()) + .into()) + } + } + }; + match inner.type_def() { + TypeDef::Primitive(primitive) => decode_compact_primitive(primitive), + TypeDef::Composite(composite) => { + match composite.fields() { + [field] => { + let field_ty = + types.resolve(field.ty().id()).ok_or_else(|| { + MetadataError::TypeNotFound(field.ty().id()) + })?; + if let TypeDef::Primitive(primitive) = field_ty.type_def() { + decode_compact_primitive(primitive) + } else { + Err(EventsDecodingError::InvalidCompactType( + "Composite type must have a single primitive field" + .into(), + ) + .into()) + } + } + _ => { + Err(EventsDecodingError::InvalidCompactType( + "Composite type must have a single field".into(), + ) + .into()) + } + } + } + _ => { + Err(EventsDecodingError::InvalidCompactType( + "Compact type must be a primitive or a composite type".into(), + ) + .into()) + } + } + } + TypeDef::BitSequence(bitseq) => { + let bit_store_def = types + .resolve(bitseq.bit_store_type().id()) + .ok_or(MetadataError::TypeNotFound(type_id))? + .type_def(); + + // We just need to consume the correct number of bytes. Roughly, we encode this + // as a Compact length, and then a slice of T of that length, where T is the + // bit store type. So, we ignore the bit order and only care that the bit store type + // used lines up in terms of the number of bytes it will take to encode/decode it. + match bit_store_def { + TypeDef::Primitive(TypeDefPrimitive::U8) => { + consume_type::>(input) + } + TypeDef::Primitive(TypeDefPrimitive::U16) => { + consume_type::>(input) + } + TypeDef::Primitive(TypeDefPrimitive::U32) => { + consume_type::>(input) + } + TypeDef::Primitive(TypeDefPrimitive::U64) => { + consume_type::>(input) + } + store => { + return Err(EventsDecodingError::InvalidBitSequenceType(format!( + "{:?}", + store + )) + .into()) + } + } + } + } +} + +/// The possible errors that we can run into attempting to decode events. +#[derive(Debug, thiserror::Error)] +pub enum EventsDecodingError { + /// Unsupported primitive type + #[error("Unsupported primitive type {0:?}")] + UnsupportedPrimitive(TypeDefPrimitive), + /// Invalid compact type, must be an unsigned int. + #[error("Invalid compact primitive {0:?}")] + InvalidCompactPrimitive(TypeDefPrimitive), + /// Invalid compact type; error details in string. + #[error("Invalid compact composite type {0}")] + InvalidCompactType(String), + /// Invalid bit sequence type; bit store type or bit order type used aren't supported. + #[error("Invalid bit sequence type; bit store type {0} is not supported")] + InvalidBitSequenceType(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::GenericError::{ + Codec, + EventsDecoding, + Other, + }; + use assert_matches::assert_matches; + use codec::Encode; + use scale_info::TypeInfo; + + type TypeId = scale_info::interner::UntrackedSymbol; + + /// Build a type registry that knows about the single type provided. + fn singleton_type_registry( + ) -> (TypeId, PortableRegistry) { + let m = scale_info::MetaType::new::(); + let mut types = scale_info::Registry::new(); + let id = types.register_type(&m); + let portable_registry: PortableRegistry = types.into(); + + (id, portable_registry) + } + + fn decode_and_consume_type_consumes_all_bytes< + T: codec::Encode + scale_info::TypeInfo + 'static, + >( + val: T, + ) { + let (type_id, registry) = singleton_type_registry::(); + let bytes = val.encode(); + let cursor = &mut &*bytes; + + decode_and_consume_type(type_id.id(), ®istry, cursor).unwrap(); + assert_eq!(cursor.len(), 0); + } + + #[test] + fn decode_bitvec() { + use bitvec::order::Msb0; + + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1], + ); + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Msb0, u8; 0, 1, 1, 0, 1, 0, 1, 0, 0], + ); + + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Lsb0, u16; 0, 1, 1, 0, 1], + ); + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Msb0, u16; 0, 1, 1, 0, 1, 0, 1, 0, 0], + ); + + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Lsb0, u32; 0, 1, 1, 0, 1], + ); + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Msb0, u32; 0, 1, 1, 0, 1, 0, 1, 0, 0], + ); + + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Lsb0, u64; 0, 1, 1, 0, 1], + ); + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Msb0, u64; 0, 1, 1, 0, 1, 0, 1, 0, 0], + ); + } + + #[test] + fn decode_primitive() { + decode_and_consume_type_consumes_all_bytes(false); + decode_and_consume_type_consumes_all_bytes(true); + + let dummy_data = vec![0u8]; + let dummy_cursor = &mut &*dummy_data; + let (id, reg) = singleton_type_registry::(); + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!( + res, + Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive( + TypeDefPrimitive::Char + ))) + ); + + decode_and_consume_type_consumes_all_bytes("str".to_string()); + + decode_and_consume_type_consumes_all_bytes(1u8); + decode_and_consume_type_consumes_all_bytes(1i8); + + decode_and_consume_type_consumes_all_bytes(1u16); + decode_and_consume_type_consumes_all_bytes(1i16); + + decode_and_consume_type_consumes_all_bytes(1u32); + decode_and_consume_type_consumes_all_bytes(1i32); + + decode_and_consume_type_consumes_all_bytes(1u64); + decode_and_consume_type_consumes_all_bytes(1i64); + + decode_and_consume_type_consumes_all_bytes(1u128); + decode_and_consume_type_consumes_all_bytes(1i128); + } + + #[test] + fn decode_tuple() { + decode_and_consume_type_consumes_all_bytes(()); + + decode_and_consume_type_consumes_all_bytes((true,)); + + decode_and_consume_type_consumes_all_bytes((true, "str")); + + // Incomplete bytes for decoding + let dummy_data = false.encode(); + let dummy_cursor = &mut &*dummy_data; + let (id, reg) = singleton_type_registry::<(bool, &'static str)>(); + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!(res, Err(Codec(_))); + + // Incomplete bytes for decoding, with invalid char type + let dummy_data = (false, "str", 0u8).encode(); + let dummy_cursor = &mut &*dummy_data; + let (id, reg) = singleton_type_registry::<(bool, &'static str, char)>(); + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!( + res, + Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive( + TypeDefPrimitive::Char + ))) + ); + // The last byte (0x0 u8) should not be consumed + assert_eq!(dummy_cursor.len(), 1); + } + + #[test] + fn decode_array_and_seq() { + decode_and_consume_type_consumes_all_bytes([0]); + decode_and_consume_type_consumes_all_bytes([1, 2, 3, 4, 5]); + decode_and_consume_type_consumes_all_bytes([0; 500]); + decode_and_consume_type_consumes_all_bytes(["str", "abc", "cde"]); + + decode_and_consume_type_consumes_all_bytes(vec![0]); + decode_and_consume_type_consumes_all_bytes(vec![1, 2, 3, 4, 5]); + decode_and_consume_type_consumes_all_bytes(vec!["str", "abc", "cde"]); + } + + #[test] + fn decode_variant() { + #[derive(Clone, Encode, TypeInfo)] + enum EnumVar { + A, + B((&'static str, u8)), + C { named: i16 }, + } + const INVALID_TYPE_ID: u32 = 1024; + + decode_and_consume_type_consumes_all_bytes(EnumVar::A); + decode_and_consume_type_consumes_all_bytes(EnumVar::B(("str", 1))); + decode_and_consume_type_consumes_all_bytes(EnumVar::C { named: 1 }); + + // Invalid variant index + let dummy_data = 3u8.encode(); + let dummy_cursor = &mut &*dummy_data; + let (id, reg) = singleton_type_registry::(); + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!(res, Err(Other(_))); + + // Valid index, incomplete data + let dummy_data = 2u8.encode(); + let dummy_cursor = &mut &*dummy_data; + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!(res, Err(Codec(_))); + + let res = decode_and_consume_type(INVALID_TYPE_ID, ®, dummy_cursor); + assert_matches!(res, Err(crate::error::GenericError::Metadata(_))); + } + + #[test] + fn decode_composite() { + #[derive(Clone, Encode, TypeInfo)] + struct Composite {} + decode_and_consume_type_consumes_all_bytes(Composite {}); + + #[derive(Clone, Encode, TypeInfo)] + struct CompositeV2 { + id: u32, + name: String, + } + decode_and_consume_type_consumes_all_bytes(CompositeV2 { + id: 10, + name: "str".to_string(), + }); + + #[derive(Clone, Encode, TypeInfo)] + struct CompositeV3 { + id: u32, + extra: T, + } + decode_and_consume_type_consumes_all_bytes(CompositeV3 { + id: 10, + extra: vec![0, 1, 2], + }); + decode_and_consume_type_consumes_all_bytes(CompositeV3 { + id: 10, + extra: bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1], + }); + decode_and_consume_type_consumes_all_bytes(CompositeV3 { + id: 10, + extra: ("str", 1), + }); + decode_and_consume_type_consumes_all_bytes(CompositeV3 { + id: 10, + extra: CompositeV2 { + id: 2, + name: "str".to_string(), + }, + }); + + #[derive(Clone, Encode, TypeInfo)] + struct CompositeV4(u32, bool); + decode_and_consume_type_consumes_all_bytes(CompositeV4(1, true)); + + #[derive(Clone, Encode, TypeInfo)] + struct CompositeV5(u32); + decode_and_consume_type_consumes_all_bytes(CompositeV5(1)); + } + + #[test] + fn decode_compact() { + #[derive(Clone, Encode, TypeInfo)] + enum Compact { + A(#[codec(compact)] u32), + } + decode_and_consume_type_consumes_all_bytes(Compact::A(1)); + + #[derive(Clone, Encode, TypeInfo)] + struct CompactV2(#[codec(compact)] u32); + decode_and_consume_type_consumes_all_bytes(CompactV2(1)); + + #[derive(Clone, Encode, TypeInfo)] + struct CompactV3 { + #[codec(compact)] + val: u32, + } + decode_and_consume_type_consumes_all_bytes(CompactV3 { val: 1 }); + + #[derive(Clone, Encode, TypeInfo)] + struct CompactV4 { + #[codec(compact)] + val: T, + } + decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 0u8 }); + decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 1u16 }); + } +} diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs new file mode 100644 index 0000000000..ee3c32e97e --- /dev/null +++ b/subxt/src/events/event_subscription.rs @@ -0,0 +1,177 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! Subscribing to events. + +use crate::{ + error::BasicError, + Client, + Config, +}; +use codec::Decode; +use derivative::Derivative; +use futures::{ + Future, + FutureExt, + Stream, + StreamExt, +}; +use jsonrpsee::core::client::Subscription; +use std::{ + marker::Unpin, + task::Poll, +}; + +pub use super::{ + at, + EventDetails, + EventFilter, + Events, + EventsDecodingError, + FilterEvents, + RawEventDetails, +}; + +/// Subscribe to events from blocks. +/// +/// **Note:** these blocks haven't necessarily been finalised yet; prefer +/// [`Events::subscribe_finalized()`] if that is important. +/// +/// **Note:** This function is hidden from the documentation +/// and is exposed only to be called via the codegen. Thus, prefer to use +/// `api.events().subscribe()` over calling this directly. +#[doc(hidden)] +pub async fn subscribe( + client: &'_ Client, +) -> Result, BasicError> { + let block_subscription = client.rpc().subscribe_blocks().await?; + Ok(EventSubscription::new(client, block_subscription)) +} + +/// Subscribe to events from finalized blocks. +/// +/// **Note:** This function is hidden from the documentation +/// and is exposed only to be called via the codegen. Thus, prefer to use +/// `api.events().subscribe_finalized()` over calling this directly. +#[doc(hidden)] +pub async fn subscribe_finalized( + client: &'_ Client, +) -> Result, BasicError> { + let block_subscription = client.rpc().subscribe_finalized_blocks().await?; + Ok(EventSubscription::new(client, block_subscription)) +} + +/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block. +#[derive(Derivative)] +#[derivative(Debug(bound = ""))] +pub struct EventSubscription<'a, T: Config, Evs: 'static> { + finished: bool, + client: &'a Client, + block_header_subscription: Subscription, + #[derivative(Debug = "ignore")] + at: Option< + std::pin::Pin< + Box, BasicError>> + 'a>, + >, + >, + _event_type: std::marker::PhantomData, +} + +impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { + fn new( + client: &'a Client, + block_header_subscription: Subscription, + ) -> Self { + EventSubscription { + finished: false, + client, + block_header_subscription, + at: None, + _event_type: std::marker::PhantomData, + } + } + + /// Return only specific events matching the tuple of 1 or more event + /// types that has been provided as the `Filter` type parameter. + pub fn filter_events(self) -> FilterEvents<'a, Self, T, Filter> { + FilterEvents::new(self) + } +} + +impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {} + +// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose +// way to roughly implement the following function: +// +// ``` +// fn subscribe_events(client: &'_ Client, block_sub: Subscription) -> impl Stream, BasicError>> + '_ { +// use futures::StreamExt; +// block_sub.then(move |block_header_res| async move { +// use sp_runtime::traits::Header; +// let block_header = block_header_res?; +// let block_hash = block_header.hash(); +// at(client, block_hash).await +// }) +// } +// ``` +// +// The advantage of this manual implementation is that we have a named type that we (and others) +// can derive things on, store away, alias etc. +impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> { + type Item = Result, BasicError>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // We are finished; return None. + if self.finished { + return Poll::Ready(None) + } + + // If there isn't an `at` function yet that's busy resolving a block hash into + // some event details, then poll the block header subscription to get one. + if self.at.is_none() { + match futures::ready!(self.block_header_subscription.poll_next_unpin(cx)) { + None => { + self.finished = true; + return Poll::Ready(None) + } + Some(Err(e)) => { + self.finished = true; + return Poll::Ready(Some(Err(e.into()))) + } + Some(Ok(block_header)) => { + use sp_runtime::traits::Header; + // Note [jsdw]: We may be able to get rid of the per-item allocation + // with https://github.com/oblique/reusable-box-future. + self.at = Some(Box::pin(at(self.client, block_header.hash()))); + // Continue, so that we poll this function future we've just created. + } + } + } + + // If we get here, there will be an `at` function stored. Unwrap it and poll it to + // completion to get our events, throwing it away as soon as it is ready. + let at_fn = self + .at + .as_mut() + .expect("'at' function should have been set above'"); + let events = futures::ready!(at_fn.poll_unpin(cx)); + self.at = None; + Poll::Ready(Some(events)) + } +} diff --git a/subxt/src/events.rs b/subxt/src/events/events_type.rs similarity index 57% rename from subxt/src/events.rs rename to subxt/src/events/events_type.rs index a650b80b4c..192d9a0276 100644 --- a/subxt/src/events.rs +++ b/subxt/src/events/events_type.rs @@ -14,50 +14,29 @@ // You should have received a copy of the GNU General Public License // along with subxt. If not, see . -//! For working with events. +//! A representation of a block of events. +use super::decoding; use crate::{ error::BasicError, - metadata::MetadataError, Client, Config, Event, Metadata, Phase, }; -use bitvec::{ - order::Lsb0, - vec::BitVec, -}; use codec::{ - Codec, Compact, Decode, Error as CodecError, Input, }; use derivative::Derivative; -use futures::{ - Future, - FutureExt, - Stream, - StreamExt, -}; -use jsonrpsee::core::client::Subscription; -use scale_info::{ - PortableRegistry, - TypeDef, - TypeDefPrimitive, -}; use sp_core::{ storage::StorageKey, twox_128, Bytes, }; -use std::{ - marker::Unpin, - task::Poll, -}; /// Obtain events at some block hash. The generic parameter is what we /// will attempt to decode each event into if using [`Events::iter()`], @@ -100,136 +79,18 @@ pub async fn at( }) } -/// Subscribe to events from blocks. -/// -/// **Note:** these blocks haven't necessarily been finalised yet; prefer -/// [`Events::subscribe_finalized()`] if that is important. -/// -/// **Note:** This function is hidden from the documentation -/// and is exposed only to be called via the codegen. Thus, prefer to use -/// `api.events().subscribe()` over calling this directly. -#[doc(hidden)] -pub async fn subscribe( - client: &'_ Client, -) -> Result, BasicError> { - let block_subscription = client.rpc().subscribe_blocks().await?; - Ok(EventSubscription::new(client, block_subscription)) -} - -/// Subscribe to events from finalized blocks. -/// -/// **Note:** This function is hidden from the documentation -/// and is exposed only to be called via the codegen. Thus, prefer to use -/// `api.events().subscribe_finalized()` over calling this directly. -#[doc(hidden)] -pub async fn subscribe_finalized( - client: &'_ Client, -) -> Result, BasicError> { - let block_subscription = client.rpc().subscribe_finalized_blocks().await?; - Ok(EventSubscription::new(client, block_subscription)) -} - -/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block. -#[derive(Derivative)] -#[derivative(Debug(bound = ""))] -pub struct EventSubscription<'a, T: Config, Evs: Decode + 'static> { - finished: bool, - client: &'a Client, - block_header_subscription: Subscription, - #[derivative(Debug = "ignore")] - at: Option< - std::pin::Pin< - Box, BasicError>> + 'a>, - >, - >, - _event_type: std::marker::PhantomData, -} - -impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { - fn new( - client: &'a Client, - block_header_subscription: Subscription, - ) -> Self { - EventSubscription { - finished: false, - client, - block_header_subscription, - at: None, - _event_type: std::marker::PhantomData, - } - } -} - -impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {} - -// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose -// way to roughly implement the following function: -// -// ``` -// fn subscribe_events(client: &'_ Client, block_sub: Subscription) -> impl Stream, BasicError>> + '_ { -// use futures::StreamExt; -// block_sub.then(move |block_header_res| async move { -// use sp_runtime::traits::Header; -// let block_header = block_header_res?; -// let block_hash = block_header.hash(); -// at(client, block_hash).await -// }) -// } -// ``` -// -// The advantage of this manual implementation is that we have a named type that we (and others) -// can derive things on, store away, alias etc. -impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> { - type Item = Result, BasicError>; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - // We are finished; return None. - if self.finished { - return Poll::Ready(None) - } - - // If there isn't an `at` function yet that's busy resolving a block hash into - // some event details, then poll the block header subscription to get one. - if self.at.is_none() { - match futures::ready!(self.block_header_subscription.poll_next_unpin(cx)) { - None => { - self.finished = true; - return Poll::Ready(None) - } - Some(Err(e)) => { - self.finished = true; - return Poll::Ready(Some(Err(e.into()))) - } - Some(Ok(block_header)) => { - use sp_runtime::traits::Header; - // Note [jsdw]: We may be able to get rid of the per-item allocation - // with https://github.com/oblique/reusable-box-future. - self.at = Some(Box::pin(at(self.client, block_header.hash()))); - // Continue, so that we poll this function future we've just created. - } - } - } - - // If we get here, there will be an `at` function stored. Unwrap it and poll it to - // completion to get our events, throwing it away as soon as it is ready. - let at_fn = self - .at - .as_mut() - .expect("'at' function should have been set above'"); - let events = futures::ready!(at_fn.poll_unpin(cx)); - self.at = None; - Poll::Ready(Some(events)) - } +// The storage key needed to access events. +fn system_events_key() -> StorageKey { + let mut storage_key = twox_128(b"System").to_vec(); + storage_key.extend(twox_128(b"Events").to_vec()); + StorageKey(storage_key) } /// A collection of events obtained from a block, bundled with the necessary /// information needed to decode and iterate over them. #[derive(Derivative)] #[derivative(Debug(bound = ""))] -pub struct Events<'a, T: Config, Evs: Decode> { +pub struct Events<'a, T: Config, Evs> { metadata: &'a Metadata, block_hash: T::Hash, // Note; raw event bytes are prefixed with a Compact containing @@ -352,6 +213,49 @@ impl<'a, T: Config, Evs: Decode> Events<'a, T, Evs> { }) } + /// Iterate over all of the events, using metadata to dynamically + /// decode them as we go, and returning the raw bytes and other associated + /// details. If an error occurs, all subsequent iterations return `None`. + /// + /// This method is safe to use even if you do not statically know about + /// all of the possible events; it splits events up using the metadata + /// obtained at runtime, which does. + /// + /// Unlike [`Events::iter_raw()`] this consumes `self`, which can be useful + /// if you need to store the iterator somewhere and avoid lifetime issues. + pub fn into_iter_raw( + self, + ) -> impl Iterator> + 'a { + let mut pos = 0; + let mut index = 0; + std::iter::from_fn(move || { + let cursor = &mut &self.event_bytes[pos..]; + let start_len = cursor.len(); + + if start_len == 0 || self.num_events == index { + None + } else { + match decode_raw_event_details::(self.metadata, index, cursor) { + Ok(raw_event) => { + // Skip over decoded bytes in next iteration: + pos += start_len - cursor.len(); + // Increment the index: + index += 1; + // Return the event details: + Some(Ok(raw_event)) + } + Err(e) => { + // By setting the position to the "end" of the event bytes, + // the cursor len will become 0 and the iterator will return `None` + // from now on: + pos = self.event_bytes.len(); + Some(Err(e)) + } + } + } + }) + } + /// Iterate through the events using metadata to dynamically decode and skip /// them, and return only those which should decode to the provided `Ev` type. /// If an error occurs, all subsequent iterations return `None`. @@ -370,7 +274,7 @@ impl<'a, T: Config, Evs: Decode> Events<'a, T, Evs> { /// /// **Note:** This method internally uses [`Events::iter_raw()`], so it is safe to /// use even if you do not statically know about all of the possible events. - pub fn find_first_event(&self) -> Result, BasicError> { + pub fn find_first(&self) -> Result, BasicError> { self.find::().next().transpose() } @@ -457,7 +361,11 @@ fn decode_raw_event_details( let type_id = arg.ty().id(); let all_bytes = *input; // consume some bytes, moving the cursor forward: - decode_and_consume_type(type_id, &metadata.runtime_metadata().types, input)?; + decoding::decode_and_consume_type( + type_id, + &metadata.runtime_metadata().types, + input, + )?; // count how many bytes were consumed based on remaining length: let consumed_len = all_bytes.len() - input.len(); // move those consumed bytes to the output vec unaltered: @@ -480,222 +388,15 @@ fn decode_raw_event_details( }) } -// The storage key needed to access events. -fn system_events_key() -> StorageKey { - let mut storage_key = twox_128(b"System").to_vec(); - storage_key.extend(twox_128(b"Events").to_vec()); - StorageKey(storage_key) -} - -// Given a type Id and a type registry, attempt to consume the bytes -// corresponding to that type from our input. -fn decode_and_consume_type( - type_id: u32, - types: &PortableRegistry, - input: &mut &[u8], -) -> Result<(), BasicError> { - let ty = types - .resolve(type_id) - .ok_or(MetadataError::TypeNotFound(type_id))?; - - fn consume_type(input: &mut &[u8]) -> Result<(), BasicError> { - T::decode(input)?; - Ok(()) - } - - match ty.type_def() { - TypeDef::Composite(composite) => { - for field in composite.fields() { - decode_and_consume_type(field.ty().id(), types, input)? - } - Ok(()) - } - TypeDef::Variant(variant) => { - let variant_index = u8::decode(input)?; - let variant = variant - .variants() - .iter() - .find(|v| v.index() == variant_index) - .ok_or_else(|| { - BasicError::Other(format!("Variant {} not found", variant_index)) - })?; - for field in variant.fields() { - decode_and_consume_type(field.ty().id(), types, input)?; - } - Ok(()) - } - TypeDef::Sequence(seq) => { - let len = >::decode(input)?; - for _ in 0..len.0 { - decode_and_consume_type(seq.type_param().id(), types, input)?; - } - Ok(()) - } - TypeDef::Array(arr) => { - for _ in 0..arr.len() { - decode_and_consume_type(arr.type_param().id(), types, input)?; - } - Ok(()) - } - TypeDef::Tuple(tuple) => { - for field in tuple.fields() { - decode_and_consume_type(field.id(), types, input)?; - } - Ok(()) - } - TypeDef::Primitive(primitive) => { - match primitive { - TypeDefPrimitive::Bool => consume_type::(input), - TypeDefPrimitive::Char => { - Err( - EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char) - .into(), - ) - } - TypeDefPrimitive::Str => consume_type::(input), - TypeDefPrimitive::U8 => consume_type::(input), - TypeDefPrimitive::U16 => consume_type::(input), - TypeDefPrimitive::U32 => consume_type::(input), - TypeDefPrimitive::U64 => consume_type::(input), - TypeDefPrimitive::U128 => consume_type::(input), - TypeDefPrimitive::U256 => { - Err( - EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::U256) - .into(), - ) - } - TypeDefPrimitive::I8 => consume_type::(input), - TypeDefPrimitive::I16 => consume_type::(input), - TypeDefPrimitive::I32 => consume_type::(input), - TypeDefPrimitive::I64 => consume_type::(input), - TypeDefPrimitive::I128 => consume_type::(input), - TypeDefPrimitive::I256 => { - Err( - EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::I256) - .into(), - ) - } - } - } - TypeDef::Compact(compact) => { - let inner = types - .resolve(compact.type_param().id()) - .ok_or(MetadataError::TypeNotFound(type_id))?; - let mut decode_compact_primitive = |primitive: &TypeDefPrimitive| { - match primitive { - TypeDefPrimitive::U8 => consume_type::>(input), - TypeDefPrimitive::U16 => consume_type::>(input), - TypeDefPrimitive::U32 => consume_type::>(input), - TypeDefPrimitive::U64 => consume_type::>(input), - TypeDefPrimitive::U128 => consume_type::>(input), - prim => { - Err(EventsDecodingError::InvalidCompactPrimitive(prim.clone()) - .into()) - } - } - }; - match inner.type_def() { - TypeDef::Primitive(primitive) => decode_compact_primitive(primitive), - TypeDef::Composite(composite) => { - match composite.fields() { - [field] => { - let field_ty = - types.resolve(field.ty().id()).ok_or_else(|| { - MetadataError::TypeNotFound(field.ty().id()) - })?; - if let TypeDef::Primitive(primitive) = field_ty.type_def() { - decode_compact_primitive(primitive) - } else { - Err(EventsDecodingError::InvalidCompactType( - "Composite type must have a single primitive field" - .into(), - ) - .into()) - } - } - _ => { - Err(EventsDecodingError::InvalidCompactType( - "Composite type must have a single field".into(), - ) - .into()) - } - } - } - _ => { - Err(EventsDecodingError::InvalidCompactType( - "Compact type must be a primitive or a composite type".into(), - ) - .into()) - } - } - } - TypeDef::BitSequence(bitseq) => { - let bit_store_def = types - .resolve(bitseq.bit_store_type().id()) - .ok_or(MetadataError::TypeNotFound(type_id))? - .type_def(); - - // We just need to consume the correct number of bytes. Roughly, we encode this - // as a Compact length, and then a slice of T of that length, where T is the - // bit store type. So, we ignore the bit order and only care that the bit store type - // used lines up in terms of the number of bytes it will take to encode/decode it. - match bit_store_def { - TypeDef::Primitive(TypeDefPrimitive::U8) => { - consume_type::>(input) - } - TypeDef::Primitive(TypeDefPrimitive::U16) => { - consume_type::>(input) - } - TypeDef::Primitive(TypeDefPrimitive::U32) => { - consume_type::>(input) - } - TypeDef::Primitive(TypeDefPrimitive::U64) => { - consume_type::>(input) - } - store => { - return Err(EventsDecodingError::InvalidBitSequenceType(format!( - "{:?}", - store - )) - .into()) - } - } - } - } -} - -/// The possible errors that we can run into attempting to decode events. -#[derive(Debug, thiserror::Error)] -pub enum EventsDecodingError { - /// Unsupported primitive type - #[error("Unsupported primitive type {0:?}")] - UnsupportedPrimitive(TypeDefPrimitive), - /// Invalid compact type, must be an unsigned int. - #[error("Invalid compact primitive {0:?}")] - InvalidCompactPrimitive(TypeDefPrimitive), - /// Invalid compact type; error details in string. - #[error("Invalid compact composite type {0}")] - InvalidCompactType(String), - /// Invalid bit sequence type; bit store type or bit order type used aren't supported. - #[error("Invalid bit sequence type; bit store type {0} is not supported")] - InvalidBitSequenceType(String), -} - +/// Event related test utilities used outside this module. #[cfg(test)] -mod tests { +pub(crate) mod test_utils { use super::*; use crate::{ - error::GenericError::{ - Codec, - EventsDecoding, - Other, - }, - events::EventsDecodingError::UnsupportedPrimitive, Config, DefaultConfig, Phase, }; - use assert_matches::assert_matches; use codec::Encode; use frame_metadata::{ v14::{ @@ -712,12 +413,10 @@ mod tests { }; use std::convert::TryFrom; - type TypeId = scale_info::interner::UntrackedSymbol; - /// An "outer" events enum containing exactly one event. #[derive(Encode, Decode, TypeInfo, Clone, Debug, PartialEq)] pub enum AllEvents { - E(Ev), + Test(Ev), } /// This encodes to the same format an event is expected to encode to @@ -731,28 +430,17 @@ mod tests { /// Build an EventRecord, which encoded events in the format expected /// to be handed back from storage queries to System.Events. - fn event_record(phase: Phase, event: E) -> EventRecord { + pub fn event_record(phase: Phase, event: E) -> EventRecord { EventRecord { phase, - event: AllEvents::E(event), + event: AllEvents::Test(event), topics: vec![], } } - /// Build a type registry that knows about the single type provided. - fn singleton_type_registry( - ) -> (TypeId, PortableRegistry) { - let m = scale_info::MetaType::new::(); - let mut types = scale_info::Registry::new(); - let id = types.register_type(&m); - let portable_registry: PortableRegistry = types.into(); - - (id, portable_registry) - } - /// Build fake metadata consisting of a single pallet that knows /// about the event type provided. - fn metadata() -> Metadata { + pub fn metadata() -> Metadata { let pallets = vec![PalletMetadata { name: "Test", storage: None, @@ -779,7 +467,7 @@ mod tests { /// Build an `Events` object for test purposes, based on the details provided, /// and with a default block hash. - fn events( + pub fn events( metadata: &'_ Metadata, event_records: Vec>, ) -> Events<'_, DefaultConfig, AllEvents> { @@ -793,7 +481,7 @@ mod tests { /// Much like [`events`], but takes pre-encoded events and event count, so that we can /// mess with the bytes in tests if we need to. - fn events_raw( + pub fn events_raw( metadata: &'_ Metadata, event_bytes: Vec, num_events: u32, @@ -806,19 +494,23 @@ mod tests { _event_type: std::marker::PhantomData, } } +} - fn decode_and_consume_type_consumes_all_bytes< - T: codec::Encode + scale_info::TypeInfo + 'static, - >( - val: T, - ) { - let (type_id, registry) = singleton_type_registry::(); - let bytes = val.encode(); - let cursor = &mut &*bytes; - - decode_and_consume_type(type_id.id(), ®istry, cursor).unwrap(); - assert_eq!(cursor.len(), 0); - } +#[cfg(test)] +mod tests { + use super::{ + test_utils::{ + event_record, + events, + events_raw, + metadata, + AllEvents, + }, + *, + }; + use crate::Phase; + use codec::Encode; + use scale_info::TypeInfo; #[test] fn statically_decode_single_event() { @@ -844,7 +536,7 @@ mod tests { vec![EventDetails { index: 0, phase: Phase::Finalization, - event: AllEvents::E(Event::A(1)) + event: AllEvents::Test(Event::A(1)) }] ); } @@ -879,17 +571,17 @@ mod tests { EventDetails { index: 0, phase: Phase::Initialization, - event: AllEvents::E(Event::A(1)) + event: AllEvents::Test(Event::A(1)) }, EventDetails { index: 1, phase: Phase::ApplyExtrinsic(123), - event: AllEvents::E(Event::B(true)) + event: AllEvents::Test(Event::B(true)) }, EventDetails { index: 2, phase: Phase::Finalization, - event: AllEvents::E(Event::A(234)) + event: AllEvents::Test(Event::A(234)) }, ] ); @@ -929,7 +621,7 @@ mod tests { EventDetails { index: 0, phase: Phase::Initialization, - event: AllEvents::E(Event::A(1)) + event: AllEvents::Test(Event::A(1)) } ); assert_eq!( @@ -937,7 +629,7 @@ mod tests { EventDetails { index: 1, phase: Phase::ApplyExtrinsic(123), - event: AllEvents::E(Event::B(true)) + event: AllEvents::Test(Event::B(true)) } ); @@ -1151,7 +843,7 @@ mod tests { vec![EventDetails { index: 0, phase: Phase::Finalization, - event: AllEvents::E(Event::A(1)) + event: AllEvents::Test(Event::A(1)) }] ); @@ -1209,7 +901,7 @@ mod tests { vec![EventDetails { index: 0, phase: Phase::Finalization, - event: AllEvents::E(Event::A(CompactWrapper(1))) + event: AllEvents::Test(Event::A(CompactWrapper(1))) }] ); @@ -1268,7 +960,7 @@ mod tests { vec![EventDetails { index: 0, phase: Phase::Finalization, - event: AllEvents::E(Event::A(MyType::B)) + event: AllEvents::Test(Event::A(MyType::B)) }] ); @@ -1294,218 +986,4 @@ mod tests { }] ); } - - #[test] - fn decode_bitvec() { - use bitvec::order::Msb0; - - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1], - ); - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Msb0, u8; 0, 1, 1, 0, 1, 0, 1, 0, 0], - ); - - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Lsb0, u16; 0, 1, 1, 0, 1], - ); - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Msb0, u16; 0, 1, 1, 0, 1, 0, 1, 0, 0], - ); - - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Lsb0, u32; 0, 1, 1, 0, 1], - ); - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Msb0, u32; 0, 1, 1, 0, 1, 0, 1, 0, 0], - ); - - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Lsb0, u64; 0, 1, 1, 0, 1], - ); - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Msb0, u64; 0, 1, 1, 0, 1, 0, 1, 0, 0], - ); - } - - #[test] - fn decode_primitive() { - decode_and_consume_type_consumes_all_bytes(false); - decode_and_consume_type_consumes_all_bytes(true); - - let dummy_data = vec![0u8]; - let dummy_cursor = &mut &*dummy_data; - let (id, reg) = singleton_type_registry::(); - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!( - res, - Err(EventsDecoding(UnsupportedPrimitive(TypeDefPrimitive::Char))) - ); - - decode_and_consume_type_consumes_all_bytes("str".to_string()); - - decode_and_consume_type_consumes_all_bytes(1u8); - decode_and_consume_type_consumes_all_bytes(1i8); - - decode_and_consume_type_consumes_all_bytes(1u16); - decode_and_consume_type_consumes_all_bytes(1i16); - - decode_and_consume_type_consumes_all_bytes(1u32); - decode_and_consume_type_consumes_all_bytes(1i32); - - decode_and_consume_type_consumes_all_bytes(1u64); - decode_and_consume_type_consumes_all_bytes(1i64); - - decode_and_consume_type_consumes_all_bytes(1u128); - decode_and_consume_type_consumes_all_bytes(1i128); - } - - #[test] - fn decode_tuple() { - decode_and_consume_type_consumes_all_bytes(()); - - decode_and_consume_type_consumes_all_bytes((true,)); - - decode_and_consume_type_consumes_all_bytes((true, "str")); - - // Incomplete bytes for decoding - let dummy_data = false.encode(); - let dummy_cursor = &mut &*dummy_data; - let (id, reg) = singleton_type_registry::<(bool, &'static str)>(); - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!(res, Err(Codec(_))); - - // Incomplete bytes for decoding, with invalid char type - let dummy_data = (false, "str", 0u8).encode(); - let dummy_cursor = &mut &*dummy_data; - let (id, reg) = singleton_type_registry::<(bool, &'static str, char)>(); - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!( - res, - Err(EventsDecoding(UnsupportedPrimitive(TypeDefPrimitive::Char))) - ); - // The last byte (0x0 u8) should not be consumed - assert_eq!(dummy_cursor.len(), 1); - } - - #[test] - fn decode_array_and_seq() { - decode_and_consume_type_consumes_all_bytes([0]); - decode_and_consume_type_consumes_all_bytes([1, 2, 3, 4, 5]); - decode_and_consume_type_consumes_all_bytes([0; 500]); - decode_and_consume_type_consumes_all_bytes(["str", "abc", "cde"]); - - decode_and_consume_type_consumes_all_bytes(vec![0]); - decode_and_consume_type_consumes_all_bytes(vec![1, 2, 3, 4, 5]); - decode_and_consume_type_consumes_all_bytes(vec!["str", "abc", "cde"]); - } - - #[test] - fn decode_variant() { - #[derive(Clone, Encode, TypeInfo)] - enum EnumVar { - A, - B((&'static str, u8)), - C { named: i16 }, - } - const INVALID_TYPE_ID: u32 = 1024; - - decode_and_consume_type_consumes_all_bytes(EnumVar::A); - decode_and_consume_type_consumes_all_bytes(EnumVar::B(("str", 1))); - decode_and_consume_type_consumes_all_bytes(EnumVar::C { named: 1 }); - - // Invalid variant index - let dummy_data = 3u8.encode(); - let dummy_cursor = &mut &*dummy_data; - let (id, reg) = singleton_type_registry::(); - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!(res, Err(Other(_))); - - // Valid index, incomplete data - let dummy_data = 2u8.encode(); - let dummy_cursor = &mut &*dummy_data; - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!(res, Err(Codec(_))); - - let res = decode_and_consume_type(INVALID_TYPE_ID, ®, dummy_cursor); - assert_matches!(res, Err(crate::error::GenericError::Metadata(_))); - } - - #[test] - fn decode_composite() { - #[derive(Clone, Encode, TypeInfo)] - struct Composite {} - decode_and_consume_type_consumes_all_bytes(Composite {}); - - #[derive(Clone, Encode, TypeInfo)] - struct CompositeV2 { - id: u32, - name: String, - } - decode_and_consume_type_consumes_all_bytes(CompositeV2 { - id: 10, - name: "str".to_string(), - }); - - #[derive(Clone, Encode, TypeInfo)] - struct CompositeV3 { - id: u32, - extra: T, - } - decode_and_consume_type_consumes_all_bytes(CompositeV3 { - id: 10, - extra: vec![0, 1, 2], - }); - decode_and_consume_type_consumes_all_bytes(CompositeV3 { - id: 10, - extra: bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1], - }); - decode_and_consume_type_consumes_all_bytes(CompositeV3 { - id: 10, - extra: ("str", 1), - }); - decode_and_consume_type_consumes_all_bytes(CompositeV3 { - id: 10, - extra: CompositeV2 { - id: 2, - name: "str".to_string(), - }, - }); - - #[derive(Clone, Encode, TypeInfo)] - struct CompositeV4(u32, bool); - decode_and_consume_type_consumes_all_bytes(CompositeV4(1, true)); - - #[derive(Clone, Encode, TypeInfo)] - struct CompositeV5(u32); - decode_and_consume_type_consumes_all_bytes(CompositeV5(1)); - } - - #[test] - fn decode_compact() { - #[derive(Clone, Encode, TypeInfo)] - enum Compact { - A(#[codec(compact)] u32), - } - decode_and_consume_type_consumes_all_bytes(Compact::A(1)); - - #[derive(Clone, Encode, TypeInfo)] - struct CompactV2(#[codec(compact)] u32); - decode_and_consume_type_consumes_all_bytes(CompactV2(1)); - - #[derive(Clone, Encode, TypeInfo)] - struct CompactV3 { - #[codec(compact)] - val: u32, - } - decode_and_consume_type_consumes_all_bytes(CompactV3 { val: 1 }); - - #[derive(Clone, Encode, TypeInfo)] - struct CompactV4 { - #[codec(compact)] - val: T, - } - decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 0u8 }); - decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 1u16 }); - } } diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs new file mode 100644 index 0000000000..84f5479b45 --- /dev/null +++ b/subxt/src/events/filter_events.rs @@ -0,0 +1,418 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! Filtering individual events from subscriptions. + +use super::Events; +use crate::{ + BasicError, + Config, + Event, + Phase, +}; +use codec::Decode; +use futures::{ + Stream, + StreamExt, +}; +use std::{ + marker::Unpin, + task::Poll, +}; + +/// A stream which filters events based on the `Filter` param provided. +/// If `Filter` is a 1-tuple of a single `Event` type, it will return every +/// instance of that event as it's found. If `filter` is ` tuple of multiple +/// `Event` types, it will return a corresponding tuple of `Option`s, where +/// exactly one of these will be `Some(event)` each iteration. +pub struct FilterEvents<'a, Sub: 'a, T: Config, Filter: EventFilter> { + // A subscription; in order for the Stream impl to apply, this will + // impl `Stream, BasicError>> + Unpin + 'a`. + sub: Sub, + // Each time we get Events from our subscription, they are stored here + // and iterated through in future stream iterations until exhausted. + events: Option< + Box< + dyn Iterator< + Item = Result< + FilteredEventDetails, + BasicError, + >, + > + 'a, + >, + >, +} + +impl<'a, Sub: 'a, T: Config, Filter: EventFilter> Unpin + for FilterEvents<'a, Sub, T, Filter> +{ +} + +impl<'a, Sub: 'a, T: Config, Filter: EventFilter> FilterEvents<'a, Sub, T, Filter> { + pub(crate) fn new(sub: Sub) -> Self { + Self { sub, events: None } + } +} + +impl<'a, Sub, T, Evs, Filter> Stream for FilterEvents<'a, Sub, T, Filter> +where + Sub: Stream, BasicError>> + Unpin + 'a, + T: Config, + Evs: Decode + 'static, + Filter: EventFilter, +{ + type Item = Result, BasicError>; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + loop { + // Drain the current events we're iterating over first: + if let Some(events_iter) = self.events.as_mut() { + match events_iter.next() { + Some(res) => return Poll::Ready(Some(res)), + None => { + self.events = None; + } + } + } + + // Wait for new events to come in: + match futures::ready!(self.sub.poll_next_unpin(cx)) { + None => return Poll::Ready(None), + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + Some(Ok(events)) => { + self.events = Some(Filter::filter(events)); + } + }; + } + } +} + +/// This is returned from the [`FilterEvents`] impl of [`Stream`]. It contains +/// some type representing an event we've filtered on, along with couple of additional +/// pieces of information about that event. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct FilteredEventDetails { + /// During which [`Phase`] was the event produced? + pub phase: Phase, + /// Hash of the block that this event came from. + pub block_hash: BlockHash, + /// A type containing an event that we've filtered on. + /// Depending on the filter type, this may be a tuple + /// or a single event. + pub event: Evs, +} + +/// This trait is implemented for tuples of Event types; any such tuple (up to size 8) can be +/// used to filter an event subscription to return only the specified events. +pub trait EventFilter: private::Sealed { + /// The type we'll be handed back from filtering. + type ReturnType; + /// Filter the events based on the type implementing this trait. + fn filter<'a, T: Config, Evs: Decode + 'static>( + events: Events<'a, T, Evs>, + ) -> Box< + dyn Iterator< + Item = Result< + FilteredEventDetails, + BasicError, + >, + > + 'a, + >; +} + +// Prevent userspace implementations of the above trait; the interface is not considered stable +// and is not a particularly nice API to work with (particularly because it involves boxing, which +// would be nice to get rid of eventually). +pub(crate) mod private { + pub trait Sealed {} +} + +// A special case impl for searching for a tuple of exactly one event (in this case, we don't +// need to return an `(Option,)`; we can just return `Event`. +impl private::Sealed for (Ev,) {} +impl EventFilter for (Ev,) { + type ReturnType = Ev; + fn filter<'a, T: Config, Evs: Decode + 'static>( + events: Events<'a, T, Evs>, + ) -> Box< + dyn Iterator, BasicError>> + 'a, + > { + let block_hash = events.block_hash(); + let mut iter = events.into_iter_raw(); + Box::new(std::iter::from_fn(move || { + for ev in iter.by_ref() { + // Forward any error immediately: + let raw_event = match ev { + Ok(ev) => ev, + Err(e) => return Some(Err(e)), + }; + // Try decoding each type until we hit a match or an error: + let ev = raw_event.as_event::(); + if let Ok(Some(event)) = ev { + // We found a match; return our tuple. + return Some(Ok(FilteredEventDetails { + phase: raw_event.phase, + block_hash, + event, + })) + } + if let Err(e) = ev { + // We hit an error. Return it. + return Some(Err(e.into())) + } + } + None + })) + } +} + +// A generalised impl for tuples of sizes greater than 1: +macro_rules! impl_event_filter { + ($($ty:ident $idx:tt),+) => { + impl <$($ty: Event),+> private::Sealed for ( $($ty,)+ ) {} + impl <$($ty: Event),+> EventFilter for ( $($ty,)+ ) { + type ReturnType = ( $(Option<$ty>,)+ ); + fn filter<'a, T: Config, Evs: Decode + 'static>( + events: Events<'a, T, Evs> + ) -> Box, BasicError>> + 'a> { + let block_hash = events.block_hash(); + let mut iter = events.into_iter_raw(); + Box::new(std::iter::from_fn(move || { + let mut out: ( $(Option<$ty>,)+ ) = Default::default(); + for ev in iter.by_ref() { + // Forward any error immediately: + let raw_event = match ev { + Ok(ev) => ev, + Err(e) => return Some(Err(e)) + }; + // Try decoding each type until we hit a match or an error: + $({ + let ev = raw_event.as_event::<$ty>(); + if let Ok(Some(ev)) = ev { + // We found a match; return our tuple. + out.$idx = Some(ev); + return Some(Ok(FilteredEventDetails { + phase: raw_event.phase, + block_hash, + event: out + })) + } + if let Err(e) = ev { + // We hit an error. Return it. + return Some(Err(e.into())) + } + })+ + } + None + })) + } + } + } +} + +impl_event_filter!(A 0, B 1); +impl_event_filter!(A 0, B 1, C 2); +impl_event_filter!(A 0, B 1, C 2, D 3); +impl_event_filter!(A 0, B 1, C 2, D 3, E 4); +impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5); +impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6); +impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7); + +#[cfg(test)] +mod test { + use super::{ + super::events_type::test_utils::{ + event_record, + events, + metadata, + AllEvents, + }, + *, + }; + use crate::{ + Config, + DefaultConfig, + Metadata, + }; + use codec::Encode; + use futures::{ + stream, + Stream, + StreamExt, + }; + use scale_info::TypeInfo; + + // Some pretend events in a pallet + #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] + enum PalletEvents { + A(EventA), + B(EventB), + C(EventC), + } + + // An event in our pallet that we can filter on. + #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] + struct EventA(u8); + impl crate::Event for EventA { + const PALLET: &'static str = "Test"; + const EVENT: &'static str = "A"; + } + + // An event in our pallet that we can filter on. + #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] + struct EventB(bool); + impl crate::Event for EventB { + const PALLET: &'static str = "Test"; + const EVENT: &'static str = "B"; + } + + // An event in our pallet that we can filter on. + #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] + struct EventC(u8, bool); + impl crate::Event for EventC { + const PALLET: &'static str = "Test"; + const EVENT: &'static str = "C"; + } + + // A stream of fake events for us to try filtering on. + fn events_stream( + metadata: &'_ Metadata, + ) -> impl Stream< + Item = Result>, BasicError>, + > { + stream::iter(vec![ + events::( + metadata, + vec![ + event_record(Phase::Initialization, PalletEvents::A(EventA(1))), + event_record(Phase::ApplyExtrinsic(0), PalletEvents::B(EventB(true))), + event_record(Phase::Finalization, PalletEvents::A(EventA(2))), + ], + ), + events::( + metadata, + vec![event_record( + Phase::ApplyExtrinsic(1), + PalletEvents::B(EventB(false)), + )], + ), + events::( + metadata, + vec![ + event_record(Phase::ApplyExtrinsic(2), PalletEvents::B(EventB(true))), + event_record(Phase::ApplyExtrinsic(3), PalletEvents::A(EventA(3))), + ], + ), + ]) + .map(Ok::<_, BasicError>) + } + + #[async_std::test] + async fn filter_one_event_from_stream() { + let metadata = metadata::(); + + // Filter out fake event stream to select events matching `EventA` only. + let actual: Vec<_> = + FilterEvents::<_, DefaultConfig, (EventA,)>::new(events_stream(&metadata)) + .map(|e| e.unwrap()) + .collect() + .await; + + let expected = vec![ + FilteredEventDetails { + phase: Phase::Initialization, + block_hash: ::Hash::default(), + event: EventA(1), + }, + FilteredEventDetails { + phase: Phase::Finalization, + block_hash: ::Hash::default(), + event: EventA(2), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(3), + block_hash: ::Hash::default(), + event: EventA(3), + }, + ]; + + assert_eq!(actual, expected); + } + + #[async_std::test] + async fn filter_some_events_from_stream() { + let metadata = metadata::(); + + // Filter out fake event stream to select events matching `EventA` or `EventB`. + let actual: Vec<_> = FilterEvents::<_, DefaultConfig, (EventA, EventB)>::new( + events_stream(&metadata), + ) + .map(|e| e.unwrap()) + .collect() + .await; + + let expected = vec![ + FilteredEventDetails { + phase: Phase::Initialization, + block_hash: ::Hash::default(), + event: (Some(EventA(1)), None), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(0), + block_hash: ::Hash::default(), + event: (None, Some(EventB(true))), + }, + FilteredEventDetails { + phase: Phase::Finalization, + block_hash: ::Hash::default(), + event: (Some(EventA(2)), None), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(1), + block_hash: ::Hash::default(), + event: (None, Some(EventB(false))), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(2), + block_hash: ::Hash::default(), + event: (None, Some(EventB(true))), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(3), + block_hash: ::Hash::default(), + event: (Some(EventA(3)), None), + }, + ]; + + assert_eq!(actual, expected); + } + + #[async_std::test] + async fn filter_no_events_from_stream() { + let metadata = metadata::(); + + // Filter out fake event stream to select events matching `EventC` (none exist). + let actual: Vec<_> = + FilterEvents::<_, DefaultConfig, (EventC,)>::new(events_stream(&metadata)) + .map(|e| e.unwrap()) + .collect() + .await; + + assert_eq!(actual, vec![]); + } +} diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs new file mode 100644 index 0000000000..846019563a --- /dev/null +++ b/subxt/src/events/mod.rs @@ -0,0 +1,40 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! For working with events. + +mod decoding; +mod event_subscription; +mod events_type; +mod filter_events; + +pub use decoding::EventsDecodingError; +pub use event_subscription::{ + subscribe, + subscribe_finalized, + EventSubscription, +}; +pub use events_type::{ + at, + EventDetails, + Events, + RawEventDetails, +}; +pub use filter_events::{ + EventFilter, + FilterEvents, + FilteredEventDetails, +}; diff --git a/subxt/src/transaction.rs b/subxt/src/transaction.rs index 188dc70b2d..db64288236 100644 --- a/subxt/src/transaction.rs +++ b/subxt/src/transaction.rs @@ -514,9 +514,9 @@ impl<'client, T: Config, Evs: Decode> TransactionEvents<'client, T, Evs> { /// Iterate through the transaction events using metadata to dynamically decode and skip /// them, and return the first event found which decodes to the provided `Ev` type. /// - /// This works in the same way that [`events::Events::find_first_event()`] does, with the + /// This works in the same way that [`events::Events::find_first()`] does, with the /// exception that it ignores events not related to the submitted extrinsic. - pub fn find_first_event(&self) -> Result, BasicError> { + pub fn find_first(&self) -> Result, BasicError> { self.find::().next().transpose() } diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index 2262860354..7741ee78da 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -78,7 +78,7 @@ async fn subscription_produces_events_each_block() -> Result<(), subxt::BasicErr .await .expect("events expected each block")?; let success_event = events - .find_first_event::() + .find_first::() .expect("decode error"); // Every now and then I get no bytes back for the first block events; // I assume that this might be the case for the genesis block, so don't @@ -100,14 +100,12 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { let ctx = test_context().await; // Subscribe to balance transfer events, ignoring all else. - let event_sub = ctx.api.events().subscribe().await?.filter_map(|events| { - async move { - let events = events.ok()?; - events - .find_first_event::() - .ok()? - } - }); + let event_sub = ctx + .api + .events() + .subscribe() + .await? + .filter_events::<(balances::events::Transfer,)>(); // Calling `.next()` on the above borrows it, and the `filter_map` // means it's no longer `Unpin`, so we pin it on the stack: @@ -125,7 +123,7 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { // Wait for the next balance transfer event in our subscription stream // and check that it lines up: - let event = event_sub.next().await.unwrap(); + let event = event_sub.next().await.unwrap().unwrap().event; assert_eq!( event, balances::events::Transfer { diff --git a/subxt/tests/integration/frame/balances.rs b/subxt/tests/integration/frame/balances.rs index 918c273dbf..4d92136ca2 100644 --- a/subxt/tests/integration/frame/balances.rs +++ b/subxt/tests/integration/frame/balances.rs @@ -63,11 +63,11 @@ async fn tx_basic_transfer() -> Result<(), subxt::Error> { .wait_for_finalized_success() .await?; let event = events - .find_first_event::() + .find_first::() .expect("Failed to decode balances::events::Transfer") .expect("Failed to find balances::events::Transfer"); let _extrinsic_success = events - .find_first_event::() + .find_first::() .expect("Failed to decode ExtrinisicSuccess") .expect("Failed to find ExtrinisicSuccess"); @@ -125,7 +125,7 @@ async fn storage_balance_lock() -> Result<(), subxt::Error> { .await? .wait_for_finalized_success() .await? - .find_first_event::()? + .find_first::()? .expect("No ExtrinsicSuccess Event found"); let locks = cxt @@ -205,7 +205,7 @@ async fn transfer_implicit_subscription() { .wait_for_finalized_success() .await .unwrap() - .find_first_event::() + .find_first::() .expect("Can decode events") .expect("Can find balance transfer event"); diff --git a/subxt/tests/integration/frame/contracts.rs b/subxt/tests/integration/frame/contracts.rs index 169b92c55f..45d5bc5c8e 100644 --- a/subxt/tests/integration/frame/contracts.rs +++ b/subxt/tests/integration/frame/contracts.rs @@ -100,13 +100,13 @@ impl ContractsTestContext { .await?; let code_stored = events - .find_first_event::()? + .find_first::()? .ok_or_else(|| Error::Other("Failed to find a CodeStored event".into()))?; let instantiated = events - .find_first_event::()? + .find_first::()? .ok_or_else(|| Error::Other("Failed to find a Instantiated event".into()))?; let _extrinsic_success = events - .find_first_event::()? + .find_first::()? .ok_or_else(|| { Error::Other("Failed to find a ExtrinsicSuccess event".into()) })?; @@ -141,7 +141,7 @@ impl ContractsTestContext { log::info!("Instantiate result: {:?}", result); let instantiated = result - .find_first_event::()? + .find_first::()? .ok_or_else(|| Error::Other("Failed to find a Instantiated event".into()))?; Ok(instantiated.contract) diff --git a/subxt/tests/integration/storage.rs b/subxt/tests/integration/storage.rs index 31093a4d9d..3f7823d428 100644 --- a/subxt/tests/integration/storage.rs +++ b/subxt/tests/integration/storage.rs @@ -28,7 +28,9 @@ use sp_keyring::AccountKeyring; async fn storage_plain_lookup() -> Result<(), subxt::Error> { let ctx = test_context().await; - // Look up a plain value + // Look up a plain value. Wait long enough that we don't get the genesis block data, + // because it may have no storage associated with it. + async_std::task::sleep(std::time::Duration::from_secs(6)).await; let entry = ctx.api.storage().timestamp().now(None).await?; assert!(entry > 0);