Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add next_events_from_metadata and rename next_event #545

Merged
merged 3 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/examples/event_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn main() {

// Wait for event callbacks from the node, which are received via subscription.
for _ in 0..5 {
let event_records = subscription.next_event::<RuntimeEvent, Hash>().unwrap().unwrap();
let event_records = subscription.next_events::<RuntimeEvent, Hash>().unwrap().unwrap();
for event_record in &event_records {
println!("decoded: {:?} {:?}", event_record.phase, event_record.event);
match &event_record.event {
Expand Down
3 changes: 0 additions & 3 deletions node-api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use sp_core::H256;

/// A collection of events obtained from a block, bundled with the necessary
/// information needed to decode and iterate over them.
//
// In subxt, this was generic over a `Config` type, but it's sole usage was to derive the
// hash type. We omitted this here and use the `ac_primitives::Hash` instead.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because outdated. There's no ac_primitves::Hash Type. No need to point out what Subxt did, because Subxt is constantly evolving as well.

#[derive(Clone, Debug)]
pub struct Events<Hash> {
metadata: Metadata,
Expand Down
61 changes: 40 additions & 21 deletions src/api/rpc_api/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
GetChainInfo, GetStorage,
};
use ac_compose_macros::rpc_params;
use ac_node_api::{EventDetails, EventRecord, Events, Phase};
use ac_node_api::{metadata::Metadata, EventDetails, EventRecord, Events, Phase};
use ac_primitives::{ExtrinsicParams, FrameSystemConfig, StorageChangeSet};
use alloc::{vec, vec::Vec};
use codec::{Decode, Encode};
Expand Down Expand Up @@ -83,22 +83,30 @@ where
/// Simplifies the event retrieval from the subscription.
pub struct EventSubscription<Subscription, Hash> {
pub subscription: Subscription,
pub metadata: Metadata,
_phantom: PhantomData<Hash>,
}

impl<Subscription, Hash> EventSubscription<Subscription, Hash> {
/// Create a new wrapper around the subscription.
pub fn new(subscription: Subscription, metadata: Metadata) -> Self {
Self { subscription, metadata, _phantom: Default::default() }
}

/// Update the metadata.
pub fn update_metadata(&mut self, metadata: Metadata) {
self.metadata = metadata
}
}

impl<Subscription, Hash> EventSubscription<Subscription, Hash>
where
Hash: DeserializeOwned,
Hash: DeserializeOwned + Copy,
Subscription: HandleSubscription<StorageChangeSet<Hash>>,
{
/// Create a new wrapper around the subscription.
pub fn new(subscription: Subscription) -> Self {
Self { subscription, _phantom: Default::default() }
}

/// Wait for the next value from the internal subscription.
/// Upon encounter, it retrieves and decodes the expected `EventRecord`.
pub fn next_event<RuntimeEvent: Decode, Topic: Decode>(
pub fn next_events<RuntimeEvent: Decode, Topic: Decode>(
&mut self,
) -> Option<Result<Vec<EventRecord<RuntimeEvent, Topic>>>> {
let change_set = match self.subscription.next()? {
Expand All @@ -109,8 +117,29 @@ where
// changes in the set. Also, we don't care about the key but only the data, so take
// the second value in the tuple of two.
let storage_data = change_set.changes[0].1.as_ref()?;
let events = Decode::decode(&mut storage_data.0.as_slice()).map_err(Error::Codec);
Some(events)
let event_records = Decode::decode(&mut storage_data.0.as_slice()).map_err(Error::Codec);
Some(event_records)
}

/// Wait for the next value from the internal subscription.
/// Upon encounter, it retrieves and decodes the expected `EventDetails`.
//
// On the contrary to `next_events` this function only needs up-to-date metadata
// and is therefore updateable during runtime.
pub fn next_events_from_metadata(&mut self) -> Option<Result<Events<Hash>>> {
let change_set = match self.subscription.next()? {
Ok(set) => set,
Err(e) => return Some(Err(Error::RpcClient(e))),
};
let block_hash = change_set.block;
// Since we subscribed to only the events key, we can simply take the first value of the
// changes in the set. Also, we don't care about the key but only the data, so take
// the second value in the tuple of two.
let storage_data = change_set.changes[0].1.as_ref()?;
let event_bytes = storage_data.0.clone();

let events = Events::<Hash>::new(self.metadata.clone(), block_hash, event_bytes);
Some(Ok(events))
}

/// Unsubscribe from the internal subscription.
Expand All @@ -119,16 +148,6 @@ where
}
}

impl<Subscription, Hash> From<Subscription> for EventSubscription<Subscription, Hash>
where
Hash: DeserializeOwned,
Subscription: HandleSubscription<StorageChangeSet<Hash>>,
{
fn from(subscription: Subscription) -> Self {
EventSubscription::new(subscription)
}
}

pub trait SubscribeEvents {
type Client: Subscribe;
type Hash: DeserializeOwned;
Expand All @@ -151,7 +170,7 @@ where
let subscription = self
.client()
.subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage")
.map(|sub| sub.into())?;
.map(|sub| EventSubscription::new(sub, self.metadata().clone()))?;
Ok(subscription)
}
}
Expand Down
16 changes: 15 additions & 1 deletion testing/examples/events_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn main() {
// Wait for event callbacks from the node, which are received via subscription.
for _ in 0..5 {
let event_records = event_subscription
.next_event::<RuntimeEvent, <Runtime as FrameSystemConfig>::Hash>()
.next_events::<RuntimeEvent, <Runtime as FrameSystemConfig>::Hash>()
.unwrap()
.unwrap();
for event_record in &event_records {
Expand All @@ -81,6 +81,20 @@ async fn main() {
}
}
}

// Wait for event callbacks from the node, which are received via subscription, in case no RuntimeEvents are accessible.
for _ in 0..5 {
let events = event_subscription.next_events_from_metadata().unwrap().unwrap();
for event in events.iter() {
let event = event.unwrap();
println!("got event: {:?} {:?}", event.pallet_name(), event.variant_name());
if let Ok(Some(_extrinisic_success)) = event.as_event::<ExtrinsicSuccess>() {
println!("Got System event, all good");
} else {
panic!("Unexpected event");
}
}
}
}

fn assert_assosciated_events_match_expected(events: Vec<EventDetails>) {
Expand Down