From ae10eaef968d83d7265477c95d9fb1e940e4986d Mon Sep 17 00:00:00 2001 From: Dmitryii Osipov Date: Tue, 28 Jan 2025 16:50:36 +0700 Subject: [PATCH 1/2] feat(ethexe): implement more rpc methods (#4470) --- Cargo.toml | 4 +- ethexe/common/Cargo.toml | 4 +- ethexe/common/src/db.rs | 3 +- ethexe/common/src/events/mirror.rs | 4 +- ethexe/common/src/events/mod.rs | 4 +- ethexe/common/src/events/router.rs | 4 +- ethexe/common/src/events/wvara.rs | 4 +- ethexe/common/src/gear.rs | 3 + ethexe/db/src/database.rs | 2 +- ethexe/rpc/Cargo.toml | 2 +- ethexe/rpc/src/apis/block.rs | 13 +++- ethexe/rpc/src/apis/code.rs | 63 ++++++++++++++++++++ ethexe/rpc/src/apis/mod.rs | 2 + ethexe/rpc/src/apis/program.rs | 96 +++++++++++++++++++++++++----- ethexe/rpc/src/lib.rs | 5 +- ethexe/runtime/common/src/state.rs | 1 + 16 files changed, 183 insertions(+), 31 deletions(-) create mode 100644 ethexe/rpc/src/apis/code.rs diff --git a/Cargo.toml b/Cargo.toml index 5d9165e9494..c47f9789453 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -309,10 +309,10 @@ ethexe-runtime-common = { path = "ethexe/runtime/common", default-features = fal ethexe-prometheus = { path = "ethexe/prometheus", default-features = false } ethexe-validator = { path = "ethexe/validator", default-features = false } ethexe-rpc = { path = "ethexe/rpc", default-features = false } -ethexe-common = { path = "ethexe/common" } +ethexe-common = { path = "ethexe/common", default-features = false } # Common executor between `sandbox-host` and `lazy-pages-fuzzer` -wasmi = { package = "wasmi", version = "0.38"} +wasmi = { package = "wasmi", version = "0.38" } # Substrate deps binary-merkle-tree = { version = "15.0.1", git = "https://github.com/gear-tech/polkadot-sdk.git", branch = "gear-polkadot-stable2409", default-features = false } diff --git a/ethexe/common/Cargo.toml b/ethexe/common/Cargo.toml index 9e0f167594c..65acb6142be 100644 --- a/ethexe/common/Cargo.toml +++ b/ethexe/common/Cargo.toml @@ -12,9 +12,11 @@ repository.workspace = true [dependencies] gear-core.workspace = true gprimitives = { workspace = true, features = ["serde"] } - parity-scale-codec.workspace = true derive_more.workspace = true hex.workspace = true anyhow.workspace = true serde.workspace = true + +[features] +std = ["gear-core/std", "gprimitives/serde"] diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index fe333e6a279..37db645b5ae 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -42,7 +42,8 @@ pub type Sum = ProgramId; /// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe. pub type ScheduledTask = gear_core::tasks::ScheduledTask; -#[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize, PartialEq, Eq)] +#[derive(Debug, Clone, Default, Encode, Decode, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub struct BlockHeader { pub height: u32, pub timestamp: u64, diff --git a/ethexe/common/src/events/mirror.rs b/ethexe/common/src/events/mirror.rs index 473a004a222..103187af005 100644 --- a/ethexe/common/src/events/mirror.rs +++ b/ethexe/common/src/events/mirror.rs @@ -20,7 +20,6 @@ use alloc::vec::Vec; use gear_core::message::ReplyCode; use gprimitives::{ActorId, MessageId, H256}; use parity_scale_codec::{Decode, Encode}; -use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord)] pub enum Event { @@ -103,7 +102,8 @@ impl Event { } } -#[derive(Clone, Debug, Encode, Decode, Serialize, Deserialize)] +#[derive(Clone, Debug, Encode, Decode)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub enum RequestEvent { ExecutableBalanceTopUpRequested { value: u128, diff --git a/ethexe/common/src/events/mod.rs b/ethexe/common/src/events/mod.rs index f154817bfb8..a631df269d6 100644 --- a/ethexe/common/src/events/mod.rs +++ b/ethexe/common/src/events/mod.rs @@ -18,7 +18,6 @@ use gprimitives::ActorId; use parity_scale_codec::{Decode, Encode}; -use serde::{Deserialize, Serialize}; mod mirror; mod router; @@ -73,7 +72,8 @@ impl From for BlockEvent { } } -#[derive(Clone, Debug, Encode, Decode, Serialize, Deserialize)] +#[derive(Clone, Debug, Encode, Decode)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub enum BlockRequestEvent { Router(RouterRequestEvent), Mirror { diff --git a/ethexe/common/src/events/router.rs b/ethexe/common/src/events/router.rs index 68985642f54..ebb8f7c95a8 100644 --- a/ethexe/common/src/events/router.rs +++ b/ethexe/common/src/events/router.rs @@ -18,7 +18,6 @@ use gprimitives::{ActorId, CodeId, H256}; use parity_scale_codec::{Decode, Encode}; -use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord)] pub enum Event { @@ -72,7 +71,8 @@ impl Event { } } -#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub enum RequestEvent { CodeValidationRequested { code_id: CodeId, diff --git a/ethexe/common/src/events/wvara.rs b/ethexe/common/src/events/wvara.rs index 22b6a916cfe..e459ceb73b4 100644 --- a/ethexe/common/src/events/wvara.rs +++ b/ethexe/common/src/events/wvara.rs @@ -18,7 +18,6 @@ use gprimitives::{ActorId, U256}; use parity_scale_codec::{Decode, Encode}; -use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord)] pub enum Event { @@ -43,7 +42,8 @@ impl Event { } } -#[derive(Clone, Debug, Encode, Decode, Serialize, Deserialize)] +#[derive(Clone, Debug, Encode, Decode)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub enum RequestEvent { Transfer { /// Never router, wvara or zero address. diff --git a/ethexe/common/src/gear.rs b/ethexe/common/src/gear.rs index 287df1b0ef4..e23d79079af 100644 --- a/ethexe/common/src/gear.rs +++ b/ethexe/common/src/gear.rs @@ -100,6 +100,7 @@ pub struct ComputationSettings { } #[derive(Clone, Debug, Default, Encode, Decode, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub struct Message { pub id: MessageId, pub destination: ActorId, @@ -130,6 +131,7 @@ pub struct ProtocolData { } #[derive(Clone, Debug, Default, Encode, Decode, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub struct StateTransition { pub actor_id: ActorId, pub new_state_hash: H256, @@ -147,6 +149,7 @@ pub struct ValidationSettings { } #[derive(Clone, Debug, Default, Encode, Decode, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub struct ValueClaim { pub message_id: MessageId, pub destination: ActorId, diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index a714df1652c..dc2c1d99d64 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -340,7 +340,7 @@ impl CodesStorage for Database { self.kv .iter_prefix(&key_prefix) - .map(|#[allow(unused_variables)] (key, code_id)| { + .map(|(key, code_id)| { let (split_key_prefix, program_id) = key.split_at(key_prefix.len()); debug_assert_eq!(split_key_prefix, key_prefix); let program_id = diff --git a/ethexe/rpc/Cargo.toml b/ethexe/rpc/Cargo.toml index e1a6403f7e5..0757b75a0af 100644 --- a/ethexe/rpc/Cargo.toml +++ b/ethexe/rpc/Cargo.toml @@ -23,7 +23,7 @@ hyper = { workspace = true, features = ["server"] } log.workspace = true parity-scale-codec.workspace = true hex.workspace = true -ethexe-common.workspace = true +ethexe-common = { workspace = true, features = ["std"] } ethexe-runtime-common = { workspace = true, features = ["std"] } sp-core = { workspace = true, features = ["serde"] } gear-core = { workspace = true, features = ["std"] } diff --git a/ethexe/rpc/src/apis/block.rs b/ethexe/rpc/src/apis/block.rs index 70bd33753b3..4d5653e8f66 100644 --- a/ethexe/rpc/src/apis/block.rs +++ b/ethexe/rpc/src/apis/block.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use crate::{common::block_header_at_or_latest, errors}; -use ethexe_common::events::BlockRequestEvent; +use ethexe_common::{events::BlockRequestEvent, gear::StateTransition}; use ethexe_db::{BlockHeader, BlockMetaStorage, Database}; use gprimitives::H256; use jsonrpsee::{ @@ -36,6 +36,9 @@ pub trait Block { #[method(name = "block_events")] async fn block_events(&self, block_hash: Option) -> RpcResult>; + + #[method(name = "block_outcome")] + async fn block_outcome(&self, block_hash: Option) -> RpcResult>; } #[derive(Clone)] @@ -70,4 +73,12 @@ impl BlockServer for BlockApi { .block_events(block_hash) .ok_or_else(|| errors::db("Block events weren't found")) } + + async fn block_outcome(&self, hash: Option) -> RpcResult> { + let block_hash = block_header_at_or_latest(&self.db, hash)?.0; + + self.db + .block_outcome(block_hash) + .ok_or_else(|| errors::db("Block outcome wasn't found")) + } } diff --git a/ethexe/rpc/src/apis/code.rs b/ethexe/rpc/src/apis/code.rs new file mode 100644 index 00000000000..960076021f3 --- /dev/null +++ b/ethexe/rpc/src/apis/code.rs @@ -0,0 +1,63 @@ +// This file is part of Gear. +// +// Copyright (C) 2024-2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::errors; +use ethexe_db::{CodesStorage, Database}; +use gprimitives::H256; +use jsonrpsee::{ + core::{async_trait, RpcResult}, + proc_macros::rpc, +}; +use parity_scale_codec::Encode; +use sp_core::Bytes; + +#[rpc(server)] +pub trait Code { + #[method(name = "code_getOriginal")] + async fn get_original_code(&self, id: H256) -> RpcResult; + + #[method(name = "code_getInstrumented")] + async fn get_instrumented_code(&self, runtime_id: u32, code_id: H256) -> RpcResult; +} + +pub struct CodeApi { + db: Database, +} + +impl CodeApi { + pub fn new(db: Database) -> Self { + Self { db } + } +} + +#[async_trait] +impl CodeServer for CodeApi { + async fn get_original_code(&self, id: H256) -> RpcResult { + self.db + .original_code(id.into()) + .map(|bytes| bytes.encode().into()) + .ok_or_else(|| errors::db("Failed to get code by supplied id")) + } + + async fn get_instrumented_code(&self, runtime_id: u32, code_id: H256) -> RpcResult { + self.db + .instrumented_code(runtime_id, code_id.into()) + .map(|bytes| bytes.encode().into()) + .ok_or_else(|| errors::db("Failed to get code by supplied id")) + } +} diff --git a/ethexe/rpc/src/apis/mod.rs b/ethexe/rpc/src/apis/mod.rs index eb86f52f66d..ebe699a238c 100644 --- a/ethexe/rpc/src/apis/mod.rs +++ b/ethexe/rpc/src/apis/mod.rs @@ -17,9 +17,11 @@ // along with this program. If not, see . mod block; +mod code; mod dev; mod program; pub use block::{BlockApi, BlockServer}; +pub use code::{CodeApi, CodeServer}; pub use dev::{DevApi, DevServer}; pub use program::{ProgramApi, ProgramServer}; diff --git a/ethexe/rpc/src/apis/program.rs b/ethexe/rpc/src/apis/program.rs index 7ecdbd5c65e..a0527c14e9f 100644 --- a/ethexe/rpc/src/apis/program.rs +++ b/ethexe/rpc/src/apis/program.rs @@ -20,7 +20,8 @@ use crate::{common::block_header_at_or_latest, errors}; use ethexe_db::{CodesStorage, Database}; use ethexe_processor::Processor; use ethexe_runtime_common::state::{ - HashOf, Mailbox, MemoryPages, MessageQueue, ProgramState, Storage, Waitlist, + DispatchStash, HashOf, Mailbox, MemoryPages, MessageQueue, Program, ProgramState, Storage, + Waitlist, }; use gear_core::message::ReplyInfo; use gprimitives::{H160, H256}; @@ -29,8 +30,20 @@ use jsonrpsee::{ proc_macros::rpc, }; use parity_scale_codec::Encode; +use serde::{Deserialize, Serialize}; use sp_core::Bytes; +#[derive(Clone, Serialize, Deserialize)] +pub struct FullProgramState { + pub program: Program, + pub queue: Option, + pub waitlist: Option, + pub stash: Option, + pub mailbox: Option, + pub balance: u128, + pub executable_balance: u128, +} + #[rpc(server)] pub trait Program { #[method(name = "program_calculateReplyForHandle")] @@ -55,15 +68,21 @@ pub trait Program { #[method(name = "program_readQueue")] async fn read_queue(&self, hash: H256) -> RpcResult; + #[method(name = "program_readWaitlist")] + async fn read_waitlist(&self, hash: H256) -> RpcResult; + + #[method(name = "program_readStash")] + async fn read_stash(&self, hash: H256) -> RpcResult; + #[method(name = "program_readMailbox")] async fn read_mailbox(&self, hash: H256) -> RpcResult; + #[method(name = "program_readFullState")] + async fn read_full_state(&self, hash: H256) -> RpcResult; + #[method(name = "program_readPages")] async fn read_pages(&self, hash: H256) -> RpcResult; - #[method(name = "program_readWaitlist")] - async fn read_waitlist(&self, hash: H256) -> RpcResult; - #[method(name = "program_readPageData")] async fn read_page_data(&self, hash: H256) -> RpcResult; } @@ -76,6 +95,22 @@ impl ProgramApi { pub fn new(db: Database) -> Self { Self { db } } + + fn read_queue(&self, hash: H256) -> Option { + self.db.read_queue(unsafe { HashOf::new(hash) }) + } + + fn read_waitlist(&self, hash: H256) -> Option { + self.db.read_waitlist(unsafe { HashOf::new(hash) }) + } + + fn read_stash(&self, hash: H256) -> Option { + self.db.read_stash(unsafe { HashOf::new(hash) }) + } + + fn read_mailbox(&self, hash: H256) -> Option { + self.db.read_mailbox(unsafe { HashOf::new(hash) }) + } } #[async_trait] @@ -130,30 +165,61 @@ impl ProgramServer for ProgramApi { } async fn read_queue(&self, hash: H256) -> RpcResult { - self.db - .read_queue(unsafe { HashOf::new(hash) }) + self.read_queue(hash) .ok_or_else(|| errors::db("Failed to read queue by hash")) } + async fn read_waitlist(&self, hash: H256) -> RpcResult { + self.read_waitlist(hash) + .ok_or_else(|| errors::db("Failed to read waitlist by hash")) + } + + async fn read_stash(&self, hash: H256) -> RpcResult { + self.read_stash(hash) + .ok_or_else(|| errors::db("Failed to read stash by hash")) + } + async fn read_mailbox(&self, hash: H256) -> RpcResult { - self.db - .read_mailbox(unsafe { HashOf::new(hash) }) + self.read_mailbox(hash) .ok_or_else(|| errors::db("Failed to read mailbox by hash")) } + async fn read_full_state(&self, hash: H256) -> RpcResult { + let Some(ProgramState { + program, + queue_hash, + waitlist_hash, + stash_hash, + mailbox_hash, + balance, + executable_balance, + }) = self.db.read_state(hash) + else { + return Err(errors::db("Failed to read state by hash")); + }; + + let queue = queue_hash.query(&self.db).ok(); + let waitlist = waitlist_hash.query(&self.db).ok(); + let stash = stash_hash.query(&self.db).ok(); + let mailbox = mailbox_hash.query(&self.db).ok(); + + Ok(FullProgramState { + program, + queue, + waitlist, + stash, + mailbox, + balance, + executable_balance, + }) + } + async fn read_pages(&self, hash: H256) -> RpcResult { self.db .read_pages(unsafe { HashOf::new(hash) }) .ok_or_else(|| errors::db("Failed to read pages by hash")) } - // TODO: read the whole program state in a single query - async fn read_waitlist(&self, hash: H256) -> RpcResult { - self.db - .read_waitlist(unsafe { HashOf::new(hash) }) - .ok_or_else(|| errors::db("Failed to read waitlist by hash")) - } - async fn read_page_data(&self, hash: H256) -> RpcResult { self.db .read_page_data(unsafe { HashOf::new(hash) }) diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index c923670b225..26b960b5795 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -17,7 +17,9 @@ // along with this program. If not, see . use anyhow::{anyhow, Result}; -use apis::{BlockApi, BlockServer, DevApi, DevServer, ProgramApi, ProgramServer}; +use apis::{ + BlockApi, BlockServer, CodeApi, CodeServer, DevApi, DevServer, ProgramApi, ProgramServer, +}; use ethexe_db::Database; use ethexe_observer::MockBlobReader; use futures::FutureExt; @@ -89,6 +91,7 @@ impl RpcService { let mut module = JsonrpcModule::new(()); module.merge(ProgramServer::into_rpc(ProgramApi::new(self.db.clone())))?; module.merge(BlockServer::into_rpc(BlockApi::new(self.db.clone())))?; + module.merge(CodeServer::into_rpc(CodeApi::new(self.db.clone())))?; if self.config.dev { module.merge(DevServer::into_rpc(DevApi::new( diff --git a/ethexe/runtime/common/src/state.rs b/ethexe/runtime/common/src/state.rs index 101843b57b3..e7c02fdc38c 100644 --- a/ethexe/runtime/common/src/state.rs +++ b/ethexe/runtime/common/src/state.rs @@ -700,6 +700,7 @@ impl Waitlist { } #[derive(Clone, Default, Debug, Encode, Decode, PartialEq, Eq, derive_more::Into)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub struct DispatchStash(BTreeMap)>>); impl DispatchStash { From a1c7e83f45b8b106b2010fbbf81065a6cd0d678e Mon Sep 17 00:00:00 2001 From: Dmitrii Novikov Date: Tue, 28 Jan 2025 17:07:01 +0700 Subject: [PATCH 2/2] refactor(ethexe): impl proper Streams on services (#4480) --- ethexe/network/src/lib.rs | 52 +++++----- ethexe/observer/src/lib.rs | 100 ++++++++++-------- ethexe/observer/src/tests.rs | 6 +- ethexe/prometheus/src/lib.rs | 49 +++------ ethexe/sequencer/src/lib.rs | 167 +++++++++++++++++------------- ethexe/service/src/lib.rs | 9 +- ethexe/service/src/tests.rs | 3 +- ethexe/service/utils/src/lib.rs | 26 ++--- ethexe/service/utils/src/timer.rs | 57 +++++----- 9 files changed, 235 insertions(+), 234 deletions(-) diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index e8291727e84..8421da4706a 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -28,7 +28,7 @@ pub mod export { use anyhow::{anyhow, Context}; use ethexe_db::Database; use ethexe_signer::{PublicKey, Signer}; -use futures::{future::Either, stream::FusedStream, Stream}; +use futures::{future::Either, ready, stream::FusedStream, Stream}; use libp2p::{ connection_limits, core::{muxing::StreamMuxerBox, upgrade}, @@ -121,6 +121,31 @@ pub struct NetworkService { swarm: Swarm, } +impl Stream for NetworkService { + type Item = NetworkEvent; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + loop { + let Some(event) = ready!(self.swarm.poll_next_unpin(cx)) else { + return Poll::Ready(None); + }; + + if let Some(event) = self.handle_swarm_event(event) { + return Poll::Ready(Some(event)); + } + } + } +} + +impl FusedStream for NetworkService { + fn is_terminated(&self) -> bool { + self.swarm.is_terminated() + } +} + impl NetworkService { pub fn new( config: NetworkConfig, @@ -407,31 +432,6 @@ impl NetworkService { } } -impl Stream for NetworkService { - type Item = NetworkEvent; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - if let Poll::Ready(Some(event)) = self.swarm.poll_next_unpin(cx) { - if let Some(event) = self.get_mut().handle_swarm_event(event) { - return Poll::Ready(Some(event)); - } else { - cx.waker().wake_by_ref(); - } - } - - Poll::Pending - } -} - -impl FusedStream for NetworkService { - fn is_terminated(&self) -> bool { - self.swarm.is_terminated() - } -} - #[cfg(test)] impl NetworkService { async fn connect(&mut self, service: &mut Self) { diff --git a/ethexe/observer/src/lib.rs b/ethexe/observer/src/lib.rs index 24df5ff7777..9e235b53a5f 100644 --- a/ethexe/observer/src/lib.rs +++ b/ethexe/observer/src/lib.rs @@ -27,12 +27,20 @@ use alloy::{ use anyhow::{Context as _, Result}; use ethexe_common::events::{BlockEvent, BlockRequestEvent, RouterEvent}; use ethexe_db::BlockHeader; -use ethexe_service_utils::AsyncFnStream; use ethexe_signer::Address; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{ + future::BoxFuture, + stream::{FusedStream, FuturesUnordered}, + Stream, StreamExt, +}; use gprimitives::{CodeId, H256}; use parity_scale_codec::{Decode, Encode}; -use std::{pin::Pin, sync::Arc, time::Duration}; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; pub(crate) type Provider = RootProvider; @@ -77,29 +85,31 @@ pub struct ObserverService { codes_futures: FuturesUnordered, } -impl AsyncFnStream for ObserverService { +impl Stream for ObserverService { type Item = Result; - async fn like_next(&mut self) -> Option { - Some(self.next().await) - } -} + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(Some((hash, header, events))) = self.stream.poll_next_unpin(cx) { + let event = Ok(self.handle_stream_next(hash, header, events)); -// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this. -// impl Stream for ObserverService { -// type Item = Result; + return Poll::Ready(Some(event)); + }; -// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// let e = ready!(pin!(self.next_event()).poll(cx)); -// Poll::Ready(Some(e)) -// } -// } + if let Poll::Ready(Some(res)) = self.codes_futures.poll_next_unpin(cx) { + let event = res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code }); -// impl FusedStream for ObserverService { -// fn is_terminated(&self) -> bool { -// false -// } -// } + return Poll::Ready(Some(event)); + } + + Poll::Pending + } +} + +impl FusedStream for ObserverService { + fn is_terminated(&self) -> bool { + false + } +} impl ObserverService { pub async fn new(config: &EthereumConfig) -> Result { @@ -171,7 +181,7 @@ impl ObserverService { router: Address, ) -> impl Stream)> { async_stream::stream! { - while let Some(header) = stream.like_next().await { + while let Some(header) = stream.next().await { let hash = (*header.hash).into(); let parent_hash = (*header.parent_hash).into(); let block_number = header.number as u32; @@ -190,31 +200,31 @@ impl ObserverService { } } - pub async fn next(&mut self) -> Result { - tokio::select! { - Some((hash, header, events)) = self.stream.next() => { - // TODO (breathx): set in db? - log::trace!("Received block: {hash:?}"); - - self.last_block_number = header.height; - - // TODO: replace me with proper processing of all events, including commitments. - for event in &events { - if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) = event { - self.lookup_code(*code_id, *tx_hash); - } - } - - Ok(ObserverEvent::Block(BlockData { - hash, - header, - events, - })) - }, - Some(res) = self.codes_futures.next() => { - res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code }) + fn handle_stream_next( + &mut self, + hash: H256, + header: BlockHeader, + events: Vec, + ) -> ObserverEvent { + // TODO (breathx): set in db? + log::trace!("Received block: {hash:?}"); + + self.last_block_number = header.height; + + // TODO: replace me with proper processing of all events, including commitments. + for event in &events { + if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) = + event + { + self.lookup_code(*code_id, *tx_hash); } } + + ObserverEvent::Block(BlockData { + hash, + header, + events, + }) } } diff --git a/ethexe/observer/src/tests.rs b/ethexe/observer/src/tests.rs index 96968c3e631..1aefad464c9 100644 --- a/ethexe/observer/src/tests.rs +++ b/ethexe/observer/src/tests.rs @@ -92,14 +92,16 @@ async fn test_deployment() -> Result<()> { let event = observer .next() .await - .expect("observer did not receive event"); + .expect("observer did not receive event") + .expect("received error instead of event"); assert!(matches!(event, ObserverEvent::Block(..))); let event = observer .next() .await - .expect("observer did not receive event"); + .expect("observer did not receive event") + .expect("received error instead of event"); assert_eq!( event, diff --git a/ethexe/prometheus/src/lib.rs b/ethexe/prometheus/src/lib.rs index c36b7e46f81..9fa75d5cd35 100644 --- a/ethexe/prometheus/src/lib.rs +++ b/ethexe/prometheus/src/lib.rs @@ -17,8 +17,7 @@ // along with this program. If not, see . use anyhow::{Context as _, Result}; -use ethexe_service_utils::AsyncFnStream; -use futures::FutureExt; +use futures::{ready, stream::FusedStream, FutureExt, Stream}; use hyper::{ http::StatusCode, server::conn::AddrIncoming, @@ -35,7 +34,9 @@ use prometheus::{ }; use std::{ net::SocketAddr, + pin::Pin, sync::LazyLock, + task::{Context, Poll}, time::{Duration, Instant, SystemTime}, }; use tokio::{ @@ -102,36 +103,28 @@ pub struct PrometheusService { metrics: PrometheusMetrics, updated: Instant, - #[allow(unused)] // to be used in stream impl. server: JoinHandle<()>, - - interval: Interval, + interval: Pin>, } -impl AsyncFnStream for PrometheusService { +impl Stream for PrometheusService { type Item = PrometheusEvent; - async fn like_next(&mut self) -> Option { - Some(self.next().await) - } -} + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let instant = ready!(self.interval.poll_tick(cx)); -// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this. -// impl Stream for PrometheusService { -// type Item = PrometheusEvent; + self.updated = instant.into(); -// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// let e = ready!(pin!(self.next_event()).poll(cx)); -// Poll::Ready(Some(e)) -// } -// } + Poll::Ready(Some(PrometheusEvent::CollectMetrics)) + } +} -// impl FusedStream for PrometheusService { -// fn is_terminated(&self) -> bool { -// self.server.is_finished() -// } -// } +impl FusedStream for PrometheusService { + fn is_terminated(&self) -> bool { + self.server.is_finished() + } +} impl PrometheusService { pub fn new(config: PrometheusConfig) -> Result { @@ -140,7 +133,7 @@ impl PrometheusService { let server = tokio::spawn(init_prometheus(config.addr, config.registry).map(drop)); - let interval = time::interval(Duration::from_secs(6)); + let interval = Box::pin(time::interval(Duration::from_secs(6))); Ok(Self { metrics, @@ -168,14 +161,6 @@ impl PrometheusService { .submitted_block_commitments .set(submitted_block_commitments as u64); } - - pub async fn next(&mut self) -> PrometheusEvent { - let instant = self.interval.tick().await; - - self.updated = instant.into(); - - PrometheusEvent::CollectMetrics - } } struct PrometheusMetrics { diff --git a/ethexe/sequencer/src/lib.rs b/ethexe/sequencer/src/lib.rs index 04fce1e79fb..ec4b8a6d488 100644 --- a/ethexe/sequencer/src/lib.rs +++ b/ethexe/sequencer/src/lib.rs @@ -25,14 +25,20 @@ use ethexe_common::{ gear::{BlockCommitment, CodeCommitment}, }; use ethexe_ethereum::{router::Router, Ethereum}; -use ethexe_service_utils::{AsyncFnStream, Timer}; +use ethexe_service_utils::Timer; use ethexe_signer::{Address, Digest, PublicKey, Signature, Signer, ToDigest}; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{ + future::BoxFuture, + stream::{FusedStream, FuturesUnordered}, + FutureExt, Stream, StreamExt, +}; use gprimitives::H256; use indexmap::IndexSet; use std::{ collections::{BTreeMap, BTreeSet, HashSet, VecDeque}, ops::Not, + pin::Pin, + task::{Context, Poll}, time::Duration, }; @@ -100,29 +106,47 @@ pub struct SequencerService { submissions: FuturesUnordered, } -impl AsyncFnStream for SequencerService { +impl Stream for SequencerService { type Item = SequencerEvent; - async fn like_next(&mut self) -> Option { - Some(self.next().await) - } -} + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(block_hash) = self.collection_round.poll_unpin(cx) { + let event = self.handle_collection_round_end(block_hash); -// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this. -// impl Stream for SequencerService { -// type Item = SequencerEvent; + return Poll::Ready(Some(event)); + } -// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// let e = ready!(pin!(self.next_event()).poll(cx)); -// Poll::Ready(Some(e)) -// } -// } + if let Poll::Ready(block_hash) = self.validation_round.poll_unpin(cx) { + let event = self.handle_validation_round_end(block_hash); -// impl FusedStream for SequencerService { -// fn is_terminated(&self) -> bool { -// false -// } -// } + return Poll::Ready(Some(event)); + } + + if let Poll::Ready(Some((res, commit_type))) = self.submissions.poll_next_unpin(cx) { + let tx_hash = res + .inspect(|tx_hash| { + log::debug!("Successfully submitted commitment {commit_type:?} in tx {tx_hash}") + }) + .inspect_err(|err| log::warn!("Failed to submit commitment {commit_type:?}: {err}")) + .ok(); + + let event = SequencerEvent::CommitmentSubmitted { + tx_hash, + commit_type, + }; + + return Poll::Ready(Some(event)); + } + + Poll::Pending + } +} + +impl FusedStream for SequencerService { + fn is_terminated(&self) -> bool { + false + } +} impl SequencerService { pub async fn new( @@ -324,65 +348,60 @@ impl SequencerService { router.commit_blocks(blocks, signatures).await } - pub async fn next(&mut self) -> SequencerEvent { - tokio::select! { - block_hash = self.collection_round.rings() => { - // If chain head is not yet processed by this node, this is normal situation, - // so we just skip this round for sequencer. - let Some(block_is_empty) = self.db.block_is_empty(block_hash) else { - log::warn!("Failed to get block emptiness status for {block_hash}"); - return SequencerEvent::CollectionRoundEnded { block_hash }; - }; - - let last_non_empty_block = if block_is_empty { - let Some(prev_commitment) = self.db.previous_committed_block(block_hash) else { - return SequencerEvent::CollectionRoundEnded { block_hash }; - }; - - prev_commitment - } else { - block_hash - }; - - self.blocks_candidate = - Self::blocks_commitment_candidate(&self.block_commitments, last_non_empty_block, self.threshold); - self.codes_candidate = - Self::codes_commitment_candidate(&self.code_commitments, self.threshold); - - let to_start_validation = self.blocks_candidate.is_some() || self.codes_candidate.is_some(); - - if to_start_validation { - log::debug!("Validation round for {block_hash} started"); - self.validation_round.start(block_hash); - } - - SequencerEvent::CollectionRoundEnded { block_hash } - } - block_hash = self.validation_round.rings() => { - log::debug!("Validation round for {block_hash} ended"); + fn handle_collection_round_end(&mut self, block_hash: H256) -> SequencerEvent { + // If chain head is not yet processed by this node, this is normal situation, + // so we just skip this round for sequencer. + let Some(block_is_empty) = self.db.block_is_empty(block_hash) else { + log::warn!("Failed to get block emptiness status for {block_hash}"); + return SequencerEvent::CollectionRoundEnded { block_hash }; + }; - let mut submitted = false; + let last_non_empty_block = if block_is_empty { + let Some(prev_commitment) = self.db.previous_committed_block(block_hash) else { + return SequencerEvent::CollectionRoundEnded { block_hash }; + }; - if self.blocks_candidate.is_some() || self.codes_candidate.is_some() { - log::debug!("Submitting commitments"); - self.submit_multisigned_commitments(); - submitted = true; - } else { - log::debug!("No commitments to submit, skipping"); - } + prev_commitment + } else { + block_hash + }; - log::debug!("Validation round ended: block {block_hash}, submitted: {submitted}"); + self.blocks_candidate = Self::blocks_commitment_candidate( + &self.block_commitments, + last_non_empty_block, + self.threshold, + ); + self.codes_candidate = + Self::codes_commitment_candidate(&self.code_commitments, self.threshold); - SequencerEvent::ValidationRoundEnded { block_hash, submitted } - } - Some((res, commit_type)) = self.submissions.next() => { - let tx_hash = res - .inspect(|tx_hash| log::debug!("Successfully submitted commitment {commit_type:?} in tx {tx_hash}")) - .inspect_err(|err| log::warn!("Failed to submit commitment {commit_type:?}: {err}")) - .ok(); + let to_start_validation = self.blocks_candidate.is_some() || self.codes_candidate.is_some(); - SequencerEvent::CommitmentSubmitted { tx_hash, commit_type } - } + if to_start_validation { + log::debug!("Validation round for {block_hash} started"); + self.validation_round.start(block_hash); + } + + SequencerEvent::CollectionRoundEnded { block_hash } + } + + fn handle_validation_round_end(&mut self, block_hash: H256) -> SequencerEvent { + log::debug!("Validation round for {block_hash} ended"); + + let mut submitted = false; + + if self.blocks_candidate.is_some() || self.codes_candidate.is_some() { + log::debug!("Submitting commitments"); + self.submit_multisigned_commitments(); + submitted = true; + } else { + log::debug!("No commitments to submit, skipping"); + } + + log::debug!("Validation round ended: block {block_hash}, submitted: {submitted}"); + + SequencerEvent::ValidationRoundEnded { + block_hash, + submitted, } } diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index 795b857a67f..77e04ae23d3 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -35,6 +35,7 @@ use ethexe_sequencer::{ use ethexe_service_utils::{OptionFuture as _, OptionStreamNext as _}; use ethexe_signer::{Digest, PublicKey, Signature, Signer}; use ethexe_validator::BlockCommitmentValidationRequest; +use futures::StreamExt; use gprimitives::H256; use parity_scale_codec::{Decode, Encode}; use std::{ops::Not, sync::Arc}; @@ -453,7 +454,7 @@ impl Service { loop { tokio::select! { - event = observer.next() => { + event = observer.select_next_some() => { match event? { ObserverEvent::Blob { code_id, code } => { // TODO: spawn blocking here? @@ -538,7 +539,7 @@ impl Service { } } }, - Some(event) = sequencer.maybe_next() => { + event = sequencer.maybe_next_some() => { let Some(s) = sequencer.as_mut() else { unreachable!("couldn't produce event without sequencer"); }; @@ -606,7 +607,7 @@ impl Service { SequencerEvent::CommitmentSubmitted { .. } => {}, } }, - Some(event) = network.maybe_next() => { + event = network.maybe_next_some() => { match event { NetworkEvent::Message { source, data } => { log::trace!("Received a network message from peer {source:?}"); @@ -687,7 +688,7 @@ impl Service { } _ => {} }}, - Some(event) = prometheus.maybe_next() => { + event = prometheus.maybe_next_some() => { let Some(p) = prometheus.as_mut() else { unreachable!("couldn't produce event without prometheus"); }; diff --git a/ethexe/service/src/tests.rs b/ethexe/service/src/tests.rs index 107fd1157eb..aa7c302b5a2 100644 --- a/ethexe/service/src/tests.rs +++ b/ethexe/service/src/tests.rs @@ -968,6 +968,7 @@ mod utils { use ethexe_network::export::Multiaddr; use ethexe_observer::{ObserverEvent, ObserverService, SimpleBlockData}; use ethexe_sequencer::{SequencerConfig, SequencerService}; + use futures::StreamExt; use gear_core::message::ReplyCode; use std::{ ops::Mul, @@ -1104,7 +1105,7 @@ mod utils { let handle = task::spawn(async move { send_subscription_created.send(()).unwrap(); - while let Ok(event) = observer.next().await { + while let Ok(event) = observer.select_next_some().await { log::trace!(target: "test-event", "📗 Event: {:?}", event); cloned_sender diff --git a/ethexe/service/utils/src/lib.rs b/ethexe/service/utils/src/lib.rs index 31fad0023e8..0314779378f 100644 --- a/ethexe/service/utils/src/lib.rs +++ b/ethexe/service/utils/src/lib.rs @@ -18,7 +18,7 @@ #![allow(async_fn_in_trait)] -use futures::{future, StreamExt}; +use futures::{future, stream::FusedStream, StreamExt}; use std::future::Future; pub use timer::Timer; @@ -46,27 +46,19 @@ impl OptionFuture for Option { } } -pub trait AsyncFnStream { - type Item; - - async fn like_next(&mut self) -> Option; -} - -impl AsyncFnStream for T { - type Item = T::Item; - - async fn like_next(&mut self) -> Option { - StreamExt::next(self).await - } -} - pub trait OptionStreamNext: private::Sealed { async fn maybe_next(self) -> Option; + + async fn maybe_next_some(self) -> T; } -impl OptionStreamNext for &mut Option { +impl OptionStreamNext for &mut Option { async fn maybe_next(self) -> Option { - self.as_mut().map(AsyncFnStream::like_next).maybe().await + self.as_mut().map(StreamExt::next).maybe().await + } + + async fn maybe_next_some(self) -> S::Item { + self.as_mut().map(StreamExt::select_next_some).maybe().await } } diff --git a/ethexe/service/utils/src/timer.rs b/ethexe/service/utils/src/timer.rs index 30d5d931cd3..0b34d4dc005 100644 --- a/ethexe/service/utils/src/timer.rs +++ b/ethexe/service/utils/src/timer.rs @@ -16,12 +16,15 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::OptionFuture; +use futures::{ready, FutureExt}; use std::{ fmt::Debug, - time::{Duration, Instant}, + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, }; -use tokio::time; +use tokio::time::{self, Sleep}; /// Asynchronous timer with inner data kept. pub struct Timer @@ -35,7 +38,7 @@ where duration: Duration, /// Moment of time when the timer was started and applied data. - inner: Option<(Instant, T)>, + inner: Option<(Pin>, T)>, } impl Timer { @@ -65,22 +68,13 @@ impl Timer { self.inner.is_some() } - /// Get the remaining time until the timer will be ready to ring if started. - pub fn remaining(&self) -> Option { - self.inner.as_ref().map(|(start, _)| { - self.duration - .checked_sub(start.elapsed()) - .unwrap_or(Duration::ZERO) - }) - } - /// Start the timer from the beginning with new data. /// Returns the previous data if the timer was already started. pub fn start(&mut self, data: T) -> Option { log::trace!("Started timer '{}' with data {data:?}", self.name); self.inner - .replace((Instant::now(), data)) + .replace((Box::pin(time::sleep(self.duration)), data)) .map(|(_, data)| data) } @@ -90,25 +84,22 @@ impl Timer { self.inner.take().map(|(_, data)| data) } +} + +impl Future for Timer { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some((sleep, _)) = self.inner.as_mut() { + ready!(sleep.poll_unpin(cx)); + + let data = self.inner.take().map(|(_, data)| data).unwrap(); + + log::debug!("Timer '{}' with data {:?} rings", self.name, data); + + return Poll::Ready(data); + } - /// Result of time passed - timer's ring. - pub async fn rings(&mut self) -> T { - self.remaining() - .map(async |dur| { - if !dur.is_zero() { - log::trace!("Timer {} will ring in {dur:?}", self.name); - } - - time::sleep(dur).await; - - log::trace!("Timer {} rings!", self.name); - - self.inner - .take() - .map(|(_, data)| data) - .expect("stopped or not started timer cannot ring;") - }) - .maybe() - .await + Poll::Pending } }