Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get event context on EventSubscription #423

Merged
merged 9 commits into from
Feb 3, 2022
163 changes: 143 additions & 20 deletions subxt/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,24 @@ use sp_core::{
use sp_runtime::traits::Header;
use std::collections::VecDeque;

/// Raw bytes for an Event, including the block hash where it occurred and its
/// corresponding event index.
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Clone))]
pub struct EventContext<Hash> {
pub block_hash: Hash,
pub event_idx: usize,
pub event: RawEvent,
}

/// Event subscription simplifies filtering a storage change set stream for
/// events of interest.
pub struct EventSubscription<'a, T: Config> {
block_reader: BlockReader<'a, T>,
block: Option<T::Hash>,
extrinsic: Option<usize>,
event: Option<(&'static str, &'static str)>,
events: VecDeque<RawEvent>,
events: VecDeque<EventContext<T::Hash>>,
finished: bool,
}

Expand All @@ -57,13 +67,19 @@ enum BlockReader<'a, T: Config> {
},
/// Mock event listener for unit tests
#[cfg(test)]
Mock(Box<dyn Iterator<Item = (T::Hash, Result<Vec<(Phase, RawEvent)>, BasicError>)>>),
Mock(
Box<
dyn Iterator<
Item = (T::Hash, Result<Vec<(Phase, usize, RawEvent)>, BasicError>),
>,
>,
),
}

impl<'a, T: Config> BlockReader<'a, T> {
async fn next(
&mut self,
) -> Option<(T::Hash, Result<Vec<(Phase, RawEvent)>, BasicError>)> {
) -> Option<(T::Hash, Result<Vec<(Phase, usize, RawEvent)>, BasicError>)> {
match self {
BlockReader::Decoder {
subscription,
Expand All @@ -78,7 +94,13 @@ impl<'a, T: Config> BlockReader<'a, T> {
})
.collect();

let flattened_events = events.map(|x| x.into_iter().flatten().collect());
let flattened_events = events.map(|x| {
x.into_iter()
.flatten()
.enumerate()
.map(|(event_idx, (phase, raw))| (phase, event_idx, raw))
.collect()
});
Some((change_set.block, flattened_events))
}
#[cfg(test)]
Expand Down Expand Up @@ -124,6 +146,15 @@ impl<'a, T: Config> EventSubscription<'a, T> {

/// Gets the next event.
pub async fn next(&mut self) -> Option<Result<RawEvent, BasicError>> {
self.next_context()
.await
.map(|res| res.map(|ctx| ctx.event))
}
/// Gets the next event with the associated block hash and its corresponding
/// event index.
pub async fn next_context(
&mut self,
) -> Option<Result<EventContext<T::Hash>, BasicError>> {
loop {
if let Some(raw_event) = self.events.pop_front() {
return Some(Ok(raw_event))
Expand All @@ -144,7 +175,7 @@ impl<'a, T: Config> EventSubscription<'a, T> {
match events {
Err(err) => return Some(Err(err)),
Ok(raw_events) => {
for (phase, raw) in raw_events {
for (phase, event_idx, raw) in raw_events {
if let Some(ext_index) = self.extrinsic {
if !matches!(phase, Phase::ApplyExtrinsic(i) if i as usize == ext_index)
{
Expand All @@ -156,7 +187,11 @@ impl<'a, T: Config> EventSubscription<'a, T> {
continue
}
}
self.events.push_back(raw);
self.events.push_back(EventContext {
block_hash: received_hash,
event_idx,
event: raw,
});
}
}
}
Expand Down Expand Up @@ -276,7 +311,7 @@ mod tests {
#[async_std::test]
/// test that filters work correctly, and are independent of each other
async fn test_filters() {
let mut events = vec![];
let mut events: Vec<(H256, Phase, usize, RawEvent)> = vec![];
// create all events
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
for phase in [
Expand All @@ -285,14 +320,24 @@ mod tests {
Phase::ApplyExtrinsic(1),
Phase::Finalization,
] {
for event in [named_event("a"), named_event("b")] {
events.push((block_hash, phase.clone(), event))
}
[named_event("a"), named_event("b")]
.iter()
.enumerate()
.for_each(|(idx, event)| {
events.push((
block_hash,
phase.clone(),
// The event index
idx,
event.clone(),
))
});
}
}

// set variant index so we can uniquely identify the event
events.iter_mut().enumerate().for_each(|(idx, event)| {
event.2.variant_index = idx as u8;
event.3.variant_index = idx as u8;
});

let half_len = events.len() / 2;
Expand All @@ -309,8 +354,8 @@ mod tests {
Ok(events
.iter()
.take(half_len)
.map(|(_, phase, event)| {
(phase.clone(), event.clone())
.map(|(_, phase, idx, event)| {
(phase.clone(), *idx, event.clone())
})
.collect()),
),
Expand All @@ -319,8 +364,8 @@ mod tests {
Ok(events
.iter()
.skip(half_len)
.map(|(_, phase, event)| {
(phase.clone(), event.clone())
.map(|(_, phase, idx, event)| {
(phase.clone(), *idx, event.clone())
})
.collect()),
),
Expand All @@ -333,26 +378,104 @@ mod tests {
events: Default::default(),
finished: false,
};
let mut expected_events = events.clone();

let mut expected_events: Vec<(H256, Phase, usize, RawEvent)> =
events.clone();

if let Some(hash) = block_filter {
expected_events.retain(|(h, _, _)| h == &hash);
expected_events.retain(|(h, _, _, _)| h == &hash);
}
if let Some(idx) = extrinsic_filter {
expected_events.retain(|(_, phase, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
expected_events.retain(|(_, phase, _, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
}
if let Some(name) = event_filter {
expected_events.retain(|(_, _, event)| event.pallet == name.0);
expected_events.retain(|(_, _, _, event)| event.pallet == name.0);
}

for expected_event in expected_events {
assert_eq!(
subscription.next().await.unwrap().unwrap(),
expected_event.2
expected_event.3
);
}
assert!(subscription.next().await.is_none());
}
}
}
}

#[async_std::test]
async fn test_context() {
let mut events = vec![];
// create all events
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
for phase in [
Phase::Initialization,
Phase::ApplyExtrinsic(0),
Phase::ApplyExtrinsic(1),
Phase::Finalization,
] {
[named_event("a"), named_event("b")]
.iter()
.enumerate()
.for_each(|(idx, event)| {
events.push((
phase.clone(),
EventContext {
block_hash,
event_idx: idx,
event: event.clone(),
},
));
});
}
}

// set variant index so we can uniquely identify the event
events.iter_mut().enumerate().for_each(|(idx, (_, ctx))| {
ctx.event.variant_index = idx as u8;
});

let half_len = events.len() / 2;

let mut subscription: EventSubscription<DefaultConfig> = EventSubscription {
block_reader: BlockReader::Mock(Box::new(
vec![
(
events[0].1.block_hash,
Ok(events
.iter()
.take(half_len)
.map(|(phase, ctx)| {
(phase.clone(), ctx.event_idx, ctx.event.clone())
})
.collect()),
),
(
events[half_len].1.block_hash,
Ok(events
.iter()
.skip(half_len)
.map(|(phase, ctx)| {
(phase.clone(), ctx.event_idx, ctx.event.clone())
})
.collect()),
),
]
.into_iter(),
)),
block: None,
extrinsic: None,
event: None,
events: Default::default(),
finished: false,
};

let expected_events = events.clone();

for exp in expected_events {
assert_eq!(subscription.next_context().await.unwrap().unwrap(), exp.1);
}
assert!(subscription.next().await.is_none());
}
}