diff --git a/tee-worker/omni-executor/Cargo.lock b/tee-worker/omni-executor/Cargo.lock index 63b09a07f6..6d589b911f 100644 --- a/tee-worker/omni-executor/Cargo.lock +++ b/tee-worker/omni-executor/Cargo.lock @@ -4724,6 +4724,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", diff --git a/tee-worker/omni-executor/executor-core/src/listener.rs b/tee-worker/omni-executor/executor-core/src/listener.rs index 126c45e7a3..68e854b327 100644 --- a/tee-worker/omni-executor/executor-core/src/listener.rs +++ b/tee-worker/omni-executor/executor-core/src/listener.rs @@ -96,7 +96,6 @@ impl< log::debug!("Starting sync from {:?}", block_number_to_sync); 'main: loop { - log::info!("Syncing block: {}", block_number_to_sync); if self.stop_signal.try_recv().is_ok() { break; } @@ -135,16 +134,42 @@ impl< None => false, }; + let mut sync_error = false; + if last_finalized_block >= block_number_to_sync { - if let Ok(events) = - self.handle.block_on(self.fetcher.get_block_events(block_number_to_sync)) - { - for event in events { - let event_id = event.get_event_id().clone(); - if let Some(ref checkpoint) = - self.checkpoint_repository.get().expect("Could not read checkpoint") - { - if checkpoint.lt(&event.get_event_id().clone().into()) { + log::info!("Syncing block: {}", block_number_to_sync); + match self.handle.block_on(self.fetcher.get_block_events(block_number_to_sync)) { + Ok(events) => { + for event in events { + let event_id = event.get_event_id().clone(); + if let Some(ref checkpoint) = + self.checkpoint_repository.get().expect("Could not read checkpoint") + { + if checkpoint.lt(&event.get_event_id().clone().into()) { + log::info!("Handling event: {:?}", event_id); + if let Err(e) = self + .handle + .block_on(self.intent_event_handler.handle(event)) + { + log::error!("Could not handle event: {:?}", e); + match e { + Error::NonRecoverableError => { + error!("Non-recoverable intent handling error, event: {:?}", event_id); + break 'main; + }, + Error::RecoverableError => { + error!( + "Recoverable intent handling error, event: {:?}", + event_id + ); + continue 'main; + }, + } + } + } else { + log::debug!("Skipping event"); + } + } else { log::info!("Handling event: {:?}", event_id); if let Err(e) = self.handle.block_on(self.intent_event_handler.handle(event)) @@ -152,7 +177,10 @@ impl< log::error!("Could not handle event: {:?}", e); match e { Error::NonRecoverableError => { - error!("Non-recoverable intent handling error, event: {:?}", event_id); + error!( + "Non-recoverable intent handling error, event: {:?}", + event_id + ); break 'main; }, Error::RecoverableError => { @@ -164,47 +192,28 @@ impl< }, } } - } else { - log::debug!("Skipping event"); - } - } else { - log::info!("Handling event: {:?}", event_id); - if let Err(e) = - self.handle.block_on(self.intent_event_handler.handle(event)) - { - log::error!("Could not handle event: {:?}", e); - match e { - Error::NonRecoverableError => { - error!( - "Non-recoverable intent handling error, event: {:?}", - event_id - ); - break 'main; - }, - Error::RecoverableError => { - error!( - "Recoverable intent handling error, event: {:?}", - event_id - ); - continue 'main; - }, - } } + self.checkpoint_repository + .save(event_id.into()) + .expect("Could not save checkpoint"); } + // we processed block completely so store new checkpoint self.checkpoint_repository - .save(event_id.into()) + .save(CheckpointT::from(block_number_to_sync)) .expect("Could not save checkpoint"); - } - // we processed block completely so store new checkpoint - self.checkpoint_repository - .save(CheckpointT::from(block_number_to_sync)) - .expect("Could not save checkpoint"); - log::info!("Finished syncing block: {}", block_number_to_sync); - block_number_to_sync += 1; + log::info!("Finished syncing block: {}", block_number_to_sync); + block_number_to_sync += 1; + }, + Err(e) => { + log::error!("Could not get block {} events: {:?}", block_number_to_sync, e); + sync_error = true; + }, } + } else { + log::trace!("Block: {} not yet finalized", block_number_to_sync); } - if !fast { + if !fast || sync_error { sleep(Duration::from_secs(1)) } else { log::trace!("Fast sync skipping 1s wait"); diff --git a/tee-worker/omni-executor/executor-worker/Cargo.toml b/tee-worker/omni-executor/executor-worker/Cargo.toml index 5ee9b46240..5ea483d852 100644 --- a/tee-worker/omni-executor/executor-worker/Cargo.toml +++ b/tee-worker/omni-executor/executor-worker/Cargo.toml @@ -14,7 +14,7 @@ log = { workspace = true } parentchain-listener = { path = "../parentchain/listener" } scale-encode = { workspace = true } serde_json = "1.0.127" -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } [lints] workspace = true diff --git a/tee-worker/omni-executor/executor-worker/src/cli.rs b/tee-worker/omni-executor/executor-worker/src/cli.rs index 3ee590d34b..73e8c27337 100644 --- a/tee-worker/omni-executor/executor-worker/src/cli.rs +++ b/tee-worker/omni-executor/executor-worker/src/cli.rs @@ -5,4 +5,5 @@ use clap::Parser; pub struct Cli { pub parentchain_url: String, pub ethereum_url: String, + pub start_block: u64, } diff --git a/tee-worker/omni-executor/executor-worker/src/main.rs b/tee-worker/omni-executor/executor-worker/src/main.rs index 7eb630e1ff..f9742e73b1 100644 --- a/tee-worker/omni-executor/executor-worker/src/main.rs +++ b/tee-worker/omni-executor/executor-worker/src/main.rs @@ -22,6 +22,7 @@ use std::io::Write; use std::thread::JoinHandle; use std::{fs, thread}; use tokio::runtime::Handle; +use tokio::signal; use tokio::sync::oneshot; mod cli; @@ -48,17 +49,25 @@ async fn main() -> Result<(), ()> { error!("Could not create data dir: {:?}", e); })?; - listen_to_parentchain(cli.parentchain_url, cli.ethereum_url) + listen_to_parentchain(cli.parentchain_url, cli.ethereum_url, cli.start_block) .await - .unwrap() - .join() .unwrap(); + + match signal::ctrl_c().await { + Ok(()) => {}, + Err(err) => { + eprintln!("Unable to listen for shutdown signal: {}", err); + // we also shut down in case of error + }, + } + Ok(()) } async fn listen_to_parentchain( parentchain_url: String, ethereum_url: String, + start_block: u64, ) -> Result, ()> { let (_sub_stop_sender, sub_stop_receiver) = oneshot::channel(); let ethereum_intent_executor = @@ -75,6 +84,6 @@ async fn listen_to_parentchain( Ok(thread::Builder::new() .name("litentry_rococo_sync".to_string()) - .spawn(move || parentchain_listener.sync(0)) + .spawn(move || parentchain_listener.sync(start_block)) .unwrap()) } diff --git a/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs b/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs index 727e5fac63..a18b3a08fc 100644 --- a/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs +++ b/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs @@ -16,7 +16,7 @@ use crate::primitives::{BlockEvent, EventId}; use async_trait::async_trait; -use log::error; +use log::{error, info}; use parity_scale_codec::Encode; use std::marker::PhantomData; use std::ops::Deref; @@ -67,9 +67,14 @@ impl> SubstrateRpcClient Result, ()> { - match self.legacy.chain_get_block_hash(Some(block_num.into())).await.map_err(|_| ())? { + info!("Getting block {} events", block_num); + match self.legacy.chain_get_block_hash(Some(block_num.into())).await.map_err(|e| { + error!("Error getting block {} hash: {:?}", block_num, e); + })? { Some(hash) => { - let events = self.events.at(BlockRef::from_hash(hash)).await.map_err(|_| ())?; + let events = self.events.at(BlockRef::from_hash(hash)).await.map_err(|e| { + error!("Error getting block {} events: {:?}", block_num, e); + })?; Ok(events .iter() .enumerate()