Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
perekopskiy committed May 30, 2024
2 parents fd19a02 + a58a7e8 commit 70e622b
Show file tree
Hide file tree
Showing 45 changed files with 508 additions and 409 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions core/lib/basic_types/src/basic_fri_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ impl TryFrom<i32> for AggregationRound {
}
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Hash)]
pub struct JobIdentifiers {
pub circuit_id: u8,
pub aggregation_round: u8,
pub protocol_version: u16,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
12 changes: 9 additions & 3 deletions core/lib/basic_types/src/prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ pub struct FriProverJobMetadata {
}

#[derive(Debug, Clone, Copy, Default)]
pub struct JobCountStatistics {
pub struct ExtendedJobCountStatistics {
pub queued: usize,
pub in_progress: usize,
pub failed: usize,
pub successful: usize,
}

impl Add for JobCountStatistics {
type Output = JobCountStatistics;
#[derive(Debug, Clone, Copy, Default)]
pub struct JobCountStatistics {
pub queued: usize,
pub in_progress: usize,
}

impl Add for ExtendedJobCountStatistics {
type Output = ExtendedJobCountStatistics;

fn add(self, rhs: Self) -> Self::Output {
Self {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use async_trait::async_trait;
use prover_dal::{Prover, ProverDal};
use zksync_dal::ConnectionPool;
Expand All @@ -24,7 +26,9 @@ impl FriProofCompressorQueueReporter {
}
}

async fn get_job_statistics(pool: &ConnectionPool<Prover>) -> JobCountStatistics {
async fn get_job_statistics(
pool: &ConnectionPool<Prover>,
) -> HashMap<ProtocolVersionId, JobCountStatistics> {
pool.connection()
.await
.unwrap()
Expand All @@ -41,25 +45,24 @@ impl PeriodicJob for FriProofCompressorQueueReporter {
async fn run_routine_task(&mut self) -> anyhow::Result<()> {
let stats = Self::get_job_statistics(&self.pool).await;

if stats.queued > 0 {
tracing::info!(
"Found {} free {} in progress proof compressor jobs",
stats.queued,
stats.in_progress
);
}
for (protocol_version, stats) in &stats {
if stats.queued > 0 {
tracing::info!(
"Found {} free {} in progress proof compressor jobs for protocol version {}",
stats.queued,
stats.in_progress,
protocol_version
);
}

PROVER_FRI_METRICS.proof_compressor_jobs[&(
JobStatus::Queued,
ProtocolVersionId::current_prover_version().to_string(),
)]
.set(stats.queued as u64);
PROVER_FRI_METRICS.proof_compressor_jobs
[&(JobStatus::Queued, protocol_version.to_string())]
.set(stats.queued as u64);

PROVER_FRI_METRICS.proof_compressor_jobs[&(
JobStatus::InProgress,
ProtocolVersionId::current_prover_version().to_string(),
)]
.set(stats.in_progress as u64);
PROVER_FRI_METRICS.proof_compressor_jobs
[&(JobStatus::InProgress, protocol_version.to_string())]
.set(stats.in_progress as u64);
}

let oldest_not_compressed_batch = self
.pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,40 +40,43 @@ impl PeriodicJob for FriProverQueueReporter {
let mut conn = self.prover_connection_pool.connection().await.unwrap();
let stats = conn.fri_prover_jobs_dal().get_prover_jobs_stats().await;

for ((circuit_id, aggregation_round), stats) in stats.into_iter() {
for (job_identifiers, stats) in &stats {
// BEWARE, HERE BE DRAGONS.
// In database, the `circuit_id` stored is the circuit for which the aggregation is done,
// not the circuit which is running.
// There is a single node level aggregation circuit, which is circuit 2.
// This can aggregate multiple leaf nodes (which may belong to different circuits).
// This reporting is a hacky forced way to use `circuit_id` 2 which will solve auto scalers.
// A proper fix will be later provided to solve this at database level.
let circuit_id = if aggregation_round == 2 {
let circuit_id = if job_identifiers.aggregation_round == 2 {
2
} else {
circuit_id
job_identifiers.circuit_id
};

let group_id = self
.config
.get_group_id_for_circuit_id_and_aggregation_round(circuit_id, aggregation_round)
.get_group_id_for_circuit_id_and_aggregation_round(
circuit_id,
job_identifiers.aggregation_round,
)
.unwrap_or(u8::MAX);

FRI_PROVER_METRICS.report_prover_jobs(
"queued",
circuit_id,
aggregation_round,
job_identifiers.aggregation_round,
group_id,
ProtocolVersionId::current_prover_version(),
ProtocolVersionId::try_from(job_identifiers.protocol_version).unwrap(),
stats.queued as u64,
);

FRI_PROVER_METRICS.report_prover_jobs(
"in_progress",
circuit_id,
aggregation_round,
job_identifiers.aggregation_round,
group_id,
ProtocolVersionId::current_prover_version(),
ProtocolVersionId::try_from(job_identifiers.protocol_version).unwrap(),
stats.in_progress as u64,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,63 +25,65 @@ impl FriWitnessGeneratorQueueReporter {
}
}

async fn get_job_statistics(&self) -> HashMap<AggregationRound, JobCountStatistics> {
async fn get_job_statistics(
&self,
) -> HashMap<(AggregationRound, ProtocolVersionId), JobCountStatistics> {
let mut conn = self.pool.connection().await.unwrap();
HashMap::from([
(
AggregationRound::BasicCircuits,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::BasicCircuits)
.await,
),
(
AggregationRound::LeafAggregation,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::LeafAggregation)
.await,
),
(
AggregationRound::NodeAggregation,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::NodeAggregation)
.await,
),
(
AggregationRound::RecursionTip,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::RecursionTip)
.await,
),
(
AggregationRound::Scheduler,
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::Scheduler)
.await,
),
])
let mut result = HashMap::new();
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::BasicCircuits)
.await,
);
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::LeafAggregation)
.await,
);
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::NodeAggregation)
.await,
);
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::RecursionTip)
.await,
);
result.extend(
conn.fri_witness_generator_dal()
.get_witness_jobs_stats(AggregationRound::Scheduler)
.await,
);
result
}
}

fn emit_metrics_for_round(round: AggregationRound, stats: JobCountStatistics) {
fn emit_metrics_for_round(
round: AggregationRound,
protocol_version: ProtocolVersionId,
stats: &JobCountStatistics,
) {
if stats.queued > 0 || stats.in_progress > 0 {
tracing::trace!(
"Found {} free and {} in progress {:?} FRI witness generators jobs",
"Found {} free and {} in progress {:?} FRI witness generators jobs for protocol version {}",
stats.queued,
stats.in_progress,
round
round,
protocol_version
);
}

SERVER_METRICS.witness_generator_jobs_by_round[&(
"queued",
format!("{:?}", round),
ProtocolVersionId::current_prover_version().to_string(),
protocol_version.to_string(),
)]
.set(stats.queued as u64);
SERVER_METRICS.witness_generator_jobs_by_round[&(
"in_progress",
format!("{:?}", round),
ProtocolVersionId::current_prover_version().to_string(),
protocol_version.to_string(),
)]
.set(stats.in_progress as u64);
}
Expand All @@ -92,31 +94,31 @@ impl PeriodicJob for FriWitnessGeneratorQueueReporter {

async fn run_routine_task(&mut self) -> anyhow::Result<()> {
let stats_for_all_rounds = self.get_job_statistics().await;
let mut aggregated = JobCountStatistics::default();
for (round, stats) in stats_for_all_rounds {
emit_metrics_for_round(round, stats);
aggregated = aggregated + stats;
}
let mut aggregated = HashMap::<ProtocolVersionId, JobCountStatistics>::new();
for ((round, protocol_version), stats) in stats_for_all_rounds {
emit_metrics_for_round(round, protocol_version, &stats);

if aggregated.queued > 0 {
tracing::trace!(
"Found {} free {} in progress witness generators jobs",
aggregated.queued,
aggregated.in_progress
);
let entry = aggregated.entry(protocol_version).or_default();
entry.queued += stats.queued;
entry.in_progress += stats.in_progress;
}

SERVER_METRICS.witness_generator_jobs[&(
"queued",
ProtocolVersionId::current_prover_version().to_string(),
)]
.set(aggregated.queued as u64);
for (protocol_version, stats) in &aggregated {
if stats.queued > 0 || stats.in_progress > 0 {
tracing::trace!(
"Found {} free {} in progress witness generators jobs for protocol version {}",
stats.queued,
stats.in_progress,
protocol_version
);
}

SERVER_METRICS.witness_generator_jobs[&(
"in_progress",
ProtocolVersionId::current_prover_version().to_string(),
)]
.set(aggregated.in_progress as u64);
SERVER_METRICS.witness_generator_jobs[&("queued", protocol_version.to_string())]
.set(stats.queued as u64);

SERVER_METRICS.witness_generator_jobs[&("in_progress", protocol_version.to_string())]
.set(stats.in_progress as u64);
}

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions core/node/metadata_calculator/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,10 +642,10 @@ impl L1BatchWithLogs {
});
let reads = protective_reads.into_iter().map(TreeInstruction::Read);

// `writes` and `reads` are already sorted, we only need to merge them.
writes
.merge_by(reads, |a, b| a.key() <= b.key())
.collect::<Vec<_>>()
.chain(reads)
.sorted_by_key(|tree_instruction| tree_instruction.key())
.collect()
} else {
// Otherwise, load writes' data from other tables.
Self::extract_storage_logs_from_db(storage, l1_batch_number, protective_reads).await?
Expand Down
5 changes: 4 additions & 1 deletion infrastructure/zk/src/format_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ function formatOneLineQuery(line: string): string {

return prefix + '\n' + formattedQuery + '\n' + suffix;
}

async function formatFile(filePath: string, check: boolean) {
const content = await fs.promises.readFile(filePath, { encoding: 'utf-8' });
let linesToQuery = null;
Expand Down Expand Up @@ -157,7 +158,9 @@ async function formatFile(filePath: string, check: boolean) {

export async function formatSqlxQueries(check: boolean) {
process.chdir(`${process.env.ZKSYNC_HOME}`);
const { stdout: filesRaw } = await utils.exec('find core/lib/dal -type f -name "*.rs"');
const { stdout: filesRaw } = await utils.exec(
'find core/lib/dal -type f -name "*.rs" && find prover/prover_dal -type f -name "*.rs"'
);
const files = filesRaw.trim().split('\n');
const formatResults = await Promise.all(files.map((file) => formatFile(file, check)));
if (check) {
Expand Down
8 changes: 8 additions & 0 deletions prover/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"prover_fri_gateway",
"proof_fri_compressor",
"prover_cli",
"prover_version",
]

resolver = "2"
Expand Down
4 changes: 2 additions & 2 deletions prover/prover_cli/src/commands/status/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use prover_dal::{Connection, ConnectionPool, Prover, ProverDal};
use zksync_types::{
basic_fri_types::AggregationRound,
prover_dal::{
BasicWitnessGeneratorJobInfo, JobCountStatistics, LeafWitnessGeneratorJobInfo,
BasicWitnessGeneratorJobInfo, ExtendedJobCountStatistics, LeafWitnessGeneratorJobInfo,
NodeWitnessGeneratorJobInfo, ProofCompressionJobInfo, ProverJobFriInfo, ProverJobStatus,
RecursionTipWitnessGeneratorJobInfo, SchedulerWitnessGeneratorJobInfo,
},
Expand Down Expand Up @@ -383,7 +383,7 @@ fn display_prover_jobs_info(prover_jobs_info: Vec<ProverJobFriInfo>) {
}

fn display_job_status_count(jobs: Vec<ProverJobFriInfo>) {
let mut jobs_counts = JobCountStatistics::default();
let mut jobs_counts = ExtendedJobCountStatistics::default();
let total_jobs = jobs.len();
jobs.iter().for_each(|job| match job.status {
ProverJobStatus::Queued => jobs_counts.queued += 1,
Expand Down
Loading

0 comments on commit 70e622b

Please sign in to comment.