Skip to content

Commit

Permalink
Merge branch 'master' into revert-4457-rmasl-fix-windows-ub-corosense…
Browse files Browse the repository at this point in the history
…i-stack-precommit
  • Loading branch information
ByteNacked authored Jan 28, 2025
2 parents 9f4c7a2 + a1c7e83 commit c246633
Show file tree
Hide file tree
Showing 25 changed files with 418 additions and 265 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ ethexe-runtime-common = { path = "ethexe/runtime/common", default-features = fal
ethexe-prometheus = { path = "ethexe/prometheus", default-features = false }
ethexe-validator = { path = "ethexe/validator", default-features = false }
ethexe-rpc = { path = "ethexe/rpc", default-features = false }
ethexe-common = { path = "ethexe/common" }
ethexe-common = { path = "ethexe/common", default-features = false }

# Common executor between `sandbox-host` and `lazy-pages-fuzzer`
wasmi = { package = "wasmi", version = "0.38"}
wasmi = { package = "wasmi", version = "0.38" }

# Substrate deps
binary-merkle-tree = { version = "15.0.1", git = "https://github.com/gear-tech/polkadot-sdk.git", branch = "gear-polkadot-stable2409", default-features = false }
Expand Down
4 changes: 3 additions & 1 deletion ethexe/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ repository.workspace = true
[dependencies]
gear-core.workspace = true
gprimitives = { workspace = true, features = ["serde"] }

parity-scale-codec.workspace = true
derive_more.workspace = true
hex.workspace = true
anyhow.workspace = true
serde.workspace = true

[features]
std = ["gear-core/std", "gprimitives/serde"]
3 changes: 2 additions & 1 deletion ethexe/common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pub type Sum = ProgramId;
/// NOTE: generic keys differs to Vara and have been chosen dependent on storage organization of ethexe.
pub type ScheduledTask = gear_core::tasks::ScheduledTask<Rfm, Sd, Sum>;

#[derive(Debug, Clone, Default, Encode, Decode, serde::Serialize, PartialEq, Eq)]
#[derive(Debug, Clone, Default, Encode, Decode, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub struct BlockHeader {
pub height: u32,
pub timestamp: u64,
Expand Down
4 changes: 2 additions & 2 deletions ethexe/common/src/events/mirror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use alloc::vec::Vec;
use gear_core::message::ReplyCode;
use gprimitives::{ActorId, MessageId, H256};
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord)]
pub enum Event {
Expand Down Expand Up @@ -103,7 +102,8 @@ impl Event {
}
}

#[derive(Clone, Debug, Encode, Decode, Serialize, Deserialize)]
#[derive(Clone, Debug, Encode, Decode)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub enum RequestEvent {
ExecutableBalanceTopUpRequested {
value: u128,
Expand Down
4 changes: 2 additions & 2 deletions ethexe/common/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use gprimitives::ActorId;
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};

mod mirror;
mod router;
Expand Down Expand Up @@ -73,7 +72,8 @@ impl From<WVaraEvent> for BlockEvent {
}
}

#[derive(Clone, Debug, Encode, Decode, Serialize, Deserialize)]
#[derive(Clone, Debug, Encode, Decode)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub enum BlockRequestEvent {
Router(RouterRequestEvent),
Mirror {
Expand Down
4 changes: 2 additions & 2 deletions ethexe/common/src/events/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use gprimitives::{ActorId, CodeId, H256};
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord)]
pub enum Event {
Expand Down Expand Up @@ -72,7 +71,8 @@ impl Event {
}
}

#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub enum RequestEvent {
CodeValidationRequested {
code_id: CodeId,
Expand Down
4 changes: 2 additions & 2 deletions ethexe/common/src/events/wvara.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use gprimitives::{ActorId, U256};
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Encode, Decode, PartialEq, Eq, PartialOrd, Ord)]
pub enum Event {
Expand All @@ -43,7 +42,8 @@ impl Event {
}
}

#[derive(Clone, Debug, Encode, Decode, Serialize, Deserialize)]
#[derive(Clone, Debug, Encode, Decode)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub enum RequestEvent {
Transfer {
/// Never router, wvara or zero address.
Expand Down
3 changes: 3 additions & 0 deletions ethexe/common/src/gear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub struct ComputationSettings {
}

#[derive(Clone, Debug, Default, Encode, Decode, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub struct Message {
pub id: MessageId,
pub destination: ActorId,
Expand Down Expand Up @@ -130,6 +131,7 @@ pub struct ProtocolData {
}

#[derive(Clone, Debug, Default, Encode, Decode, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub struct StateTransition {
pub actor_id: ActorId,
pub new_state_hash: H256,
Expand All @@ -147,6 +149,7 @@ pub struct ValidationSettings {
}

#[derive(Clone, Debug, Default, Encode, Decode, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub struct ValueClaim {
pub message_id: MessageId,
pub destination: ActorId,
Expand Down
2 changes: 1 addition & 1 deletion ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl CodesStorage for Database {

self.kv
.iter_prefix(&key_prefix)
.map(|#[allow(unused_variables)] (key, code_id)| {
.map(|(key, code_id)| {
let (split_key_prefix, program_id) = key.split_at(key_prefix.len());
debug_assert_eq!(split_key_prefix, key_prefix);
let program_id =
Expand Down
52 changes: 26 additions & 26 deletions ethexe/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub mod export {
use anyhow::{anyhow, Context};
use ethexe_db::Database;
use ethexe_signer::{PublicKey, Signer};
use futures::{future::Either, stream::FusedStream, Stream};
use futures::{future::Either, ready, stream::FusedStream, Stream};
use libp2p::{
connection_limits,
core::{muxing::StreamMuxerBox, upgrade},
Expand Down Expand Up @@ -121,6 +121,31 @@ pub struct NetworkService {
swarm: Swarm<Behaviour>,
}

impl Stream for NetworkService {
type Item = NetworkEvent;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let Some(event) = ready!(self.swarm.poll_next_unpin(cx)) else {
return Poll::Ready(None);
};

if let Some(event) = self.handle_swarm_event(event) {
return Poll::Ready(Some(event));
}
}
}
}

impl FusedStream for NetworkService {
fn is_terminated(&self) -> bool {
self.swarm.is_terminated()
}
}

impl NetworkService {
pub fn new(
config: NetworkConfig,
Expand Down Expand Up @@ -407,31 +432,6 @@ impl NetworkService {
}
}

impl Stream for NetworkService {
type Item = NetworkEvent;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(event)) = self.swarm.poll_next_unpin(cx) {
if let Some(event) = self.get_mut().handle_swarm_event(event) {
return Poll::Ready(Some(event));
} else {
cx.waker().wake_by_ref();
}
}

Poll::Pending
}
}

impl FusedStream for NetworkService {
fn is_terminated(&self) -> bool {
self.swarm.is_terminated()
}
}

#[cfg(test)]
impl NetworkService {
async fn connect(&mut self, service: &mut Self) {
Expand Down
100 changes: 55 additions & 45 deletions ethexe/observer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ use alloy::{
use anyhow::{Context as _, Result};
use ethexe_common::events::{BlockEvent, BlockRequestEvent, RouterEvent};
use ethexe_db::BlockHeader;
use ethexe_service_utils::AsyncFnStream;
use ethexe_signer::Address;
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
use futures::{
future::BoxFuture,
stream::{FusedStream, FuturesUnordered},
Stream, StreamExt,
};
use gprimitives::{CodeId, H256};
use parity_scale_codec::{Decode, Encode};
use std::{pin::Pin, sync::Arc, time::Duration};
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

pub(crate) type Provider = RootProvider<BoxTransport>;

Expand Down Expand Up @@ -77,29 +85,31 @@ pub struct ObserverService {
codes_futures: FuturesUnordered<BlobDownloadFuture>,
}

impl AsyncFnStream for ObserverService {
impl Stream for ObserverService {
type Item = Result<ObserverEvent>;

async fn like_next(&mut self) -> Option<Self::Item> {
Some(self.next().await)
}
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some((hash, header, events))) = self.stream.poll_next_unpin(cx) {
let event = Ok(self.handle_stream_next(hash, header, events));

// TODO: fix it by some wrapper. It's not possible to implement Stream for SequencerService like this.
// impl Stream for ObserverService {
// type Item = Result<ObserverEvent>;
return Poll::Ready(Some(event));
};

// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// let e = ready!(pin!(self.next_event()).poll(cx));
// Poll::Ready(Some(e))
// }
// }
if let Poll::Ready(Some(res)) = self.codes_futures.poll_next_unpin(cx) {
let event = res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code });

// impl FusedStream for ObserverService {
// fn is_terminated(&self) -> bool {
// false
// }
// }
return Poll::Ready(Some(event));
}

Poll::Pending
}
}

impl FusedStream for ObserverService {
fn is_terminated(&self) -> bool {
false
}
}

impl ObserverService {
pub async fn new(config: &EthereumConfig) -> Result<Self> {
Expand Down Expand Up @@ -171,7 +181,7 @@ impl ObserverService {
router: Address,
) -> impl Stream<Item = (H256, BlockHeader, Vec<BlockEvent>)> {
async_stream::stream! {
while let Some(header) = stream.like_next().await {
while let Some(header) = stream.next().await {
let hash = (*header.hash).into();
let parent_hash = (*header.parent_hash).into();
let block_number = header.number as u32;
Expand All @@ -190,31 +200,31 @@ impl ObserverService {
}
}

pub async fn next(&mut self) -> Result<ObserverEvent> {
tokio::select! {
Some((hash, header, events)) = self.stream.next() => {
// TODO (breathx): set in db?
log::trace!("Received block: {hash:?}");

self.last_block_number = header.height;

// TODO: replace me with proper processing of all events, including commitments.
for event in &events {
if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) = event {
self.lookup_code(*code_id, *tx_hash);
}
}

Ok(ObserverEvent::Block(BlockData {
hash,
header,
events,
}))
},
Some(res) = self.codes_futures.next() => {
res.map(|(code_id, code)| ObserverEvent::Blob { code_id, code })
fn handle_stream_next(
&mut self,
hash: H256,
header: BlockHeader,
events: Vec<BlockEvent>,
) -> ObserverEvent {
// TODO (breathx): set in db?
log::trace!("Received block: {hash:?}");

self.last_block_number = header.height;

// TODO: replace me with proper processing of all events, including commitments.
for event in &events {
if let BlockEvent::Router(RouterEvent::CodeValidationRequested { code_id, tx_hash }) =
event
{
self.lookup_code(*code_id, *tx_hash);
}
}

ObserverEvent::Block(BlockData {
hash,
header,
events,
})
}
}

Expand Down
6 changes: 4 additions & 2 deletions ethexe/observer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,16 @@ async fn test_deployment() -> Result<()> {
let event = observer
.next()
.await
.expect("observer did not receive event");
.expect("observer did not receive event")
.expect("received error instead of event");

assert!(matches!(event, ObserverEvent::Block(..)));

let event = observer
.next()
.await
.expect("observer did not receive event");
.expect("observer did not receive event")
.expect("received error instead of event");

assert_eq!(
event,
Expand Down
Loading

0 comments on commit c246633

Please sign in to comment.