Skip to content

Commit

Permalink
Make event subscription logic more generic.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch committed Jun 23, 2020
1 parent 3080ec9 commit d229489
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 70 deletions.
26 changes: 26 additions & 0 deletions src/frame/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,12 @@ mod tests {
Error,
RuntimeError,
},
events::EventsDecoder,
signer::{
PairSigner,
Signer,
},
subscription::EventSubscription,
system::AccountStoreExt,
tests::{
test_client,
Expand Down Expand Up @@ -201,4 +203,28 @@ mod tests {
panic!("expected an error");
}
}

#[async_std::test]
async fn test_transfer_subscription() {
env_logger::try_init().ok();
let alice = PairSigner::new(AccountKeyring::Alice.pair());
let bob = AccountKeyring::Bob.to_account_id();
let (client, _) = test_client().await;
let sub = client.subscribe_events().await.unwrap();
let mut decoder = EventsDecoder::<TestRuntime>::new(client.metadata().clone());
decoder.with_balances();
let mut sub = EventSubscription::<TestRuntime>::new(sub, decoder);
sub.filter_event::<TransferEvent<_>>();
client.transfer(&alice, &bob, 10_000).await.unwrap();
let raw = sub.next().await.unwrap().unwrap();
let event = TransferEvent::<TestRuntime>::decode(&mut &raw.data[..]).unwrap();
assert_eq!(
event,
TransferEvent {
from: alice.account_id().clone(),
to: bob.clone(),
amount: 10_000,
}
);
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ mod metadata;
mod rpc;
mod runtimes;
mod signer;
mod subscription;

pub use crate::{
error::Error,
Expand All @@ -87,6 +88,7 @@ pub use crate::{
},
runtimes::*,
signer::*,
subscription::*,
substrate_subxt_proc_macro::*,
};
use crate::{
Expand Down
104 changes: 34 additions & 70 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ use crate::{
RawEvent,
},
frame::{
system::{
Phase,
System,
},
system::System,
Event,
},
metadata::Metadata,
runtimes::Runtime,
subscription::EventSubscription,
};

pub type ChainBlock<T> =
Expand Down Expand Up @@ -107,12 +106,12 @@ where
}

/// Client for substrate rpc interfaces
pub struct Rpc<T: System> {
pub struct Rpc<T: Runtime> {
client: Client,
marker: PhantomData<T>,
}

impl<T: System> Clone for Rpc<T> {
impl<T: Runtime> Clone for Rpc<T> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
Expand All @@ -121,7 +120,7 @@ impl<T: System> Clone for Rpc<T> {
}
}

impl<T: System> Rpc<T> {
impl<T: Runtime> Rpc<T> {
pub fn new(client: Client) -> Self {
Self {
client,
Expand Down Expand Up @@ -256,7 +255,7 @@ impl<T: System> Rpc<T> {
/// Subscribe to substrate System Events
pub async fn subscribe_events(
&self,
) -> Result<Subscription<StorageChangeSet<<T as System>::Hash>>, Error> {
) -> Result<Subscription<StorageChangeSet<T::Hash>>, Error> {
let mut storage_key = twox_128(b"System").to_vec();
storage_key.extend(twox_128(b"Events").to_vec());
log::debug!("Events storage key {:?}", hex::encode(&storage_key));
Expand Down Expand Up @@ -360,14 +359,24 @@ impl<T: System> Rpc<T> {
block_hash,
signed_block.block.extrinsics.len()
);
wait_for_block_events(
decoder,
ext_hash,
signed_block,
block_hash,
events_sub,
)
.await
let ext_index = find_extrinsic::<T>(&signed_block, &ext_hash)
.ok_or_else(|| {
Error::Other(format!(
"Failed to find Extrinsic with hash {:?}",
ext_hash,
))
})?;
let mut sub = EventSubscription::new(events_sub, decoder);
sub.filter_extrinsic(block_hash, ext_index);
let mut events = vec![];
while let Some(event) = sub.next().await {
events.push(event?);
}
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
}
None => {
Err(format!("Failed to find block {:?}", block_hash).into())
Expand Down Expand Up @@ -425,58 +434,13 @@ impl<T: System> ExtrinsicSuccess<T> {
}
}

/// Waits for events for the block triggered by the extrinsic
pub async fn wait_for_block_events<T: System>(
decoder: EventsDecoder<T>,
ext_hash: T::Hash,
signed_block: ChainBlock<T>,
block_hash: T::Hash,
events_subscription: Subscription<StorageChangeSet<T::Hash>>,
) -> Result<ExtrinsicSuccess<T>, Error> {
let ext_index = signed_block
.block
.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == ext_hash
})
.ok_or_else(|| {
Error::Other(format!("Failed to find Extrinsic with hash {:?}", ext_hash))
})?;

let mut subscription = events_subscription;
while let change_set = subscription.next().await {
// only interested in events for the given block
if change_set.block != block_hash {
continue
}
let mut events = Vec::new();
for (_key, data) in change_set.changes {
if let Some(data) = data {
match decoder.decode_events(&mut &data.0[..]) {
Ok(raw_events) => {
for (phase, event) in raw_events {
if let Phase::ApplyExtrinsic(i) = phase {
if i as usize == ext_index {
events.push(event)
}
}
}
}
Err(err) => return Err(err),
}
}
}
return if !events.is_empty() {
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
} else {
Err(format!("No events found for block {}", block_hash).into())
}
}
unreachable!()
/// Returns the index of an extrinsic in a block.
pub fn find_extrinsic<T: Runtime>(
signed_block: &ChainBlock<T>,
extrinsic: &T::Hash,
) -> Option<usize> {
signed_block.block.extrinsics.iter().position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == *extrinsic
})
}
121 changes: 121 additions & 0 deletions src/subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of substrate-subxt.
//
// subxt is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// subxt is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with substrate-subxt. If not, see <http://www.gnu.org/licenses/>.

use jsonrpsee::client::Subscription;
use sp_core::storage::StorageChangeSet;
use std::collections::VecDeque;

use crate::{
error::Error,
events::{
EventsDecoder,
RawEvent,
},
frame::{
system::Phase,
Event,
},
runtimes::Runtime,
};

/// Event subscription simplifies filtering a storage change set stream for
/// events of interest.
pub struct EventSubscription<T: Runtime> {
subscription: Subscription<StorageChangeSet<T::Hash>>,
decoder: EventsDecoder<T>,
block: Option<T::Hash>,
extrinsic: Option<usize>,
event: Option<(&'static str, &'static str)>,
events: VecDeque<RawEvent>,
finished: bool,
}

impl<T: Runtime> EventSubscription<T> {
/// Creates a new event subscription.
pub fn new(
subscription: Subscription<StorageChangeSet<T::Hash>>,
decoder: EventsDecoder<T>,
) -> Self {
Self {
subscription,
decoder,
block: None,
extrinsic: None,
event: None,
events: Default::default(),
finished: false,
}
}

/// Only returns events contained in the block with the given hash.
pub fn filter_block(&mut self, block: T::Hash) {
self.block = Some(block);
}

/// Only returns events from block emitted by extrinsic with index.
pub fn filter_extrinsic(&mut self, block: T::Hash, ext_index: usize) {
self.block = Some(block);
self.extrinsic = Some(ext_index);
}

/// Filters events by type.
pub fn filter_event<E: Event<T>>(&mut self) {
self.event = Some((E::MODULE, E::EVENT));
}

/// Gets the next event.
pub async fn next(&mut self) -> Option<Result<RawEvent, Error>> {
loop {
if let Some(event) = self.events.pop_front() {
return Some(Ok(event))
}
if self.finished {
return None
}
let change_set = self.subscription.next().await;
if let Some(hash) = self.block.as_ref() {
if &change_set.block == hash {
self.finished = true;
} else {
continue
}
}
for (_key, data) in change_set.changes {
if let Some(data) = data {
let raw_events = match self.decoder.decode_events(&mut &data.0[..]) {
Ok(events) => events,
Err(error) => return Some(Err(error)),
};
for (phase, event) in raw_events {
if let Phase::ApplyExtrinsic(i) = phase {
if let Some(ext_index) = self.extrinsic {
if i as usize != ext_index {
continue
}
}
if let Some((module, variant)) = self.event {
if event.module != module || event.variant != variant {
continue
}
}
self.events.push_back(event);
}
}
}
}
}
}
}

0 comments on commit d229489

Please sign in to comment.