From 75e615ac869b452b3ca7589a447b71f0196382ac Mon Sep 17 00:00:00 2001 From: James Wilson Date: Mon, 21 Feb 2022 16:13:56 +0000 Subject: [PATCH 01/18] Split events.rs into multiple files and start work on FilterEvents --- subxt/src/events/decoding.rs | 488 ++++++++++++++++++++ subxt/src/events/event_subscription.rs | 178 ++++++++ subxt/src/{ => events}/events.rs | 609 +------------------------ subxt/src/events/filter_events.rs | 47 ++ subxt/src/events/mod.rs | 41 ++ subxt/tests/integration/storage.rs | 4 +- 6 files changed, 769 insertions(+), 598 deletions(-) create mode 100644 subxt/src/events/decoding.rs create mode 100644 subxt/src/events/event_subscription.rs rename subxt/src/{ => events}/events.rs (58%) create mode 100644 subxt/src/events/filter_events.rs create mode 100644 subxt/src/events/mod.rs diff --git a/subxt/src/events/decoding.rs b/subxt/src/events/decoding.rs new file mode 100644 index 0000000000..c3f6cb59d2 --- /dev/null +++ b/subxt/src/events/decoding.rs @@ -0,0 +1,488 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! Dynamically decoding events. + +use crate::{ + error::BasicError, + metadata::MetadataError, +}; +use bitvec::{ + order::Lsb0, + vec::BitVec, +}; +use codec::{ + Codec, + Compact, + Decode, +}; +use scale_info::{ + PortableRegistry, + TypeDef, + TypeDefPrimitive, +}; + +/// Given a type Id and a type registry, attempt to consume the bytes +/// corresponding to that type from our input. +pub fn decode_and_consume_type( + type_id: u32, + types: &PortableRegistry, + input: &mut &[u8], +) -> Result<(), BasicError> { + let ty = types + .resolve(type_id) + .ok_or(MetadataError::TypeNotFound(type_id))?; + + fn consume_type(input: &mut &[u8]) -> Result<(), BasicError> { + T::decode(input)?; + Ok(()) + } + + match ty.type_def() { + TypeDef::Composite(composite) => { + for field in composite.fields() { + decode_and_consume_type(field.ty().id(), types, input)? + } + Ok(()) + } + TypeDef::Variant(variant) => { + let variant_index = u8::decode(input)?; + let variant = variant + .variants() + .iter() + .find(|v| v.index() == variant_index) + .ok_or_else(|| { + BasicError::Other(format!("Variant {} not found", variant_index)) + })?; + for field in variant.fields() { + decode_and_consume_type(field.ty().id(), types, input)?; + } + Ok(()) + } + TypeDef::Sequence(seq) => { + let len = >::decode(input)?; + for _ in 0..len.0 { + decode_and_consume_type(seq.type_param().id(), types, input)?; + } + Ok(()) + } + TypeDef::Array(arr) => { + for _ in 0..arr.len() { + decode_and_consume_type(arr.type_param().id(), types, input)?; + } + Ok(()) + } + TypeDef::Tuple(tuple) => { + for field in tuple.fields() { + decode_and_consume_type(field.id(), types, input)?; + } + Ok(()) + } + TypeDef::Primitive(primitive) => { + match primitive { + TypeDefPrimitive::Bool => consume_type::(input), + TypeDefPrimitive::Char => { + Err( + EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char) + .into(), + ) + } + TypeDefPrimitive::Str => consume_type::(input), + TypeDefPrimitive::U8 => consume_type::(input), + TypeDefPrimitive::U16 => consume_type::(input), + TypeDefPrimitive::U32 => consume_type::(input), + TypeDefPrimitive::U64 => consume_type::(input), + TypeDefPrimitive::U128 => consume_type::(input), + TypeDefPrimitive::U256 => { + Err( + EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::U256) + .into(), + ) + } + TypeDefPrimitive::I8 => consume_type::(input), + TypeDefPrimitive::I16 => consume_type::(input), + TypeDefPrimitive::I32 => consume_type::(input), + TypeDefPrimitive::I64 => consume_type::(input), + TypeDefPrimitive::I128 => consume_type::(input), + TypeDefPrimitive::I256 => { + Err( + EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::I256) + .into(), + ) + } + } + } + TypeDef::Compact(compact) => { + let inner = types + .resolve(compact.type_param().id()) + .ok_or(MetadataError::TypeNotFound(type_id))?; + let mut decode_compact_primitive = |primitive: &TypeDefPrimitive| { + match primitive { + TypeDefPrimitive::U8 => consume_type::>(input), + TypeDefPrimitive::U16 => consume_type::>(input), + TypeDefPrimitive::U32 => consume_type::>(input), + TypeDefPrimitive::U64 => consume_type::>(input), + TypeDefPrimitive::U128 => consume_type::>(input), + prim => { + Err(EventsDecodingError::InvalidCompactPrimitive(prim.clone()) + .into()) + } + } + }; + match inner.type_def() { + TypeDef::Primitive(primitive) => decode_compact_primitive(primitive), + TypeDef::Composite(composite) => { + match composite.fields() { + [field] => { + let field_ty = + types.resolve(field.ty().id()).ok_or_else(|| { + MetadataError::TypeNotFound(field.ty().id()) + })?; + if let TypeDef::Primitive(primitive) = field_ty.type_def() { + decode_compact_primitive(primitive) + } else { + Err(EventsDecodingError::InvalidCompactType( + "Composite type must have a single primitive field" + .into(), + ) + .into()) + } + } + _ => { + Err(EventsDecodingError::InvalidCompactType( + "Composite type must have a single field".into(), + ) + .into()) + } + } + } + _ => { + Err(EventsDecodingError::InvalidCompactType( + "Compact type must be a primitive or a composite type".into(), + ) + .into()) + } + } + } + TypeDef::BitSequence(bitseq) => { + let bit_store_def = types + .resolve(bitseq.bit_store_type().id()) + .ok_or(MetadataError::TypeNotFound(type_id))? + .type_def(); + + // We just need to consume the correct number of bytes. Roughly, we encode this + // as a Compact length, and then a slice of T of that length, where T is the + // bit store type. So, we ignore the bit order and only care that the bit store type + // used lines up in terms of the number of bytes it will take to encode/decode it. + match bit_store_def { + TypeDef::Primitive(TypeDefPrimitive::U8) => { + consume_type::>(input) + } + TypeDef::Primitive(TypeDefPrimitive::U16) => { + consume_type::>(input) + } + TypeDef::Primitive(TypeDefPrimitive::U32) => { + consume_type::>(input) + } + TypeDef::Primitive(TypeDefPrimitive::U64) => { + consume_type::>(input) + } + store => { + return Err(EventsDecodingError::InvalidBitSequenceType(format!( + "{:?}", + store + )) + .into()) + } + } + } + } +} + +/// The possible errors that we can run into attempting to decode events. +#[derive(Debug, thiserror::Error)] +pub enum EventsDecodingError { + /// Unsupported primitive type + #[error("Unsupported primitive type {0:?}")] + UnsupportedPrimitive(TypeDefPrimitive), + /// Invalid compact type, must be an unsigned int. + #[error("Invalid compact primitive {0:?}")] + InvalidCompactPrimitive(TypeDefPrimitive), + /// Invalid compact type; error details in string. + #[error("Invalid compact composite type {0}")] + InvalidCompactType(String), + /// Invalid bit sequence type; bit store type or bit order type used aren't supported. + #[error("Invalid bit sequence type; bit store type {0} is not supported")] + InvalidBitSequenceType(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + error::GenericError::{ + Codec, + EventsDecoding, + Other, + }, + }; + use assert_matches::assert_matches; + use codec::Encode; + use scale_info::{ + TypeInfo, + }; + + type TypeId = scale_info::interner::UntrackedSymbol; + + /// Build a type registry that knows about the single type provided. + fn singleton_type_registry( + ) -> (TypeId, PortableRegistry) { + let m = scale_info::MetaType::new::(); + let mut types = scale_info::Registry::new(); + let id = types.register_type(&m); + let portable_registry: PortableRegistry = types.into(); + + (id, portable_registry) + } + + fn decode_and_consume_type_consumes_all_bytes< + T: codec::Encode + scale_info::TypeInfo + 'static, + >( + val: T, + ) { + let (type_id, registry) = singleton_type_registry::(); + let bytes = val.encode(); + let cursor = &mut &*bytes; + + decode_and_consume_type(type_id.id(), ®istry, cursor).unwrap(); + assert_eq!(cursor.len(), 0); + } + + #[test] + fn decode_bitvec() { + use bitvec::order::Msb0; + + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1], + ); + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Msb0, u8; 0, 1, 1, 0, 1, 0, 1, 0, 0], + ); + + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Lsb0, u16; 0, 1, 1, 0, 1], + ); + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Msb0, u16; 0, 1, 1, 0, 1, 0, 1, 0, 0], + ); + + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Lsb0, u32; 0, 1, 1, 0, 1], + ); + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Msb0, u32; 0, 1, 1, 0, 1, 0, 1, 0, 0], + ); + + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Lsb0, u64; 0, 1, 1, 0, 1], + ); + decode_and_consume_type_consumes_all_bytes( + bitvec::bitvec![Msb0, u64; 0, 1, 1, 0, 1, 0, 1, 0, 0], + ); + } + + #[test] + fn decode_primitive() { + decode_and_consume_type_consumes_all_bytes(false); + decode_and_consume_type_consumes_all_bytes(true); + + let dummy_data = vec![0u8]; + let dummy_cursor = &mut &*dummy_data; + let (id, reg) = singleton_type_registry::(); + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!( + res, + Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char))) + ); + + decode_and_consume_type_consumes_all_bytes("str".to_string()); + + decode_and_consume_type_consumes_all_bytes(1u8); + decode_and_consume_type_consumes_all_bytes(1i8); + + decode_and_consume_type_consumes_all_bytes(1u16); + decode_and_consume_type_consumes_all_bytes(1i16); + + decode_and_consume_type_consumes_all_bytes(1u32); + decode_and_consume_type_consumes_all_bytes(1i32); + + decode_and_consume_type_consumes_all_bytes(1u64); + decode_and_consume_type_consumes_all_bytes(1i64); + + decode_and_consume_type_consumes_all_bytes(1u128); + decode_and_consume_type_consumes_all_bytes(1i128); + } + + #[test] + fn decode_tuple() { + decode_and_consume_type_consumes_all_bytes(()); + + decode_and_consume_type_consumes_all_bytes((true,)); + + decode_and_consume_type_consumes_all_bytes((true, "str")); + + // Incomplete bytes for decoding + let dummy_data = false.encode(); + let dummy_cursor = &mut &*dummy_data; + let (id, reg) = singleton_type_registry::<(bool, &'static str)>(); + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!(res, Err(Codec(_))); + + // Incomplete bytes for decoding, with invalid char type + let dummy_data = (false, "str", 0u8).encode(); + let dummy_cursor = &mut &*dummy_data; + let (id, reg) = singleton_type_registry::<(bool, &'static str, char)>(); + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!( + res, + Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char))) + ); + // The last byte (0x0 u8) should not be consumed + assert_eq!(dummy_cursor.len(), 1); + } + + #[test] + fn decode_array_and_seq() { + decode_and_consume_type_consumes_all_bytes([0]); + decode_and_consume_type_consumes_all_bytes([1, 2, 3, 4, 5]); + decode_and_consume_type_consumes_all_bytes([0; 500]); + decode_and_consume_type_consumes_all_bytes(["str", "abc", "cde"]); + + decode_and_consume_type_consumes_all_bytes(vec![0]); + decode_and_consume_type_consumes_all_bytes(vec![1, 2, 3, 4, 5]); + decode_and_consume_type_consumes_all_bytes(vec!["str", "abc", "cde"]); + } + + #[test] + fn decode_variant() { + #[derive(Clone, Encode, TypeInfo)] + enum EnumVar { + A, + B((&'static str, u8)), + C { named: i16 }, + } + const INVALID_TYPE_ID: u32 = 1024; + + decode_and_consume_type_consumes_all_bytes(EnumVar::A); + decode_and_consume_type_consumes_all_bytes(EnumVar::B(("str", 1))); + decode_and_consume_type_consumes_all_bytes(EnumVar::C { named: 1 }); + + // Invalid variant index + let dummy_data = 3u8.encode(); + let dummy_cursor = &mut &*dummy_data; + let (id, reg) = singleton_type_registry::(); + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!(res, Err(Other(_))); + + // Valid index, incomplete data + let dummy_data = 2u8.encode(); + let dummy_cursor = &mut &*dummy_data; + let res = decode_and_consume_type(id.id(), ®, dummy_cursor); + assert_matches!(res, Err(Codec(_))); + + let res = decode_and_consume_type(INVALID_TYPE_ID, ®, dummy_cursor); + assert_matches!(res, Err(crate::error::GenericError::Metadata(_))); + } + + #[test] + fn decode_composite() { + #[derive(Clone, Encode, TypeInfo)] + struct Composite {} + decode_and_consume_type_consumes_all_bytes(Composite {}); + + #[derive(Clone, Encode, TypeInfo)] + struct CompositeV2 { + id: u32, + name: String, + } + decode_and_consume_type_consumes_all_bytes(CompositeV2 { + id: 10, + name: "str".to_string(), + }); + + #[derive(Clone, Encode, TypeInfo)] + struct CompositeV3 { + id: u32, + extra: T, + } + decode_and_consume_type_consumes_all_bytes(CompositeV3 { + id: 10, + extra: vec![0, 1, 2], + }); + decode_and_consume_type_consumes_all_bytes(CompositeV3 { + id: 10, + extra: bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1], + }); + decode_and_consume_type_consumes_all_bytes(CompositeV3 { + id: 10, + extra: ("str", 1), + }); + decode_and_consume_type_consumes_all_bytes(CompositeV3 { + id: 10, + extra: CompositeV2 { + id: 2, + name: "str".to_string(), + }, + }); + + #[derive(Clone, Encode, TypeInfo)] + struct CompositeV4(u32, bool); + decode_and_consume_type_consumes_all_bytes(CompositeV4(1, true)); + + #[derive(Clone, Encode, TypeInfo)] + struct CompositeV5(u32); + decode_and_consume_type_consumes_all_bytes(CompositeV5(1)); + } + + #[test] + fn decode_compact() { + #[derive(Clone, Encode, TypeInfo)] + enum Compact { + A(#[codec(compact)] u32), + } + decode_and_consume_type_consumes_all_bytes(Compact::A(1)); + + #[derive(Clone, Encode, TypeInfo)] + struct CompactV2(#[codec(compact)] u32); + decode_and_consume_type_consumes_all_bytes(CompactV2(1)); + + #[derive(Clone, Encode, TypeInfo)] + struct CompactV3 { + #[codec(compact)] + val: u32, + } + decode_and_consume_type_consumes_all_bytes(CompactV3 { val: 1 }); + + #[derive(Clone, Encode, TypeInfo)] + struct CompactV4 { + #[codec(compact)] + val: T, + } + decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 0u8 }); + decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 1u16 }); + } + +} \ No newline at end of file diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs new file mode 100644 index 0000000000..7977c799f9 --- /dev/null +++ b/subxt/src/events/event_subscription.rs @@ -0,0 +1,178 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! Subscribing to events. + +use crate::{ + error::BasicError, + Client, + Config, +}; +use codec::{ + Decode, +}; +use derivative::Derivative; +use futures::{ + Future, + FutureExt, + Stream, + StreamExt, +}; +use jsonrpsee::core::client::Subscription; +use std::{ + marker::Unpin, + task::Poll, +}; + +pub use super::{ + at, + Events, + EventDetails, + RawEventDetails, + EventsDecodingError, + FilterEvents, + EventFilter, +}; + +/// Subscribe to events from blocks. +/// +/// **Note:** these blocks haven't necessarily been finalised yet; prefer +/// [`Events::subscribe_finalized()`] if that is important. +/// +/// **Note:** This function is hidden from the documentation +/// and is exposed only to be called via the codegen. Thus, prefer to use +/// `api.events().subscribe()` over calling this directly. +#[doc(hidden)] +pub async fn subscribe( + client: &'_ Client, +) -> Result, BasicError> { + let block_subscription = client.rpc().subscribe_blocks().await?; + Ok(EventSubscription::new(client, block_subscription)) +} + +/// Subscribe to events from finalized blocks. +/// +/// **Note:** This function is hidden from the documentation +/// and is exposed only to be called via the codegen. Thus, prefer to use +/// `api.events().subscribe_finalized()` over calling this directly. +#[doc(hidden)] +pub async fn subscribe_finalized( + client: &'_ Client, +) -> Result, BasicError> { + let block_subscription = client.rpc().subscribe_finalized_blocks().await?; + Ok(EventSubscription::new(client, block_subscription)) +} + +/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block. +#[derive(Derivative)] +#[derivative(Debug(bound = ""))] +pub struct EventSubscription<'a, T: Config, Evs: 'static> { + finished: bool, + client: &'a Client, + block_header_subscription: Subscription, + #[derivative(Debug = "ignore")] + at: Option< + std::pin::Pin< + Box, BasicError>> + 'a>, + >, + >, + _event_type: std::marker::PhantomData, +} + +impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { + fn new( + client: &'a Client, + block_header_subscription: Subscription, + ) -> Self { + EventSubscription { + finished: false, + client, + block_header_subscription, + at: None, + _event_type: std::marker::PhantomData, + } + } + + /// Return only specific events based on the types provided. + pub fn filter_events(self) -> FilterEvents<'a, T, Evs, EvFilter> { + FilterEvents::new(self) + } +} + +impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {} + +// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose +// way to roughly implement the following function: +// +// ``` +// fn subscribe_events(client: &'_ Client, block_sub: Subscription) -> impl Stream, BasicError>> + '_ { +// use futures::StreamExt; +// block_sub.then(move |block_header_res| async move { +// use sp_runtime::traits::Header; +// let block_header = block_header_res?; +// let block_hash = block_header.hash(); +// at(client, block_hash).await +// }) +// } +// ``` +// +// The advantage of this manual implementation is that we have a named type that we (and others) +// can derive things on, store away, alias etc. +impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> { + type Item = Result, BasicError>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // We are finished; return None. + if self.finished { + return Poll::Ready(None) + } + + // If there isn't an `at` function yet that's busy resolving a block hash into + // some event details, then poll the block header subscription to get one. + if self.at.is_none() { + match futures::ready!(self.block_header_subscription.poll_next_unpin(cx)) { + None => { + self.finished = true; + return Poll::Ready(None) + } + Some(Err(e)) => { + self.finished = true; + return Poll::Ready(Some(Err(e.into()))) + } + Some(Ok(block_header)) => { + use sp_runtime::traits::Header; + // Note [jsdw]: We may be able to get rid of the per-item allocation + // with https://github.com/oblique/reusable-box-future. + self.at = Some(Box::pin(at(self.client, block_header.hash()))); + // Continue, so that we poll this function future we've just created. + } + } + } + + // If we get here, there will be an `at` function stored. Unwrap it and poll it to + // completion to get our events, throwing it away as soon as it is ready. + let at_fn = self + .at + .as_mut() + .expect("'at' function should have been set above'"); + let events = futures::ready!(at_fn.poll_unpin(cx)); + self.at = None; + Poll::Ready(Some(events)) + } +} diff --git a/subxt/src/events.rs b/subxt/src/events/events.rs similarity index 58% rename from subxt/src/events.rs rename to subxt/src/events/events.rs index a650b80b4c..4a137b2eec 100644 --- a/subxt/src/events.rs +++ b/subxt/src/events/events.rs @@ -14,50 +14,29 @@ // You should have received a copy of the GNU General Public License // along with subxt. If not, see . -//! For working with events. +//! A representation of a block of events. use crate::{ error::BasicError, - metadata::MetadataError, Client, Config, Event, Metadata, Phase, }; -use bitvec::{ - order::Lsb0, - vec::BitVec, -}; use codec::{ - Codec, Compact, Decode, Error as CodecError, Input, }; use derivative::Derivative; -use futures::{ - Future, - FutureExt, - Stream, - StreamExt, -}; -use jsonrpsee::core::client::Subscription; -use scale_info::{ - PortableRegistry, - TypeDef, - TypeDefPrimitive, -}; use sp_core::{ - storage::StorageKey, - twox_128, Bytes, + twox_128, + storage::StorageKey, }; -use std::{ - marker::Unpin, - task::Poll, -}; +use super::decoding; /// Obtain events at some block hash. The generic parameter is what we /// will attempt to decode each event into if using [`Events::iter()`], @@ -100,136 +79,18 @@ pub async fn at( }) } -/// Subscribe to events from blocks. -/// -/// **Note:** these blocks haven't necessarily been finalised yet; prefer -/// [`Events::subscribe_finalized()`] if that is important. -/// -/// **Note:** This function is hidden from the documentation -/// and is exposed only to be called via the codegen. Thus, prefer to use -/// `api.events().subscribe()` over calling this directly. -#[doc(hidden)] -pub async fn subscribe( - client: &'_ Client, -) -> Result, BasicError> { - let block_subscription = client.rpc().subscribe_blocks().await?; - Ok(EventSubscription::new(client, block_subscription)) -} - -/// Subscribe to events from finalized blocks. -/// -/// **Note:** This function is hidden from the documentation -/// and is exposed only to be called via the codegen. Thus, prefer to use -/// `api.events().subscribe_finalized()` over calling this directly. -#[doc(hidden)] -pub async fn subscribe_finalized( - client: &'_ Client, -) -> Result, BasicError> { - let block_subscription = client.rpc().subscribe_finalized_blocks().await?; - Ok(EventSubscription::new(client, block_subscription)) -} - -/// A subscription to events that implements [`Stream`], and returns [`Events`] objects for each block. -#[derive(Derivative)] -#[derivative(Debug(bound = ""))] -pub struct EventSubscription<'a, T: Config, Evs: Decode + 'static> { - finished: bool, - client: &'a Client, - block_header_subscription: Subscription, - #[derivative(Debug = "ignore")] - at: Option< - std::pin::Pin< - Box, BasicError>> + 'a>, - >, - >, - _event_type: std::marker::PhantomData, -} - -impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { - fn new( - client: &'a Client, - block_header_subscription: Subscription, - ) -> Self { - EventSubscription { - finished: false, - client, - block_header_subscription, - at: None, - _event_type: std::marker::PhantomData, - } - } -} - -impl<'a, T: Config, Evs: Decode> Unpin for EventSubscription<'a, T, Evs> {} - -// We want `EventSubscription` to implement Stream. The below implementation is the rather verbose -// way to roughly implement the following function: -// -// ``` -// fn subscribe_events(client: &'_ Client, block_sub: Subscription) -> impl Stream, BasicError>> + '_ { -// use futures::StreamExt; -// block_sub.then(move |block_header_res| async move { -// use sp_runtime::traits::Header; -// let block_header = block_header_res?; -// let block_hash = block_header.hash(); -// at(client, block_hash).await -// }) -// } -// ``` -// -// The advantage of this manual implementation is that we have a named type that we (and others) -// can derive things on, store away, alias etc. -impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> { - type Item = Result, BasicError>; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - // We are finished; return None. - if self.finished { - return Poll::Ready(None) - } - - // If there isn't an `at` function yet that's busy resolving a block hash into - // some event details, then poll the block header subscription to get one. - if self.at.is_none() { - match futures::ready!(self.block_header_subscription.poll_next_unpin(cx)) { - None => { - self.finished = true; - return Poll::Ready(None) - } - Some(Err(e)) => { - self.finished = true; - return Poll::Ready(Some(Err(e.into()))) - } - Some(Ok(block_header)) => { - use sp_runtime::traits::Header; - // Note [jsdw]: We may be able to get rid of the per-item allocation - // with https://github.com/oblique/reusable-box-future. - self.at = Some(Box::pin(at(self.client, block_header.hash()))); - // Continue, so that we poll this function future we've just created. - } - } - } - - // If we get here, there will be an `at` function stored. Unwrap it and poll it to - // completion to get our events, throwing it away as soon as it is ready. - let at_fn = self - .at - .as_mut() - .expect("'at' function should have been set above'"); - let events = futures::ready!(at_fn.poll_unpin(cx)); - self.at = None; - Poll::Ready(Some(events)) - } +// The storage key needed to access events. +fn system_events_key() -> StorageKey { + let mut storage_key = twox_128(b"System").to_vec(); + storage_key.extend(twox_128(b"Events").to_vec()); + StorageKey(storage_key) } /// A collection of events obtained from a block, bundled with the necessary /// information needed to decode and iterate over them. #[derive(Derivative)] #[derivative(Debug(bound = ""))] -pub struct Events<'a, T: Config, Evs: Decode> { +pub struct Events<'a, T: Config, Evs> { metadata: &'a Metadata, block_hash: T::Hash, // Note; raw event bytes are prefixed with a Compact containing @@ -457,7 +318,7 @@ fn decode_raw_event_details( let type_id = arg.ty().id(); let all_bytes = *input; // consume some bytes, moving the cursor forward: - decode_and_consume_type(type_id, &metadata.runtime_metadata().types, input)?; + decoding::decode_and_consume_type(type_id, &metadata.runtime_metadata().types, input)?; // count how many bytes were consumed based on remaining length: let consumed_len = all_bytes.len() - input.len(); // move those consumed bytes to the output vec unaltered: @@ -480,222 +341,14 @@ fn decode_raw_event_details( }) } -// The storage key needed to access events. -fn system_events_key() -> StorageKey { - let mut storage_key = twox_128(b"System").to_vec(); - storage_key.extend(twox_128(b"Events").to_vec()); - StorageKey(storage_key) -} - -// Given a type Id and a type registry, attempt to consume the bytes -// corresponding to that type from our input. -fn decode_and_consume_type( - type_id: u32, - types: &PortableRegistry, - input: &mut &[u8], -) -> Result<(), BasicError> { - let ty = types - .resolve(type_id) - .ok_or(MetadataError::TypeNotFound(type_id))?; - - fn consume_type(input: &mut &[u8]) -> Result<(), BasicError> { - T::decode(input)?; - Ok(()) - } - - match ty.type_def() { - TypeDef::Composite(composite) => { - for field in composite.fields() { - decode_and_consume_type(field.ty().id(), types, input)? - } - Ok(()) - } - TypeDef::Variant(variant) => { - let variant_index = u8::decode(input)?; - let variant = variant - .variants() - .iter() - .find(|v| v.index() == variant_index) - .ok_or_else(|| { - BasicError::Other(format!("Variant {} not found", variant_index)) - })?; - for field in variant.fields() { - decode_and_consume_type(field.ty().id(), types, input)?; - } - Ok(()) - } - TypeDef::Sequence(seq) => { - let len = >::decode(input)?; - for _ in 0..len.0 { - decode_and_consume_type(seq.type_param().id(), types, input)?; - } - Ok(()) - } - TypeDef::Array(arr) => { - for _ in 0..arr.len() { - decode_and_consume_type(arr.type_param().id(), types, input)?; - } - Ok(()) - } - TypeDef::Tuple(tuple) => { - for field in tuple.fields() { - decode_and_consume_type(field.id(), types, input)?; - } - Ok(()) - } - TypeDef::Primitive(primitive) => { - match primitive { - TypeDefPrimitive::Bool => consume_type::(input), - TypeDefPrimitive::Char => { - Err( - EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char) - .into(), - ) - } - TypeDefPrimitive::Str => consume_type::(input), - TypeDefPrimitive::U8 => consume_type::(input), - TypeDefPrimitive::U16 => consume_type::(input), - TypeDefPrimitive::U32 => consume_type::(input), - TypeDefPrimitive::U64 => consume_type::(input), - TypeDefPrimitive::U128 => consume_type::(input), - TypeDefPrimitive::U256 => { - Err( - EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::U256) - .into(), - ) - } - TypeDefPrimitive::I8 => consume_type::(input), - TypeDefPrimitive::I16 => consume_type::(input), - TypeDefPrimitive::I32 => consume_type::(input), - TypeDefPrimitive::I64 => consume_type::(input), - TypeDefPrimitive::I128 => consume_type::(input), - TypeDefPrimitive::I256 => { - Err( - EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::I256) - .into(), - ) - } - } - } - TypeDef::Compact(compact) => { - let inner = types - .resolve(compact.type_param().id()) - .ok_or(MetadataError::TypeNotFound(type_id))?; - let mut decode_compact_primitive = |primitive: &TypeDefPrimitive| { - match primitive { - TypeDefPrimitive::U8 => consume_type::>(input), - TypeDefPrimitive::U16 => consume_type::>(input), - TypeDefPrimitive::U32 => consume_type::>(input), - TypeDefPrimitive::U64 => consume_type::>(input), - TypeDefPrimitive::U128 => consume_type::>(input), - prim => { - Err(EventsDecodingError::InvalidCompactPrimitive(prim.clone()) - .into()) - } - } - }; - match inner.type_def() { - TypeDef::Primitive(primitive) => decode_compact_primitive(primitive), - TypeDef::Composite(composite) => { - match composite.fields() { - [field] => { - let field_ty = - types.resolve(field.ty().id()).ok_or_else(|| { - MetadataError::TypeNotFound(field.ty().id()) - })?; - if let TypeDef::Primitive(primitive) = field_ty.type_def() { - decode_compact_primitive(primitive) - } else { - Err(EventsDecodingError::InvalidCompactType( - "Composite type must have a single primitive field" - .into(), - ) - .into()) - } - } - _ => { - Err(EventsDecodingError::InvalidCompactType( - "Composite type must have a single field".into(), - ) - .into()) - } - } - } - _ => { - Err(EventsDecodingError::InvalidCompactType( - "Compact type must be a primitive or a composite type".into(), - ) - .into()) - } - } - } - TypeDef::BitSequence(bitseq) => { - let bit_store_def = types - .resolve(bitseq.bit_store_type().id()) - .ok_or(MetadataError::TypeNotFound(type_id))? - .type_def(); - - // We just need to consume the correct number of bytes. Roughly, we encode this - // as a Compact length, and then a slice of T of that length, where T is the - // bit store type. So, we ignore the bit order and only care that the bit store type - // used lines up in terms of the number of bytes it will take to encode/decode it. - match bit_store_def { - TypeDef::Primitive(TypeDefPrimitive::U8) => { - consume_type::>(input) - } - TypeDef::Primitive(TypeDefPrimitive::U16) => { - consume_type::>(input) - } - TypeDef::Primitive(TypeDefPrimitive::U32) => { - consume_type::>(input) - } - TypeDef::Primitive(TypeDefPrimitive::U64) => { - consume_type::>(input) - } - store => { - return Err(EventsDecodingError::InvalidBitSequenceType(format!( - "{:?}", - store - )) - .into()) - } - } - } - } -} - -/// The possible errors that we can run into attempting to decode events. -#[derive(Debug, thiserror::Error)] -pub enum EventsDecodingError { - /// Unsupported primitive type - #[error("Unsupported primitive type {0:?}")] - UnsupportedPrimitive(TypeDefPrimitive), - /// Invalid compact type, must be an unsigned int. - #[error("Invalid compact primitive {0:?}")] - InvalidCompactPrimitive(TypeDefPrimitive), - /// Invalid compact type; error details in string. - #[error("Invalid compact composite type {0}")] - InvalidCompactType(String), - /// Invalid bit sequence type; bit store type or bit order type used aren't supported. - #[error("Invalid bit sequence type; bit store type {0} is not supported")] - InvalidBitSequenceType(String), -} - #[cfg(test)] mod tests { use super::*; use crate::{ - error::GenericError::{ - Codec, - EventsDecoding, - Other, - }, - events::EventsDecodingError::UnsupportedPrimitive, Config, DefaultConfig, Phase, }; - use assert_matches::assert_matches; use codec::Encode; use frame_metadata::{ v14::{ @@ -712,8 +365,6 @@ mod tests { }; use std::convert::TryFrom; - type TypeId = scale_info::interner::UntrackedSymbol; - /// An "outer" events enum containing exactly one event. #[derive(Encode, Decode, TypeInfo, Clone, Debug, PartialEq)] pub enum AllEvents { @@ -739,17 +390,6 @@ mod tests { } } - /// Build a type registry that knows about the single type provided. - fn singleton_type_registry( - ) -> (TypeId, PortableRegistry) { - let m = scale_info::MetaType::new::(); - let mut types = scale_info::Registry::new(); - let id = types.register_type(&m); - let portable_registry: PortableRegistry = types.into(); - - (id, portable_registry) - } - /// Build fake metadata consisting of a single pallet that knows /// about the event type provided. fn metadata() -> Metadata { @@ -807,18 +447,7 @@ mod tests { } } - fn decode_and_consume_type_consumes_all_bytes< - T: codec::Encode + scale_info::TypeInfo + 'static, - >( - val: T, - ) { - let (type_id, registry) = singleton_type_registry::(); - let bytes = val.encode(); - let cursor = &mut &*bytes; - - decode_and_consume_type(type_id.id(), ®istry, cursor).unwrap(); - assert_eq!(cursor.len(), 0); - } + #[test] fn statically_decode_single_event() { @@ -1294,218 +923,4 @@ mod tests { }] ); } - - #[test] - fn decode_bitvec() { - use bitvec::order::Msb0; - - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1], - ); - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Msb0, u8; 0, 1, 1, 0, 1, 0, 1, 0, 0], - ); - - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Lsb0, u16; 0, 1, 1, 0, 1], - ); - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Msb0, u16; 0, 1, 1, 0, 1, 0, 1, 0, 0], - ); - - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Lsb0, u32; 0, 1, 1, 0, 1], - ); - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Msb0, u32; 0, 1, 1, 0, 1, 0, 1, 0, 0], - ); - - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Lsb0, u64; 0, 1, 1, 0, 1], - ); - decode_and_consume_type_consumes_all_bytes( - bitvec::bitvec![Msb0, u64; 0, 1, 1, 0, 1, 0, 1, 0, 0], - ); - } - - #[test] - fn decode_primitive() { - decode_and_consume_type_consumes_all_bytes(false); - decode_and_consume_type_consumes_all_bytes(true); - - let dummy_data = vec![0u8]; - let dummy_cursor = &mut &*dummy_data; - let (id, reg) = singleton_type_registry::(); - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!( - res, - Err(EventsDecoding(UnsupportedPrimitive(TypeDefPrimitive::Char))) - ); - - decode_and_consume_type_consumes_all_bytes("str".to_string()); - - decode_and_consume_type_consumes_all_bytes(1u8); - decode_and_consume_type_consumes_all_bytes(1i8); - - decode_and_consume_type_consumes_all_bytes(1u16); - decode_and_consume_type_consumes_all_bytes(1i16); - - decode_and_consume_type_consumes_all_bytes(1u32); - decode_and_consume_type_consumes_all_bytes(1i32); - - decode_and_consume_type_consumes_all_bytes(1u64); - decode_and_consume_type_consumes_all_bytes(1i64); - - decode_and_consume_type_consumes_all_bytes(1u128); - decode_and_consume_type_consumes_all_bytes(1i128); - } - - #[test] - fn decode_tuple() { - decode_and_consume_type_consumes_all_bytes(()); - - decode_and_consume_type_consumes_all_bytes((true,)); - - decode_and_consume_type_consumes_all_bytes((true, "str")); - - // Incomplete bytes for decoding - let dummy_data = false.encode(); - let dummy_cursor = &mut &*dummy_data; - let (id, reg) = singleton_type_registry::<(bool, &'static str)>(); - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!(res, Err(Codec(_))); - - // Incomplete bytes for decoding, with invalid char type - let dummy_data = (false, "str", 0u8).encode(); - let dummy_cursor = &mut &*dummy_data; - let (id, reg) = singleton_type_registry::<(bool, &'static str, char)>(); - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!( - res, - Err(EventsDecoding(UnsupportedPrimitive(TypeDefPrimitive::Char))) - ); - // The last byte (0x0 u8) should not be consumed - assert_eq!(dummy_cursor.len(), 1); - } - - #[test] - fn decode_array_and_seq() { - decode_and_consume_type_consumes_all_bytes([0]); - decode_and_consume_type_consumes_all_bytes([1, 2, 3, 4, 5]); - decode_and_consume_type_consumes_all_bytes([0; 500]); - decode_and_consume_type_consumes_all_bytes(["str", "abc", "cde"]); - - decode_and_consume_type_consumes_all_bytes(vec![0]); - decode_and_consume_type_consumes_all_bytes(vec![1, 2, 3, 4, 5]); - decode_and_consume_type_consumes_all_bytes(vec!["str", "abc", "cde"]); - } - - #[test] - fn decode_variant() { - #[derive(Clone, Encode, TypeInfo)] - enum EnumVar { - A, - B((&'static str, u8)), - C { named: i16 }, - } - const INVALID_TYPE_ID: u32 = 1024; - - decode_and_consume_type_consumes_all_bytes(EnumVar::A); - decode_and_consume_type_consumes_all_bytes(EnumVar::B(("str", 1))); - decode_and_consume_type_consumes_all_bytes(EnumVar::C { named: 1 }); - - // Invalid variant index - let dummy_data = 3u8.encode(); - let dummy_cursor = &mut &*dummy_data; - let (id, reg) = singleton_type_registry::(); - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!(res, Err(Other(_))); - - // Valid index, incomplete data - let dummy_data = 2u8.encode(); - let dummy_cursor = &mut &*dummy_data; - let res = decode_and_consume_type(id.id(), ®, dummy_cursor); - assert_matches!(res, Err(Codec(_))); - - let res = decode_and_consume_type(INVALID_TYPE_ID, ®, dummy_cursor); - assert_matches!(res, Err(crate::error::GenericError::Metadata(_))); - } - - #[test] - fn decode_composite() { - #[derive(Clone, Encode, TypeInfo)] - struct Composite {} - decode_and_consume_type_consumes_all_bytes(Composite {}); - - #[derive(Clone, Encode, TypeInfo)] - struct CompositeV2 { - id: u32, - name: String, - } - decode_and_consume_type_consumes_all_bytes(CompositeV2 { - id: 10, - name: "str".to_string(), - }); - - #[derive(Clone, Encode, TypeInfo)] - struct CompositeV3 { - id: u32, - extra: T, - } - decode_and_consume_type_consumes_all_bytes(CompositeV3 { - id: 10, - extra: vec![0, 1, 2], - }); - decode_and_consume_type_consumes_all_bytes(CompositeV3 { - id: 10, - extra: bitvec::bitvec![Lsb0, u8; 0, 1, 1, 0, 1], - }); - decode_and_consume_type_consumes_all_bytes(CompositeV3 { - id: 10, - extra: ("str", 1), - }); - decode_and_consume_type_consumes_all_bytes(CompositeV3 { - id: 10, - extra: CompositeV2 { - id: 2, - name: "str".to_string(), - }, - }); - - #[derive(Clone, Encode, TypeInfo)] - struct CompositeV4(u32, bool); - decode_and_consume_type_consumes_all_bytes(CompositeV4(1, true)); - - #[derive(Clone, Encode, TypeInfo)] - struct CompositeV5(u32); - decode_and_consume_type_consumes_all_bytes(CompositeV5(1)); - } - - #[test] - fn decode_compact() { - #[derive(Clone, Encode, TypeInfo)] - enum Compact { - A(#[codec(compact)] u32), - } - decode_and_consume_type_consumes_all_bytes(Compact::A(1)); - - #[derive(Clone, Encode, TypeInfo)] - struct CompactV2(#[codec(compact)] u32); - decode_and_consume_type_consumes_all_bytes(CompactV2(1)); - - #[derive(Clone, Encode, TypeInfo)] - struct CompactV3 { - #[codec(compact)] - val: u32, - } - decode_and_consume_type_consumes_all_bytes(CompactV3 { val: 1 }); - - #[derive(Clone, Encode, TypeInfo)] - struct CompactV4 { - #[codec(compact)] - val: T, - } - decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 0u8 }); - decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 1u16 }); - } } diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs new file mode 100644 index 0000000000..3ce18a301d --- /dev/null +++ b/subxt/src/events/filter_events.rs @@ -0,0 +1,47 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! Filtering individual events from subscriptions. + +use codec::Decode; +use super::{ Events, EventSubscription }; +use crate::{ Config, BasicError }; + +/// A stream which returns zero or more of the filtered event +/// types back. +pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter> { + sub: EventSubscription<'a, T, Evs>, + filter: std::marker::PhantomData, +} + +impl <'a, T: Config, Evs, Filter: EventFilter> FilterEvents<'a, T, Evs, Filter> { + pub (crate) fn new(sub: EventSubscription<'a, T, Evs>) -> Self { + Self { + sub, + filter: std::marker::PhantomData + } + } +} + +/// This trait is implemented for tuples of Event types; any +/// such tuple can be used to filter an event subscription to return +/// only the specified events. +pub trait EventFilter { + /// The type we'll be handed back from filtering. + type ReturnType; + /// Filter the events based on the type implementing this trait. + fn filter(events: &Events<'_, T, Evs>) -> Result; +} \ No newline at end of file diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs new file mode 100644 index 0000000000..386e143a2b --- /dev/null +++ b/subxt/src/events/mod.rs @@ -0,0 +1,41 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! For working with events. + +mod events; +mod filter_events; +mod decoding; +mod event_subscription; + +pub use events::{ + at, + Events, + EventDetails, + RawEventDetails, +}; +pub use event_subscription::{ + subscribe, + subscribe_finalized, + EventSubscription, +}; +pub use filter_events::{ + EventFilter, + FilterEvents, +}; +pub use decoding::{ + EventsDecodingError, +}; diff --git a/subxt/tests/integration/storage.rs b/subxt/tests/integration/storage.rs index 31093a4d9d..20ae278a9e 100644 --- a/subxt/tests/integration/storage.rs +++ b/subxt/tests/integration/storage.rs @@ -28,7 +28,9 @@ use sp_keyring::AccountKeyring; async fn storage_plain_lookup() -> Result<(), subxt::Error> { let ctx = test_context().await; - // Look up a plain value + // Look up a plain value. Wait long enough that we don't get the genesis block data, + // because it may have no storage associated with it. + async_std::task::sleep(std::time::Duration::from_secs(6)); let entry = ctx.api.storage().timestamp().now(None).await?; assert!(entry > 0); From 9e187fec733b01c06ec1b933db10aa7a280987ad Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 12:48:45 +0000 Subject: [PATCH 02/18] First pass filtering event(s) --- subxt/src/events/events.rs | 43 +++++++++ subxt/src/events/filter_events.rs | 142 +++++++++++++++++++++++++++--- 2 files changed, 172 insertions(+), 13 deletions(-) diff --git a/subxt/src/events/events.rs b/subxt/src/events/events.rs index 4a137b2eec..1ef47967b5 100644 --- a/subxt/src/events/events.rs +++ b/subxt/src/events/events.rs @@ -213,6 +213,49 @@ impl<'a, T: Config, Evs: Decode> Events<'a, T, Evs> { }) } + /// Iterate over all of the events, using metadata to dynamically + /// decode them as we go, and returning the raw bytes and other associated + /// details. If an error occurs, all subsequent iterations return `None`. + /// + /// This method is safe to use even if you do not statically know about + /// all of the possible events; it splits events up using the metadata + /// obtained at runtime, which does. + /// + /// Unlike [`Events::iter_raw()`] this consumes `self`, which can be useful + /// if you need to store the iterator somewhere and avoid lifetime issues. + pub fn into_iter_raw( + self, + ) -> impl Iterator> + 'a { + let mut pos = 0; + let mut index = 0; + std::iter::from_fn(move || { + let cursor = &mut &self.event_bytes[pos..]; + let start_len = cursor.len(); + + if start_len == 0 || self.num_events == index { + None + } else { + match decode_raw_event_details::(self.metadata, index, cursor) { + Ok(raw_event) => { + // Skip over decoded bytes in next iteration: + pos += start_len - cursor.len(); + // Increment the index: + index += 1; + // Return the event details: + Some(Ok(raw_event)) + } + Err(e) => { + // By setting the position to the "end" of the event bytes, + // the cursor len will become 0 and the iterator will return `None` + // from now on: + pos = self.event_bytes.len(); + Some(Err(e)) + } + } + } + }) + } + /// Iterate through the events using metadata to dynamically decode and skip /// them, and return only those which should decode to the provided `Ev` type. /// If an error occurs, all subsequent iterations return `None`. diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 3ce18a301d..af29ef5f4a 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -17,31 +17,147 @@ //! Filtering individual events from subscriptions. use codec::Decode; -use super::{ Events, EventSubscription }; -use crate::{ Config, BasicError }; +use super::{ RawEventDetails, EventSubscription }; +use crate::{ Config, BasicError, Event }; +use futures::{ Stream, StreamExt }; +use std::task::Poll; +use std::marker::Unpin; -/// A stream which returns zero or more of the filtered event -/// types back. -pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter> { +/// A stream which returns tuples containing exactly one of the +/// given event types back on each iteration. +pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter: EventFilter> { sub: EventSubscription<'a, T, Evs>, - filter: std::marker::PhantomData, + // Once we get a block back, we'll + events: Option> + 'a>> } +impl <'a, T: Config, Evs, Filter: EventFilter> Unpin for FilterEvents<'a, T, Evs, Filter> {} + impl <'a, T: Config, Evs, Filter: EventFilter> FilterEvents<'a, T, Evs, Filter> { pub (crate) fn new(sub: EventSubscription<'a, T, Evs>) -> Self { Self { sub, - filter: std::marker::PhantomData + events: None } } } -/// This trait is implemented for tuples of Event types; any -/// such tuple can be used to filter an event subscription to return -/// only the specified events. -pub trait EventFilter { +impl <'a, T: Config, Evs: Decode, Filter: EventFilter> Stream for FilterEvents<'a, T, Evs, Filter> { + type Item = Result; + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + loop { + // Drain the current events we're iterating over first: + if let Some(events_iter) = self.events.as_mut() { + match events_iter.next() { + Some(res) => return Poll::Ready(Some(res)), + None => { + self.events = None; + } + } + } + + // Wait for new events to come in: + match futures::ready!(self.sub.poll_next_unpin(cx)) { + None => return Poll::Ready(None), + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + Some(Ok(events)) => { + self.events = Some(Filter::filter(events.into_iter_raw())); + } + }; + } + } +} + +/// This trait is implemented for tuples of Event types; any such tuple (up to size 8) can be +/// used to filter an event subscription to return only the specified events. +pub trait EventFilter: private::Sealed { /// The type we'll be handed back from filtering. type ReturnType; /// Filter the events based on the type implementing this trait. - fn filter(events: &Events<'_, T, Evs>) -> Result; -} \ No newline at end of file + fn filter<'a>(events: impl Iterator> + 'a) -> Box> + 'a>; +} + +// Prevent userspace implementations of the above trait; the interface is not considered stable +// and is not a particularly nice API to work with (particularly because it involves boxing, which +// would be nice to get rid of eventually). +pub (crate) mod private { + pub trait Sealed {} +} + +// A special case impl for searching for a tuple of exactly one event (in this case, we don't +// need to return an `(Option,)`; we can just return `Event`. +impl private::Sealed for (A,) {} +impl EventFilter for (A,) { + type ReturnType = A; + fn filter<'a>(mut events: impl Iterator> + 'a) -> Box> + 'a> { + // Return an iterator that populates exactly 1 of the tuple options each pass, + // or bails with None if none of them could be populated. + Box::new(std::iter::from_fn(move || { + while let Some(ev) = events.next() { + // Forward any error immediately: + let ev = match ev { + Ok(ev) => ev, + Err(e) => return Some(Err(e.into())) + }; + // Try decoding each type until we hit a match or an error: + let ev = ev.as_event::(); + if let Ok(Some(ev)) = ev { + // We found a match; return our tuple. + return Some(Ok(ev)); + } + if let Err(e) = ev { + // We hit an error. Return it. + return Some(Err(e.into())) + } + } + None + })) + } +} + +// A generalised impl for tuples of sizes greater than 1: +macro_rules! impl_event_filter { + ($($ty:ident $idx:tt),+) => { + impl <$($ty: Event),+> private::Sealed for ( $($ty,)+ ) {} + impl <$($ty: Event),+> EventFilter for ( $($ty,)+ ) { + type ReturnType = ( $(Option<$ty>,)+ ); + fn filter<'a>(mut events: impl Iterator> + 'a) -> Box> + 'a> { + // Return an iterator that populates exactly 1 of the tuple options each pass, + // or bails with None if none of them could be populated. + Box::new(std::iter::from_fn(move || { + let mut out: ( $(Option<$ty>,)+ ) = Default::default(); + while let Some(ev) = events.next() { + // Forward any error immediately: + let ev = match ev { + Ok(ev) => ev, + Err(e) => return Some(Err(e.into())) + }; + // Try decoding each type until we hit a match or an error: + $({ + let ev = ev.as_event::<$ty>(); + if let Ok(Some(ev)) = ev { + // We found a match; return our tuple. + out.$idx = Some(ev); + return Some(Ok(out)); + } + if let Err(e) = ev { + // We hit an error. Return it. + return Some(Err(e.into())) + } + })+ + } + None + })) + } + } + } +} + +impl_event_filter!(A 0, B 1); +impl_event_filter!(A 0, B 1, C 2); +impl_event_filter!(A 0, B 1, C 2, D 3); +impl_event_filter!(A 0, B 1, C 2, D 3, E 4); +impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5); +impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6); +impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7); + From a3383daa591e40c4f795c5a452b374808eb12611 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 14:08:33 +0000 Subject: [PATCH 03/18] Tweak event examples to show filter_events --- examples/examples/subscribe_one_event.rs | 22 ++--- examples/examples/subscribe_some_events.rs | 103 +++++++++++++++++++++ subxt/src/events/filter_events.rs | 17 ++-- 3 files changed, 116 insertions(+), 26 deletions(-) create mode 100644 examples/examples/subscribe_some_events.rs diff --git a/examples/examples/subscribe_one_event.rs b/examples/examples/subscribe_one_event.rs index e58a92dba0..77eaaff6e5 100644 --- a/examples/examples/subscribe_one_event.rs +++ b/examples/examples/subscribe_one_event.rs @@ -22,11 +22,7 @@ //! polkadot --dev --tmp //! ``` -use futures::{ - future, - stream, - StreamExt, -}; +use futures::StreamExt; use sp_keyring::AccountKeyring; use std::time::Duration; use subxt::{ @@ -51,21 +47,15 @@ async fn main() -> Result<(), Box> { .await? .to_runtime_api::>>(); - // Subscribe to just balance transfer events, making use of `flat_map` and - // `filter_map` from the StreamExt trait to filter everything else out. + // Subscribe to just balance transfer events, making use of `filter_events` + // to select a single event type (note the 1-tuple) to filter out and return. let mut transfer_events = api .events() .subscribe() .await? - // Ignore errors returning events: - .filter_map(|events| future::ready(events.ok())) - // Map events to just the one we care about: - .flat_map(|events| { - let transfer_events = events - .find::() - .collect::>(); - stream::iter(transfer_events) - }); + .filter_events::<( + polkadot::balances::events::Transfer, + )>(); // While this subscription is active, we imagine some balance transfers are made somewhere else: async_std::task::spawn(async { diff --git a/examples/examples/subscribe_some_events.rs b/examples/examples/subscribe_some_events.rs new file mode 100644 index 0000000000..274fc72a13 --- /dev/null +++ b/examples/examples/subscribe_some_events.rs @@ -0,0 +1,103 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// This file is part of subxt. +// +// subxt is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// subxt is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with subxt. If not, see . + +//! To run this example, a local polkadot node should be running. Example verified against polkadot 0.9.13-82616422d0-aarch64-macos. +//! +//! E.g. +//! ```bash +//! curl "https://github.com/paritytech/polkadot/releases/download/v0.9.13/polkadot" --output /usr/local/bin/polkadot --location +//! polkadot --dev --tmp +//! ``` + +use futures::StreamExt; +use sp_keyring::AccountKeyring; +use std::time::Duration; +use subxt::{ + ClientBuilder, + DefaultConfig, + DefaultExtra, + PairSigner, +}; + +#[subxt::subxt(runtime_metadata_path = "examples/polkadot_metadata.scale")] +pub mod polkadot {} + +/// Subscribe to all events, and then manually look through them and +/// pluck out the events that we care about. +#[async_std::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + // Subscribe to any events that occur: + let api = ClientBuilder::new() + .build() + .await? + .to_runtime_api::>>(); + + // Subscribe to several balance related events. If we ask for more than one event, + // we'll be given a correpsonding tuple of `Option`'s, with exactly one + // variant populated each time. + let mut balance_events = api + .events() + .subscribe() + .await? + .filter_events::<( + polkadot::balances::events::Withdraw, + polkadot::balances::events::Transfer, + polkadot::balances::events::Deposit, + )>(); + + // While this subscription is active, we imagine some balance transfers are made somewhere else: + async_std::task::spawn(async { + let signer = PairSigner::new(AccountKeyring::Alice.pair()); + let api = ClientBuilder::new() + .build() + .await + .unwrap() + .to_runtime_api::>>(); + + // Make small balance transfers from Alice to Bob in a loop: + loop { + api.tx() + .balances() + .transfer(AccountKeyring::Bob.to_account_id().into(), 1_000_000_000) + .sign_and_submit(&signer) + .await + .unwrap(); + async_std::task::sleep(Duration::from_secs(10)).await; + } + }); + + // Our subscription will see all of the balance events we're filtering on: + while let Some(ev) = balance_events.next().await { + match ev? { + (Some(withdraw), _, _) => { + println!("Withdraw event: {withdraw:?}"); + }, + (_, Some(transfer), _) => { + println!("Transfer event: {transfer:?}"); + }, + (_, _, Some(deposit)) => { + println!("Deposit event: {deposit:?}"); + }, + _ => { + unreachable!(); + } + } + } + + Ok(()) +} diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index af29ef5f4a..6243fae99e 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -27,7 +27,6 @@ use std::marker::Unpin; /// given event types back on each iteration. pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter: EventFilter> { sub: EventSubscription<'a, T, Evs>, - // Once we get a block back, we'll events: Option> + 'a>> } @@ -86,12 +85,10 @@ pub (crate) mod private { // A special case impl for searching for a tuple of exactly one event (in this case, we don't // need to return an `(Option,)`; we can just return `Event`. -impl private::Sealed for (A,) {} -impl EventFilter for (A,) { - type ReturnType = A; - fn filter<'a>(mut events: impl Iterator> + 'a) -> Box> + 'a> { - // Return an iterator that populates exactly 1 of the tuple options each pass, - // or bails with None if none of them could be populated. +impl private::Sealed for (Ev,) {} +impl EventFilter for (Ev,) { + type ReturnType = Ev; + fn filter<'a>(mut events: impl Iterator> + 'a) -> Box> + 'a> { Box::new(std::iter::from_fn(move || { while let Some(ev) = events.next() { // Forward any error immediately: @@ -100,7 +97,7 @@ impl EventFilter for (A,) { Err(e) => return Some(Err(e.into())) }; // Try decoding each type until we hit a match or an error: - let ev = ev.as_event::(); + let ev = ev.as_event::(); if let Ok(Some(ev)) = ev { // We found a match; return our tuple. return Some(Ok(ev)); @@ -122,8 +119,8 @@ macro_rules! impl_event_filter { impl <$($ty: Event),+> EventFilter for ( $($ty,)+ ) { type ReturnType = ( $(Option<$ty>,)+ ); fn filter<'a>(mut events: impl Iterator> + 'a) -> Box> + 'a> { - // Return an iterator that populates exactly 1 of the tuple options each pass, - // or bails with None if none of them could be populated. + // Return an iterator that populates exactly 1 of the tuple options each, + // iteration, or bails with None if none of them could be populated. Box::new(std::iter::from_fn(move || { let mut out: ( $(Option<$ty>,)+ ) = Default::default(); while let Some(ev) = events.next() { From fc25bc4bdb0c74153045af2f54922fbc2a1fb118 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 14:13:37 +0000 Subject: [PATCH 04/18] cargo clippy + fmt --- examples/examples/subscribe_one_event.rs | 4 +- examples/examples/subscribe_some_events.rs | 20 +++--- subxt/src/events/decoding.rs | 25 ++++--- subxt/src/events/event_subscription.rs | 14 ++-- .../src/events/{events.rs => events_type.rs} | 14 ++-- subxt/src/events/filter_events.rs | 69 ++++++++++++------- subxt/src/events/mod.rs | 20 +++--- 7 files changed, 88 insertions(+), 78 deletions(-) rename subxt/src/events/{events.rs => events_type.rs} (99%) diff --git a/examples/examples/subscribe_one_event.rs b/examples/examples/subscribe_one_event.rs index 77eaaff6e5..b6e6c65519 100644 --- a/examples/examples/subscribe_one_event.rs +++ b/examples/examples/subscribe_one_event.rs @@ -53,9 +53,7 @@ async fn main() -> Result<(), Box> { .events() .subscribe() .await? - .filter_events::<( - polkadot::balances::events::Transfer, - )>(); + .filter_events::<(polkadot::balances::events::Transfer,)>(); // While this subscription is active, we imagine some balance transfers are made somewhere else: async_std::task::spawn(async { diff --git a/examples/examples/subscribe_some_events.rs b/examples/examples/subscribe_some_events.rs index 274fc72a13..a8b82ff30c 100644 --- a/examples/examples/subscribe_some_events.rs +++ b/examples/examples/subscribe_some_events.rs @@ -50,15 +50,11 @@ async fn main() -> Result<(), Box> { // Subscribe to several balance related events. If we ask for more than one event, // we'll be given a correpsonding tuple of `Option`'s, with exactly one // variant populated each time. - let mut balance_events = api - .events() - .subscribe() - .await? - .filter_events::<( - polkadot::balances::events::Withdraw, - polkadot::balances::events::Transfer, - polkadot::balances::events::Deposit, - )>(); + let mut balance_events = api.events().subscribe().await?.filter_events::<( + polkadot::balances::events::Withdraw, + polkadot::balances::events::Transfer, + polkadot::balances::events::Deposit, + )>(); // While this subscription is active, we imagine some balance transfers are made somewhere else: async_std::task::spawn(async { @@ -86,13 +82,13 @@ async fn main() -> Result<(), Box> { match ev? { (Some(withdraw), _, _) => { println!("Withdraw event: {withdraw:?}"); - }, + } (_, Some(transfer), _) => { println!("Transfer event: {transfer:?}"); - }, + } (_, _, Some(deposit)) => { println!("Deposit event: {deposit:?}"); - }, + } _ => { unreachable!(); } diff --git a/subxt/src/events/decoding.rs b/subxt/src/events/decoding.rs index c3f6cb59d2..e2ea7834e8 100644 --- a/subxt/src/events/decoding.rs +++ b/subxt/src/events/decoding.rs @@ -232,18 +232,14 @@ pub enum EventsDecodingError { #[cfg(test)] mod tests { use super::*; - use crate::{ - error::GenericError::{ - Codec, - EventsDecoding, - Other, - }, + use crate::error::GenericError::{ + Codec, + EventsDecoding, + Other, }; use assert_matches::assert_matches; use codec::Encode; - use scale_info::{ - TypeInfo, - }; + use scale_info::TypeInfo; type TypeId = scale_info::interner::UntrackedSymbol; @@ -315,7 +311,9 @@ mod tests { let res = decode_and_consume_type(id.id(), ®, dummy_cursor); assert_matches!( res, - Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char))) + Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive( + TypeDefPrimitive::Char + ))) ); decode_and_consume_type_consumes_all_bytes("str".to_string()); @@ -358,7 +356,9 @@ mod tests { let res = decode_and_consume_type(id.id(), ®, dummy_cursor); assert_matches!( res, - Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive(TypeDefPrimitive::Char))) + Err(EventsDecoding(EventsDecodingError::UnsupportedPrimitive( + TypeDefPrimitive::Char + ))) ); // The last byte (0x0 u8) should not be consumed assert_eq!(dummy_cursor.len(), 1); @@ -484,5 +484,4 @@ mod tests { decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 0u8 }); decode_and_consume_type_consumes_all_bytes(CompactV4 { val: 1u16 }); } - -} \ No newline at end of file +} diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 7977c799f9..2d44e8510e 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -21,9 +21,7 @@ use crate::{ Client, Config, }; -use codec::{ - Decode, -}; +use codec::Decode; use derivative::Derivative; use futures::{ Future, @@ -39,12 +37,12 @@ use std::{ pub use super::{ at, - Events, EventDetails, - RawEventDetails, + EventFilter, + Events, EventsDecodingError, FilterEvents, - EventFilter, + RawEventDetails, }; /// Subscribe to events from blocks. @@ -107,7 +105,9 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { } /// Return only specific events based on the types provided. - pub fn filter_events(self) -> FilterEvents<'a, T, Evs, EvFilter> { + pub fn filter_events( + self, + ) -> FilterEvents<'a, T, Evs, EvFilter> { FilterEvents::new(self) } } diff --git a/subxt/src/events/events.rs b/subxt/src/events/events_type.rs similarity index 99% rename from subxt/src/events/events.rs rename to subxt/src/events/events_type.rs index 1ef47967b5..f1dd8bd816 100644 --- a/subxt/src/events/events.rs +++ b/subxt/src/events/events_type.rs @@ -16,6 +16,7 @@ //! A representation of a block of events. +use super::decoding; use crate::{ error::BasicError, Client, @@ -32,11 +33,10 @@ use codec::{ }; use derivative::Derivative; use sp_core::{ - Bytes, - twox_128, storage::StorageKey, + twox_128, + Bytes, }; -use super::decoding; /// Obtain events at some block hash. The generic parameter is what we /// will attempt to decode each event into if using [`Events::iter()`], @@ -361,7 +361,11 @@ fn decode_raw_event_details( let type_id = arg.ty().id(); let all_bytes = *input; // consume some bytes, moving the cursor forward: - decoding::decode_and_consume_type(type_id, &metadata.runtime_metadata().types, input)?; + decoding::decode_and_consume_type( + type_id, + &metadata.runtime_metadata().types, + input, + )?; // count how many bytes were consumed based on remaining length: let consumed_len = all_bytes.len() - input.len(); // move those consumed bytes to the output vec unaltered: @@ -490,8 +494,6 @@ mod tests { } } - - #[test] fn statically_decode_single_event() { #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 6243fae99e..30660f483d 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -16,34 +16,48 @@ //! Filtering individual events from subscriptions. +use super::{ + EventSubscription, + RawEventDetails, +}; +use crate::{ + BasicError, + Config, + Event, +}; use codec::Decode; -use super::{ RawEventDetails, EventSubscription }; -use crate::{ Config, BasicError, Event }; -use futures::{ Stream, StreamExt }; -use std::task::Poll; -use std::marker::Unpin; +use futures::{ + Stream, + StreamExt, +}; +use std::{ + marker::Unpin, + task::Poll, +}; /// A stream which returns tuples containing exactly one of the /// given event types back on each iteration. pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter: EventFilter> { sub: EventSubscription<'a, T, Evs>, - events: Option> + 'a>> + events: Option> + 'a>>, } -impl <'a, T: Config, Evs, Filter: EventFilter> Unpin for FilterEvents<'a, T, Evs, Filter> {} +impl<'a, T: Config, Evs, Filter: EventFilter> Unpin for FilterEvents<'a, T, Evs, Filter> {} -impl <'a, T: Config, Evs, Filter: EventFilter> FilterEvents<'a, T, Evs, Filter> { - pub (crate) fn new(sub: EventSubscription<'a, T, Evs>) -> Self { - Self { - sub, - events: None - } +impl<'a, T: Config, Evs, Filter: EventFilter> FilterEvents<'a, T, Evs, Filter> { + pub(crate) fn new(sub: EventSubscription<'a, T, Evs>) -> Self { + Self { sub, events: None } } } -impl <'a, T: Config, Evs: Decode, Filter: EventFilter> Stream for FilterEvents<'a, T, Evs, Filter> { +impl<'a, T: Config, Evs: Decode, Filter: EventFilter> Stream + for FilterEvents<'a, T, Evs, Filter> +{ type Item = Result; - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { loop { // Drain the current events we're iterating over first: if let Some(events_iter) = self.events.as_mut() { @@ -73,34 +87,38 @@ pub trait EventFilter: private::Sealed { /// The type we'll be handed back from filtering. type ReturnType; /// Filter the events based on the type implementing this trait. - fn filter<'a>(events: impl Iterator> + 'a) -> Box> + 'a>; + fn filter<'a>( + events: impl Iterator> + 'a, + ) -> Box> + 'a>; } // Prevent userspace implementations of the above trait; the interface is not considered stable // and is not a particularly nice API to work with (particularly because it involves boxing, which // would be nice to get rid of eventually). -pub (crate) mod private { +pub(crate) mod private { pub trait Sealed {} } // A special case impl for searching for a tuple of exactly one event (in this case, we don't // need to return an `(Option,)`; we can just return `Event`. -impl private::Sealed for (Ev,) {} -impl EventFilter for (Ev,) { +impl private::Sealed for (Ev,) {} +impl EventFilter for (Ev,) { type ReturnType = Ev; - fn filter<'a>(mut events: impl Iterator> + 'a) -> Box> + 'a> { + fn filter<'a>( + mut events: impl Iterator> + 'a, + ) -> Box> + 'a> { Box::new(std::iter::from_fn(move || { - while let Some(ev) = events.next() { + for ev in events.by_ref() { // Forward any error immediately: let ev = match ev { Ok(ev) => ev, - Err(e) => return Some(Err(e.into())) + Err(e) => return Some(Err(e)), }; // Try decoding each type until we hit a match or an error: let ev = ev.as_event::(); if let Ok(Some(ev)) = ev { // We found a match; return our tuple. - return Some(Ok(ev)); + return Some(Ok(ev)) } if let Err(e) = ev { // We hit an error. Return it. @@ -123,11 +141,11 @@ macro_rules! impl_event_filter { // iteration, or bails with None if none of them could be populated. Box::new(std::iter::from_fn(move || { let mut out: ( $(Option<$ty>,)+ ) = Default::default(); - while let Some(ev) = events.next() { + for ev in events.by_ref() { // Forward any error immediately: let ev = match ev { Ok(ev) => ev, - Err(e) => return Some(Err(e.into())) + Err(e) => return Some(Err(e)) }; // Try decoding each type until we hit a match or an error: $({ @@ -157,4 +175,3 @@ impl_event_filter!(A 0, B 1, C 2, D 3, E 4); impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5); impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6); impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7); - diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs index 386e143a2b..ebb70d1f32 100644 --- a/subxt/src/events/mod.rs +++ b/subxt/src/events/mod.rs @@ -16,26 +16,24 @@ //! For working with events. -mod events; -mod filter_events; mod decoding; mod event_subscription; +mod events_type; +mod filter_events; -pub use events::{ - at, - Events, - EventDetails, - RawEventDetails, -}; +pub use decoding::EventsDecodingError; pub use event_subscription::{ subscribe, subscribe_finalized, EventSubscription, }; +pub use events_type::{ + at, + EventDetails, + Events, + RawEventDetails, +}; pub use filter_events::{ EventFilter, FilterEvents, }; -pub use decoding::{ - EventsDecodingError, -}; From c6702f93358a75b47ff4f8369fe57def114cbcc7 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 14:45:52 +0000 Subject: [PATCH 05/18] consistify and tidy --- examples/examples/submit_and_watch.rs | 10 +++++----- examples/examples/subscribe_all_events.rs | 2 +- examples/examples/subscribe_some_events.rs | 23 ++++++++++------------ subxt/src/events/events_type.rs | 2 +- subxt/src/transaction.rs | 4 ++-- subxt/tests/integration/events.rs | 4 ++-- subxt/tests/integration/frame/balances.rs | 8 ++++---- subxt/tests/integration/frame/contracts.rs | 8 ++++---- subxt/tests/integration/storage.rs | 2 +- 9 files changed, 30 insertions(+), 33 deletions(-) diff --git a/examples/examples/submit_and_watch.rs b/examples/examples/submit_and_watch.rs index c1d72ca2fe..6ca86a57ad 100644 --- a/examples/examples/submit_and_watch.rs +++ b/examples/examples/submit_and_watch.rs @@ -67,7 +67,7 @@ async fn simple_transfer() -> Result<(), Box> { .await?; let transfer_event = - balance_transfer.find_first_event::()?; + balance_transfer.find_first::()?; if let Some(event) = transfer_event { println!("Balance transfer success: {event:?}"); @@ -108,7 +108,7 @@ async fn simple_transfer_separate_events() -> Result<(), Box()?; + events.find_first::()?; if let Some(_ev) = failed_event { // We found a failed event; the transfer didn't succeed. @@ -117,7 +117,7 @@ async fn simple_transfer_separate_events() -> Result<(), Box()?; + events.find_first::()?; if let Some(event) = transfer_event { println!("Balance transfer success: {event:?}"); } else { @@ -161,7 +161,7 @@ async fn handle_transfer_events() -> Result<(), Box> { let events = details.wait_for_success().await?; let transfer_event = - events.find_first_event::()?; + events.find_first::()?; if let Some(event) = transfer_event { println!( @@ -181,7 +181,7 @@ async fn handle_transfer_events() -> Result<(), Box> { let events = details.wait_for_success().await?; let transfer_event = - events.find_first_event::()?; + events.find_first::()?; if let Some(event) = transfer_event { println!("Balance transfer success: {event:?}"); diff --git a/examples/examples/subscribe_all_events.rs b/examples/examples/subscribe_all_events.rs index 92b5cae1b1..6c23cd95aa 100644 --- a/examples/examples/subscribe_all_events.rs +++ b/examples/examples/subscribe_all_events.rs @@ -103,7 +103,7 @@ async fn main() -> Result<(), Box> { // Or we can dynamically find the first transfer event, ignoring any others: let transfer_event = - events.find_first_event::()?; + events.find_first::()?; if let Some(ev) = transfer_event { println!(" - Balance transfer success: value: {:?}", ev.amount); diff --git a/examples/examples/subscribe_some_events.rs b/examples/examples/subscribe_some_events.rs index a8b82ff30c..513eb8c179 100644 --- a/examples/examples/subscribe_some_events.rs +++ b/examples/examples/subscribe_some_events.rs @@ -79,19 +79,16 @@ async fn main() -> Result<(), Box> { // Our subscription will see all of the balance events we're filtering on: while let Some(ev) = balance_events.next().await { - match ev? { - (Some(withdraw), _, _) => { - println!("Withdraw event: {withdraw:?}"); - } - (_, Some(transfer), _) => { - println!("Transfer event: {transfer:?}"); - } - (_, _, Some(deposit)) => { - println!("Deposit event: {deposit:?}"); - } - _ => { - unreachable!(); - } + let ev = ev?; + + if let (Some(withdraw), _, _) = &ev { + println!("Withdraw event: {withdraw:?}"); + } + if let (_, Some(transfer), _) = &ev { + println!("Transfer event: {transfer:?}"); + } + if let (_, _, Some(deposit)) = &ev { + println!("Deposit event: {deposit:?}"); } } diff --git a/subxt/src/events/events_type.rs b/subxt/src/events/events_type.rs index f1dd8bd816..e39fb4fd17 100644 --- a/subxt/src/events/events_type.rs +++ b/subxt/src/events/events_type.rs @@ -274,7 +274,7 @@ impl<'a, T: Config, Evs: Decode> Events<'a, T, Evs> { /// /// **Note:** This method internally uses [`Events::iter_raw()`], so it is safe to /// use even if you do not statically know about all of the possible events. - pub fn find_first_event(&self) -> Result, BasicError> { + pub fn find_first(&self) -> Result, BasicError> { self.find::().next().transpose() } diff --git a/subxt/src/transaction.rs b/subxt/src/transaction.rs index 188dc70b2d..db64288236 100644 --- a/subxt/src/transaction.rs +++ b/subxt/src/transaction.rs @@ -514,9 +514,9 @@ impl<'client, T: Config, Evs: Decode> TransactionEvents<'client, T, Evs> { /// Iterate through the transaction events using metadata to dynamically decode and skip /// them, and return the first event found which decodes to the provided `Ev` type. /// - /// This works in the same way that [`events::Events::find_first_event()`] does, with the + /// This works in the same way that [`events::Events::find_first()`] does, with the /// exception that it ignores events not related to the submitted extrinsic. - pub fn find_first_event(&self) -> Result, BasicError> { + pub fn find_first(&self) -> Result, BasicError> { self.find::().next().transpose() } diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index 2262860354..2cb2938c62 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -78,7 +78,7 @@ async fn subscription_produces_events_each_block() -> Result<(), subxt::BasicErr .await .expect("events expected each block")?; let success_event = events - .find_first_event::() + .find_first::() .expect("decode error"); // Every now and then I get no bytes back for the first block events; // I assume that this might be the case for the genesis block, so don't @@ -104,7 +104,7 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { async move { let events = events.ok()?; events - .find_first_event::() + .find_first::() .ok()? } }); diff --git a/subxt/tests/integration/frame/balances.rs b/subxt/tests/integration/frame/balances.rs index 918c273dbf..4d92136ca2 100644 --- a/subxt/tests/integration/frame/balances.rs +++ b/subxt/tests/integration/frame/balances.rs @@ -63,11 +63,11 @@ async fn tx_basic_transfer() -> Result<(), subxt::Error> { .wait_for_finalized_success() .await?; let event = events - .find_first_event::() + .find_first::() .expect("Failed to decode balances::events::Transfer") .expect("Failed to find balances::events::Transfer"); let _extrinsic_success = events - .find_first_event::() + .find_first::() .expect("Failed to decode ExtrinisicSuccess") .expect("Failed to find ExtrinisicSuccess"); @@ -125,7 +125,7 @@ async fn storage_balance_lock() -> Result<(), subxt::Error> { .await? .wait_for_finalized_success() .await? - .find_first_event::()? + .find_first::()? .expect("No ExtrinsicSuccess Event found"); let locks = cxt @@ -205,7 +205,7 @@ async fn transfer_implicit_subscription() { .wait_for_finalized_success() .await .unwrap() - .find_first_event::() + .find_first::() .expect("Can decode events") .expect("Can find balance transfer event"); diff --git a/subxt/tests/integration/frame/contracts.rs b/subxt/tests/integration/frame/contracts.rs index 169b92c55f..45d5bc5c8e 100644 --- a/subxt/tests/integration/frame/contracts.rs +++ b/subxt/tests/integration/frame/contracts.rs @@ -100,13 +100,13 @@ impl ContractsTestContext { .await?; let code_stored = events - .find_first_event::()? + .find_first::()? .ok_or_else(|| Error::Other("Failed to find a CodeStored event".into()))?; let instantiated = events - .find_first_event::()? + .find_first::()? .ok_or_else(|| Error::Other("Failed to find a Instantiated event".into()))?; let _extrinsic_success = events - .find_first_event::()? + .find_first::()? .ok_or_else(|| { Error::Other("Failed to find a ExtrinsicSuccess event".into()) })?; @@ -141,7 +141,7 @@ impl ContractsTestContext { log::info!("Instantiate result: {:?}", result); let instantiated = result - .find_first_event::()? + .find_first::()? .ok_or_else(|| Error::Other("Failed to find a Instantiated event".into()))?; Ok(instantiated.contract) diff --git a/subxt/tests/integration/storage.rs b/subxt/tests/integration/storage.rs index 20ae278a9e..3f7823d428 100644 --- a/subxt/tests/integration/storage.rs +++ b/subxt/tests/integration/storage.rs @@ -30,7 +30,7 @@ async fn storage_plain_lookup() -> Result<(), subxt::Error> { // Look up a plain value. Wait long enough that we don't get the genesis block data, // because it may have no storage associated with it. - async_std::task::sleep(std::time::Duration::from_secs(6)); + async_std::task::sleep(std::time::Duration::from_secs(6)).await; let entry = ctx.api.storage().timestamp().now(None).await?; assert!(entry > 0); From f6cc1bd2394212d829bd33b37b8a75f06fd46639 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 15:22:49 +0000 Subject: [PATCH 06/18] cargo fmt --- subxt/tests/integration/events.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index 2cb2938c62..de1962dbde 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -103,9 +103,7 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { let event_sub = ctx.api.events().subscribe().await?.filter_map(|events| { async move { let events = events.ok()?; - events - .find_first::() - .ok()? + events.find_first::().ok()? } }); From 7ff6ce6805964d38a238e9505eb1ad38b7be1913 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 15:30:59 +0000 Subject: [PATCH 07/18] Tweak a couple of comments --- subxt/src/events/event_subscription.rs | 7 ++++--- subxt/src/events/filter_events.rs | 11 ++++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 2d44e8510e..fdf4818577 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -104,10 +104,11 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { } } - /// Return only specific events based on the types provided. - pub fn filter_events( + /// Return only specific events matching the tuple of 1 or more event + /// types that has been provided as the `Filter` type parameter. + pub fn filter_events( self, - ) -> FilterEvents<'a, T, Evs, EvFilter> { + ) -> FilterEvents<'a, T, Evs, Filter> { FilterEvents::new(self) } } diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 30660f483d..68d97c11cb 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -35,8 +35,11 @@ use std::{ task::Poll, }; -/// A stream which returns tuples containing exactly one of the -/// given event types back on each iteration. +/// A stream which filters events based on the `Filter` param provided. +/// If `Filter` is a 1-tuple of a single `Event` type, it will return every +/// instance of that event as it's found. If `filter` is ` tuple of multiple +/// `Event` types, it will return a corresponding tuple of `Option`s, where +/// exactly one of these will be `Some(event)` each iteration. pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter: EventFilter> { sub: EventSubscription<'a, T, Evs>, events: Option> + 'a>>, @@ -136,7 +139,9 @@ macro_rules! impl_event_filter { impl <$($ty: Event),+> private::Sealed for ( $($ty,)+ ) {} impl <$($ty: Event),+> EventFilter for ( $($ty,)+ ) { type ReturnType = ( $(Option<$ty>,)+ ); - fn filter<'a>(mut events: impl Iterator> + 'a) -> Box> + 'a> { + fn filter<'a>( + mut events: impl Iterator> + 'a + ) -> Box> + 'a> { // Return an iterator that populates exactly 1 of the tuple options each, // iteration, or bails with None if none of them could be populated. Box::new(std::iter::from_fn(move || { From 35c98ade9ce719417e3c4bd0a5b2cf2165b82253 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 15:57:27 +0000 Subject: [PATCH 08/18] Expose phase and block_hash of filtered events, too --- examples/examples/subscribe_some_events.rs | 18 ++++-- subxt/src/events/filter_events.rs | 75 ++++++++++++++-------- 2 files changed, 60 insertions(+), 33 deletions(-) diff --git a/examples/examples/subscribe_some_events.rs b/examples/examples/subscribe_some_events.rs index 513eb8c179..f57a67f01f 100644 --- a/examples/examples/subscribe_some_events.rs +++ b/examples/examples/subscribe_some_events.rs @@ -79,16 +79,20 @@ async fn main() -> Result<(), Box> { // Our subscription will see all of the balance events we're filtering on: while let Some(ev) = balance_events.next().await { - let ev = ev?; + let event_details = ev?; - if let (Some(withdraw), _, _) = &ev { - println!("Withdraw event: {withdraw:?}"); + let block_hash = event_details.block_hash; + let event = event_details.event; + println!("Event at {:?}:", block_hash); + + if let (Some(withdraw), _, _) = &event { + println!(" Withdraw event: {withdraw:?}"); } - if let (_, Some(transfer), _) = &ev { - println!("Transfer event: {transfer:?}"); + if let (_, Some(transfer), _) = &event { + println!(" Transfer event: {transfer:?}"); } - if let (_, _, Some(deposit)) = &ev { - println!("Deposit event: {deposit:?}"); + if let (_, _, Some(deposit)) = &event { + println!(" Deposit event: {deposit:?}"); } } diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 68d97c11cb..85d4a63978 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -17,13 +17,14 @@ //! Filtering individual events from subscriptions. use super::{ + Events, EventSubscription, - RawEventDetails, }; use crate::{ BasicError, Config, Event, + Phase }; use codec::Decode; use futures::{ @@ -42,7 +43,7 @@ use std::{ /// exactly one of these will be `Some(event)` each iteration. pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter: EventFilter> { sub: EventSubscription<'a, T, Evs>, - events: Option> + 'a>>, + events: Option, BasicError>> + 'a>>, } impl<'a, T: Config, Evs, Filter: EventFilter> Unpin for FilterEvents<'a, T, Evs, Filter> {} @@ -56,7 +57,7 @@ impl<'a, T: Config, Evs, Filter: EventFilter> FilterEvents<'a, T, Evs, Filter> { impl<'a, T: Config, Evs: Decode, Filter: EventFilter> Stream for FilterEvents<'a, T, Evs, Filter> { - type Item = Result; + type Item = Result, BasicError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -77,22 +78,34 @@ impl<'a, T: Config, Evs: Decode, Filter: EventFilter> Stream None => return Poll::Ready(None), Some(Err(e)) => return Poll::Ready(Some(Err(e))), Some(Ok(events)) => { - self.events = Some(Filter::filter(events.into_iter_raw())); + self.events = Some(Filter::filter(events)); } }; } } } +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct FilteredEventDetails { + /// During which [`Phase`] was the event produced? + pub phase: Phase, + /// Hash of the block that this event came from. + pub block_hash: BlockHash, + /// A type containing an event that we've filtered on. + /// Depending on the filter type, this may be a tuple + /// or a single event. + pub event: Evs +} + /// This trait is implemented for tuples of Event types; any such tuple (up to size 8) can be /// used to filter an event subscription to return only the specified events. pub trait EventFilter: private::Sealed { /// The type we'll be handed back from filtering. type ReturnType; /// Filter the events based on the type implementing this trait. - fn filter<'a>( - events: impl Iterator> + 'a, - ) -> Box> + 'a>; + fn filter<'a, T: Config, Evs: Decode + 'static>( + events: Events<'a, T, Evs>, + ) -> Box, BasicError>> + 'a>; } // Prevent userspace implementations of the above trait; the interface is not considered stable @@ -107,21 +120,27 @@ pub(crate) mod private { impl private::Sealed for (Ev,) {} impl EventFilter for (Ev,) { type ReturnType = Ev; - fn filter<'a>( - mut events: impl Iterator> + 'a, - ) -> Box> + 'a> { + fn filter<'a, T: Config, Evs: Decode + 'static>( + events: Events<'a, T, Evs>, + ) -> Box, BasicError>> + 'a> { + let block_hash = events.block_hash(); + let mut iter = events.into_iter_raw(); Box::new(std::iter::from_fn(move || { - for ev in events.by_ref() { + for ev in iter.by_ref() { // Forward any error immediately: - let ev = match ev { + let raw_event = match ev { Ok(ev) => ev, Err(e) => return Some(Err(e)), }; // Try decoding each type until we hit a match or an error: - let ev = ev.as_event::(); - if let Ok(Some(ev)) = ev { + let ev = raw_event.as_event::(); + if let Ok(Some(event)) = ev { // We found a match; return our tuple. - return Some(Ok(ev)) + return Some(Ok(FilteredEventDetails { + phase: raw_event.phase, + block_hash: block_hash, + event: event + })) } if let Err(e) = ev { // We hit an error. Return it. @@ -139,30 +158,34 @@ macro_rules! impl_event_filter { impl <$($ty: Event),+> private::Sealed for ( $($ty,)+ ) {} impl <$($ty: Event),+> EventFilter for ( $($ty,)+ ) { type ReturnType = ( $(Option<$ty>,)+ ); - fn filter<'a>( - mut events: impl Iterator> + 'a - ) -> Box> + 'a> { - // Return an iterator that populates exactly 1 of the tuple options each, - // iteration, or bails with None if none of them could be populated. + fn filter<'a, T: Config, Evs: Decode + 'static>( + events: Events<'a, T, Evs> + ) -> Box, BasicError>> + 'a> { + let block_hash = events.block_hash(); + let mut iter = events.into_iter_raw(); Box::new(std::iter::from_fn(move || { let mut out: ( $(Option<$ty>,)+ ) = Default::default(); - for ev in events.by_ref() { + for ev in iter.by_ref() { // Forward any error immediately: - let ev = match ev { + let raw_event = match ev { Ok(ev) => ev, Err(e) => return Some(Err(e)) }; // Try decoding each type until we hit a match or an error: $({ - let ev = ev.as_event::<$ty>(); + let ev = raw_event.as_event::<$ty>(); if let Ok(Some(ev)) = ev { // We found a match; return our tuple. out.$idx = Some(ev); - return Some(Ok(out)); + return Some(Ok(FilteredEventDetails { + phase: raw_event.phase, + block_hash: block_hash, + event: out + })) } if let Err(e) = ev { - // We hit an error. Return it. - return Some(Err(e.into())) + // We hit an error. Return it. + return Some(Err(e.into())) } })+ } From f9d535d27050dd6abe586b2e69ad57b64df4b45d Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 15:58:01 +0000 Subject: [PATCH 09/18] cargo fmt --- subxt/src/events/event_subscription.rs | 4 +-- subxt/src/events/filter_events.rs | 36 +++++++++++++++++++------- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index fdf4818577..3814818c83 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -106,9 +106,7 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { /// Return only specific events matching the tuple of 1 or more event /// types that has been provided as the `Filter` type parameter. - pub fn filter_events( - self, - ) -> FilterEvents<'a, T, Evs, Filter> { + pub fn filter_events(self) -> FilterEvents<'a, T, Evs, Filter> { FilterEvents::new(self) } } diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 85d4a63978..04d55cf572 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -17,14 +17,14 @@ //! Filtering individual events from subscriptions. use super::{ - Events, EventSubscription, + Events, }; use crate::{ BasicError, Config, Event, - Phase + Phase, }; use codec::Decode; use futures::{ @@ -43,7 +43,16 @@ use std::{ /// exactly one of these will be `Some(event)` each iteration. pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter: EventFilter> { sub: EventSubscription<'a, T, Evs>, - events: Option, BasicError>> + 'a>>, + events: Option< + Box< + dyn Iterator< + Item = Result< + FilteredEventDetails, + BasicError, + >, + > + 'a, + >, + >, } impl<'a, T: Config, Evs, Filter: EventFilter> Unpin for FilterEvents<'a, T, Evs, Filter> {} @@ -57,7 +66,7 @@ impl<'a, T: Config, Evs, Filter: EventFilter> FilterEvents<'a, T, Evs, Filter> { impl<'a, T: Config, Evs: Decode, Filter: EventFilter> Stream for FilterEvents<'a, T, Evs, Filter> { - type Item = Result, BasicError>; + type Item = Result, BasicError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -94,7 +103,7 @@ pub struct FilteredEventDetails { /// A type containing an event that we've filtered on. /// Depending on the filter type, this may be a tuple /// or a single event. - pub event: Evs + pub event: Evs, } /// This trait is implemented for tuples of Event types; any such tuple (up to size 8) can be @@ -105,7 +114,14 @@ pub trait EventFilter: private::Sealed { /// Filter the events based on the type implementing this trait. fn filter<'a, T: Config, Evs: Decode + 'static>( events: Events<'a, T, Evs>, - ) -> Box, BasicError>> + 'a>; + ) -> Box< + dyn Iterator< + Item = Result< + FilteredEventDetails, + BasicError, + >, + > + 'a, + >; } // Prevent userspace implementations of the above trait; the interface is not considered stable @@ -122,7 +138,9 @@ impl EventFilter for (Ev,) { type ReturnType = Ev; fn filter<'a, T: Config, Evs: Decode + 'static>( events: Events<'a, T, Evs>, - ) -> Box, BasicError>> + 'a> { + ) -> Box< + dyn Iterator, BasicError>> + 'a, + > { let block_hash = events.block_hash(); let mut iter = events.into_iter_raw(); Box::new(std::iter::from_fn(move || { @@ -138,8 +156,8 @@ impl EventFilter for (Ev,) { // We found a match; return our tuple. return Some(Ok(FilteredEventDetails { phase: raw_event.phase, - block_hash: block_hash, - event: event + block_hash, + event, })) } if let Err(e) = ev { From b2ddf6b06aac88439831c91e109ae394b6c1f800 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 16:00:04 +0000 Subject: [PATCH 10/18] expose FilteredEventDetails --- subxt/src/events/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/subxt/src/events/mod.rs b/subxt/src/events/mod.rs index ebb70d1f32..846019563a 100644 --- a/subxt/src/events/mod.rs +++ b/subxt/src/events/mod.rs @@ -36,4 +36,5 @@ pub use events_type::{ pub use filter_events::{ EventFilter, FilterEvents, + FilteredEventDetails, }; From 84893945bd5326d9562b3461b6530b9e67521342 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Tue, 22 Feb 2022 16:02:01 +0000 Subject: [PATCH 11/18] Add docs --- subxt/src/events/filter_events.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 04d55cf572..0e51ca1f45 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -94,6 +94,9 @@ impl<'a, T: Config, Evs: Decode, Filter: EventFilter> Stream } } +/// This is returned from the [`FilterEvents`] impl of [`Stream`]. It contains +/// some type representing an event we've filtered on, along with couple of additional +/// pieces of information about that event. #[derive(Debug, Clone, Eq, PartialEq)] pub struct FilteredEventDetails { /// During which [`Phase`] was the event produced? From 329e78532a94aa20f53999b9630cf9010d95057e Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 23 Feb 2022 10:39:27 +0000 Subject: [PATCH 12/18] cargo clippy --- subxt/src/events/filter_events.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 0e51ca1f45..af4573d2e2 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -200,7 +200,7 @@ macro_rules! impl_event_filter { out.$idx = Some(ev); return Some(Ok(FilteredEventDetails { phase: raw_event.phase, - block_hash: block_hash, + block_hash, event: out })) } From 9da2438c6a9e124cfe1a1282f45d9e73c2413617 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 23 Feb 2022 11:02:40 +0000 Subject: [PATCH 13/18] remove FilterEvents knowledge of EventSubscription so it's easier to unit test --- subxt/src/events/event_subscription.rs | 2 +- subxt/src/events/filter_events.rs | 30 ++++++++++++++++---------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index 3814818c83..ee3c32e97e 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -106,7 +106,7 @@ impl<'a, T: Config, Evs: Decode> EventSubscription<'a, T, Evs> { /// Return only specific events matching the tuple of 1 or more event /// types that has been provided as the `Filter` type parameter. - pub fn filter_events(self) -> FilterEvents<'a, T, Evs, Filter> { + pub fn filter_events(self) -> FilterEvents<'a, Self, T, Filter> { FilterEvents::new(self) } } diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index af4573d2e2..146d2879ca 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -16,10 +16,7 @@ //! Filtering individual events from subscriptions. -use super::{ - EventSubscription, - Events, -}; +use super::Events; use crate::{ BasicError, Config, @@ -41,8 +38,12 @@ use std::{ /// instance of that event as it's found. If `filter` is ` tuple of multiple /// `Event` types, it will return a corresponding tuple of `Option`s, where /// exactly one of these will be `Some(event)` each iteration. -pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter: EventFilter> { - sub: EventSubscription<'a, T, Evs>, +pub struct FilterEvents<'a, Sub: 'a, T: Config, Filter: EventFilter> { + // A subscription; in order for the Stream impl to apply, this will + // impl `Stream, BasicError>> + Unpin + 'a`. + sub: Sub, + // Each time we get Events from our subscription, they are stored here + // and iterated through in future stream iterations until exhausted. events: Option< Box< dyn Iterator< @@ -55,16 +56,23 @@ pub struct FilterEvents<'a, T: Config, Evs: 'static, Filter: EventFilter> { >, } -impl<'a, T: Config, Evs, Filter: EventFilter> Unpin for FilterEvents<'a, T, Evs, Filter> {} +impl<'a, Sub: 'a, T: Config, Filter: EventFilter> Unpin + for FilterEvents<'a, Sub, T, Filter> +{ +} -impl<'a, T: Config, Evs, Filter: EventFilter> FilterEvents<'a, T, Evs, Filter> { - pub(crate) fn new(sub: EventSubscription<'a, T, Evs>) -> Self { +impl<'a, Sub: 'a, T: Config, Filter: EventFilter> FilterEvents<'a, Sub, T, Filter> { + pub(crate) fn new(sub: Sub) -> Self { Self { sub, events: None } } } -impl<'a, T: Config, Evs: Decode, Filter: EventFilter> Stream - for FilterEvents<'a, T, Evs, Filter> +impl<'a, Sub, T, Evs, Filter> Stream for FilterEvents<'a, Sub, T, Filter> +where + Sub: Stream, BasicError>> + Unpin + 'a, + T: Config, + Evs: Decode + 'static, + Filter: EventFilter, { type Item = Result, BasicError>; fn poll_next( From fdfb9f76afa7de3ac66565b143f7d10f3c913016 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 23 Feb 2022 13:44:54 +0000 Subject: [PATCH 14/18] unit test filter_events --- subxt/src/events/events_type.rs | 50 +++++--- subxt/src/events/filter_events.rs | 184 ++++++++++++++++++++++++++++++ 2 files changed, 218 insertions(+), 16 deletions(-) diff --git a/subxt/src/events/events_type.rs b/subxt/src/events/events_type.rs index e39fb4fd17..3c9175aca3 100644 --- a/subxt/src/events/events_type.rs +++ b/subxt/src/events/events_type.rs @@ -388,8 +388,9 @@ fn decode_raw_event_details( }) } +/// Event related test utilities made use of outside this module. #[cfg(test)] -mod tests { +pub(crate) mod test_utils { use super::*; use crate::{ Config, @@ -415,7 +416,7 @@ mod tests { /// An "outer" events enum containing exactly one event. #[derive(Encode, Decode, TypeInfo, Clone, Debug, PartialEq)] pub enum AllEvents { - E(Ev), + Test(Ev), } /// This encodes to the same format an event is expected to encode to @@ -429,17 +430,17 @@ mod tests { /// Build an EventRecord, which encoded events in the format expected /// to be handed back from storage queries to System.Events. - fn event_record(phase: Phase, event: E) -> EventRecord { + pub fn event_record(phase: Phase, event: E) -> EventRecord { EventRecord { phase, - event: AllEvents::E(event), + event: AllEvents::Test(event), topics: vec![], } } /// Build fake metadata consisting of a single pallet that knows /// about the event type provided. - fn metadata() -> Metadata { + pub fn metadata() -> Metadata { let pallets = vec![PalletMetadata { name: "Test", storage: None, @@ -466,7 +467,7 @@ mod tests { /// Build an `Events` object for test purposes, based on the details provided, /// and with a default block hash. - fn events( + pub fn events( metadata: &'_ Metadata, event_records: Vec>, ) -> Events<'_, DefaultConfig, AllEvents> { @@ -480,7 +481,7 @@ mod tests { /// Much like [`events`], but takes pre-encoded events and event count, so that we can /// mess with the bytes in tests if we need to. - fn events_raw( + pub fn events_raw( metadata: &'_ Metadata, event_bytes: Vec, num_events: u32, @@ -493,6 +494,23 @@ mod tests { _event_type: std::marker::PhantomData, } } +} + +#[cfg(test)] +mod tests { + use super::{ + test_utils::{ + event_record, + events, + events_raw, + metadata, + AllEvents, + }, + *, + }; + use crate::Phase; + use codec::Encode; + use scale_info::TypeInfo; #[test] fn statically_decode_single_event() { @@ -518,7 +536,7 @@ mod tests { vec![EventDetails { index: 0, phase: Phase::Finalization, - event: AllEvents::E(Event::A(1)) + event: AllEvents::Test(Event::A(1)) }] ); } @@ -553,17 +571,17 @@ mod tests { EventDetails { index: 0, phase: Phase::Initialization, - event: AllEvents::E(Event::A(1)) + event: AllEvents::Test(Event::A(1)) }, EventDetails { index: 1, phase: Phase::ApplyExtrinsic(123), - event: AllEvents::E(Event::B(true)) + event: AllEvents::Test(Event::B(true)) }, EventDetails { index: 2, phase: Phase::Finalization, - event: AllEvents::E(Event::A(234)) + event: AllEvents::Test(Event::A(234)) }, ] ); @@ -603,7 +621,7 @@ mod tests { EventDetails { index: 0, phase: Phase::Initialization, - event: AllEvents::E(Event::A(1)) + event: AllEvents::Test(Event::A(1)) } ); assert_eq!( @@ -611,7 +629,7 @@ mod tests { EventDetails { index: 1, phase: Phase::ApplyExtrinsic(123), - event: AllEvents::E(Event::B(true)) + event: AllEvents::Test(Event::B(true)) } ); @@ -825,7 +843,7 @@ mod tests { vec![EventDetails { index: 0, phase: Phase::Finalization, - event: AllEvents::E(Event::A(1)) + event: AllEvents::Test(Event::A(1)) }] ); @@ -883,7 +901,7 @@ mod tests { vec![EventDetails { index: 0, phase: Phase::Finalization, - event: AllEvents::E(Event::A(CompactWrapper(1))) + event: AllEvents::Test(Event::A(CompactWrapper(1))) }] ); @@ -942,7 +960,7 @@ mod tests { vec![EventDetails { index: 0, phase: Phase::Finalization, - event: AllEvents::E(Event::A(MyType::B)) + event: AllEvents::Test(Event::A(MyType::B)) }] ); diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 146d2879ca..5f2b92aeea 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -232,3 +232,187 @@ impl_event_filter!(A 0, B 1, C 2, D 3, E 4); impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5); impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6); impl_event_filter!(A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7); + +#[cfg(test)] +mod test { + use super::{ + super::events_type::test_utils::{ + event_record, + events, + metadata, + AllEvents, + }, + *, + }; + use crate::{ + Config, + DefaultConfig, + Metadata, + }; + use codec::Encode; + use futures::{ + stream, + Stream, + StreamExt, + }; + use scale_info::TypeInfo; + + // Some pretend events in a pallet + #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] + enum PalletEvents { + A(EventA), + B(EventB), + C(EventC), + } + + // An event in our pallet that we can filter on. + #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] + struct EventA(u8); + impl crate::Event for EventA { + const PALLET: &'static str = "Test"; + const EVENT: &'static str = "A"; + } + + // An event in our pallet that we can filter on. + #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] + struct EventB(bool); + impl crate::Event for EventB { + const PALLET: &'static str = "Test"; + const EVENT: &'static str = "B"; + } + + // An event in our pallet that we can filter on. + #[derive(Clone, Debug, PartialEq, Decode, Encode, TypeInfo)] + struct EventC(u8, bool); + impl crate::Event for EventC { + const PALLET: &'static str = "Test"; + const EVENT: &'static str = "C"; + } + + // A stream of fake events for us to try filtering on. + fn events_stream( + metadata: &'_ Metadata, + ) -> impl Stream< + Item = Result>, BasicError>, + > { + stream::iter(vec![ + events::( + &metadata, + vec![ + event_record(Phase::Initialization, PalletEvents::A(EventA(1))), + event_record(Phase::ApplyExtrinsic(0), PalletEvents::B(EventB(true))), + event_record(Phase::Finalization, PalletEvents::A(EventA(2))), + ], + ), + events::( + &metadata, + vec![event_record( + Phase::ApplyExtrinsic(1), + PalletEvents::B(EventB(false)), + )], + ), + events::( + &metadata, + vec![ + event_record(Phase::ApplyExtrinsic(2), PalletEvents::B(EventB(true))), + event_record(Phase::ApplyExtrinsic(3), PalletEvents::A(EventA(3))), + ], + ), + ]) + .map(|item| Ok::<_, BasicError>(item)) + } + + #[async_std::test] + async fn filter_one_event_from_stream() { + let metadata = metadata::(); + + // Filter out fake event stream to select events matching `EventA` only. + let actual: Vec<_> = + FilterEvents::<_, DefaultConfig, (EventA,)>::new(events_stream(&metadata)) + .map(|e| e.unwrap()) + .collect() + .await; + + let expected = vec![ + FilteredEventDetails { + phase: Phase::Initialization, + block_hash: ::Hash::default(), + event: EventA(1), + }, + FilteredEventDetails { + phase: Phase::Finalization, + block_hash: ::Hash::default(), + event: EventA(2), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(3), + block_hash: ::Hash::default(), + event: EventA(3), + }, + ]; + + assert_eq!(actual, expected); + } + + #[async_std::test] + async fn filter_some_events_from_stream() { + let metadata = metadata::(); + + // Filter out fake event stream to select events matching `EventA` or `EventB`. + let actual: Vec<_> = FilterEvents::<_, DefaultConfig, (EventA, EventB)>::new( + events_stream(&metadata), + ) + .map(|e| e.unwrap()) + .collect() + .await; + + let expected = vec![ + FilteredEventDetails { + phase: Phase::Initialization, + block_hash: ::Hash::default(), + event: (Some(EventA(1)), None), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(0), + block_hash: ::Hash::default(), + event: (None, Some(EventB(true))), + }, + FilteredEventDetails { + phase: Phase::Finalization, + block_hash: ::Hash::default(), + event: (Some(EventA(2)), None), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(1), + block_hash: ::Hash::default(), + event: (None, Some(EventB(false))), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(2), + block_hash: ::Hash::default(), + event: (None, Some(EventB(true))), + }, + FilteredEventDetails { + phase: Phase::ApplyExtrinsic(3), + block_hash: ::Hash::default(), + event: (Some(EventA(3)), None), + }, + ]; + + assert_eq!(actual, expected); + } + + #[async_std::test] + async fn filter_no_events_from_stream() { + let metadata = metadata::(); + + // Filter out fake event stream to select events matching `EventC` (none exist). + let actual: Vec<_> = + FilterEvents::<_, DefaultConfig, (EventC,)>::new(events_stream(&metadata)) + .map(|e| e.unwrap()) + .collect() + .await; + + assert_eq!(actual, vec![]); + } +} From 0df05eccdcfe5950a8dcd4fa1ecd7089c236d330 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 23 Feb 2022 13:50:01 +0000 Subject: [PATCH 15/18] tweak an integration test to use filter_events --- subxt/tests/integration/events.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index de1962dbde..660ab0d34e 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -100,12 +100,11 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { let ctx = test_context().await; // Subscribe to balance transfer events, ignoring all else. - let event_sub = ctx.api.events().subscribe().await?.filter_map(|events| { - async move { - let events = events.ok()?; - events.find_first::().ok()? - } - }); + let event_sub = ctx.api + .events() + .subscribe() + .await? + .filter_events::<(balances::events::Transfer,)>(); // Calling `.next()` on the above borrows it, and the `filter_map` // means it's no longer `Unpin`, so we pin it on the stack: @@ -123,7 +122,7 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { // Wait for the next balance transfer event in our subscription stream // and check that it lines up: - let event = event_sub.next().await.unwrap(); + let event = event_sub.next().await.unwrap().unwrap().event; assert_eq!( event, balances::events::Transfer { From 8584078e2b339858a77fa722d8b32795f0b92099 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 23 Feb 2022 13:55:38 +0000 Subject: [PATCH 16/18] cargo fmt --- subxt/tests/integration/events.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index 660ab0d34e..7741ee78da 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -100,7 +100,8 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { let ctx = test_context().await; // Subscribe to balance transfer events, ignoring all else. - let event_sub = ctx.api + let event_sub = ctx + .api .events() .subscribe() .await? From 333101c3fb8324cec4d1bdc5efec05e41deda8a3 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Wed, 23 Feb 2022 14:53:45 +0000 Subject: [PATCH 17/18] cargo clippy --- subxt/src/events/filter_events.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 5f2b92aeea..84f5479b45 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -297,7 +297,7 @@ mod test { > { stream::iter(vec![ events::( - &metadata, + metadata, vec![ event_record(Phase::Initialization, PalletEvents::A(EventA(1))), event_record(Phase::ApplyExtrinsic(0), PalletEvents::B(EventB(true))), @@ -305,21 +305,21 @@ mod test { ], ), events::( - &metadata, + metadata, vec![event_record( Phase::ApplyExtrinsic(1), PalletEvents::B(EventB(false)), )], ), events::( - &metadata, + metadata, vec![ event_record(Phase::ApplyExtrinsic(2), PalletEvents::B(EventB(true))), event_record(Phase::ApplyExtrinsic(3), PalletEvents::A(EventA(3))), ], ), ]) - .map(|item| Ok::<_, BasicError>(item)) + .map(Ok::<_, BasicError>) } #[async_std::test] From 486f8aa4167475fdbdabac297dda6d76f01bcb39 Mon Sep 17 00:00:00 2001 From: James Wilson Date: Thu, 24 Feb 2022 17:02:38 +0000 Subject: [PATCH 18/18] Tweak a comment Co-authored-by: David --- subxt/src/events/events_type.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/src/events/events_type.rs b/subxt/src/events/events_type.rs index 3c9175aca3..192d9a0276 100644 --- a/subxt/src/events/events_type.rs +++ b/subxt/src/events/events_type.rs @@ -388,7 +388,7 @@ fn decode_raw_event_details( }) } -/// Event related test utilities made use of outside this module. +/// Event related test utilities used outside this module. #[cfg(test)] pub(crate) mod test_utils { use super::*;