From 1669129603ca212eea78feaba67d6b95fc3e29df Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Mon, 27 Jan 2025 13:57:04 +0700 Subject: [PATCH] use future for block processing because of block_in_place in processor; tests OK --- ethexe/connect/src/lib.rs | 32 +++++++++++++++----------------- ethexe/service/src/lib.rs | 2 +- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/ethexe/connect/src/lib.rs b/ethexe/connect/src/lib.rs index 6466c7c55b8..06b8c78843b 100644 --- a/ethexe/connect/src/lib.rs +++ b/ethexe/connect/src/lib.rs @@ -28,14 +28,15 @@ use ethexe_db::Database; use ethexe_observer::{BlockData, ObserverEvent, Query}; use ethexe_processor::{LocalOutcome, Processor}; use ethexe_service_utils::{AsyncFnStream, OptionFuture}; -use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; +use futures::future::BoxFuture; use gprimitives::H256; use std::collections::VecDeque; -use tokio::task::{JoinHandle, JoinSet}; +use tokio::task::JoinSet; #[derive(Debug)] pub struct BlockProcessed { pub chain_head: H256, + // TODO (gsobol): remove commitments pub commitments: Vec, } @@ -50,8 +51,8 @@ pub struct ConnectService { processor: Processor, query: Query, blocks_queue: VecDeque, - process_block_future: Option>>, - process_code_futures: JoinSet>, + process_block: Option>>, + process_codes: JoinSet>, } impl AsyncFnStream for ConnectService { @@ -69,8 +70,8 @@ impl ConnectService { processor, query, blocks_queue: VecDeque::new(), - process_block_future: Default::default(), - process_code_futures: Default::default(), + process_block: Default::default(), + process_codes: Default::default(), } } @@ -85,14 +86,14 @@ impl ConnectService { block.header.parent_hash ); - if self.process_block_future.is_none() { + if self.process_block.is_none() { let context = ChainHeadProcessContext { db: self.db.clone(), processor: self.processor.clone(), query: self.query.clone(), }; - self.process_block_future = Some(tokio::task::spawn(context.process(block))); + self.process_block = Some(Box::pin(context.process(block))); } else { self.blocks_queue.push_back(block); } @@ -100,7 +101,7 @@ impl ConnectService { ObserverEvent::Blob { code_id, code } => { log::info!("receive a code blob, code_id {code_id}"); let mut processor = self.processor.clone(); - self.process_code_futures.spawn_blocking(move || { + self.process_codes.spawn_blocking(move || { let valid = processor.process_upload_code_raw(code_id, code.as_slice())?; Ok(CodeCommitment { id: code_id, valid }) }); @@ -110,7 +111,7 @@ impl ConnectService { pub async fn next(&mut self) -> Result { tokio::select! { - res = self.process_block_future.as_mut().maybe() => { + res = self.process_block.as_mut().maybe() => { if let Some(block) = self.blocks_queue.pop_front() { let context = ChainHeadProcessContext { db: self.db.clone(), @@ -118,17 +119,14 @@ impl ConnectService { query: self.query.clone(), }; - self.process_block_future = Some(tokio::task::spawn(context.process(block))); + self.process_block = Some(Box::pin(context.process(block))); } else { - self.process_block_future = None; + self.process_block = None; } - match res { - Ok(res) => res.map(ConnectEvent::BlockProcessed), - Err(err) => Err(err.into()), - } + res.map(ConnectEvent::BlockProcessed) } - Some(res) = self.process_code_futures.join_next() => { + Some(res) = self.process_codes.join_next() => { match res { Ok(res) => res.map(ConnectEvent::CodeProcessed), Err(err) => Err(err.into()), diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index ba1b39c5160..42a35b079fe 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -324,7 +324,7 @@ impl Service { event = connect.next() => { match event? { ethexe_connect::ConnectEvent::BlockProcessed(BlockProcessed { chain_head, commitments }) => { - // TODO: must be done in observer event handling + // TODO (gsobol): must be done in observer event handling if let Some(s) = sequencer.as_mut() { s.on_new_head(chain_head)? }