diff --git a/ethexe/observer/src/lib.rs b/ethexe/observer/src/lib.rs index 24df5ff7777..9e235b53a5f 100644 --- a/ethexe/observer/src/lib.rs +++ b/ethexe/observer/src/lib.rs @@ -27,12 +27,20 @@ use alloy::{ use anyhow::{Context as _, Result}; use ethexe_common::events::{BlockEvent, BlockRequestEvent, RouterEvent}; use ethexe_db::BlockHeader; -use ethexe_service_utils::AsyncFnStream; use ethexe_signer::Address; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{ + future::BoxFuture, + stream::{FusedStream, FuturesUnordered}, + Stream, StreamExt, +}; use gprimitives::{CodeId, H256}; use parity_scale_codec::{Decode, Encode}; -use std::{pin::Pin, sync::Arc, time::Duration}; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; pub(crate) type Provider = RootProvider; @@ -77,29 +85,31 @@ pub struct ObserverService { codes_futures: FuturesUnordered, } -impl AsyncFnStream for ObserverService { +impl Stream for ObserverService { type Item = Result; - async fn like_next(&mut self) -> Option { - Some(self.next().await) - } -} + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(Some((hash, header, events))) = self.stream.poll_next_unpin(cx) { + let event = Ok(self.handle_stream_next(hash, header, events)); -// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this. -// impl Stream for ObserverService { -// type Item = Result; + return Poll::Ready(Some(event)); + }; -// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// let e = ready!(pin!(self.next_event()).poll(cx)); -// Poll::Ready(Some(e)) -// } -// } + if let Poll::Ready(Some(res)) = self.codes_futures.poll_next_unpin(cx) { + let event = res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code }); -// impl FusedStream for ObserverService { -// fn is_terminated(&self) -> bool { -// false -// } -// } + return Poll::Ready(Some(event)); + } + + Poll::Pending + } +} + +impl FusedStream for ObserverService { + fn is_terminated(&self) -> bool { + false + } +} impl ObserverService { pub async fn new(config: &EthereumConfig) -> Result { @@ -171,7 +181,7 @@ impl ObserverService { router: Address, ) -> impl Stream)> { async_stream::stream! { - while let Some(header) = stream.like_next().await { + while let Some(header) = stream.next().await { let hash = (*header.hash).into(); let parent_hash = (*header.parent_hash).into(); let block_number = header.number as u32; @@ -190,31 +200,31 @@ impl ObserverService { } } - pub async fn next(&mut self) -> Result { - tokio::select! { - Some((hash, header, events)) = self.stream.next() => { - // TODO (breathx): set in db? - log::trace!("Received block: {hash:?}"); - - self.last_block_number = header.height; - - // TODO: replace me with proper processing of all events, including commitments. - for event in &events { - if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) = event { - self.lookup_code(*code_id, *tx_hash); - } - } - - Ok(ObserverEvent::Block(BlockData { - hash, - header, - events, - })) - }, - Some(res) = self.codes_futures.next() => { - res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code }) + fn handle_stream_next( + &mut self, + hash: H256, + header: BlockHeader, + events: Vec, + ) -> ObserverEvent { + // TODO (breathx): set in db? + log::trace!("Received block: {hash:?}"); + + self.last_block_number = header.height; + + // TODO: replace me with proper processing of all events, including commitments. + for event in &events { + if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) = + event + { + self.lookup_code(*code_id, *tx_hash); } } + + ObserverEvent::Block(BlockData { + hash, + header, + events, + }) } } diff --git a/ethexe/observer/src/tests.rs b/ethexe/observer/src/tests.rs index 96968c3e631..1aefad464c9 100644 --- a/ethexe/observer/src/tests.rs +++ b/ethexe/observer/src/tests.rs @@ -92,14 +92,16 @@ async fn test_deployment() -> Result<()> { let event = observer .next() .await - .expect("observer did not receive event"); + .expect("observer did not receive event") + .expect("received error instead of event"); assert!(matches!(event, ObserverEvent::Block(..))); let event = observer .next() .await - .expect("observer did not receive event"); + .expect("observer did not receive event") + .expect("received error instead of event"); assert_eq!( event, diff --git a/ethexe/prometheus/src/lib.rs b/ethexe/prometheus/src/lib.rs index c36b7e46f81..9fa75d5cd35 100644 --- a/ethexe/prometheus/src/lib.rs +++ b/ethexe/prometheus/src/lib.rs @@ -17,8 +17,7 @@ // along with this program. If not, see . use anyhow::{Context as _, Result}; -use ethexe_service_utils::AsyncFnStream; -use futures::FutureExt; +use futures::{ready, stream::FusedStream, FutureExt, Stream}; use hyper::{ http::StatusCode, server::conn::AddrIncoming, @@ -35,7 +34,9 @@ use prometheus::{ }; use std::{ net::SocketAddr, + pin::Pin, sync::LazyLock, + task::{Context, Poll}, time::{Duration, Instant, SystemTime}, }; use tokio::{ @@ -102,36 +103,28 @@ pub struct PrometheusService { metrics: PrometheusMetrics, updated: Instant, - #[allow(unused)] // to be used in stream impl. server: JoinHandle<()>, - - interval: Interval, + interval: Pin>, } -impl AsyncFnStream for PrometheusService { +impl Stream for PrometheusService { type Item = PrometheusEvent; - async fn like_next(&mut self) -> Option { - Some(self.next().await) - } -} + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let instant = ready!(self.interval.poll_tick(cx)); -// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this. -// impl Stream for PrometheusService { -// type Item = PrometheusEvent; + self.updated = instant.into(); -// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// let e = ready!(pin!(self.next_event()).poll(cx)); -// Poll::Ready(Some(e)) -// } -// } + Poll::Ready(Some(PrometheusEvent::CollectMetrics)) + } +} -// impl FusedStream for PrometheusService { -// fn is_terminated(&self) -> bool { -// self.server.is_finished() -// } -// } +impl FusedStream for PrometheusService { + fn is_terminated(&self) -> bool { + self.server.is_finished() + } +} impl PrometheusService { pub fn new(config: PrometheusConfig) -> Result { @@ -140,7 +133,7 @@ impl PrometheusService { let server = tokio::spawn(init_prometheus(config.addr, config.registry).map(drop)); - let interval = time::interval(Duration::from_secs(6)); + let interval = Box::pin(time::interval(Duration::from_secs(6))); Ok(Self { metrics, @@ -168,14 +161,6 @@ impl PrometheusService { .submitted_block_commitments .set(submitted_block_commitments as u64); } - - pub async fn next(&mut self) -> PrometheusEvent { - let instant = self.interval.tick().await; - - self.updated = instant.into(); - - PrometheusEvent::CollectMetrics - } } struct PrometheusMetrics { diff --git a/ethexe/sequencer/src/lib.rs b/ethexe/sequencer/src/lib.rs index 04fce1e79fb..26886f247c3 100644 --- a/ethexe/sequencer/src/lib.rs +++ b/ethexe/sequencer/src/lib.rs @@ -25,14 +25,20 @@ use ethexe_common::{ gear::{BlockCommitment, CodeCommitment}, }; use ethexe_ethereum::{router::Router, Ethereum}; -use ethexe_service_utils::{AsyncFnStream, Timer}; +use ethexe_service_utils::Timer; use ethexe_signer::{Address, Digest, PublicKey, Signature, Signer, ToDigest}; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{ + future::BoxFuture, + stream::{FusedStream, FuturesUnordered}, + FutureExt, Stream, StreamExt, +}; use gprimitives::H256; use indexmap::IndexSet; use std::{ collections::{BTreeMap, BTreeSet, HashSet, VecDeque}, ops::Not, + pin::Pin, + task::{Context, Poll}, time::Duration, }; @@ -100,29 +106,45 @@ pub struct SequencerService { submissions: FuturesUnordered, } -impl AsyncFnStream for SequencerService { +impl Stream for SequencerService { type Item = SequencerEvent; - async fn like_next(&mut self) -> Option { - Some(self.next().await) - } -} + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(block_hash) = self.collection_round.poll_unpin(cx) { + let event = self.handle_collection_round_end(block_hash); + + return Poll::Ready(Some(event)); + } + + if let Poll::Ready(block_hash) = self.validation_round.poll_unpin(cx) { + let event = self.handle_validation_round_end(block_hash); + + return Poll::Ready(Some(event)); + } -// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this. -// impl Stream for SequencerService { -// type Item = SequencerEvent; + if let Poll::Ready(Some((res, commit_type))) = self.submissions.poll_next_unpin(cx) { + let tx_hash = res + .inspect(|tx_hash| { + log::debug!("Successfully submitted commitment {commit_type:?} in tx {tx_hash}") + }) + .inspect_err(|err| log::warn!("Failed to submit commitment {commit_type:?}: {err}")) + .ok(); + + return Poll::Ready(Some(SequencerEvent::CommitmentSubmitted { + tx_hash, + commit_type, + })); + } -// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// let e = ready!(pin!(self.next_event()).poll(cx)); -// Poll::Ready(Some(e)) -// } -// } + Poll::Pending + } +} -// impl FusedStream for SequencerService { -// fn is_terminated(&self) -> bool { -// false -// } -// } +impl FusedStream for SequencerService { + fn is_terminated(&self) -> bool { + false + } +} impl SequencerService { pub async fn new( @@ -324,65 +346,60 @@ impl SequencerService { router.commit_blocks(blocks, signatures).await } - pub async fn next(&mut self) -> SequencerEvent { - tokio::select! { - block_hash = self.collection_round.rings() => { - // If chain head is not yet processed by this node, this is normal situation, - // so we just skip this round for sequencer. - let Some(block_is_empty) = self.db.block_is_empty(block_hash) else { - log::warn!("Failed to get block emptiness status for {block_hash}"); - return SequencerEvent::CollectionRoundEnded { block_hash }; - }; - - let last_non_empty_block = if block_is_empty { - let Some(prev_commitment) = self.db.previous_committed_block(block_hash) else { - return SequencerEvent::CollectionRoundEnded { block_hash }; - }; - - prev_commitment - } else { - block_hash - }; - - self.blocks_candidate = - Self::blocks_commitment_candidate(&self.block_commitments, last_non_empty_block, self.threshold); - self.codes_candidate = - Self::codes_commitment_candidate(&self.code_commitments, self.threshold); - - let to_start_validation = self.blocks_candidate.is_some() || self.codes_candidate.is_some(); - - if to_start_validation { - log::debug!("Validation round for {block_hash} started"); - self.validation_round.start(block_hash); - } - - SequencerEvent::CollectionRoundEnded { block_hash } - } - block_hash = self.validation_round.rings() => { - log::debug!("Validation round for {block_hash} ended"); + fn handle_collection_round_end(&mut self, block_hash: H256) -> SequencerEvent { + // If chain head is not yet processed by this node, this is normal situation, + // so we just skip this round for sequencer. + let Some(block_is_empty) = self.db.block_is_empty(block_hash) else { + log::warn!("Failed to get block emptiness status for {block_hash}"); + return SequencerEvent::CollectionRoundEnded { block_hash }; + }; + + let last_non_empty_block = if block_is_empty { + let Some(prev_commitment) = self.db.previous_committed_block(block_hash) else { + return SequencerEvent::CollectionRoundEnded { block_hash }; + }; - let mut submitted = false; + prev_commitment + } else { + block_hash + }; - if self.blocks_candidate.is_some() || self.codes_candidate.is_some() { - log::debug!("Submitting commitments"); - self.submit_multisigned_commitments(); - submitted = true; - } else { - log::debug!("No commitments to submit, skipping"); - } + self.blocks_candidate = Self::blocks_commitment_candidate( + &self.block_commitments, + last_non_empty_block, + self.threshold, + ); + self.codes_candidate = + Self::codes_commitment_candidate(&self.code_commitments, self.threshold); - log::debug!("Validation round ended: block {block_hash}, submitted: {submitted}"); + let to_start_validation = self.blocks_candidate.is_some() || self.codes_candidate.is_some(); - SequencerEvent::ValidationRoundEnded { block_hash, submitted } - } - Some((res, commit_type)) = self.submissions.next() => { - let tx_hash = res - .inspect(|tx_hash| log::debug!("Successfully submitted commitment {commit_type:?} in tx {tx_hash}")) - .inspect_err(|err| log::warn!("Failed to submit commitment {commit_type:?}: {err}")) - .ok(); + if to_start_validation { + log::debug!("Validation round for {block_hash} started"); + self.validation_round.start(block_hash); + } - SequencerEvent::CommitmentSubmitted { tx_hash, commit_type } - } + SequencerEvent::CollectionRoundEnded { block_hash } + } + + fn handle_validation_round_end(&mut self, block_hash: H256) -> SequencerEvent { + log::debug!("Validation round for {block_hash} ended"); + + let mut submitted = false; + + if self.blocks_candidate.is_some() || self.codes_candidate.is_some() { + log::debug!("Submitting commitments"); + self.submit_multisigned_commitments(); + submitted = true; + } else { + log::debug!("No commitments to submit, skipping"); + } + + log::debug!("Validation round ended: block {block_hash}, submitted: {submitted}"); + + SequencerEvent::ValidationRoundEnded { + block_hash, + submitted, } } diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index 795b857a67f..77e04ae23d3 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -35,6 +35,7 @@ use ethexe_sequencer::{ use ethexe_service_utils::{OptionFuture as _, OptionStreamNext as _}; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; use ethexe_validator::BlockCommitmentValidationRequest; +use futures::StreamExt; use gprimitives::H256; use parity_scale_codec::{Decode, Encode}; use std::{ops::Not, sync::Arc}; @@ -453,7 +454,7 @@ impl Service { loop { tokio::select! { - event = observer.next() => { + event = observer.select_next_some() => { match event? { ObserverEvent::Blob { code_id, code } => { // TODO: spawn blocking here? @@ -538,7 +539,7 @@ impl Service { } } }, - Some(event) = sequencer.maybe_next() => { + event = sequencer.maybe_next_some() => { let Some(s) = sequencer.as_mut() else { unreachable!("couldn't produce event without sequencer"); }; @@ -606,7 +607,7 @@ impl Service { SequencerEvent::CommitmentSubmitted { .. } => {}, } }, - Some(event) = network.maybe_next() => { + event = network.maybe_next_some() => { match event { NetworkEvent::Message { source, data } => { log::trace!("Received a network message from peer {source:?}"); @@ -687,7 +688,7 @@ impl Service { } _ => {} }}, - Some(event) = prometheus.maybe_next() => { + event = prometheus.maybe_next_some() => { let Some(p) = prometheus.as_mut() else { unreachable!("couldn't produce event without prometheus"); }; diff --git a/ethexe/service/src/tests.rs b/ethexe/service/src/tests.rs index 107fd1157eb..aa7c302b5a2 100644 --- a/ethexe/service/src/tests.rs +++ b/ethexe/service/src/tests.rs @@ -968,6 +968,7 @@ mod utils { use ethexe_network::export::Multiaddr; use ethexe_observer::{ObserverEvent, ObserverService, SimpleBlockData}; use ethexe_sequencer::{SequencerConfig, SequencerService}; + use futures::StreamExt; use gear_core::message::ReplyCode; use std::{ ops::Mul, @@ -1104,7 +1105,7 @@ mod utils { let handle = task::spawn(async move { send_subscription_created.send(()).unwrap(); - while let Ok(event) = observer.next().await { + while let Ok(event) = observer.select_next_some().await { log::trace!(target: "test-event", "📗 Event: {:?}", event); cloned_sender diff --git a/ethexe/service/utils/src/lib.rs b/ethexe/service/utils/src/lib.rs index 31fad0023e8..0314779378f 100644 --- a/ethexe/service/utils/src/lib.rs +++ b/ethexe/service/utils/src/lib.rs @@ -18,7 +18,7 @@ #![allow(async_fn_in_trait)] -use futures::{future, StreamExt}; +use futures::{future, stream::FusedStream, StreamExt}; use std::future::Future; pub use timer::Timer; @@ -46,27 +46,19 @@ impl OptionFuture for Option { } } -pub trait AsyncFnStream { - type Item; - - async fn like_next(&mut self) -> Option; -} - -impl AsyncFnStream for T { - type Item = T::Item; - - async fn like_next(&mut self) -> Option { - StreamExt::next(self).await - } -} - pub trait OptionStreamNext: private::Sealed { async fn maybe_next(self) -> Option; + + async fn maybe_next_some(self) -> T; } -impl OptionStreamNext for &mut Option { +impl OptionStreamNext for &mut Option { async fn maybe_next(self) -> Option { - self.as_mut().map(AsyncFnStream::like_next).maybe().await + self.as_mut().map(StreamExt::next).maybe().await + } + + async fn maybe_next_some(self) -> S::Item { + self.as_mut().map(StreamExt::select_next_some).maybe().await } } diff --git a/ethexe/service/utils/src/timer.rs b/ethexe/service/utils/src/timer.rs index 30d5d931cd3..0b34d4dc005 100644 --- a/ethexe/service/utils/src/timer.rs +++ b/ethexe/service/utils/src/timer.rs @@ -16,12 +16,15 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::OptionFuture; +use futures::{ready, FutureExt}; use std::{ fmt::Debug, - time::{Duration, Instant}, + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, }; -use tokio::time; +use tokio::time::{self, Sleep}; /// Asynchronous timer with inner data kept. pub struct Timer @@ -35,7 +38,7 @@ where duration: Duration, /// Moment of time when the timer was started and applied data. - inner: Option<(Instant, T)>, + inner: Option<(Pin>, T)>, } impl Timer { @@ -65,22 +68,13 @@ impl Timer { self.inner.is_some() } - /// Get the remaining time until the timer will be ready to ring if started. - pub fn remaining(&self) -> Option { - self.inner.as_ref().map(|(start, _)| { - self.duration - .checked_sub(start.elapsed()) - .unwrap_or(Duration::ZERO) - }) - } - /// Start the timer from the beginning with new data. /// Returns the previous data if the timer was already started. pub fn start(&mut self, data: T) -> Option { log::trace!("Started timer '{}' with data {data:?}", self.name); self.inner - .replace((Instant::now(), data)) + .replace((Box::pin(time::sleep(self.duration)), data)) .map(|(_, data)| data) } @@ -90,25 +84,22 @@ impl Timer { self.inner.take().map(|(_, data)| data) } +} + +impl Future for Timer { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some((sleep, _)) = self.inner.as_mut() { + ready!(sleep.poll_unpin(cx)); + + let data = self.inner.take().map(|(_, data)| data).unwrap(); + + log::debug!("Timer '{}' with data {:?} rings", self.name, data); + + return Poll::Ready(data); + } - /// Result of time passed - timer's ring. - pub async fn rings(&mut self) -> T { - self.remaining() - .map(async |dur| { - if !dur.is_zero() { - log::trace!("Timer {} will ring in {dur:?}", self.name); - } - - time::sleep(dur).await; - - log::trace!("Timer {} rings!", self.name); - - self.inner - .take() - .map(|(_, data)| data) - .expect("stopped or not started timer cannot ring;") - }) - .maybe() - .await + Poll::Pending } }