Skip to content

Commit

Permalink
move wg dal methods to corresponding files
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemka374 committed Jan 31, 2025
1 parent ddc953d commit 4ac6783
Show file tree
Hide file tree
Showing 27 changed files with 2,119 additions and 1,920 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,31 @@ impl WitnessGeneratorJobRequeuer {

async fn requeue_stuck_basic_jobs(&self, connection: &mut Connection<'_, Prover>) {
let stuck_jobs = connection
.fri_witness_generator_dal()
.fri_basic_witness_generator_dal()
.requeue_stuck_basic_jobs(self.processing_timeouts.basic(), self.max_attempts)
.await;
self.emit_telemetry(WitnessType::WitnessInputsFri, &stuck_jobs);
}

async fn requeue_stuck_leaf_jobs(&self, connection: &mut Connection<'_, Prover>) {
let stuck_jobs = connection
.fri_witness_generator_dal()
.fri_leaf_witness_generator_dal()
.requeue_stuck_leaf_jobs(self.processing_timeouts.leaf(), self.max_attempts)
.await;
self.emit_telemetry(WitnessType::LeafAggregationJobsFri, &stuck_jobs);
}

async fn requeue_stuck_node_jobs(&self, connection: &mut Connection<'_, Prover>) {
let stuck_jobs = connection
.fri_witness_generator_dal()
.fri_node_witness_generator_dal()
.requeue_stuck_node_jobs(self.processing_timeouts.node(), self.max_attempts)
.await;
self.emit_telemetry(WitnessType::NodeAggregationJobsFri, &stuck_jobs);
}

async fn requeue_stuck_recursion_tip_jobs(&self, connection: &mut Connection<'_, Prover>) {
let stuck_jobs = connection
.fri_witness_generator_dal()
.fri_recursion_tip_witness_generator_dal()
.requeue_stuck_recursion_tip_jobs(
self.processing_timeouts.recursion_tip(),
self.max_attempts,
Expand All @@ -69,7 +69,7 @@ impl WitnessGeneratorJobRequeuer {

async fn requeue_stuck_scheduler_jobs(&self, connection: &mut Connection<'_, Prover>) {
let stuck_jobs = connection
.fri_witness_generator_dal()
.fri_scheduler_witness_generator_dal()
.requeue_stuck_scheduler_jobs(self.processing_timeouts.scheduler(), self.max_attempts)
.await;
self.emit_telemetry(WitnessType::SchedulerJobsFri, &stuck_jobs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber};

use crate::{
artifacts::ArtifactsManager,
rounds::basic_circuits::{BasicCircuitArtifacts, BasicCircuits, BasicWitnessGeneratorJob},
rounds::basic_circuits::{
utils::create_aggregation_jobs, BasicCircuitArtifacts, BasicCircuits,
BasicWitnessGeneratorJob,
},
utils::SchedulerPartialInputWrapper,
};

Expand Down Expand Up @@ -77,7 +80,7 @@ impl ArtifactsManager for BasicCircuits {
.await
.expect("failed to get database transaction");
let protocol_version_id = transaction
.fri_witness_generator_dal()
.fri_basic_witness_generator_dal()
.protocol_version_for_l1_batch(L1BatchNumber(job_id))
.await;
transaction
Expand All @@ -90,18 +93,20 @@ impl ArtifactsManager for BasicCircuits {
protocol_version_id,
)
.await;

create_aggregation_jobs(
&mut transaction,
L1BatchNumber(job_id),
&artifacts.queue_urls,
&blob_urls,
get_recursive_layer_circuit_id_for_base_layer,
protocol_version_id,
)
.await
.unwrap();

transaction
.fri_witness_generator_dal()
.create_aggregation_jobs(
L1BatchNumber(job_id),
&artifacts.queue_urls,
&blob_urls,
get_recursive_layer_circuit_id_for_base_layer,
protocol_version_id,
)
.await;
transaction
.fri_witness_generator_dal()
.fri_basic_witness_generator_dal()
.mark_witness_job_as_successful(L1BatchNumber(job_id), started_at.elapsed())
.await;
transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl JobManager for BasicCircuits {
.connection()
.await
.unwrap()
.fri_witness_generator_dal()
.fri_basic_witness_generator_dal()
.get_next_basic_circuit_witness_job(protocol_version, &pod_name)
.await
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ use zksync_multivm::{
zk_evm_latest::ethereum_types::Address,
};
use zksync_object_store::ObjectStore;
use zksync_prover_dal::{Connection, Prover, ProverDal};
use zksync_prover_fri_types::keys::ClosedFormInputKey;
use zksync_prover_interface::inputs::WitnessInputData;
use zksync_system_constants::BOOTLOADER_ADDRESS;
use zksync_types::L1BatchNumber;
use zksync_types::{protocol_version::ProtocolSemanticVersion, L1BatchNumber};

use crate::{
precalculated_merkle_paths_provider::PrecalculatedMerklePathsProvider,
Expand Down Expand Up @@ -262,3 +263,59 @@ async fn save_recursion_queue(
let blob_url = object_store.put(key, &wrapper).await.unwrap();
(circuit_id, blob_url, basic_circuit_count)
}

pub(crate) async fn create_aggregation_jobs(
connection: &mut Connection<'_, Prover>,
block_number: L1BatchNumber,
closed_form_inputs_and_urls: &Vec<(u8, String, usize)>,
scheduler_partial_input_blob_url: &str,
base_layer_to_recursive_layer_circuit_id: fn(u8) -> u8,
protocol_version: ProtocolSemanticVersion,
) -> anyhow::Result<()> {
for (circuit_id, closed_form_inputs_url, number_of_basic_circuits) in
closed_form_inputs_and_urls
{
connection
.fri_leaf_witness_generator_dal()
.insert_leaf_aggregation_jobs(
block_number,
protocol_version,
*circuit_id,
closed_form_inputs_url.clone(),
*number_of_basic_circuits,
)
.await;

connection
.fri_node_witness_generator_dal()
.insert_node_aggregation_jobs(
block_number,
base_layer_to_recursive_layer_circuit_id(*circuit_id),
None,
0,
"",
protocol_version,
)
.await;
}

connection
.fri_recursion_tip_witness_generator_dal()
.insert_recursion_tip_aggregation_jobs(
block_number,
closed_form_inputs_and_urls,
protocol_version,
)
.await;

connection
.fri_scheduler_witness_generator_dal()
.insert_scheduler_aggregation_jobs(
block_number,
scheduler_partial_input_blob_url,
protocol_version,
)
.await;

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl ArtifactsManager for LeafAggregation {
let mut transaction = prover_connection.start_transaction().await.unwrap();
let number_of_dependent_jobs = blob_urls.circuit_ids_and_urls.len();
let protocol_version_id = transaction
.fri_witness_generator_dal()
.fri_basic_witness_generator_dal()
.protocol_version_for_l1_batch(artifacts.block_number)
.await;
tracing::info!(
Expand All @@ -118,7 +118,7 @@ impl ArtifactsManager for LeafAggregation {
artifacts.circuit_id,
);
transaction
.fri_witness_generator_dal()
.fri_node_witness_generator_dal()
.update_node_aggregation_jobs_url(
artifacts.block_number,
get_recursive_layer_circuit_id_for_base_layer(artifacts.circuit_id),
Expand All @@ -134,7 +134,7 @@ impl ArtifactsManager for LeafAggregation {
artifacts.circuit_id,
);
transaction
.fri_witness_generator_dal()
.fri_leaf_witness_generator_dal()
.mark_leaf_aggregation_as_successful(job_id, started_at.elapsed())
.await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl JobManager for LeafAggregation {
let Some(metadata) = connection_pool
.connection()
.await?
.fri_witness_generator_dal()
.fri_leaf_witness_generator_dal()
.get_next_leaf_aggregation_job(protocol_version, &pod_name)
.await
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl ArtifactsManager for NodeAggregation {
let mut transaction = prover_connection.start_transaction().await.unwrap();
let dependent_jobs = blob_urls.circuit_ids_and_urls.len();
let protocol_version_id = transaction
.fri_witness_generator_dal()
.fri_basic_witness_generator_dal()
.protocol_version_for_l1_batch(artifacts.block_number)
.await;
match artifacts.next_aggregations.len() > 1 {
Expand All @@ -105,7 +105,7 @@ impl ArtifactsManager for NodeAggregation {
)
.await;
transaction
.fri_witness_generator_dal()
.fri_node_witness_generator_dal()
.insert_node_aggregation_jobs(
artifacts.block_number,
artifacts.circuit_id,
Expand Down Expand Up @@ -135,7 +135,7 @@ impl ArtifactsManager for NodeAggregation {
}

transaction
.fri_witness_generator_dal()
.fri_node_witness_generator_dal()
.mark_node_aggregation_as_successful(job_id, started_at.elapsed())
.await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl JobManager for NodeAggregation {
let Some(metadata) = connection_pool
.connection()
.await?
.fri_witness_generator_dal()
.fri_node_witness_generator_dal()
.get_next_node_aggregation_job(protocol_version, &pod_name)
.await
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl ArtifactsManager for RecursionTip {
let mut prover_connection = connection_pool.connection().await?;
let mut transaction = prover_connection.start_transaction().await?;
let protocol_version_id = transaction
.fri_witness_generator_dal()
.fri_basic_witness_generator_dal()
.protocol_version_for_l1_batch(L1BatchNumber(job_id))
.await;
transaction
Expand All @@ -126,7 +126,7 @@ impl ArtifactsManager for RecursionTip {
.await;

transaction
.fri_witness_generator_dal()
.fri_recursion_tip_witness_generator_dal()
.mark_recursion_tip_job_as_successful(L1BatchNumber(job_id), started_at.elapsed())
.await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl JobManager for RecursionTip {
let Some((l1_batch_number, number_of_final_node_jobs)) = connection_pool
.connection()
.await?
.fri_witness_generator_dal()
.fri_recursion_tip_witness_generator_dal()
.get_next_recursion_tip_witness_job(protocol_version, &pod_name)
.await
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl ArtifactsManager for Scheduler {
let mut prover_connection = connection_pool.connection().await?;
let mut transaction = prover_connection.start_transaction().await?;
let protocol_version_id = transaction
.fri_witness_generator_dal()
.fri_basic_witness_generator_dal()
.protocol_version_for_l1_batch(L1BatchNumber(job_id))
.await;
transaction
Expand All @@ -80,7 +80,7 @@ impl ArtifactsManager for Scheduler {
.await;

transaction
.fri_witness_generator_dal()
.fri_scheduler_witness_generator_dal()
.mark_scheduler_job_as_successful(L1BatchNumber(job_id), started_at.elapsed())
.await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl JobManager for Scheduler {
let Some(l1_batch_number) = connection_pool
.connection()
.await?
.fri_witness_generator_dal()
.fri_scheduler_witness_generator_dal()
.get_next_scheduler_witness_job(protocol_version, &pod_name)
.await
else {
Expand Down

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4ac6783

Please sign in to comment.