From 636af2b33d7400a9e85cff8f3b0602b1bc1aaa89 Mon Sep 17 00:00:00 2001 From: anastasiiaVashchuk Date: Thu, 2 May 2024 20:20:49 +0300 Subject: [PATCH 1/5] skeleton --- .../src/implementations/layers/mod.rs | 1 + .../implementations/layers/reorg_detector.rs | 93 +++++++++++++++++++ .../src/implementations/resources/mod.rs | 1 + .../src/implementations/resources/reverter.rs | 15 +++ 4 files changed, 110 insertions(+) create mode 100644 core/node/node_framework/src/implementations/layers/reorg_detector.rs create mode 100644 core/node/node_framework/src/implementations/resources/reverter.rs diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index 9a55cf18d24d..f538e9bc7fa4 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -16,6 +16,7 @@ pub mod pools_layer; pub mod prometheus_exporter; pub mod proof_data_handler; pub mod query_eth_client; +pub mod reorg_detector; pub mod sigint; pub mod state_keeper; pub mod web3_api; diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector.rs b/core/node/node_framework/src/implementations/layers/reorg_detector.rs new file mode 100644 index 000000000000..bb08c700372a --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/reorg_detector.rs @@ -0,0 +1,93 @@ +use std::sync::Arc; + +use anyhow::Context; +use zksync_block_reverter::BlockReverter; +use zksync_core::reorg_detector::{self, ReorgDetector}; + +use crate::{ + implementations::resources::{ + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + reverter::BlockReverterResource, + }, + precondition::Precondition, + service::{ServiceContext, StopReceiver}, + task::Task, + wiring_layer::{WiringError, WiringLayer}, +}; + +#[derive(Debug)] +pub struct ReorgDetectorLayer; + +#[async_trait::async_trait] +impl WiringLayer for ReorgDetectorLayer { + fn layer_name(&self) -> &'static str { + "reorg_detector_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + // Get resources. + let main_node_client = context.get_resource::().await?.0; + + let pool_resource = context.get_resource::>().await?; + let pool = pool_resource.get().await?; + + let reverter = context.get_resource::().await?.0; + + // Create and insert precondition. + context.add_precondition(Box::new(ReorgDetectorPrecondition { + reorg_detector: ReorgDetector::new(main_node_client.clone(), pool.clone()), + reverter, + })); + + // Create and insert task. + context.add_task(Box::new(ReorgDetectorTask { + reorg_detector: ReorgDetector::new(main_node_client, pool), + })); + + Ok(()) + } +} + +pub struct ReorgDetectorPrecondition { + reorg_detector: ReorgDetector, + reverter: Arc, +} + +#[async_trait::async_trait] +impl Precondition for ReorgDetectorPrecondition { + fn name(&self) -> &'static str { + "reorg_detector" + } + + async fn check(mut self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { + match self.reorg_detector.check_consistency().await { + Ok(()) => {} + Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { + tracing::info!("Reverting to l1 batch number {last_correct_l1_batch}"); + self.reverter.roll_back(last_correct_l1_batch).await?; + tracing::info!("Revert successfully completed"); + } + Err(err) => return Err(err).context("reorg_detector.check_consistency()"), + } + Ok(()) + } +} + +pub struct ReorgDetectorTask { + reorg_detector: ReorgDetector, +} + +#[async_trait::async_trait] +impl Task for ReorgDetectorTask { + fn name(&self) -> &'static str { + "reorg_detector" + } + + async fn run(mut self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + self.reorg_detector + .run(stop_receiver.0) + .await + .context("reorg_detector.run()") + } +} diff --git a/core/node/node_framework/src/implementations/resources/mod.rs b/core/node/node_framework/src/implementations/resources/mod.rs index 2225fcd2f4c9..85a3a5d775f5 100644 --- a/core/node/node_framework/src/implementations/resources/mod.rs +++ b/core/node/node_framework/src/implementations/resources/mod.rs @@ -8,6 +8,7 @@ pub mod l1_tx_params; pub mod main_node_client; pub mod object_store; pub mod pools; +pub mod reverter; pub mod state_keeper; pub mod sync_state; pub mod web3_api; diff --git a/core/node/node_framework/src/implementations/resources/reverter.rs b/core/node/node_framework/src/implementations/resources/reverter.rs new file mode 100644 index 000000000000..2a2bdb142a85 --- /dev/null +++ b/core/node/node_framework/src/implementations/resources/reverter.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +use zksync_block_reverter::BlockReverter; + +use crate::resource::Resource; + +/// Wrapper for the block reverter. +#[derive(Debug, Clone)] +pub struct BlockReverterResource(pub Arc); + +impl Resource for BlockReverterResource { + fn name() -> String { + "common/block_reverter".into() + } +} From 3ea3126a8433ce36c6fecb4cd1b00a2fe6f1ee13 Mon Sep 17 00:00:00 2001 From: anastasiiaVashchuk Date: Tue, 14 May 2024 16:11:48 +0300 Subject: [PATCH 2/5] address suggestion: fix logic by adding 2 separate layers --- .../src/implementations/layers/mod.rs | 3 +- .../layers/reorg_detector_checker.rs | 71 +++++++++++++++++++ ...g_detector.rs => reorg_detector_runner.rs} | 48 ++++--------- 3 files changed, 87 insertions(+), 35 deletions(-) create mode 100644 core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs rename core/node/node_framework/src/implementations/layers/{reorg_detector.rs => reorg_detector_runner.rs} (62%) diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index f538e9bc7fa4..86f755787a1e 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -16,7 +16,8 @@ pub mod pools_layer; pub mod prometheus_exporter; pub mod proof_data_handler; pub mod query_eth_client; -pub mod reorg_detector; +pub mod reorg_detector_checker; +pub mod reorg_detector_runner; pub mod sigint; pub mod state_keeper; pub mod web3_api; diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs new file mode 100644 index 000000000000..f7ec53ada2a1 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use anyhow::Context; +use zksync_core::reorg_detector::{self, ReorgDetector}; + +use crate::{ + implementations::resources::{ + main_node_client::MainNodeClientResource, + pools::{MasterPool, PoolResource}, + }, + precondition::Precondition, + service::{ServiceContext, StopReceiver}, + task::Task, + wiring_layer::{WiringError, WiringLayer}, +}; + +const REORG_DETECTED_SLEEP_INTERVAL: Duration = Duration::from_secs(1); + +/// The layer is responsible for integrating reorg checking into the system. +/// When a reorg is detected, the system will not start running until it is fixed. +#[derive(Debug)] +pub struct ReorgDetectorCheckerLayer; + +#[async_trait::async_trait] +impl WiringLayer for ReorgDetectorCheckerLayer { + fn layer_name(&self) -> &'static str { + "reorg_detector_checker_layer" + } + + async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { + // Get resources. + let main_node_client = context.get_resource::().await?.0; + + let pool_resource = context.get_resource::>().await?; + let pool = pool_resource.get().await?; + + // Create and insert precondition. + context.add_precondition(Box::new(CheckerPrecondition { + reorg_detector: ReorgDetector::new(main_node_client, pool), + })); + + Ok(()) + } +} + +pub struct CheckerPrecondition { + reorg_detector: ReorgDetector, +} + +#[async_trait::async_trait] +impl Precondition for CheckerPrecondition { + fn name(&self) -> &'static str { + "reorg_detector_checker" + } + + async fn check(mut self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { + loop { + match self.reorg_detector.check_consistency().await { + Ok(()) => return Ok(()), + Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { + tracing::warn!( + "Reorg detected, last correct L1 batch #{}. Waiting till it will be resolved. Sleep for {} seconds and retry", + last_correct_l1_batch, REORG_DETECTED_SLEEP_INTERVAL.as_secs() + ); + tokio::time::sleep(REORG_DETECTED_SLEEP_INTERVAL).await; + } + Err(err) => return Err(err).context("reorg_detector.check_consistency()"), + } + } + } +} diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs similarity index 62% rename from core/node/node_framework/src/implementations/layers/reorg_detector.rs rename to core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs index bb08c700372a..f935e1c65df7 100644 --- a/core/node/node_framework/src/implementations/layers/reorg_detector.rs +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs @@ -10,19 +10,19 @@ use crate::{ pools::{MasterPool, PoolResource}, reverter::BlockReverterResource, }, - precondition::Precondition, service::{ServiceContext, StopReceiver}, - task::Task, + task::UnconstrainedOneshotTask, wiring_layer::{WiringError, WiringLayer}, }; +/// Layer responsible for detecting reorgs and reverting blocks in case it was found. #[derive(Debug)] -pub struct ReorgDetectorLayer; +pub struct ReorgDetectorRunnerLayer; #[async_trait::async_trait] -impl WiringLayer for ReorgDetectorLayer { +impl WiringLayer for ReorgDetectorRunnerLayer { fn layer_name(&self) -> &'static str { - "reorg_detector_layer" + "reorg_detector_runner_layer" } async fn wire(self: Box, mut context: ServiceContext<'_>) -> Result<(), WiringError> { @@ -34,33 +34,31 @@ impl WiringLayer for ReorgDetectorLayer { let reverter = context.get_resource::().await?.0; - // Create and insert precondition. - context.add_precondition(Box::new(ReorgDetectorPrecondition { - reorg_detector: ReorgDetector::new(main_node_client.clone(), pool.clone()), - reverter, - })); - // Create and insert task. - context.add_task(Box::new(ReorgDetectorTask { + context.add_unconstrained_oneshot_task(Box::new(RunnerUnconstrainedOneshotTask { reorg_detector: ReorgDetector::new(main_node_client, pool), + reverter, })); Ok(()) } } -pub struct ReorgDetectorPrecondition { +pub struct RunnerUnconstrainedOneshotTask { reorg_detector: ReorgDetector, reverter: Arc, } #[async_trait::async_trait] -impl Precondition for ReorgDetectorPrecondition { +impl UnconstrainedOneshotTask for RunnerUnconstrainedOneshotTask { fn name(&self) -> &'static str { - "reorg_detector" + "reorg_detector_runner" } - async fn check(mut self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { + async fn run_unconstrained_oneshot( + mut self: Box, + _stop_receiver: StopReceiver, + ) -> anyhow::Result<()> { match self.reorg_detector.check_consistency().await { Ok(()) => {} Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { @@ -73,21 +71,3 @@ impl Precondition for ReorgDetectorPrecondition { Ok(()) } } - -pub struct ReorgDetectorTask { - reorg_detector: ReorgDetector, -} - -#[async_trait::async_trait] -impl Task for ReorgDetectorTask { - fn name(&self) -> &'static str { - "reorg_detector" - } - - async fn run(mut self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - self.reorg_detector - .run(stop_receiver.0) - .await - .context("reorg_detector.run()") - } -} From da43942a5b1655ad4340701305953ae06b29724c Mon Sep 17 00:00:00 2001 From: anastasiiaVashchuk Date: Tue, 14 May 2024 16:24:20 +0300 Subject: [PATCH 3/5] add missed dep - zksync_block_reverter --- Cargo.lock | 1 + contracts | 2 +- core/node/node_framework/Cargo.toml | 1 + .../src/implementations/layers/reorg_detector_checker.rs | 1 - .../src/implementations/layers/reorg_detector_runner.rs | 2 +- 5 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f05b0b2d56f..8e7a4a037701 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8744,6 +8744,7 @@ dependencies = [ "tokio", "tracing", "vlog", + "zksync_block_reverter", "zksync_circuit_breaker", "zksync_commitment_generator", "zksync_concurrency", diff --git a/contracts b/contracts index 452a54f67243..d89e406cd20c 160000 --- a/contracts +++ b/contracts @@ -1 +1 @@ -Subproject commit 452a54f6724347b7e517be1a3d948299ab827d8c +Subproject commit d89e406cd20c6d6e9052ba2321334b71ef53c54e diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index e02266c16546..1670fe22e390 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -34,6 +34,7 @@ zksync_commitment_generator.workspace = true zksync_house_keeper.workspace = true zksync_node_fee_model.workspace = true zksync_eth_sender.workspace = true +zksync_block_reverter.workspace = true tracing.workspace = true thiserror.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs index f7ec53ada2a1..066b12da8ae8 100644 --- a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs @@ -10,7 +10,6 @@ use crate::{ }, precondition::Precondition, service::{ServiceContext, StopReceiver}, - task::Task, wiring_layer::{WiringError, WiringLayer}, }; diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs index f935e1c65df7..e80012d6123e 100644 --- a/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs @@ -15,7 +15,7 @@ use crate::{ wiring_layer::{WiringError, WiringLayer}, }; -/// Layer responsible for detecting reorgs and reverting blocks in case it was found. +/// Layer responsible for detecting reorg and reverting blocks in case it was found. #[derive(Debug)] pub struct ReorgDetectorRunnerLayer; From d38701605a47ca1b037088d96067d7de6176e998 Mon Sep 17 00:00:00 2001 From: anastasiiaVashchuk Date: Tue, 14 May 2024 16:31:12 +0300 Subject: [PATCH 4/5] set correct contracts submodule version --- contracts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contracts b/contracts index d89e406cd20c..452a54f67243 160000 --- a/contracts +++ b/contracts @@ -1 +1 @@ -Subproject commit d89e406cd20c6d6e9052ba2321334b71ef53c54e +Subproject commit 452a54f6724347b7e517be1a3d948299ab827d8c From 657d19c75c2b2ad0b2625d23214a4efc7093eb40 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Mon, 3 Jun 2024 11:30:04 +0400 Subject: [PATCH 5/5] Update reorg layers --- Cargo.lock | 1 + core/node/node_framework/Cargo.toml | 1 + .../layers/reorg_detector_checker.rs | 13 +++++++------ .../layers/reorg_detector_runner.rs | 14 +++++++------- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2d9cd893db6..69184b04f394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8950,6 +8950,7 @@ dependencies = [ "zksync_proof_data_handler", "zksync_protobuf_config", "zksync_queued_job_processor", + "zksync_reorg_detector", "zksync_state", "zksync_state_keeper", "zksync_storage", diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index a4194f2a815c..ed7d37c876de 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -43,6 +43,7 @@ zksync_node_consensus.workspace = true zksync_contract_verification_server.workspace = true zksync_tee_verifier_input_producer.workspace = true zksync_queued_job_processor.workspace = true +zksync_reorg_detector.workspace = true tracing.workspace = true thiserror.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs index 066b12da8ae8..64454b63998b 100644 --- a/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_checker.rs @@ -1,7 +1,7 @@ use std::time::Duration; use anyhow::Context; -use zksync_core::reorg_detector::{self, ReorgDetector}; +use zksync_reorg_detector::{self, ReorgDetector}; use crate::{ implementations::resources::{ @@ -10,6 +10,7 @@ use crate::{ }, precondition::Precondition, service::{ServiceContext, StopReceiver}, + task::TaskId, wiring_layer::{WiringError, WiringLayer}, }; @@ -48,15 +49,15 @@ pub struct CheckerPrecondition { #[async_trait::async_trait] impl Precondition for CheckerPrecondition { - fn name(&self) -> &'static str { - "reorg_detector_checker" + fn id(&self) -> TaskId { + "reorg_detector_checker".into() } - async fn check(mut self: Box, _stop_receiver: StopReceiver) -> anyhow::Result<()> { + async fn check(mut self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { loop { - match self.reorg_detector.check_consistency().await { + match self.reorg_detector.run_once(stop_receiver.0.clone()).await { Ok(()) => return Ok(()), - Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { + Err(zksync_reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { tracing::warn!( "Reorg detected, last correct L1 batch #{}. Waiting till it will be resolved. Sleep for {} seconds and retry", last_correct_l1_batch, REORG_DETECTED_SLEEP_INTERVAL.as_secs() diff --git a/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs index e80012d6123e..55ee621c15b0 100644 --- a/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs +++ b/core/node/node_framework/src/implementations/layers/reorg_detector_runner.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use anyhow::Context; use zksync_block_reverter::BlockReverter; -use zksync_core::reorg_detector::{self, ReorgDetector}; +use zksync_reorg_detector::{self, ReorgDetector}; use crate::{ implementations::resources::{ @@ -11,7 +11,7 @@ use crate::{ reverter::BlockReverterResource, }, service::{ServiceContext, StopReceiver}, - task::UnconstrainedOneshotTask, + task::{TaskId, UnconstrainedOneshotTask}, wiring_layer::{WiringError, WiringLayer}, }; @@ -51,17 +51,17 @@ pub struct RunnerUnconstrainedOneshotTask { #[async_trait::async_trait] impl UnconstrainedOneshotTask for RunnerUnconstrainedOneshotTask { - fn name(&self) -> &'static str { - "reorg_detector_runner" + fn id(&self) -> TaskId { + "reorg_detector_runner".into() } async fn run_unconstrained_oneshot( mut self: Box, - _stop_receiver: StopReceiver, + stop_receiver: StopReceiver, ) -> anyhow::Result<()> { - match self.reorg_detector.check_consistency().await { + match self.reorg_detector.run_once(stop_receiver.0.clone()).await { Ok(()) => {} - Err(reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { + Err(zksync_reorg_detector::Error::ReorgDetected(last_correct_l1_batch)) => { tracing::info!("Reverting to l1 batch number {last_correct_l1_batch}"); self.reverter.roll_back(last_correct_l1_batch).await?; tracing::info!("Revert successfully completed");