Skip to content

Commit

Permalink
Add ability to run stress for a finite duration or count
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood committed Sep 16, 2022
1 parent 2a03a5c commit 1a48cd2
Show file tree
Hide file tree
Showing 8 changed files with 662 additions and 354 deletions.
813 changes: 490 additions & 323 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions crates/sui-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ crossterm = "0.23.2"
rand = "0.8.5"
base64 = "0.13.0"
rand_distr = "0.4.3"
indicatif = "0.17.0"
duration-str = "0.4.0"

bcs = "0.1.3"
sui-core = { path = "../sui-core" }
Expand Down
22 changes: 21 additions & 1 deletion crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::time::Duration;
use strum_macros::EnumString;
use sui_benchmark::drivers::bench_driver::BenchDriver;
use sui_benchmark::drivers::driver::Driver;
use sui_benchmark::drivers::Interval;
use sui_benchmark::workloads::shared_counter::SharedCounterWorkload;
use sui_benchmark::workloads::transfer_object::TransferObjectWorkload;
use sui_benchmark::workloads::workload::get_latest;
Expand Down Expand Up @@ -113,6 +114,17 @@ struct Opts {
/// one (or some) is slow.
#[clap(long, parse(try_from_str), default_value = "true", global = true)]
pub disjoint_mode: bool,
/// Number of transactions or duration to
/// run the benchmark for. Default set to
/// "unbounded" i.e. benchmark runs forever
/// until terminated with a ctrl-c. However,
/// if we wanted to run the test for
/// 60 seconds, this could be set as "60s".
/// And if we wanted to run the test for
/// 10,000 transactions we could set it to
/// "10000"
#[clap(long, global = true, default_value = "unbounded")]
pub run_duration: Interval,
}

#[derive(Debug, Clone, Parser, Eq, PartialEq, EnumString)]
Expand Down Expand Up @@ -571,8 +583,16 @@ async fn main() -> Result<()> {
}
workloads
};
let interval = opts.run_duration;
// We only show the progress in stderr
// if benchmark is running in unbounded mode,
// otherwise summarized benchmark results are
// published in the end
let show_progress = interval.is_unbounded();
let driver = BenchDriver::new(stat_collection_interval);
driver.run(workloads, aggregator, &registry).await
driver
.run(workloads, aggregator, &registry, show_progress, interval)
.await
}
}
})
Expand Down
66 changes: 60 additions & 6 deletions crates/sui-benchmark/src/drivers/bench_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::future::try_join_all;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{stream::FuturesUnordered, StreamExt};
use indicatif::ProgressBar;
use prometheus::register_gauge_vec_with_registry;
use prometheus::register_histogram_vec_with_registry;
use prometheus::register_int_counter_vec_with_registry;
Expand Down Expand Up @@ -34,6 +35,8 @@ use tokio::sync::Barrier;
use tokio::time;
use tokio::time::Instant;
use tracing::{debug, error};

use super::Interval;
pub struct BenchMetrics {
pub num_success: IntCounterVec,
pub num_error: IntCounterVec,
Expand Down Expand Up @@ -107,13 +110,14 @@ enum NextOp {
Retry(RetryType),
}

async fn print_start_benchmark() {
static ONCE: OnceCell<bool> = OnceCell::const_new();
async fn start_benchmark(pb: Arc<ProgressBar>) -> &'static Instant {
static ONCE: OnceCell<Instant> = OnceCell::const_new();
ONCE.get_or_init(|| async move {
pb.finish();
eprintln!("Starting benchmark!");
true
Instant::now()
})
.await;
.await
}

pub struct BenchWorker {
Expand All @@ -124,12 +128,36 @@ pub struct BenchWorker {

pub struct BenchDriver {
pub stat_collection_interval: u64,
pub start_time: Instant,
}

impl BenchDriver {
pub fn new(stat_collection_interval: u64) -> BenchDriver {
BenchDriver {
stat_collection_interval,
start_time: Instant::now(),
}
}
pub fn update_progress(
start_time: Instant,
interval: Interval,
progress_bar: Arc<ProgressBar>,
) {
match interval {
Interval::Count(count) => {
progress_bar.inc(1);
if progress_bar.position() >= count {
progress_bar.finish();
}
}
Interval::Time(Duration::MAX) => progress_bar.inc(1),
Interval::Time(duration) => {
let elapsed_secs = (Instant::now() - start_time).as_secs();
progress_bar.set_position(std::cmp::min(duration.as_secs(), elapsed_secs));
if progress_bar.position() >= duration.as_secs() {
progress_bar.finish();
}
}
}
}
pub async fn make_workers(
Expand Down Expand Up @@ -167,6 +195,8 @@ impl Driver<()> for BenchDriver {
workloads: Vec<WorkloadInfo>,
aggregator: AuthorityAggregator<NetworkAuthorityClient>,
registry: &Registry,
show_progress: bool,
run_duration: Interval,
) -> Result<(), anyhow::Error> {
let mut tasks = Vec::new();
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
Expand All @@ -181,9 +211,18 @@ impl Driver<()> for BenchDriver {
let stat_delay_micros = 1_000_000 * self.stat_collection_interval;
let metrics = Arc::new(BenchMetrics::new(registry));
let barrier = Arc::new(Barrier::new(num_workers as usize));
let pb = Arc::new(ProgressBar::new(num_workers));
eprintln!("Setting up workers...");
let progress = Arc::new(match run_duration {
Interval::Count(count) => ProgressBar::new(count),
Interval::Time(Duration::MAX) => ProgressBar::hidden(),
Interval::Time(duration) => ProgressBar::new(duration.as_secs()),
});
for (i, worker) in bench_workers.into_iter().enumerate() {
let request_delay_micros = 1_000_000 / worker.target_qps;
let mut free_pool = worker.payload;
let progress = progress.clone();
let pb = pb.clone();
let tx_cloned = tx.clone();
let cloned_barrier = barrier.clone();
let metrics_cloned = metrics.clone();
Expand All @@ -192,8 +231,9 @@ impl Driver<()> for BenchDriver {
QuorumDriverHandler::new(aggregator.clone(), QuorumDriverMetrics::new_for_tests());
let qd = quorum_driver_handler.clone_quorum_driver();
let runner = tokio::spawn(async move {
pb.inc(1);
cloned_barrier.wait().await;
print_start_benchmark().await;
let start_time = start_benchmark(pb).await;
let mut num_success = 0;
let mut num_error = 0;
let mut min_latency = Duration::MAX;
Expand Down Expand Up @@ -335,6 +375,10 @@ impl Driver<()> for BenchDriver {
match op {
NextOp::Retry(b) => {
retry_queue.push_back(b);
BenchDriver::update_progress(*start_time, run_duration, progress.clone());
if progress.is_finished() {
break;
}
}
NextOp::Response(Some((latency, new_payload))) => {
num_success += 1;
Expand All @@ -346,6 +390,10 @@ impl Driver<()> for BenchDriver {
if latency < min_latency {
min_latency = latency;
}
BenchDriver::update_progress(*start_time, run_duration, progress.clone());
if progress.is_finished() {
break;
}
}
NextOp::Response(None) => {
// num_in_flight -= 1;
Expand All @@ -362,6 +410,7 @@ impl Driver<()> for BenchDriver {
tasks.push(tokio::spawn(async move {
let mut stat_collection: BTreeMap<usize, Stats> = BTreeMap::new();
let mut counter = 0;
let mut stat = "".to_string();
while let Some(s @ Stats {
id,
num_success: _,
Expand Down Expand Up @@ -408,10 +457,15 @@ impl Driver<()> for BenchDriver {
};
counter += 1;
if counter % num_workers == 0 {
eprintln!("Throughput = {}, min_latency_ms = {}, max_latency_ms = {}, num_success = {}, num_error = {}, no_gas = {}, submitted = {}, in_flight = {}", total_qps, min_latency.as_millis(), max_latency.as_millis(), num_success, num_error, num_no_gas, num_submitted, num_in_flight);
stat = format!("Throughput = {}, min_latency_ms = {}, max_latency_ms = {}, num_success = {}, num_error = {}, no_gas = {}, submitted = {}, in_flight = {}", total_qps, min_latency.as_millis(), max_latency.as_millis(), num_success, num_error, num_no_gas, num_submitted, num_in_flight);
if show_progress {
eprintln!("{}", stat);
}
}
}
eprintln!("{}", stat);
}));
drop(tx);
let _res: Vec<_> = try_join_all(tasks).await.unwrap().into_iter().collect();
Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-benchmark/src/drivers/driver.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::drivers::Interval;
use async_trait::async_trait;
use prometheus::Registry;
use sui_core::authority_aggregator::AuthorityAggregator;
Expand All @@ -14,5 +16,7 @@ pub trait Driver<T> {
workload: Vec<WorkloadInfo>,
aggregator: AuthorityAggregator<NetworkAuthorityClient>,
registry: &Registry,
show_progress: bool,
run_duration: Interval,
) -> Result<T, anyhow::Error>;
}
33 changes: 33 additions & 0 deletions crates/sui-benchmark/src/drivers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,38 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::str::FromStr;

use duration_str::parse;
use serde::{Deserialize, Serialize};

pub mod bench_driver;
pub mod driver;

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum Interval {
Count(u64),
Time(tokio::time::Duration),
}

impl Interval {
pub fn is_unbounded(&self) -> bool {
matches!(self, Interval::Time(tokio::time::Duration::MAX))
}
}

impl FromStr for Interval {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(i) = s.parse() {
Ok(Interval::Count(i))
} else if let Ok(d) = parse(s) {
Ok(Interval::Time(d))
} else if "unbounded" == s {
Ok(Interval::Time(tokio::time::Duration::MAX))
} else {
Err("Required integer number of cycles or time duration".to_string())
}
}
}
3 changes: 1 addition & 2 deletions crates/sui-benchmark/src/workloads/shared_counter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use super::workload::{submit_transaction, Gas, Payload, Workload, WorkloadType};
use crate::workloads::workload::{get_latest, transfer_sui_for_testing, MAX_GAS_FOR_TESTING};
use async_trait::async_trait;
use futures::future::join_all;
Expand All @@ -19,8 +20,6 @@ use test_utils::{
messages::create_publish_move_package_transaction, transaction::parse_package_ref,
};

use super::workload::{submit_transaction, Gas, Payload, Workload, WorkloadType};

pub struct SharedCounterTestPayload {
package_ref: ObjectRef,
counter_id: ObjectID,
Expand Down
Loading

0 comments on commit 1a48cd2

Please sign in to comment.