From 89b83757af1ae8db279836f0b0f6b0707967d2ae Mon Sep 17 00:00:00 2001 From: lamafab <42901763+lamafab@users.noreply.github.com> Date: Fri, 28 Jan 2022 20:44:53 +0100 Subject: [PATCH 1/9] implement next_context --- subxt/src/subscription.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/subxt/src/subscription.rs b/subxt/src/subscription.rs index 307bfea222..9138a8ee14 100644 --- a/subxt/src/subscription.rs +++ b/subxt/src/subscription.rs @@ -46,7 +46,7 @@ pub struct EventSubscription<'a, T: Config> { block: Option, extrinsic: Option, event: Option<(&'static str, &'static str)>, - events: VecDeque, + events: VecDeque<(::Hash, usize, RawEvent)>, finished: bool, } @@ -63,7 +63,11 @@ enum BlockReader<'a, T: Config> { impl<'a, T: Config> BlockReader<'a, T> { async fn next( &mut self, +<<<<<<< HEAD ) -> Option<(T::Hash, Result, BasicError>)> { +======= + ) -> Option<(T::Hash, Result, Error>)> { +>>>>>>> ae51587... implement next_context match self { BlockReader::Decoder { subscription, @@ -78,7 +82,8 @@ impl<'a, T: Config> BlockReader<'a, T> { }) .collect(); - let flattened_events = events.map(|x| x.into_iter().flatten().collect()); + let flattened_events = + events.map(|x| x.into_iter().flatten().enumerate().collect()); Some((change_set.block, flattened_events)) } #[cfg(test)] @@ -124,6 +129,15 @@ impl<'a, T: Config> EventSubscription<'a, T> { /// Gets the next event. pub async fn next(&mut self) -> Option> { + self.next_context() + .await + .map(|res| res.map(|(_, _, event)| event)) + } + /// Gets the next event with the associated block hash and its index + /// indicating where it occurred. + pub async fn next_context( + &mut self, + ) -> Option::Hash, usize, RawEvent), BasicError>> { loop { if let Some(raw_event) = self.events.pop_front() { return Some(Ok(raw_event)) @@ -144,7 +158,7 @@ impl<'a, T: Config> EventSubscription<'a, T> { match events { Err(err) => return Some(Err(err)), Ok(raw_events) => { - for (phase, raw) in raw_events { + for (idx, (phase, raw)) in raw_events { if let Some(ext_index) = self.extrinsic { if !matches!(phase, Phase::ApplyExtrinsic(i) if i as usize == ext_index) { @@ -156,7 +170,7 @@ impl<'a, T: Config> EventSubscription<'a, T> { continue } } - self.events.push_back(raw); + self.events.push_back((received_hash, idx, raw)); } } } From 14f45f20ca336d6fd7e7b551ddc2252b7599ed1f Mon Sep 17 00:00:00 2001 From: Fabio Lama Date: Tue, 1 Feb 2022 19:20:12 +0100 Subject: [PATCH 2/9] write test_context for method next_context --- subxt/src/subscription.rs | 152 +++++++++++++++++++++++++++++++------- 1 file changed, 125 insertions(+), 27 deletions(-) diff --git a/subxt/src/subscription.rs b/subxt/src/subscription.rs index 9138a8ee14..fa56722b1d 100644 --- a/subxt/src/subscription.rs +++ b/subxt/src/subscription.rs @@ -57,17 +57,13 @@ enum BlockReader<'a, T: Config> { }, /// Mock event listener for unit tests #[cfg(test)] - Mock(Box, BasicError>)>>), + Mock(Box, BasicError>)>>), } impl<'a, T: Config> BlockReader<'a, T> { async fn next( &mut self, -<<<<<<< HEAD - ) -> Option<(T::Hash, Result, BasicError>)> { -======= - ) -> Option<(T::Hash, Result, Error>)> { ->>>>>>> ae51587... implement next_context + ) -> Option<(T::Hash, Result, BasicError>)> { match self { BlockReader::Decoder { subscription, @@ -83,7 +79,15 @@ impl<'a, T: Config> BlockReader<'a, T> { .collect(); let flattened_events = - events.map(|x| x.into_iter().flatten().enumerate().collect()); + events.map(|x| { + x.into_iter() + .flatten() + .enumerate() + .map(|(idx, (phase, raw))| { + (phase, idx, raw) + }) + .collect() + }); Some((change_set.block, flattened_events)) } #[cfg(test)] @@ -133,8 +137,8 @@ impl<'a, T: Config> EventSubscription<'a, T> { .await .map(|res| res.map(|(_, _, event)| event)) } - /// Gets the next event with the associated block hash and its index - /// indicating where it occurred. + /// Gets the next event with the associated block hash and its corresponding + /// event index. pub async fn next_context( &mut self, ) -> Option::Hash, usize, RawEvent), BasicError>> { @@ -158,7 +162,7 @@ impl<'a, T: Config> EventSubscription<'a, T> { match events { Err(err) => return Some(Err(err)), Ok(raw_events) => { - for (idx, (phase, raw)) in raw_events { + for (phase, idx, raw) in raw_events { if let Some(ext_index) = self.extrinsic { if !matches!(phase, Phase::ApplyExtrinsic(i) if i as usize == ext_index) { @@ -290,7 +294,7 @@ mod tests { #[async_std::test] /// test that filters work correctly, and are independent of each other async fn test_filters() { - let mut events = vec![]; + let mut events: Vec<(H256, Phase, usize, RawEvent)> = vec![]; // create all events for block_hash in [H256::from([0; 32]), H256::from([1; 32])] { for phase in [ @@ -299,15 +303,26 @@ mod tests { Phase::ApplyExtrinsic(1), Phase::Finalization, ] { - for event in [named_event("a"), named_event("b")] { - events.push((block_hash, phase.clone(), event)) - } + [named_event("a"), named_event("b")] + .iter() + .enumerate() + .for_each(|(idx, event)| { + events.push(( + block_hash, + phase.clone(), + // The event index + idx, + { + // set variant index so we can uniquely identify + // the event, independently from the event index + let mut event = event.clone(); + event.variant_index = (idx * 2) as u8; + event + } + )) + }); } } - // set variant index so we can uniquely identify the event - events.iter_mut().enumerate().for_each(|(idx, event)| { - event.2.variant_index = idx as u8; - }); let half_len = events.len() / 2; @@ -323,8 +338,8 @@ mod tests { Ok(events .iter() .take(half_len) - .map(|(_, phase, event)| { - (phase.clone(), event.clone()) + .map(|(_, phase, idx, event)| { + (phase.clone(), *idx, event.clone()) }) .collect()), ), @@ -333,8 +348,8 @@ mod tests { Ok(events .iter() .skip(half_len) - .map(|(_, phase, event)| { - (phase.clone(), event.clone()) + .map(|(_, phase, idx, event)| { + (phase.clone(), *idx, event.clone()) }) .collect()), ), @@ -347,21 +362,23 @@ mod tests { events: Default::default(), finished: false, }; - let mut expected_events = events.clone(); + + let mut expected_events: Vec<(H256, Phase, usize, RawEvent)> = events.clone(); + if let Some(hash) = block_filter { - expected_events.retain(|(h, _, _)| h == &hash); + expected_events.retain(|(h, _, _, _)| h == &hash); } if let Some(idx) = extrinsic_filter { - expected_events.retain(|(_, phase, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx)); + expected_events.retain(|(_, phase, _, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx)); } if let Some(name) = event_filter { - expected_events.retain(|(_, _, event)| event.pallet == name.0); + expected_events.retain(|(_, _, _, event)| event.pallet == name.0); } for expected_event in expected_events { assert_eq!( subscription.next().await.unwrap().unwrap(), - expected_event.2 + expected_event.3 ); } assert!(subscription.next().await.is_none()); @@ -369,4 +386,85 @@ mod tests { } } } + + #[async_std::test] + async fn test_context() { + let mut events: Vec<(H256, Phase, usize, RawEvent)> = vec![]; + // create all events + for block_hash in [H256::from([0; 32]), H256::from([1; 32])] { + for phase in [ + Phase::Initialization, + Phase::ApplyExtrinsic(0), + Phase::ApplyExtrinsic(1), + Phase::Finalization, + ] { + [named_event("a"), named_event("b")] + .iter() + .enumerate() + .for_each(|(idx, event)| { + events.push(( + block_hash, + phase.clone(), + // The event index + idx, + { + // set variant index so we can uniquely identify + // the event, independently from the event index + let mut event = event.clone(); + event.variant_index = (idx * 2) as u8; + event + } + )) + }); + } + } + + let half_len = events.len() / 2; + + let mut subscription: EventSubscription = + EventSubscription { + block_reader: BlockReader::Mock(Box::new( + vec![ + ( + events[0].0, + Ok(events + .iter() + .take(half_len) + .map(|(_, phase, idx, event)| { + (phase.clone(), *idx, event.clone()) + }) + .collect()), + ), + ( + events[half_len].0, + Ok(events + .iter() + .skip(half_len) + .map(|(_, phase, idx, event)| { + (phase.clone(), *idx, event.clone()) + }) + .collect()), + ), + ] + .into_iter(), + )), + block: None, + extrinsic: None, + event: None, + events: Default::default(), + finished: false, + }; + + let expected_events: Vec<(H256, Phase, usize, RawEvent)> = events.clone(); + + for exp in expected_events { + assert_eq!( + subscription.next_context().await.unwrap().unwrap(), + // (block_hash, event_idx, event) + (exp.0, exp.2, exp.3) + ); + } + assert!(subscription.next().await.is_none()); + } + } From 066b8751fca3a3f01431e5ce59eb195e49a79e3d Mon Sep 17 00:00:00 2001 From: Fabio Lama Date: Tue, 1 Feb 2022 20:02:11 +0100 Subject: [PATCH 3/9] change how events are uniquely identified --- subxt/src/subscription.rs | 27 +++++++++++++-------------- test-runtime/build.rs | 3 +-- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/subxt/src/subscription.rs b/subxt/src/subscription.rs index fa56722b1d..4bf1705324 100644 --- a/subxt/src/subscription.rs +++ b/subxt/src/subscription.rs @@ -312,18 +312,18 @@ mod tests { phase.clone(), // The event index idx, - { - // set variant index so we can uniquely identify - // the event, independently from the event index - let mut event = event.clone(); - event.variant_index = (idx * 2) as u8; - event - } + event.clone(), )) }); } } + // set variant index so we can uniquely identify the event + events.iter_mut().enumerate().for_each(|(idx, event)| { + event.3.variant_index = idx as u8; + }); + + let half_len = events.len() / 2; for block_filter in [None, Some(H256::from([1; 32]))] { @@ -407,18 +407,17 @@ mod tests { phase.clone(), // The event index idx, - { - // set variant index so we can uniquely identify - // the event, independently from the event index - let mut event = event.clone(); - event.variant_index = (idx * 2) as u8; - event - } + event.clone(), )) }); } } + // set variant index so we can uniquely identify the event + events.iter_mut().enumerate().for_each(|(idx, event)| { + event.3.variant_index = idx as u8; + }); + let half_len = events.len() / 2; let mut subscription: EventSubscription = diff --git a/test-runtime/build.rs b/test-runtime/build.rs index 71a075a37a..a93c1c5ac8 100644 --- a/test-runtime/build.rs +++ b/test-runtime/build.rs @@ -45,8 +45,7 @@ async fn run() { env::var(SUBSTRATE_BIN_ENV_VAR).unwrap_or_else(|_| "substrate".to_owned()); // Run binary. - let port = next_open_port() - .expect("Cannot spawn substrate: no available ports in the given port range"); + let port = 8833; let cmd = Command::new(&substrate_bin) .arg("--dev") .arg("--tmp") From c22a91500903bdb4328c27a87aea7b2bbf3f9c2d Mon Sep 17 00:00:00 2001 From: Fabio Lama Date: Tue, 1 Feb 2022 20:03:55 +0100 Subject: [PATCH 4/9] undo local changes for test-runtime --- test-runtime/build.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test-runtime/build.rs b/test-runtime/build.rs index a93c1c5ac8..71a075a37a 100644 --- a/test-runtime/build.rs +++ b/test-runtime/build.rs @@ -45,7 +45,8 @@ async fn run() { env::var(SUBSTRATE_BIN_ENV_VAR).unwrap_or_else(|_| "substrate".to_owned()); // Run binary. - let port = 8833; + let port = next_open_port() + .expect("Cannot spawn substrate: no available ports in the given port range"); let cmd = Command::new(&substrate_bin) .arg("--dev") .arg("--tmp") From 3a78bc069686b4d0a7d811cf7d1f4bf02c18930f Mon Sep 17 00:00:00 2001 From: lamafab <42901763+lamafab@users.noreply.github.com> Date: Wed, 2 Feb 2022 12:29:30 +0100 Subject: [PATCH 5/9] introduce EventContext struct --- subxt/src/subscription.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/subxt/src/subscription.rs b/subxt/src/subscription.rs index 4bf1705324..273d1c7d3b 100644 --- a/subxt/src/subscription.rs +++ b/subxt/src/subscription.rs @@ -39,6 +39,13 @@ use sp_core::{ use sp_runtime::traits::Header; use std::collections::VecDeque; + +pub struct EventContext { + pub block: T, + pub event_idx: usize, + pub event: RawEvent, +} + /// Event subscription simplifies filtering a storage change set stream for /// events of interest. pub struct EventSubscription<'a, T: Config> { @@ -46,7 +53,7 @@ pub struct EventSubscription<'a, T: Config> { block: Option, extrinsic: Option, event: Option<(&'static str, &'static str)>, - events: VecDeque<(::Hash, usize, RawEvent)>, + events: VecDeque>, finished: bool, } @@ -135,7 +142,9 @@ impl<'a, T: Config> EventSubscription<'a, T> { pub async fn next(&mut self) -> Option> { self.next_context() .await - .map(|res| res.map(|(_, _, event)| event)) + .map(|res| { + res.map(|ctx| ctx.event) + }) } /// Gets the next event with the associated block hash and its corresponding /// event index. @@ -162,7 +171,7 @@ impl<'a, T: Config> EventSubscription<'a, T> { match events { Err(err) => return Some(Err(err)), Ok(raw_events) => { - for (phase, idx, raw) in raw_events { + for (phase, event_idx, raw) in raw_events { if let Some(ext_index) = self.extrinsic { if !matches!(phase, Phase::ApplyExtrinsic(i) if i as usize == ext_index) { @@ -174,7 +183,13 @@ impl<'a, T: Config> EventSubscription<'a, T> { continue } } - self.events.push_back((received_hash, idx, raw)); + self.events.push_back( + EventContext { + block: received_hash, + event_idx: event_idx, + event: raw + } + ); } } } From 0a147ac3c81bcc00f1f77b175916e7b7101b2839 Mon Sep 17 00:00:00 2001 From: lamafab <42901763+lamafab@users.noreply.github.com> Date: Wed, 2 Feb 2022 14:07:50 +0100 Subject: [PATCH 6/9] adjust test_context to EventContext struct --- subxt/src/subscription.rs | 43 +++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/subxt/src/subscription.rs b/subxt/src/subscription.rs index 273d1c7d3b..28e2872e2a 100644 --- a/subxt/src/subscription.rs +++ b/subxt/src/subscription.rs @@ -40,6 +40,8 @@ use sp_runtime::traits::Header; use std::collections::VecDeque; +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq, Clone))] pub struct EventContext { pub block: T, pub event_idx: usize, @@ -404,7 +406,7 @@ mod tests { #[async_std::test] async fn test_context() { - let mut events: Vec<(H256, Phase, usize, RawEvent)> = vec![]; + let mut events = vec![]; // create all events for block_hash in [H256::from([0; 32]), H256::from([1; 32])] { for phase in [ @@ -417,20 +419,23 @@ mod tests { .iter() .enumerate() .for_each(|(idx, event)| { - events.push(( - block_hash, - phase.clone(), - // The event index - idx, - event.clone(), - )) + events.push( + ( + phase.clone(), + EventContext { + block: block_hash, + event_idx: idx, + event: event.clone(), + } + ) + ); }); } } // set variant index so we can uniquely identify the event - events.iter_mut().enumerate().for_each(|(idx, event)| { - event.3.variant_index = idx as u8; + events.iter_mut().enumerate().for_each(|(idx, (_, ctx))| { + ctx.event.variant_index = idx as u8; }); let half_len = events.len() / 2; @@ -440,22 +445,22 @@ mod tests { block_reader: BlockReader::Mock(Box::new( vec![ ( - events[0].0, + events[0].1.block, Ok(events .iter() .take(half_len) - .map(|(_, phase, idx, event)| { - (phase.clone(), *idx, event.clone()) + .map(|(phase, ctx)| { + (phase.clone(), ctx.event_idx, ctx.event.clone()) }) .collect()), ), ( - events[half_len].0, + events[half_len].1.block, Ok(events .iter() .skip(half_len) - .map(|(_, phase, idx, event)| { - (phase.clone(), *idx, event.clone()) + .map(|(phase, ctx)| { + (phase.clone(), ctx.event_idx, ctx.event.clone()) }) .collect()), ), @@ -469,16 +474,14 @@ mod tests { finished: false, }; - let expected_events: Vec<(H256, Phase, usize, RawEvent)> = events.clone(); + let expected_events = events.clone(); for exp in expected_events { assert_eq!( subscription.next_context().await.unwrap().unwrap(), - // (block_hash, event_idx, event) - (exp.0, exp.2, exp.3) + exp.1 ); } assert!(subscription.next().await.is_none()); } - } From d3b010cd87e5c67348f3bb80ba84f5d4fe369455 Mon Sep 17 00:00:00 2001 From: lamafab <42901763+lamafab@users.noreply.github.com> Date: Wed, 2 Feb 2022 14:17:28 +0100 Subject: [PATCH 7/9] fix return type for next_context --- subxt/src/subscription.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/subxt/src/subscription.rs b/subxt/src/subscription.rs index 28e2872e2a..719107bf17 100644 --- a/subxt/src/subscription.rs +++ b/subxt/src/subscription.rs @@ -39,7 +39,8 @@ use sp_core::{ use sp_runtime::traits::Header; use std::collections::VecDeque; - +/// Raw bytes for an Event, including the block hash where it occurred and its +/// corresponding event index. #[derive(Debug)] #[cfg_attr(test, derive(PartialEq, Clone))] pub struct EventContext { @@ -92,8 +93,8 @@ impl<'a, T: Config> BlockReader<'a, T> { x.into_iter() .flatten() .enumerate() - .map(|(idx, (phase, raw))| { - (phase, idx, raw) + .map(|(event_idx, (phase, raw))| { + (phase, event_idx, raw) }) .collect() }); @@ -152,7 +153,7 @@ impl<'a, T: Config> EventSubscription<'a, T> { /// event index. pub async fn next_context( &mut self, - ) -> Option::Hash, usize, RawEvent), BasicError>> { + ) -> Option, BasicError>> { loop { if let Some(raw_event) = self.events.pop_front() { return Some(Ok(raw_event)) From a615b790360d5631d90eb6aa89578135218ac0c2 Mon Sep 17 00:00:00 2001 From: lamafab <42901763+lamafab@users.noreply.github.com> Date: Wed, 2 Feb 2022 14:22:28 +0100 Subject: [PATCH 8/9] add suggestions by jsdw --- subxt/src/subscription.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/subxt/src/subscription.rs b/subxt/src/subscription.rs index 719107bf17..ea2b85caa5 100644 --- a/subxt/src/subscription.rs +++ b/subxt/src/subscription.rs @@ -43,8 +43,8 @@ use std::collections::VecDeque; /// corresponding event index. #[derive(Debug)] #[cfg_attr(test, derive(PartialEq, Clone))] -pub struct EventContext { - pub block: T, +pub struct EventContext { + pub block_hash: Hash, pub event_idx: usize, pub event: RawEvent, } @@ -188,7 +188,7 @@ impl<'a, T: Config> EventSubscription<'a, T> { } self.events.push_back( EventContext { - block: received_hash, + block_hash: received_hash, event_idx: event_idx, event: raw } @@ -424,7 +424,7 @@ mod tests { ( phase.clone(), EventContext { - block: block_hash, + block_hash: block_hash, event_idx: idx, event: event.clone(), } @@ -446,7 +446,7 @@ mod tests { block_reader: BlockReader::Mock(Box::new( vec![ ( - events[0].1.block, + events[0].1.block_hash, Ok(events .iter() .take(half_len) @@ -456,7 +456,7 @@ mod tests { .collect()), ), ( - events[half_len].1.block, + events[half_len].1.block_hash, Ok(events .iter() .skip(half_len) From 60d64765ffa8b73c124be9d08ce41d0f89281c1b Mon Sep 17 00:00:00 2001 From: lamafab <42901763+lamafab@users.noreply.github.com> Date: Thu, 3 Feb 2022 16:37:49 +0100 Subject: [PATCH 9/9] ran cargo fmt and clippy --- subxt/src/subscription.rs | 133 ++++++++++++++++++-------------------- 1 file changed, 63 insertions(+), 70 deletions(-) diff --git a/subxt/src/subscription.rs b/subxt/src/subscription.rs index ea2b85caa5..0adc6f929a 100644 --- a/subxt/src/subscription.rs +++ b/subxt/src/subscription.rs @@ -67,7 +67,13 @@ enum BlockReader<'a, T: Config> { }, /// Mock event listener for unit tests #[cfg(test)] - Mock(Box, BasicError>)>>), + Mock( + Box< + dyn Iterator< + Item = (T::Hash, Result, BasicError>), + >, + >, + ), } impl<'a, T: Config> BlockReader<'a, T> { @@ -88,16 +94,13 @@ impl<'a, T: Config> BlockReader<'a, T> { }) .collect(); - let flattened_events = - events.map(|x| { - x.into_iter() - .flatten() - .enumerate() - .map(|(event_idx, (phase, raw))| { - (phase, event_idx, raw) - }) - .collect() - }); + let flattened_events = events.map(|x| { + x.into_iter() + .flatten() + .enumerate() + .map(|(event_idx, (phase, raw))| (phase, event_idx, raw)) + .collect() + }); Some((change_set.block, flattened_events)) } #[cfg(test)] @@ -145,9 +148,7 @@ impl<'a, T: Config> EventSubscription<'a, T> { pub async fn next(&mut self) -> Option> { self.next_context() .await - .map(|res| { - res.map(|ctx| ctx.event) - }) + .map(|res| res.map(|ctx| ctx.event)) } /// Gets the next event with the associated block hash and its corresponding /// event index. @@ -186,13 +187,11 @@ impl<'a, T: Config> EventSubscription<'a, T> { continue } } - self.events.push_back( - EventContext { - block_hash: received_hash, - event_idx: event_idx, - event: raw - } - ); + self.events.push_back(EventContext { + block_hash: received_hash, + event_idx, + event: raw, + }); } } } @@ -341,7 +340,6 @@ mod tests { event.3.variant_index = idx as u8; }); - let half_len = events.len() / 2; for block_filter in [None, Some(H256::from([1; 32]))] { @@ -381,7 +379,8 @@ mod tests { finished: false, }; - let mut expected_events: Vec<(H256, Phase, usize, RawEvent)> = events.clone(); + let mut expected_events: Vec<(H256, Phase, usize, RawEvent)> = + events.clone(); if let Some(hash) = block_filter { expected_events.retain(|(h, _, _, _)| h == &hash); @@ -420,16 +419,14 @@ mod tests { .iter() .enumerate() .for_each(|(idx, event)| { - events.push( - ( - phase.clone(), - EventContext { - block_hash: block_hash, - event_idx: idx, - event: event.clone(), - } - ) - ); + events.push(( + phase.clone(), + EventContext { + block_hash, + event_idx: idx, + event: event.clone(), + }, + )); }); } } @@ -441,47 +438,43 @@ mod tests { let half_len = events.len() / 2; - let mut subscription: EventSubscription = - EventSubscription { - block_reader: BlockReader::Mock(Box::new( - vec![ - ( - events[0].1.block_hash, - Ok(events - .iter() - .take(half_len) - .map(|(phase, ctx)| { - (phase.clone(), ctx.event_idx, ctx.event.clone()) - }) - .collect()), - ), - ( - events[half_len].1.block_hash, - Ok(events - .iter() - .skip(half_len) - .map(|(phase, ctx)| { - (phase.clone(), ctx.event_idx, ctx.event.clone()) - }) - .collect()), - ), - ] - .into_iter(), - )), - block: None, - extrinsic: None, - event: None, - events: Default::default(), - finished: false, - }; + let mut subscription: EventSubscription = EventSubscription { + block_reader: BlockReader::Mock(Box::new( + vec![ + ( + events[0].1.block_hash, + Ok(events + .iter() + .take(half_len) + .map(|(phase, ctx)| { + (phase.clone(), ctx.event_idx, ctx.event.clone()) + }) + .collect()), + ), + ( + events[half_len].1.block_hash, + Ok(events + .iter() + .skip(half_len) + .map(|(phase, ctx)| { + (phase.clone(), ctx.event_idx, ctx.event.clone()) + }) + .collect()), + ), + ] + .into_iter(), + )), + block: None, + extrinsic: None, + event: None, + events: Default::default(), + finished: false, + }; let expected_events = events.clone(); for exp in expected_events { - assert_eq!( - subscription.next_context().await.unwrap().unwrap(), - exp.1 - ); + assert_eq!(subscription.next_context().await.unwrap().unwrap(), exp.1); } assert!(subscription.next().await.is_none()); }