Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
More replay stage timing metrics (#10828)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge authored Jun 28, 2020
1 parent ea30c15 commit 17a2128
Showing 1 changed file with 111 additions and 26 deletions.
137 changes: 111 additions & 26 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use solana_ledger::{
entry::VerifyRecyclers,
leader_schedule_cache::LeaderScheduleCache,
};
use solana_measure::thread_mem_usage;
use solana_measure::{measure::Measure, thread_mem_usage};
use solana_metrics::inc_new_counter_info;
use solana_runtime::{
bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache,
Expand All @@ -37,7 +37,7 @@ use solana_sdk::{
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::duration_as_ms,
timing::timestamp,
transaction::Transaction,
};
use solana_vote_program::{
Expand All @@ -54,7 +54,7 @@ use std::{
Arc, Mutex, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
time::Duration,
};

pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
Expand Down Expand Up @@ -110,36 +110,95 @@ pub struct ReplayStageConfig {

#[derive(Default)]
pub struct ReplayTiming {
num_iterations: u64,
last_print: u64,
compute_bank_stats_elapsed: u64,
select_vote_and_reset_forks_elapsed: u64,
start_leader_elapsed: u64,
reset_bank_elapsed: u64,
voting_elapsed: u64,
select_forks_elapsed: u64,
compute_slot_stats_elapsed: u64,
generate_new_bank_forks_elapsed: u64,
replay_active_banks_elapsed: u64,
reset_duplicate_slots_elapsed: u64,
}
impl ReplayTiming {
#[allow(clippy::too_many_arguments)]
fn update(
&mut self,
compute_bank_stats_elapsed: u64,
select_vote_and_reset_forks_elapsed: u64,
start_leader_elapsed: u64,
reset_bank_elapsed: u64,
voting_elapsed: u64,
select_forks_elapsed: u64,
compute_slot_stats_elapsed: u64,
generate_new_bank_forks_elapsed: u64,
replay_active_banks_elapsed: u64,
reset_duplicate_slots_elapsed: u64,
) {
self.num_iterations += 1;
self.compute_bank_stats_elapsed += compute_bank_stats_elapsed;
self.select_vote_and_reset_forks_elapsed += select_vote_and_reset_forks_elapsed;
if self.num_iterations == 100 {
self.start_leader_elapsed += start_leader_elapsed;
self.reset_bank_elapsed += reset_bank_elapsed;
self.voting_elapsed += voting_elapsed;
self.select_forks_elapsed += select_forks_elapsed;
self.compute_slot_stats_elapsed += compute_slot_stats_elapsed;
self.generate_new_bank_forks_elapsed += generate_new_bank_forks_elapsed;
self.replay_active_banks_elapsed += replay_active_banks_elapsed;
self.reset_duplicate_slots_elapsed += reset_duplicate_slots_elapsed;
let now = timestamp();
let elapsed_ms = now - self.last_print;
if elapsed_ms > 1000 {
datapoint_info!(
"replay-loop-timing-stats",
("total_elapsed_us", elapsed_ms * 1000, i64),
(
"compute_bank_stats_elapsed",
self.compute_bank_stats_elapsed as i64 / 100,
self.compute_bank_stats_elapsed as i64,
i64
),
(
"select_vote_and_reset_forks_elapsed",
self.select_vote_and_reset_forks_elapsed as i64 / 100,
self.select_vote_and_reset_forks_elapsed as i64,
i64
),
(
"start_leader_elapsed",
self.start_leader_elapsed as i64,
i64
),
("reset_bank_elapsed", self.reset_bank_elapsed as i64, i64),
("voting_elapsed", self.voting_elapsed as i64, i64),
(
"select_forks_elapsed",
self.select_forks_elapsed as i64,
i64
),
(
"compute_slot_stats_elapsed",
self.compute_slot_stats_elapsed as i64,
i64
),
(
"generate_new_bank_forks_elapsed",
self.generate_new_bank_forks_elapsed as i64,
i64
),
(
"replay_active_banks_elapsed",
self.replay_active_banks_elapsed as i64,
i64
),
(
"reset_duplicate_slots_elapsed",
self.reset_duplicate_slots_elapsed as i64,
i64
),
);
self.num_iterations = 0;
self.compute_bank_stats_elapsed = 0;
self.select_vote_and_reset_forks_elapsed = 0;

*self = ReplayTiming::default();
self.last_print = now;
}
}
}
Expand Down Expand Up @@ -254,6 +313,8 @@ impl ReplayStage {
}

let start = allocated.get();
let mut generate_new_bank_forks_time =
Measure::start("generate_new_bank_forks_time");
Self::generate_new_bank_forks(
&blockstore,
&bank_forks,
Expand All @@ -263,11 +324,13 @@ impl ReplayStage {
&mut progress,
&mut all_pubkeys,
);
generate_new_bank_forks_time.stop();
Self::report_memory(&allocated, "generate_new_bank_forks", start);

let mut tpu_has_bank = poh_recorder.lock().unwrap().has_bank();

let start = allocated.get();
let mut replay_active_banks_time = Measure::start("replay_active_banks_time");
let did_complete_bank = Self::replay_active_banks(
&blockstore,
&bank_forks,
Expand All @@ -279,8 +342,10 @@ impl ReplayStage {
&mut heaviest_subtree_fork_choice,
&subscriptions,
);
replay_active_banks_time.stop();
Self::report_memory(&allocated, "replay_active_banks", start);

let mut reset_duplicate_slots_time = Measure::start("reset_duplicate_slots");
let mut ancestors = bank_forks.read().unwrap().ancestors();
let mut descendants = bank_forks.read().unwrap().descendants();
let forks_root = bank_forks.read().unwrap().root();
Expand All @@ -296,6 +361,9 @@ impl ReplayStage {
&mut progress,
&bank_forks,
);
reset_duplicate_slots_time.stop();

let mut collect_frozen_banks_time = Measure::start("frozen_banks");
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
Expand All @@ -304,7 +372,9 @@ impl ReplayStage {
.filter(|(slot, _)| *slot >= forks_root)
.map(|(_, bank)| bank)
.collect();
let now = Instant::now();
collect_frozen_banks_time.stop();

let mut compute_bank_stats_time = Measure::start("compute_bank_stats");
let newly_computed_slot_stats = Self::compute_bank_stats(
&my_pubkey,
&ancestors,
Expand All @@ -318,7 +388,9 @@ impl ReplayStage {
&mut heaviest_subtree_fork_choice,
&mut bank_weight_fork_choice,
);
let compute_bank_stats_elapsed = now.elapsed().as_micros();
compute_bank_stats_time.stop();

let mut compute_slot_stats_time = Measure::start("compute_slot_stats_time");
for slot in newly_computed_slot_stats {
let fork_stats = progress.get_fork_stats(slot).unwrap();
let confirmed_forks = Self::confirm_forks(
Expand All @@ -337,7 +409,9 @@ impl ReplayStage {
.confirmation_reported = true;
}
}
compute_slot_stats_time.stop();

let mut select_forks_time = Measure::start("select_forks_time");
let fork_choice: &mut dyn ForkChoice =
if forks_root > unlock_heaviest_subtree_fork_choice_slot {
&mut heaviest_subtree_fork_choice
Expand All @@ -346,10 +420,12 @@ impl ReplayStage {
};
let (heaviest_bank, heaviest_bank_on_same_voted_fork) = fork_choice
.select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks);
select_forks_time.stop();

Self::report_memory(&allocated, "select_fork", start);

let now = Instant::now();
let mut select_vote_and_reset_forks_time =
Measure::start("select_vote_and_reset_forks");
let SelectVoteAndResetForkResult {
vote_bank,
reset_bank,
Expand All @@ -362,11 +438,7 @@ impl ReplayStage {
&progress,
&tower,
);
let select_vote_and_reset_forks_elapsed = now.elapsed().as_micros();
replay_timing.update(
compute_bank_stats_elapsed as u64,
select_vote_and_reset_forks_elapsed as u64,
);
select_vote_and_reset_forks_time.stop();

if tower.is_recent(heaviest_bank.slot()) && !heaviest_fork_failures.is_empty() {
info!(
Expand All @@ -388,6 +460,7 @@ impl ReplayStage {

let start = allocated.get();

let mut voting_time = Measure::start("voting_time");
// Vote on a fork
if let Some((ref vote_bank, ref switch_fork_decision)) = vote_bank {
if let Some(votable_leader) =
Expand Down Expand Up @@ -421,10 +494,12 @@ impl ReplayStage {
&mut heaviest_subtree_fork_choice,
)?;
};
voting_time.stop();

Self::report_memory(&allocated, "votable_bank", start);
let start = allocated.get();

let mut reset_bank_time = Measure::start("reset_bank");
// Reset onto a fork
if let Some(reset_bank) = reset_bank {
if last_reset != reset_bank.last_blockhash() {
Expand Down Expand Up @@ -486,14 +561,13 @@ impl ReplayStage {
inc_new_counter_info!("replay_stage-partition_resolved", 1);
}
}
datapoint_debug!(
"replay_stage-memory",
("reset_bank", (allocated.get() - start) as i64, i64),
);
Self::report_memory(&allocated, "reset_bank", start);
}
reset_bank_time.stop();
Self::report_memory(&allocated, "reset_bank", start);

let start = allocated.get();
let mut start_leader_time = Measure::start("start_leader_time");
if !tpu_has_bank {
Self::maybe_start_leader(
&my_pubkey,
Expand All @@ -517,11 +591,22 @@ impl ReplayStage {
);
}
}
start_leader_time.stop();
Self::report_memory(&allocated, "start_leader", start);
datapoint_debug!(
"replay_stage",
("duration", duration_as_ms(&now.elapsed()) as i64, i64)

replay_timing.update(
compute_bank_stats_time.as_us(),
select_vote_and_reset_forks_time.as_us(),
start_leader_time.as_us(),
reset_bank_time.as_us(),
voting_time.as_us(),
select_forks_time.as_us(),
compute_slot_stats_time.as_us(),
generate_new_bank_forks_time.as_us(),
replay_active_banks_time.as_us(),
reset_duplicate_slots_time.as_us(),
);

if did_complete_bank {
//just processed a bank, skip the signal; maybe there's more slots available
continue;
Expand Down

0 comments on commit 17a2128

Please sign in to comment.