diff --git a/Cargo.lock b/Cargo.lock index 58f83030c7c3..69184b04f394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8925,6 +8925,7 @@ dependencies = [ "tokio", "tracing", "vlog", + "zksync_block_reverter", "zksync_circuit_breaker", "zksync_commitment_generator", "zksync_concurrency", @@ -8949,6 +8950,7 @@ dependencies = [ "zksync_proof_data_handler", "zksync_protobuf_config", "zksync_queued_job_processor", + "zksync_reorg_detector", "zksync_state", "zksync_state_keeper", "zksync_storage", diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index f95500a3836d..ed7d37c876de 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -33,6 +33,7 @@ zksync_commitment_generator.workspace = true zksync_house_keeper.workspace = true zksync_node_fee_model.workspace = true zksync_eth_sender.workspace = true +zksync_block_reverter.workspace = true zksync_state_keeper.workspace = true zksync_consistency_checker.workspace = true zksync_metadata_calculator.workspace = true @@ -42,6 +43,7 @@ zksync_node_consensus.workspace = true zksync_contract_verification_server.workspace = true zksync_tee_verifier_input_producer.workspace = true zksync_queued_job_processor.workspace = true +zksync_reorg_detector.workspace = true tracing.workspace = true thiserror.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index cee9a0b6906d..da6e76377d1f 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -15,6 +15,8 @@ pub mod pools_layer; pub mod prometheus_exporter; pub mod proof_data_handler; pub mod query_eth_client; +pub mod reorg_detector_checker; +pub mod reorg_detector_runner; pub mod sigint; pub mod state_keeper; pub mod tee_verifier_input_producer; diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs new file mode 100644 index 000000000000..64454b63998b --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use anyhow::Context; +use zksync_reorg_detector::{self, ReorgDetector}; + +use crate::{ + implementations::resources::{ + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + }, + precondition::Precondition, + service::{ServiceContext, StopReceiver}, + task::TaskId, + wiring_layer::{WiringError, WiringLayer}, +}; + +const REORG_DETECTED_SLEEP_INTERVAL: Duration = Duration::from_secs(1); + +/// The layer is responsible for integrating reorg checking into the system. +/// When a reorg is detected, the system will not start running until it is fixed. +#[derive(Debug)] +pub struct ReorgDetectorCheckerLayer; + +#[async_trait::async_trait] +impl WiringLayer for ReorgDetectorCheckerLayer { + fn layer_name(&self) -> &'static str { + "reorg_detector_checker_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + // Get resources. + let main_node_client = context.get_resource::().await?.0; + + let pool_resource = context.get_resource::>().await?; + let pool = pool_resource.get().await?; + + // Create and insert precondition. + context.add_precondition(Box::new(CheckerPrecondition { + reorg_detector: ReorgDetector::new(main_node_client, pool), + })); + + Ok(()) + } +} + +pub struct CheckerPrecondition { + reorg_detector: ReorgDetector, +} + +#[async_trait::async_trait] +impl Precondition for CheckerPrecondition { + fn id(&self) -> TaskId { + "reorg_detector_checker".into() + } + + async fn check(mut self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + loop { + match self.reorg_detector.run_once(stop_receiver.0.clone()).await { + Ok(()) => return Ok(()), + Err(zksync_reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { + tracing::warn!( + "Reorg detected, last correct L1 batch #{}. Waiting till it will be resolved. Sleep for {} seconds and retry", + last_correct_l1_batch, REORG_DETECTED_SLEEP_INTERVAL.as_secs() + ); + tokio::time::sleep(REORG_DETECTED_SLEEP_INTERVAL).await; + } + Err(err) => return Err(err).context("reorg_detector.check_consistency()"), + } + } + } +} diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs new file mode 100644 index 000000000000..55ee621c15b0 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use anyhow::Context; +use zksync_block_reverter::BlockReverter; +use zksync_reorg_detector::{self, ReorgDetector}; + +use crate::{ + implementations::resources::{ + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + reverter::BlockReverterResource, + }, + service::{ServiceContext, StopReceiver}, + task::{TaskId, UnconstrainedOneshotTask}, + wiring_layer::{WiringError, WiringLayer}, +}; + +/// Layer responsible for detecting reorg and reverting blocks in case it was found. +#[derive(Debug)] +pub struct ReorgDetectorRunnerLayer; + +#[async_trait::async_trait] +impl WiringLayer for ReorgDetectorRunnerLayer { + fn layer_name(&self) -> &'static str { + "reorg_detector_runner_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + // Get resources. + let main_node_client = context.get_resource::().await?.0; + + let pool_resource = context.get_resource::>().await?; + let pool = pool_resource.get().await?; + + let reverter = context.get_resource::().await?.0; + + // Create and insert task. + context.add_unconstrained_oneshot_task(Box::new(RunnerUnconstrainedOneshotTask { + reorg_detector: ReorgDetector::new(main_node_client, pool), + reverter, + })); + + Ok(()) + } +} + +pub struct RunnerUnconstrainedOneshotTask { + reorg_detector: ReorgDetector, + reverter: Arc, +} + +#[async_trait::async_trait] +impl UnconstrainedOneshotTask for RunnerUnconstrainedOneshotTask { + fn id(&self) -> TaskId { + "reorg_detector_runner".into() + } + + async fn run_unconstrained_oneshot( + mut self: Box, + stop_receiver: StopReceiver, + ) -> anyhow::Result<()> { + match self.reorg_detector.run_once(stop_receiver.0.clone()).await { + Ok(()) => {} + Err(zksync_reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { + tracing::info!("Reverting to l1 batch number {last_correct_l1_batch}"); + self.reverter.roll_back(last_correct_l1_batch).await?; + tracing::info!("Revert successfully completed"); + } + Err(err) => return Err(err).context("reorg_detector.check_consistency()"), + } + Ok(()) + } +} diff --git a/core/node/node_framework/src/implementations/resources/mod.rs b/core/node/node_framework/src/implementations/resources/mod.rs index 17c939419985..edfb280d4db7 100644 --- a/core/node/node_framework/src/implementations/resources/mod.rs +++ b/core/node/node_framework/src/implementations/resources/mod.rs @@ -7,6 +7,7 @@ pub mod l1_tx_params; pub mod main_node_client; pub mod object_store; pub mod pools; +pub mod reverter; pub mod state_keeper; pub mod sync_state; pub mod web3_api; diff --git a/core/node/node_framework/src/implementations/resources/reverter.rs b/core/node/node_framework/src/implementations/resources/reverter.rs new file mode 100644 index 000000000000..2a2bdb142a85 --- /dev/null +++ b/core/node/node_framework/src/implementations/resources/reverter.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +use zksync_block_reverter::BlockReverter; + +use crate::resource::Resource; + +/// Wrapper for the block reverter. +#[derive(Debug, Clone)] +pub struct BlockReverterResource(pub Arc); + +impl Resource for BlockReverterResource { + fn name() -> String { + "common/block_reverter".into() + } +}