Skip to content

Commit

Permalink
impl it!
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx committed Jan 28, 2025
1 parent 7c10e29 commit 59d027e
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 208 deletions.
100 changes: 55 additions & 45 deletions ethexe/observer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ use alloy::{
use anyhow::{Context as _, Result};
use ethexe_common::events::{BlockEvent, BlockRequestEvent, RouterEvent};
use ethexe_db::BlockHeader;
use ethexe_service_utils::AsyncFnStream;
use ethexe_signer::Address;
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
use futures::{
future::BoxFuture,
stream::{FusedStream, FuturesUnordered},
Stream, StreamExt,
};
use gprimitives::{CodeId, H256};
use parity_scale_codec::{Decode, Encode};
use std::{pin::Pin, sync::Arc, time::Duration};
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

pub(crate) type Provider = RootProvider<BoxTransport>;

Expand Down Expand Up @@ -77,29 +85,31 @@ pub struct ObserverService {
codes_futures: FuturesUnordered<BlobDownloadFuture>,
}

impl AsyncFnStream for ObserverService {
impl Stream for ObserverService {
type Item = Result<ObserverEvent>;

async fn like_next(&mut self) -> Option<Self::Item> {
Some(self.next().await)
}
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some((hash, header, events))) = self.stream.poll_next_unpin(cx) {
let event = Ok(self.handle_stream_next(hash, header, events));

// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this.
// impl Stream for ObserverService {
// type Item = Result<ObserverEvent>;
return Poll::Ready(Some(event));
};

// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// let e = ready!(pin!(self.next_event()).poll(cx));
// Poll::Ready(Some(e))
// }
// }
if let Poll::Ready(Some(res)) = self.codes_futures.poll_next_unpin(cx) {
let event = res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code });

// impl FusedStream for ObserverService {
// fn is_terminated(&self) -> bool {
// false
// }
// }
return Poll::Ready(Some(event));
}

Poll::Pending
}
}

impl FusedStream for ObserverService {
fn is_terminated(&self) -> bool {
false
}
}

impl ObserverService {
pub async fn new(config: &EthereumConfig) -> Result<Self> {
Expand Down Expand Up @@ -171,7 +181,7 @@ impl ObserverService {
router: Address,
) -> impl Stream<Item = (H256, BlockHeader, Vec<BlockEvent>)> {
async_stream::stream! {
while let Some(header) = stream.like_next().await {
while let Some(header) = stream.next().await {
let hash = (*header.hash).into();
let parent_hash = (*header.parent_hash).into();
let block_number = header.number as u32;
Expand All @@ -190,31 +200,31 @@ impl ObserverService {
}
}

pub async fn next(&mut self) -> Result<ObserverEvent> {
tokio::select! {
Some((hash, header, events)) = self.stream.next() => {
// TODO (breathx): set in db?
log::trace!("Received block: {hash:?}");

self.last_block_number = header.height;

// TODO: replace me with proper processing of all events, including commitments.
for event in &events {
if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) = event {
self.lookup_code(*code_id, *tx_hash);
}
}

Ok(ObserverEvent::Block(BlockData {
hash,
header,
events,
}))
},
Some(res) = self.codes_futures.next() => {
res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code })
fn handle_stream_next(
&mut self,
hash: H256,
header: BlockHeader,
events: Vec<BlockEvent>,
) -> ObserverEvent {
// TODO (breathx): set in db?
log::trace!("Received block: {hash:?}");

self.last_block_number = header.height;

// TODO: replace me with proper processing of all events, including commitments.
for event in &events {
if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) =
event
{
self.lookup_code(*code_id, *tx_hash);
}
}

ObserverEvent::Block(BlockData {
hash,
header,
events,
})
}
}

Expand Down
6 changes: 4 additions & 2 deletions ethexe/observer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,16 @@ async fn test_deployment() -> Result<()> {
let event = observer
.next()
.await
.expect("observer did not receive event");
.expect("observer did not receive event")
.expect("received error instead of event");

assert!(matches!(event, ObserverEvent::Block(..)));

let event = observer
.next()
.await
.expect("observer did not receive event");
.expect("observer did not receive event")
.expect("received error instead of event");

assert_eq!(
event,
Expand Down
49 changes: 17 additions & 32 deletions ethexe/prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use anyhow::{Context as _, Result};
use ethexe_service_utils::AsyncFnStream;
use futures::FutureExt;
use futures::{ready, stream::FusedStream, FutureExt, Stream};
use hyper::{
http::StatusCode,
server::conn::AddrIncoming,
Expand All @@ -35,7 +34,9 @@ use prometheus::{
};
use std::{
net::SocketAddr,
pin::Pin,
sync::LazyLock,
task::{Context, Poll},
time::{Duration, Instant, SystemTime},
};
use tokio::{
Expand Down Expand Up @@ -102,36 +103,28 @@ pub struct PrometheusService {
metrics: PrometheusMetrics,
updated: Instant,

#[allow(unused)]
// to be used in stream impl.
server: JoinHandle<()>,

interval: Interval,
interval: Pin<Box<Interval>>,
}

impl AsyncFnStream for PrometheusService {
impl Stream for PrometheusService {
type Item = PrometheusEvent;

async fn like_next(&mut self) -> Option<Self::Item> {
Some(self.next().await)
}
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let instant = ready!(self.interval.poll_tick(cx));

// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this.
// impl Stream for PrometheusService {
// type Item = PrometheusEvent;
self.updated = instant.into();

// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// let e = ready!(pin!(self.next_event()).poll(cx));
// Poll::Ready(Some(e))
// }
// }
Poll::Ready(Some(PrometheusEvent::CollectMetrics))
}
}

// impl FusedStream for PrometheusService {
// fn is_terminated(&self) -> bool {
// self.server.is_finished()
// }
// }
impl FusedStream for PrometheusService {
fn is_terminated(&self) -> bool {
self.server.is_finished()
}
}

impl PrometheusService {
pub fn new(config: PrometheusConfig) -> Result<Self> {
Expand All @@ -140,7 +133,7 @@ impl PrometheusService {

let server = tokio::spawn(init_prometheus(config.addr, config.registry).map(drop));

let interval = time::interval(Duration::from_secs(6));
let interval = Box::pin(time::interval(Duration::from_secs(6)));

Ok(Self {
metrics,
Expand Down Expand Up @@ -168,14 +161,6 @@ impl PrometheusService {
.submitted_block_commitments
.set(submitted_block_commitments as u64);
}

pub async fn next(&mut self) -> PrometheusEvent {
let instant = self.interval.tick().await;

self.updated = instant.into();

PrometheusEvent::CollectMetrics
}
}

struct PrometheusMetrics {
Expand Down
Loading

0 comments on commit 59d027e

Please sign in to comment.