Skip to content

Commit

Permalink
add latency metrics and measurement (#4536)
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Sep 11, 2022
1 parent 34b1aa2 commit 2f83760
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 36 deletions.
6 changes: 3 additions & 3 deletions crates/sui-benchmark/src/benchmark/load_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,21 +280,21 @@ impl FixedRateLoadGenerator {
}
}

pub async fn spawn_authority_server(
pub async fn spawn_authority_server_for_test(
listen_address: Multiaddr,
state: AuthorityState,
) -> AuthorityServerHandle {
// The following two fields are only needed for shared objects (not by this bench).
let consensus_address = "/dns/localhost/tcp/0/http".parse().unwrap();
let (tx_consensus_listener, _rx_consensus_listener) = tokio::sync::mpsc::channel(1);

let server = AuthorityServer::new(
let server = AuthorityServer::new_for_test(
listen_address,
Arc::new(state),
consensus_address,
tx_consensus_listener,
);
server.spawn().await.unwrap()
server.spawn_for_test().await.unwrap()
}

pub fn calculate_throughput(num_items: usize, elapsed_time_us: u128) -> f64 {
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-benchmark/src/benchmark/validator_preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#![allow(clippy::large_enum_variant)]
use crate::benchmark::bench_types::RunningMode;
use crate::benchmark::load_generator::spawn_authority_server;
use crate::benchmark::load_generator::spawn_authority_server_for_test;
use sui_config::genesis_config::ObjectConfig;
use sui_config::NetworkConfig;

Expand Down Expand Up @@ -158,7 +158,7 @@ impl ValidatorPreparer {
thread::spawn(move || {
info!("Spawning a validator thread...");
get_multithread_runtime().block_on(async move {
let server = spawn_authority_server(address, state).await;
let server = spawn_authority_server_for_test(address, state).await;
if let Err(e) = server.join().await {
error!("Server ended with an error: {e}");
}
Expand Down
78 changes: 73 additions & 5 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
checkpoints::CheckpointStore,
event_handler::EventHandler,
execution_engine,
metrics::start_timer,
query_helpers::QueryHelpers,
transaction_input_checker,
transaction_streamer::TransactionStreamer,
Expand Down Expand Up @@ -64,6 +65,8 @@ use tap::TapFallible;
use thiserror::Error;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc::Sender;
use tokio::time::Instant;
use tracing::Instrument;
use tracing::{debug, error, instrument, warn};
use typed_store::Map;

Expand Down Expand Up @@ -122,6 +125,12 @@ pub struct AuthorityMetrics {
num_shared_objects: Histogram,
batch_size: Histogram,

prepare_certificate_latency: Histogram,
commit_certificate_latency: Histogram,
handle_transaction_latency: Histogram,
handle_certificate_latency: Histogram,
handle_node_sync_certificate_latency: Histogram,

total_consensus_txns: IntCounter,

pub follower_items_streamed: IntCounter,
Expand Down Expand Up @@ -215,6 +224,36 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
prepare_certificate_latency: register_histogram_with_registry!(
"validator_prepare_certificate_latency",
"Latency of preparing certificate",
registry,
)
.unwrap(),
commit_certificate_latency: register_histogram_with_registry!(
"validator_commit_certificate_latency",
"Latency of committing certificate",
registry,
)
.unwrap(),
handle_transaction_latency: register_histogram_with_registry!(
"validator_handle_transaction_latency",
"Latency of committing certificate",
registry,
)
.unwrap(),
handle_certificate_latency: register_histogram_with_registry!(
"validator_handle_certificate_latency",
"Latency of handling certificate",
registry,
)
.unwrap(),
handle_node_sync_certificate_latency: register_histogram_with_registry!(
"fullnode_handle_node_sync_certificate_latency",
"Latency of fullnode handling certificate from node sync",
registry,
)
.unwrap(),
total_consensus_txns: register_int_counter_with_registry!(
"total_consensus_txns",
"Total number of consensus transactions received from narwhal",
Expand Down Expand Up @@ -409,6 +448,10 @@ impl AuthorityState {
) -> Result<TransactionInfoResponse, SuiError> {
let transaction_digest = *transaction.digest();
debug!(tx_digest=?transaction_digest, "handle_transaction. Tx data: {:?}", transaction.signed_data.data);
let start_ts = Instant::now();
let _metrics_guard =
start_timer!(self.metrics.handle_transaction_latency.clone(), &start_ts);

self.metrics.tx_orders.inc();
// Check the sender's signature.
transaction.verify().map_err(|e| {
Expand Down Expand Up @@ -445,6 +488,11 @@ impl AuthorityState {
// byzantine validator from giving us incorrect effects.
signed_effects: SignedTransactionEffects,
) -> SuiResult {
let start_ts = Instant::now();
let _metrics_guard = start_timer!(
self.metrics.handle_node_sync_certificate_latency.clone(),
&start_ts
);
let digest = *certificate.digest();
debug!(?digest, "handle_node_sync_transaction");
fp_ensure!(
Expand Down Expand Up @@ -488,15 +536,19 @@ impl AuthorityState {
&self,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
let start_ts = Instant::now();
let _metrics_guard =
start_timer!(self.metrics.handle_certificate_latency.clone(), &start_ts);

self.metrics.total_cert_attempts.inc();
if self.is_fullnode() {
return Err(SuiError::GenericStorageError(
"cannot execute cert without effects on fullnode".into(),
));
}

let digest = certificate.digest();
debug!(?digest, "handle_confirmation_transaction");
let tx_digest = certificate.digest();
debug!(?tx_digest, "handle_confirmation_transaction");

// This acquires a lock on the tx digest to prevent multiple concurrent executions of the
// same tx. While we don't need this for safety (tx sequencing is ultimately atomic), it is
Expand All @@ -508,11 +560,20 @@ impl AuthorityState {
// to do this, since the false contention can be made arbitrarily low (no cost for 1.0 -
// epsilon of txes) while solutions without false contention have slightly higher cost
// for every tx.
let tx_guard = self.database.acquire_tx_guard(&certificate).await?;
let span = tracing::debug_span!(
"validator_acquire_tx_guard",
?tx_digest,
tx_kind = certificate.signed_data.data.kind_as_str()
);
let tx_guard = self
.database
.acquire_tx_guard(&certificate)
.instrument(span)
.await?;

self.process_certificate(tx_guard, &certificate)
.await
.tap_err(|e| debug!(?digest, "process_certificate failed: {}", e))
.tap_err(|e| debug!(?tx_digest, "process_certificate failed: {}", e))
}

#[instrument(level = "trace", skip_all)]
Expand Down Expand Up @@ -667,6 +728,7 @@ impl AuthorityState {
certificate: &CertifiedTransaction,
transaction_digest: TransactionDigest,
) -> SuiResult<(InnerTemporaryStore, SignedTransactionEffects)> {
let start_ts = Instant::now();
let (gas_status, input_objects) =
transaction_input_checker::check_transaction_input(&self.database, certificate).await?;

Expand Down Expand Up @@ -706,7 +768,9 @@ impl AuthorityState {

// TODO: Distribute gas charge and rebate, which can be retrieved from effects.
let signed_effects = effects.to_sign_effects(self.epoch(), &self.name, &*self.secret);

self.metrics
.prepare_certificate_latency
.observe(start_ts.elapsed().as_secs_f64());
Ok((inner_temp_store, signed_effects))
}

Expand Down Expand Up @@ -1644,6 +1708,10 @@ impl AuthorityState {
certificate: &CertifiedTransaction,
signed_effects: &SignedTransactionEffects,
) -> SuiResult {
let start_ts = Instant::now();
let _metrics_guard =
start_timer!(self.metrics.commit_certificate_latency.clone(), &start_ts);

if self.is_halted() && !certificate.signed_data.data.kind.is_system_tx() {
// TODO: Here we should allow consensus transaction to continue.
// TODO: Do we want to include the new validator set?
Expand Down
Loading

0 comments on commit 2f83760

Please sign in to comment.