diff --git a/Cargo.lock b/Cargo.lock index 3fdbc90f710e..aad5aa964d3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8060,9 +8060,11 @@ dependencies = [ "circuit_sequencer_api 0.1.40", "circuit_sequencer_api 0.1.41", "circuit_sequencer_api 0.1.50", + "futures 0.3.28", "itertools 0.10.5", - "jsonrpsee", "multivm", + "num_cpus", + "rand 0.8.5", "serde_json", "tokio", "tracing", @@ -8075,6 +8077,8 @@ dependencies = [ "zksync_eth_client", "zksync_health_check", "zksync_l1_contract_interface", + "zksync_node_genesis", + "zksync_node_test_utils", "zksync_types", "zksync_utils", "zksync_web3_decl", diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 63c0433eda0d..1cc09bc32cbd 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -745,6 +745,11 @@ pub(crate) struct ExperimentalENConfig { /// Maximum number of files concurrently opened by state keeper cache RocksDB. Useful to fit into OS limits; can be used /// as a rudimentary way to control RAM usage of the cache. pub state_keeper_db_max_open_files: Option, + + // Commitment generator + /// Maximum degree of parallelism during commitment generation, i.e., the maximum number of L1 batches being processed in parallel. + /// If not specified, commitment generator will use a value roughly equal to the number of CPU cores with some clamping applied. + pub commitment_generator_max_parallelism: Option, } impl ExperimentalENConfig { @@ -758,6 +763,7 @@ impl ExperimentalENConfig { state_keeper_db_block_cache_capacity_mb: Self::default_state_keeper_db_block_cache_capacity_mb(), state_keeper_db_max_open_files: None, + commitment_generator_max_parallelism: None, } } diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 0d8adc067e8a..18a0ab173aa7 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -359,14 +359,13 @@ async fn run_core( ); app_health.insert_component(batch_status_updater.health_check())?; - let commitment_generator_pool = singleton_pool_builder - .build() - .await - .context("failed to build a commitment_generator_pool")?; - let commitment_generator = CommitmentGenerator::new( - commitment_generator_pool, + let mut commitment_generator = CommitmentGenerator::new( + connection_pool.clone(), config.optional.l1_batch_commit_data_generator_mode, ); + if let Some(parallelism) = config.experimental.commitment_generator_max_parallelism { + commitment_generator.set_max_parallelism(parallelism); + } app_health.insert_component(commitment_generator.health_check())?; let commitment_generator_handle = tokio::spawn(commitment_generator.run(stop_receiver.clone())); diff --git a/core/lib/dal/.sqlx/query-148dd243ab476724a430e74406119a148b59a79b03dacf3b1c32223c5ebf8d4b.json b/core/lib/dal/.sqlx/query-148dd243ab476724a430e74406119a148b59a79b03dacf3b1c32223c5ebf8d4b.json new file mode 100644 index 000000000000..4f14d753fd6f --- /dev/null +++ b/core/lib/dal/.sqlx/query-148dd243ab476724a430e74406119a148b59a79b03dacf3b1c32223c5ebf8d4b.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n number\n FROM\n l1_batches\n WHERE\n hash IS NOT NULL\n AND commitment IS NULL\n ORDER BY\n number DESC\n LIMIT\n 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "148dd243ab476724a430e74406119a148b59a79b03dacf3b1c32223c5ebf8d4b" +} diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 3e805e92f5f1..2633e04e3839 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -164,6 +164,8 @@ impl BlocksDal<'_, '_> { Ok(row.number.map(|num| L1BatchNumber(num as u32))) } + /// Gets a number of the earliest L1 batch that is ready for commitment generation (i.e., doesn't have commitment + /// yet, and has tree data). pub async fn get_next_l1_batch_ready_for_commitment_generation( &mut self, ) -> DalResult> { @@ -190,6 +192,34 @@ impl BlocksDal<'_, '_> { Ok(row.map(|row| L1BatchNumber(row.number as u32))) } + /// Gets a number of the last L1 batch that is ready for commitment generation (i.e., doesn't have commitment + /// yet, and has tree data). + pub async fn get_last_l1_batch_ready_for_commitment_generation( + &mut self, + ) -> DalResult> { + let row = sqlx::query!( + r#" + SELECT + number + FROM + l1_batches + WHERE + hash IS NOT NULL + AND commitment IS NULL + ORDER BY + number DESC + LIMIT + 1 + "# + ) + .instrument("get_last_l1_batch_ready_for_commitment_generation") + .report_latency() + .fetch_optional(self.storage) + .await?; + + Ok(row.map(|row| L1BatchNumber(row.number as u32))) + } + /// Returns the number of the earliest L1 batch with metadata (= state hash) present in the DB, /// or `None` if there are no such L1 batches. pub async fn get_earliest_l1_batch_number_with_metadata( diff --git a/core/lib/zksync_core_leftovers/src/lib.rs b/core/lib/zksync_core_leftovers/src/lib.rs index 01358e05a8cf..6d839f5c9dc0 100644 --- a/core/lib/zksync_core_leftovers/src/lib.rs +++ b/core/lib/zksync_core_leftovers/src/lib.rs @@ -765,8 +765,9 @@ pub async fn initialize_components( } if components.contains(&Component::CommitmentGenerator) { + let pool_size = CommitmentGenerator::default_parallelism().get(); let commitment_generator_pool = - ConnectionPool::::singleton(database_secrets.master_url()?) + ConnectionPool::::builder(database_secrets.master_url()?, pool_size) .build() .await .context("failed to build commitment_generator_pool")?; diff --git a/core/node/commitment_generator/Cargo.toml b/core/node/commitment_generator/Cargo.toml index 45c62161e3f2..24752691348b 100644 --- a/core/node/commitment_generator/Cargo.toml +++ b/core/node/commitment_generator/Cargo.toml @@ -28,11 +28,16 @@ zk_evm_1_4_1.workspace = true zk_evm_1_3_3.workspace = true tokio = { workspace = true, features = ["time"] } +futures.workspace = true +num_cpus.workspace = true anyhow.workspace = true tracing.workspace = true itertools.workspace = true serde_json.workspace = true [dev-dependencies] -jsonrpsee.workspace = true zksync_web3_decl.workspace = true +zksync_node_genesis.workspace = true +zksync_node_test_utils.workspace = true + +rand.workspace = true diff --git a/core/node/commitment_generator/src/lib.rs b/core/node/commitment_generator/src/lib.rs index 866ef572b065..cbb6279481ca 100644 --- a/core/node/commitment_generator/src/lib.rs +++ b/core/node/commitment_generator/src/lib.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{num::NonZeroU32, ops, sync::Arc, time::Duration}; use anyhow::Context; use itertools::Itertools; @@ -11,7 +11,7 @@ use zksync_types::{ blob::num_blobs_required, commitment::{ AuxCommitments, CommitmentCommonInput, CommitmentInput, L1BatchAuxiliaryOutput, - L1BatchCommitment, L1BatchCommitmentMode, + L1BatchCommitment, L1BatchCommitmentArtifacts, L1BatchCommitmentMode, }, event::convert_vm_events_to_log_queries, writes::{InitialStorageWrite, RepeatedStorageWrite, StateDiffRecord}, @@ -21,34 +21,56 @@ use zksync_utils::h256_to_u256; use crate::{ metrics::{CommitmentStage, METRICS}, - utils::{bootloader_initial_content_commitment, events_queue_commitment}, + utils::{CommitmentComputer, RealCommitmentComputer}, }; mod metrics; +#[cfg(test)] +mod tests; mod utils; pub mod validation_task; const SLEEP_INTERVAL: Duration = Duration::from_millis(100); +/// Component responsible for generating commitments for L1 batches. #[derive(Debug)] pub struct CommitmentGenerator { + computer: Arc, connection_pool: ConnectionPool, health_updater: HealthUpdater, commitment_mode: L1BatchCommitmentMode, + parallelism: NonZeroU32, } impl CommitmentGenerator { + /// Creates a commitment generator with the provided mode. pub fn new( connection_pool: ConnectionPool, commitment_mode: L1BatchCommitmentMode, ) -> Self { Self { + computer: Arc::new(RealCommitmentComputer), connection_pool, health_updater: ReactiveHealthCheck::new("commitment_generator").1, commitment_mode, + parallelism: Self::default_parallelism(), } } + /// Returns default parallelism for commitment generation based on the number of CPU cores available. + pub fn default_parallelism() -> NonZeroU32 { + // Leave at least one core free to handle other blocking tasks. `unwrap()`s are safe by design. + let cpus = u32::try_from(num_cpus::get().saturating_sub(1).clamp(1, 16)).unwrap(); + NonZeroU32::new(cpus).unwrap() + } + + /// Sets the degree of parallelism to be used by this generator. A reasonable value can be obtained + /// using [`Self::default_parallelism()`]. + pub fn set_max_parallelism(&mut self, parallelism: NonZeroU32) { + self.parallelism = parallelism; + } + + /// Returns a health check for this generator. pub fn health_check(&self) -> ReactiveHealthCheck { self.health_updater.subscribe() } @@ -82,25 +104,26 @@ impl CommitmentGenerator { })?; drop(connection); + let computer = self.computer.clone(); let events_commitment_task: JoinHandle> = tokio::task::spawn_blocking(move || { let latency = METRICS.events_queue_commitment_latency.start(); let events_queue_commitment = - events_queue_commitment(&events_queue, protocol_version) - .context("Events queue commitment is required for post-boojum batch")?; + computer.events_queue_commitment(&events_queue, protocol_version)?; latency.observe(); Ok(events_queue_commitment) }); + let computer = self.computer.clone(); let bootloader_memory_commitment_task: JoinHandle> = tokio::task::spawn_blocking(move || { let latency = METRICS.bootloader_content_commitment_latency.start(); - let bootloader_initial_content_commitment = bootloader_initial_content_commitment( - &initial_bootloader_contents, - protocol_version, - ) - .context("Bootloader content commitment is required for post-boojum batch")?; + let bootloader_initial_content_commitment = computer + .bootloader_initial_content_commitment( + &initial_bootloader_contents, + protocol_version, + )?; latency.observe(); Ok(bootloader_initial_content_commitment) @@ -262,7 +285,10 @@ impl CommitmentGenerator { Ok(input) } - async fn step(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<()> { + async fn process_batch( + &self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result { let latency = METRICS.generate_commitment_latency_stage[&CommitmentStage::PrepareInput].start(); let input = self.prepare_input(l1_batch_number).await?; @@ -278,22 +304,45 @@ impl CommitmentGenerator { tracing::debug!( "Generated commitment artifacts for L1 batch #{l1_batch_number} in {latency:?}" ); + Ok(artifacts) + } - let latency = - METRICS.generate_commitment_latency_stage[&CommitmentStage::SaveResults].start(); - self.connection_pool + async fn step( + &self, + l1_batch_numbers: ops::RangeInclusive, + ) -> anyhow::Result<()> { + let iterable_numbers = + (l1_batch_numbers.start().0..=l1_batch_numbers.end().0).map(L1BatchNumber); + let batch_futures = iterable_numbers.map(|number| async move { + let artifacts = self + .process_batch(number) + .await + .with_context(|| format!("failed processing L1 batch #{number}"))?; + anyhow::Ok((number, artifacts)) + }); + let artifacts = futures::future::try_join_all(batch_futures).await?; + + let mut connection = self + .connection_pool .connection_tagged("commitment_generator") - .await? - .blocks_dal() - .save_l1_batch_commitment_artifacts(l1_batch_number, &artifacts) .await?; - let latency = latency.observe(); - tracing::debug!( - "Stored commitment artifacts for L1 batch #{l1_batch_number} in {latency:?}" - ); + // Saving changes atomically is not required here; since we save batches in order, if we encounter a DB error, + // the commitment generator will be able to recover gracefully. + for (l1_batch_number, artifacts) in artifacts { + let latency = + METRICS.generate_commitment_latency_stage[&CommitmentStage::SaveResults].start(); + connection + .blocks_dal() + .save_l1_batch_commitment_artifacts(l1_batch_number, &artifacts) + .await?; + let latency = latency.observe(); + tracing::debug!( + "Stored commitment artifacts for L1 batch #{l1_batch_number} in {latency:?}" + ); + } let health_details = serde_json::json!({ - "l1_batch_number": l1_batch_number, + "l1_batch_number": *l1_batch_numbers.end(), }); self.health_updater .update(Health::from(HealthStatus::Ready).with_details(health_details)); @@ -335,29 +384,72 @@ impl CommitmentGenerator { } } + async fn next_batch_range(&self) -> anyhow::Result>> { + let mut connection = self + .connection_pool + .connection_tagged("commitment_generator") + .await?; + let Some(next_batch_number) = connection + .blocks_dal() + .get_next_l1_batch_ready_for_commitment_generation() + .await? + else { + return Ok(None); + }; + + let Some(last_batch_number) = connection + .blocks_dal() + .get_last_l1_batch_ready_for_commitment_generation() + .await? + else { + return Ok(None); + }; + anyhow::ensure!( + next_batch_number <= last_batch_number, + "Unexpected node state: next L1 batch ready for commitment generation (#{next_batch_number}) is greater than \ + the last L1 batch ready for commitment generation (#{last_batch_number})" + ); + let last_batch_number = + last_batch_number.min(next_batch_number + self.parallelism.get() - 1); + Ok(Some(next_batch_number..=last_batch_number)) + } + + /// Runs this commitment generator indefinitely. It will process L1 batches added to the database + /// processed by the Merkle tree (or a tree fetcher), with a previously configured max parallelism. pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + tracing::info!( + "Starting commitment generator with mode {:?} and parallelism {}", + self.commitment_mode, + self.parallelism + ); + if self.connection_pool.max_size() < self.parallelism.get() { + tracing::warn!( + "Connection pool for commitment generation has fewer connections ({pool_size}) than \ + configured max parallelism ({parallelism}); commitment generation may be slowed down as a result", + pool_size = self.connection_pool.max_size(), + parallelism = self.parallelism.get() + ); + } self.health_updater.update(HealthStatus::Ready.into()); + loop { if *stop_receiver.borrow() { tracing::info!("Stop signal received, commitment generator is shutting down"); break; } - let Some(l1_batch_number) = self - .connection_pool - .connection_tagged("commitment_generator") - .await? - .blocks_dal() - .get_next_l1_batch_ready_for_commitment_generation() - .await? - else { + let Some(l1_batch_numbers) = self.next_batch_range().await? else { tokio::time::sleep(SLEEP_INTERVAL).await; continue; }; - tracing::info!("Started commitment generation for L1 batch #{l1_batch_number}"); - self.step(l1_batch_number).await?; - tracing::info!("Finished commitment generation for L1 batch #{l1_batch_number}"); + tracing::info!("Started commitment generation for L1 batches #{l1_batch_numbers:?}"); + let step_latency = METRICS.step_latency.start(); + self.step(l1_batch_numbers.clone()).await?; + let step_latency = step_latency.observe(); + let batch_count = l1_batch_numbers.end().0 - l1_batch_numbers.start().0 + 1; + METRICS.step_batch_count.observe(batch_count.into()); + tracing::info!("Finished commitment generation for L1 batches #{l1_batch_numbers:?} in {step_latency:?} ({:?} per batch)", step_latency / batch_count); } Ok(()) } diff --git a/core/node/commitment_generator/src/metrics.rs b/core/node/commitment_generator/src/metrics.rs index 78cb82fff2bd..767e2874915b 100644 --- a/core/node/commitment_generator/src/metrics.rs +++ b/core/node/commitment_generator/src/metrics.rs @@ -10,19 +10,28 @@ pub(super) enum CommitmentStage { SaveResults, } +const BATCH_COUNT_BUCKETS: Buckets = Buckets::linear(1.0..=16.0, 1.0); + /// Metrics for the commitment generator. #[derive(Debug, Metrics)] #[metrics(prefix = "server_commitment_generator")] pub(super) struct CommitmentGeneratorMetrics { - /// Latency of generating commitment per stage. + /// Latency of generating commitment for a single L1 batch per stage. #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] pub generate_commitment_latency_stage: Family>, - /// Latency of generating bootloader content commitment. + /// Latency of generating bootloader content commitment for a single L1 batch. #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] pub bootloader_content_commitment_latency: Histogram, - /// Latency of generating events queue commitment. + /// Latency of generating events queue commitment for a single L1 batch. #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] pub events_queue_commitment_latency: Histogram, + + /// Latency of processing a continuous chunk of L1 batches during a single step of the generator. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub step_latency: Histogram, + /// Number of L1 batches processed during a single step. + #[metrics(buckets = BATCH_COUNT_BUCKETS)] + pub step_batch_count: Histogram, } #[vise::register] diff --git a/core/node/commitment_generator/src/tests.rs b/core/node/commitment_generator/src/tests.rs new file mode 100644 index 000000000000..7f3c3eb2e2b1 --- /dev/null +++ b/core/node/commitment_generator/src/tests.rs @@ -0,0 +1,301 @@ +//! Tests for `CommitmentGenerator`. + +use std::thread; + +use rand::{thread_rng, Rng}; +use zksync_dal::Connection; +use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; +use zksync_node_test_utils::{create_l1_batch, create_l2_block}; +use zksync_types::{ + block::L1BatchTreeData, zk_evm_types::LogQuery, AccountTreeId, Address, StorageLog, +}; + +use super::*; + +async fn seal_l1_batch(storage: &mut Connection<'_, Core>, number: L1BatchNumber) { + let l2_block = create_l2_block(number.0); + storage + .blocks_dal() + .insert_l2_block(&l2_block) + .await + .unwrap(); + let storage_key = StorageKey::new( + AccountTreeId::new(Address::repeat_byte(1)), + H256::from_low_u64_be(number.0.into()), + ); + let storage_log = StorageLog::new_write_log(storage_key, H256::repeat_byte(0xff)); + storage + .storage_logs_dal() + .insert_storage_logs(l2_block.number, &[(H256::zero(), vec![storage_log])]) + .await + .unwrap(); + storage + .storage_logs_dedup_dal() + .insert_initial_writes(number, &[storage_key]) + .await + .unwrap(); + + let header = create_l1_batch(number.0); + storage + .blocks_dal() + .insert_mock_l1_batch(&header) + .await + .unwrap(); + storage + .blocks_dal() + .mark_l2_blocks_as_executed_in_l1_batch(number) + .await + .unwrap(); +} + +async fn save_l1_batch_tree_data(storage: &mut Connection<'_, Core>, number: L1BatchNumber) { + let tree_data = L1BatchTreeData { + hash: H256::from_low_u64_be(number.0.into()), + rollup_last_leaf_index: 20 + 10 * u64::from(number.0), + }; + storage + .blocks_dal() + .save_l1_batch_tree_data(number, &tree_data) + .await + .unwrap(); +} + +#[derive(Debug)] +struct MockCommitmentComputer { + delay: Duration, +} + +impl MockCommitmentComputer { + const EVENTS_QUEUE_COMMITMENT: H256 = H256::repeat_byte(1); + const BOOTLOADER_COMMITMENT: H256 = H256::repeat_byte(2); +} + +impl CommitmentComputer for MockCommitmentComputer { + fn events_queue_commitment( + &self, + _events_queue: &[LogQuery], + protocol_version: ProtocolVersionId, + ) -> anyhow::Result { + assert_eq!(protocol_version, ProtocolVersionId::latest()); + thread::sleep(self.delay); + Ok(Self::EVENTS_QUEUE_COMMITMENT) + } + + fn bootloader_initial_content_commitment( + &self, + _initial_bootloader_contents: &[(usize, U256)], + protocol_version: ProtocolVersionId, + ) -> anyhow::Result { + assert_eq!(protocol_version, ProtocolVersionId::latest()); + thread::sleep(self.delay); + Ok(Self::BOOTLOADER_COMMITMENT) + } +} + +fn create_commitment_generator(pool: ConnectionPool) -> CommitmentGenerator { + let mut generator = CommitmentGenerator::new(pool, L1BatchCommitmentMode::Rollup); + generator.computer = Arc::new(MockCommitmentComputer { + delay: Duration::from_millis(20), + }); + generator +} + +fn processed_batch(health: &Health, expected_number: L1BatchNumber) -> bool { + if !matches!(health.status(), HealthStatus::Ready) { + return false; + } + let Some(details) = health.details() else { + return false; + }; + *details == serde_json::json!({ "l1_batch_number": expected_number }) +} + +#[tokio::test] +async fn determining_batch_range() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + + let mut generator = create_commitment_generator(pool.clone()); + generator.parallelism = NonZeroU32::new(4).unwrap(); // to be deterministic + assert_eq!(generator.next_batch_range().await.unwrap(), None); + + seal_l1_batch(&mut storage, L1BatchNumber(1)).await; + assert_eq!(generator.next_batch_range().await.unwrap(), None); // No tree data for L1 batch #1 + + save_l1_batch_tree_data(&mut storage, L1BatchNumber(1)).await; + assert_eq!( + generator.next_batch_range().await.unwrap(), + Some(L1BatchNumber(1)..=L1BatchNumber(1)) + ); + + seal_l1_batch(&mut storage, L1BatchNumber(2)).await; + assert_eq!( + generator.next_batch_range().await.unwrap(), + Some(L1BatchNumber(1)..=L1BatchNumber(1)) + ); + + save_l1_batch_tree_data(&mut storage, L1BatchNumber(2)).await; + assert_eq!( + generator.next_batch_range().await.unwrap(), + Some(L1BatchNumber(1)..=L1BatchNumber(2)) + ); + + for number in 3..=5 { + seal_l1_batch(&mut storage, L1BatchNumber(number)).await; + } + assert_eq!( + generator.next_batch_range().await.unwrap(), + Some(L1BatchNumber(1)..=L1BatchNumber(2)) + ); + + for number in 3..=5 { + save_l1_batch_tree_data(&mut storage, L1BatchNumber(number)).await; + } + // L1 batch #5 is excluded because of the parallelism limit + assert_eq!( + generator.next_batch_range().await.unwrap(), + Some(L1BatchNumber(1)..=L1BatchNumber(4)) + ); +} + +#[tokio::test] +async fn commitment_generator_normal_operation() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + + let generator = create_commitment_generator(pool.clone()); + let mut health_check = generator.health_check(); + let (stop_sender, stop_receiver) = watch::channel(false); + let generator_handle = tokio::spawn(generator.run(stop_receiver)); + + for number in 1..=5 { + let number = L1BatchNumber(number); + seal_l1_batch(&mut storage, number).await; + save_l1_batch_tree_data(&mut storage, number).await; + // Wait until the batch is processed by the generator + health_check + .wait_for(|health| processed_batch(health, number)) + .await; + // Check data in Postgres + let metadata = storage + .blocks_dal() + .get_l1_batch_metadata(number) + .await + .unwrap() + .expect("no batch metadata"); + assert_eq!( + metadata.metadata.events_queue_commitment, + Some(MockCommitmentComputer::EVENTS_QUEUE_COMMITMENT) + ); + assert_eq!( + metadata.metadata.bootloader_initial_content_commitment, + Some(MockCommitmentComputer::BOOTLOADER_COMMITMENT) + ); + } + + stop_sender.send_replace(true); + generator_handle.await.unwrap().unwrap(); +} + +#[tokio::test] +async fn commitment_generator_bulk_processing() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + + for number in 1..=5 { + seal_l1_batch(&mut storage, L1BatchNumber(number)).await; + save_l1_batch_tree_data(&mut storage, L1BatchNumber(number)).await; + } + + let mut generator = create_commitment_generator(pool.clone()); + generator.parallelism = NonZeroU32::new(10).unwrap(); // enough to process all batches at once + let mut health_check = generator.health_check(); + let (stop_sender, stop_receiver) = watch::channel(false); + let generator_handle = tokio::spawn(generator.run(stop_receiver)); + + health_check + .wait_for(|health| processed_batch(health, L1BatchNumber(5))) + .await; + for number in 1..=5 { + let metadata = storage + .blocks_dal() + .get_l1_batch_metadata(L1BatchNumber(number)) + .await + .unwrap() + .expect("no batch metadata"); + assert_eq!( + metadata.metadata.events_queue_commitment, + Some(MockCommitmentComputer::EVENTS_QUEUE_COMMITMENT) + ); + assert_eq!( + metadata.metadata.bootloader_initial_content_commitment, + Some(MockCommitmentComputer::BOOTLOADER_COMMITMENT) + ); + } + + stop_sender.send_replace(true); + generator_handle.await.unwrap().unwrap(); +} + +#[tokio::test] +async fn commitment_generator_with_tree_emulation() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + drop(storage); + + // Emulates adding new batches to the storage. + let new_batches_pool = pool.clone(); + let new_batches_handle = tokio::spawn(async move { + for number in 1..=10 { + let sleep_delay = Duration::from_millis(thread_rng().gen_range(1..20)); + tokio::time::sleep(sleep_delay).await; + let mut storage = new_batches_pool.connection().await.unwrap(); + seal_l1_batch(&mut storage, L1BatchNumber(number)).await; + } + }); + + let tree_emulator_pool = pool.clone(); + let tree_emulator_handle = tokio::spawn(async move { + for number in 1..=10 { + let mut storage = tree_emulator_pool.connection().await.unwrap(); + while storage + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .unwrap() + < Some(L1BatchNumber(number)) + { + let sleep_delay = Duration::from_millis(thread_rng().gen_range(5..10)); + tokio::time::sleep(sleep_delay).await; + } + save_l1_batch_tree_data(&mut storage, L1BatchNumber(number)).await; + } + }); + + let mut generator = create_commitment_generator(pool.clone()); + generator.parallelism = NonZeroU32::new(10).unwrap(); // enough to process all batches at once + let mut health_check = generator.health_check(); + let (stop_sender, stop_receiver) = watch::channel(false); + let generator_handle = tokio::spawn(generator.run(stop_receiver)); + + health_check + .wait_for(|health| processed_batch(health, L1BatchNumber(10))) + .await; + + new_batches_handle.await.unwrap(); + tree_emulator_handle.await.unwrap(); + stop_sender.send_replace(true); + generator_handle.await.unwrap().unwrap(); +} diff --git a/core/node/commitment_generator/src/utils.rs b/core/node/commitment_generator/src/utils.rs index 433d1345903e..9a12f0c43165 100644 --- a/core/node/commitment_generator/src/utils.rs +++ b/core/node/commitment_generator/src/utils.rs @@ -1,4 +1,7 @@ //! Utils for commitment calculation. + +use std::fmt; + use multivm::utils::get_used_bootloader_memory_bytes; use zk_evm_1_3_3::{ aux_structures::Timestamp as Timestamp_1_3_3, @@ -15,73 +18,96 @@ use zk_evm_1_5_0::{ use zksync_types::{zk_evm_types::LogQuery, ProtocolVersionId, VmVersion, H256, U256}; use zksync_utils::expand_memory_contents; -pub fn events_queue_commitment( - events_queue: &[LogQuery], - protocol_version: ProtocolVersionId, -) -> Option { - match VmVersion::from(protocol_version) { - VmVersion::VmBoojumIntegration => Some(H256( - circuit_sequencer_api_1_4_0::commitments::events_queue_commitment_fixed( - &events_queue - .iter() - .map(|x| to_log_query_1_3_3(*x)) - .collect(), - ), - )), - VmVersion::Vm1_4_1 | VmVersion::Vm1_4_2 => Some(H256( - circuit_sequencer_api_1_4_1::commitments::events_queue_commitment_fixed( - &events_queue - .iter() - .map(|x| to_log_query_1_4_1(*x)) - .collect(), - ), - )), - VmVersion::Vm1_5_0SmallBootloaderMemory | VmVersion::Vm1_5_0IncreasedBootloaderMemory => { - Some(H256( +/// Encapsulates computations of commitment components. +/// +/// - All methods are considered to be blocking. +/// - Returned errors are considered unrecoverable (i.e., they bubble up and lead to commitment generator termination). +pub(crate) trait CommitmentComputer: fmt::Debug + Send + Sync + 'static { + fn events_queue_commitment( + &self, + events_queue: &[LogQuery], + protocol_version: ProtocolVersionId, + ) -> anyhow::Result; + + fn bootloader_initial_content_commitment( + &self, + initial_bootloader_contents: &[(usize, U256)], + protocol_version: ProtocolVersionId, + ) -> anyhow::Result; +} + +#[derive(Debug)] +pub(crate) struct RealCommitmentComputer; + +impl CommitmentComputer for RealCommitmentComputer { + fn events_queue_commitment( + &self, + events_queue: &[LogQuery], + protocol_version: ProtocolVersionId, + ) -> anyhow::Result { + match VmVersion::from(protocol_version) { + VmVersion::VmBoojumIntegration => Ok(H256( + circuit_sequencer_api_1_4_0::commitments::events_queue_commitment_fixed( + &events_queue + .iter() + .map(|x| to_log_query_1_3_3(*x)) + .collect(), + ), + )), + VmVersion::Vm1_4_1 | VmVersion::Vm1_4_2 => Ok(H256( + circuit_sequencer_api_1_4_1::commitments::events_queue_commitment_fixed( + &events_queue + .iter() + .map(|x| to_log_query_1_4_1(*x)) + .collect(), + ), + )), + VmVersion::Vm1_5_0SmallBootloaderMemory + | VmVersion::Vm1_5_0IncreasedBootloaderMemory => Ok(H256( circuit_sequencer_api_1_5_0::commitments::events_queue_commitment_fixed( &events_queue .iter() .map(|x| to_log_query_1_5_0(*x)) .collect(), ), - )) + )), + _ => anyhow::bail!("Unsupported protocol version: {protocol_version:?}"), } - _ => None, } -} -pub fn bootloader_initial_content_commitment( - initial_bootloader_contents: &[(usize, U256)], - protocol_version: ProtocolVersionId, -) -> Option { - let expanded_memory_size = if protocol_version.is_pre_boojum() { - return None; - } else { - get_used_bootloader_memory_bytes(protocol_version.into()) - }; + fn bootloader_initial_content_commitment( + &self, + initial_bootloader_contents: &[(usize, U256)], + protocol_version: ProtocolVersionId, + ) -> anyhow::Result { + let expanded_memory_size = if protocol_version.is_pre_boojum() { + anyhow::bail!("Unsupported protocol version: {protocol_version:?}"); + } else { + get_used_bootloader_memory_bytes(protocol_version.into()) + }; - let full_bootloader_memory = - expand_memory_contents(initial_bootloader_contents, expanded_memory_size); + let full_bootloader_memory = + expand_memory_contents(initial_bootloader_contents, expanded_memory_size); - match VmVersion::from(protocol_version) { - VmVersion::VmBoojumIntegration => Some(H256( - circuit_sequencer_api_1_4_0::commitments::initial_heap_content_commitment_fixed( - &full_bootloader_memory, - ), - )), - VmVersion::Vm1_4_1 | VmVersion::Vm1_4_2 => Some(H256( - circuit_sequencer_api_1_4_1::commitments::initial_heap_content_commitment_fixed( - &full_bootloader_memory, - ), - )), - VmVersion::Vm1_5_0SmallBootloaderMemory | VmVersion::Vm1_5_0IncreasedBootloaderMemory => { - Some(H256( + match VmVersion::from(protocol_version) { + VmVersion::VmBoojumIntegration => Ok(H256( + circuit_sequencer_api_1_4_0::commitments::initial_heap_content_commitment_fixed( + &full_bootloader_memory, + ), + )), + VmVersion::Vm1_4_1 | VmVersion::Vm1_4_2 => Ok(H256( + circuit_sequencer_api_1_4_1::commitments::initial_heap_content_commitment_fixed( + &full_bootloader_memory, + ), + )), + VmVersion::Vm1_5_0SmallBootloaderMemory + | VmVersion::Vm1_5_0IncreasedBootloaderMemory => Ok(H256( circuit_sequencer_api_1_5_0::commitments::initial_heap_content_commitment_fixed( &full_bootloader_memory, ), - )) + )), + _ => unreachable!(), } - _ => unreachable!(), } } diff --git a/core/node/commitment_generator/src/validation_task.rs b/core/node/commitment_generator/src/validation_task.rs index 8724408f14d3..4488e0c2c56e 100644 --- a/core/node/commitment_generator/src/validation_task.rs +++ b/core/node/commitment_generator/src/validation_task.rs @@ -124,10 +124,9 @@ impl L1BatchCommitmentModeValidationTask { mod tests { use std::{mem, sync::Mutex}; - use jsonrpsee::types::ErrorObject; use zksync_eth_client::clients::MockEthereum; use zksync_types::{ethabi, U256}; - use zksync_web3_decl::client::MockClient; + use zksync_web3_decl::{client::MockClient, jsonrpsee::types::ErrorObject}; use super::*;