diff --git a/iroh/examples/sync.rs b/iroh/examples/sync.rs index 474ab699f2..f7bbe356a8 100644 --- a/iroh/examples/sync.rs +++ b/iroh/examples/sync.rs @@ -7,7 +7,10 @@ //! `cargo run --bin derper -- --dev` //! and then set the `-d http://localhost:3340` flag on this example. -use std::{collections::HashSet, fmt, net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc}; +use std::{ + collections::HashSet, fmt, net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc, + time::Instant, +}; use anyhow::{anyhow, bail}; use clap::{CommandFactory, FromArgMatches, Parser}; @@ -35,6 +38,7 @@ use serde::{Deserialize, Serialize}; use tokio::{ io::AsyncWriteExt, sync::{mpsc, oneshot}, + task::JoinHandle, }; use tracing::warn; use tracing_subscriber::{EnvFilter, Registry}; @@ -360,56 +364,58 @@ async fn handle_command( "> Hammering with prefix \"{prefix}\" for {threads} x {count} messages of size {size} bytes in {mode} mode", mode = format!("{mode:?}").to_lowercase() ); - let start = std::time::Instant::now(); - let mut handles = Vec::new(); + let start = Instant::now(); + let mut handles: Vec>> = Vec::new(); match mode { HammerMode::Set => { let mut bytes = vec![0; size]; + // TODO: Add a flag to fill content differently per entry to be able to + // test downloading too bytes.fill(97); for t in 0..threads { - let p = prefix.clone(); - let t_doc = doc.clone(); - let b = bytes.clone(); - let h = tokio::spawn(async move { + let prefix = prefix.clone(); + let doc = doc.clone(); + let bytes = bytes.clone(); + let handle = tokio::spawn(async move { for i in 0..count { - let value = String::from_utf8(b.clone()).unwrap(); - let key = format!("{}/{}/{}", p, t, i); - t_doc - .insert_bytes(key, value.into_bytes().into()) - .await - .unwrap(); + let value = String::from_utf8(bytes.clone()).unwrap(); + let key = format!("{}/{}/{}", prefix, t, i); + doc.insert_bytes(key, value.into_bytes().into()).await?; } + Ok(()) }); - handles.push(h); + handles.push(handle); } } HammerMode::Get => { for t in 0..threads { - let p = prefix.clone(); - let t_doc = doc.clone(); - let h = tokio::spawn(async move { + let prefix = prefix.clone(); + let doc = doc.clone(); + let handle = tokio::spawn(async move { for i in 0..count { - let key = format!("{}/{}/{}", p, t, i); - let entries = t_doc.replica().all_for_key(key.as_bytes()); + let key = format!("{}/{}/{}", prefix, t, i); + let entries = doc.replica().all_for_key(key.as_bytes()); for (_id, entry) in entries { - let _content = fmt_content(&t_doc, &entry).await; + let _content = fmt_content(&doc, &entry).await; } } + Ok(()) }); - handles.push(h); + handles.push(handle); } } } - let _result = futures::future::join_all(handles).await; + for result in futures::future::join_all(handles).await { + // Check that no errors ocurred + result??; + } let diff = start.elapsed().as_secs_f64(); let total_count = threads as u64 * count as u64; println!( - "> Hammering done in {:.2}s for {} messages with total of {}", - diff, - total_count, - HumanBytes(total_count * size as u64), + "> Hammering done in {diff:.2}s for {total_count} messages with total of {size}", + size = HumanBytes(total_count * size as u64), ); } Cmd::Exit => {}