Skip to content

Commit

Permalink
use future for block processing because of block_in_place in processo…
Browse files Browse the repository at this point in the history
…r; tests OK
  • Loading branch information
grishasobol committed Jan 27, 2025
1 parent 87fbc31 commit 1669129
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 18 deletions.
32 changes: 15 additions & 17 deletions ethexe/connect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ use ethexe_db::Database;
use ethexe_observer::{BlockData, ObserverEvent, Query};
use ethexe_processor::{LocalOutcome, Processor};
use ethexe_service_utils::{AsyncFnStream, OptionFuture};
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use futures::future::BoxFuture;
use gprimitives::H256;
use std::collections::VecDeque;
use tokio::task::{JoinHandle, JoinSet};
use tokio::task::JoinSet;

#[derive(Debug)]
pub struct BlockProcessed {
pub chain_head: H256,
// TODO (gsobol): remove commitments
pub commitments: Vec<BlockCommitment>,
}

Expand All @@ -50,8 +51,8 @@ pub struct ConnectService {
processor: Processor,
query: Query,
blocks_queue: VecDeque<BlockData>,
process_block_future: Option<JoinHandle<Result<BlockProcessed>>>,
process_code_futures: JoinSet<Result<CodeCommitment>>,
process_block: Option<BoxFuture<'static, Result<BlockProcessed>>>,
process_codes: JoinSet<Result<CodeCommitment>>,
}

impl AsyncFnStream for ConnectService {
Expand All @@ -69,8 +70,8 @@ impl ConnectService {
processor,
query,
blocks_queue: VecDeque::new(),
process_block_future: Default::default(),
process_code_futures: Default::default(),
process_block: Default::default(),
process_codes: Default::default(),
}
}

Expand All @@ -85,22 +86,22 @@ impl ConnectService {
block.header.parent_hash
);

if self.process_block_future.is_none() {
if self.process_block.is_none() {
let context = ChainHeadProcessContext {
db: self.db.clone(),
processor: self.processor.clone(),
query: self.query.clone(),
};

self.process_block_future = Some(tokio::task::spawn(context.process(block)));
self.process_block = Some(Box::pin(context.process(block)));
} else {
self.blocks_queue.push_back(block);
}
}
ObserverEvent::Blob { code_id, code } => {
log::info!("receive a code blob, code_id {code_id}");
let mut processor = self.processor.clone();
self.process_code_futures.spawn_blocking(move || {
self.process_codes.spawn_blocking(move || {
let valid = processor.process_upload_code_raw(code_id, code.as_slice())?;
Ok(CodeCommitment { id: code_id, valid })
});
Expand All @@ -110,25 +111,22 @@ impl ConnectService {

pub async fn next(&mut self) -> Result<ConnectEvent> {
tokio::select! {
res = self.process_block_future.as_mut().maybe() => {
res = self.process_block.as_mut().maybe() => {
if let Some(block) = self.blocks_queue.pop_front() {
let context = ChainHeadProcessContext {
db: self.db.clone(),
processor: self.processor.clone(),
query: self.query.clone(),
};

self.process_block_future = Some(tokio::task::spawn(context.process(block)));
self.process_block = Some(Box::pin(context.process(block)));
} else {
self.process_block_future = None;
self.process_block = None;
}

match res {
Ok(res) => res.map(ConnectEvent::BlockProcessed),
Err(err) => Err(err.into()),
}
res.map(ConnectEvent::BlockProcessed)
}
Some(res) = self.process_code_futures.join_next() => {
Some(res) = self.process_codes.join_next() => {
match res {
Ok(res) => res.map(ConnectEvent::CodeProcessed),
Err(err) => Err(err.into()),
Expand Down
2 changes: 1 addition & 1 deletion ethexe/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl Service {
event = connect.next() => {
match event? {
ethexe_connect::ConnectEvent::BlockProcessed(BlockProcessed { chain_head, commitments }) => {
// TODO: must be done in observer event handling
// TODO (gsobol): must be done in observer event handling
if let Some(s) = sequencer.as_mut() {
s.on_new_head(chain_head)?
}
Expand Down

0 comments on commit 1669129

Please sign in to comment.