diff --git a/iroh-bytes/src/provider.rs b/iroh-bytes/src/provider.rs index 261f5cdd03..aeb32c8631 100644 --- a/iroh-bytes/src/provider.rs +++ b/iroh-bytes/src/provider.rs @@ -12,6 +12,7 @@ use iroh_io::stats::{ }; use iroh_io::{AsyncStreamWriter, TokioStreamWriter}; use serde::{Deserialize, Serialize}; +use tokio_util::task::LocalPoolHandle; use tracing::{debug, debug_span, info, trace, warn}; use tracing_futures::Instrument; @@ -351,7 +352,7 @@ pub async fn handle_connection( connecting: quinn::Connecting, db: D, events: E, - rt: crate::util::runtime::Handle, + rt: LocalPoolHandle, ) { let remote_addr = connecting.remote_address(); let connection = match connecting.await { @@ -376,7 +377,7 @@ pub async fn handle_connection( }; events.send(Event::ClientConnected { connection_id }).await; let db = db.clone(); - rt.local_pool().spawn_pinned(|| { + rt.spawn_pinned(|| { async move { if let Err(err) = handle_stream(db, reader, writer).await { warn!("error: {err:#?}",); diff --git a/iroh-bytes/src/store/flat.rs b/iroh-bytes/src/store/flat.rs index 6e95382f80..20ff883102 100644 --- a/iroh-bytes/src/store/flat.rs +++ b/iroh-bytes/src/store/flat.rs @@ -362,10 +362,7 @@ impl PartialMap for Store { fn insert_complete(&self, entry: Self::PartialEntry) -> BoxFuture<'_, io::Result<()>> { let this = self.clone(); - self.0 - .options - .rt - .spawn_blocking(move || this.insert_complete_sync(entry)) + tokio::task::spawn_blocking(move || this.insert_complete_sync(entry)) .map(flatten_to_io) .boxed() } @@ -378,7 +375,6 @@ struct Options { meta_path: PathBuf, move_threshold: u64, inline_threshold: u64, - rt: tokio::runtime::Handle, } impl Options { @@ -678,8 +674,7 @@ impl ReadableStore for Store { progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static, ) -> BoxFuture<'_, io::Result<()>> { let this = self.clone(); - self.rt() - .spawn_blocking(move || this.export_sync(hash, target, mode, progress)) + tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress)) .map(flatten_to_io) .boxed() } @@ -694,16 +689,14 @@ impl super::Store for Store { progress: impl ProgressSender + IdGenerator, ) -> BoxFuture<'_, io::Result<(TempTag, u64)>> { let this = self.clone(); - self.rt() - .spawn_blocking(move || this.import_file_sync(path, mode, format, progress)) + tokio::task::spawn_blocking(move || this.import_file_sync(path, mode, format, progress)) .map(flatten_to_io) .boxed() } fn import_bytes(&self, data: Bytes, format: BlobFormat) -> BoxFuture<'_, io::Result> { let this = self.clone(); - self.rt() - .spawn_blocking(move || this.import_bytes_sync(data, format)) + tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format)) .map(flatten_to_io) .boxed() } @@ -714,7 +707,6 @@ impl super::Store for Store { format: BlobFormat, progress: impl ProgressSender + IdGenerator, ) -> BoxFuture<'_, io::Result<(TempTag, u64)>> { - let rt = self.rt().clone(); let this = self.clone(); async move { let id = progress.new_id(); @@ -737,29 +729,25 @@ impl super::Store for Store { writer.flush().await?; drop(writer); let file = ImportFile::TempFile(temp_data_path); - rt.spawn_blocking(move || this.finalize_import_sync(file, format, id, progress)) - .map(flatten_to_io) - .await + tokio::task::spawn_blocking(move || { + this.finalize_import_sync(file, format, id, progress) + }) + .map(flatten_to_io) + .await } .boxed() } fn create_tag(&self, value: HashAndFormat) -> BoxFuture<'_, io::Result> { let this = self.clone(); - self.0 - .options - .rt - .spawn_blocking(move || this.create_tag_sync(value)) + tokio::task::spawn_blocking(move || this.create_tag_sync(value)) .map(flatten_to_io) .boxed() } fn set_tag(&self, name: Tag, value: Option) -> BoxFuture<'_, io::Result<()>> { let this = self.clone(); - self.0 - .options - .rt - .spawn_blocking(move || this.set_tag_sync(name, value)) + tokio::task::spawn_blocking(move || this.set_tag_sync(name, value)) .map(flatten_to_io) .boxed() } @@ -788,10 +776,7 @@ impl super::Store for Store { tracing::debug!("delete: {:?}", hash); let this = self.clone(); let hash = *hash; - self.0 - .options - .rt - .spawn_blocking(move || this.delete_sync(hash)) + tokio::task::spawn_blocking(move || this.delete_sync(hash)) .map(flatten_to_io) .boxed() } @@ -839,10 +824,6 @@ impl ImportFile { } impl Store { - fn rt(&self) -> &tokio::runtime::Handle { - &self.0.options.rt - } - fn temp_path(&self) -> PathBuf { self.0.options.partial_path.join(temp_name()) } @@ -1197,7 +1178,6 @@ impl Store { complete_path: PathBuf, partial_path: PathBuf, meta_path: PathBuf, - rt: crate::util::runtime::Handle, ) -> anyhow::Result { tracing::info!( "loading database from {} {}", @@ -1456,7 +1436,6 @@ impl Store { meta_path, move_threshold: 1024 * 128, inline_threshold: 1024 * 16, - rt: rt.main().clone(), }, complete_io_mutex: Mutex::new(()), }))) @@ -1467,13 +1446,11 @@ impl Store { complete_path: impl AsRef, partial_path: impl AsRef, meta_path: impl AsRef, - rt: &crate::util::runtime::Handle, ) -> anyhow::Result { let complete_path = complete_path.as_ref().to_path_buf(); let partial_path = partial_path.as_ref().to_path_buf(); let meta_path = meta_path.as_ref().to_path_buf(); - let rt = rt.clone(); - let db = Self::load_sync(complete_path, partial_path, meta_path, rt)?; + let db = Self::load_sync(complete_path, partial_path, meta_path)?; Ok(db) } @@ -1482,16 +1459,14 @@ impl Store { complete_path: impl AsRef, partial_path: impl AsRef, meta_path: impl AsRef, - rt: &crate::util::runtime::Handle, ) -> anyhow::Result { let complete_path = complete_path.as_ref().to_path_buf(); let partial_path = partial_path.as_ref().to_path_buf(); let meta_path = meta_path.as_ref().to_path_buf(); - let rtc = rt.clone(); - let db = rt - .main() - .spawn_blocking(move || Self::load_sync(complete_path, partial_path, meta_path, rtc)) - .await??; + let db = tokio::task::spawn_blocking(move || { + Self::load_sync(complete_path, partial_path, meta_path) + }) + .await??; Ok(db) } diff --git a/iroh-bytes/src/store/mem.rs b/iroh-bytes/src/store/mem.rs index 9cbf5d92e1..910fd7724b 100644 --- a/iroh-bytes/src/store/mem.rs +++ b/iroh-bytes/src/store/mem.rs @@ -22,7 +22,7 @@ use crate::{ }, util::{ progress::{IdGenerator, IgnoreProgressSender, ProgressSender}, - runtime, LivenessTracker, + LivenessTracker, }, BlobFormat, Hash, HashAndFormat, Tag, TempTag, IROH_BLOCK_SIZE, }; @@ -182,13 +182,12 @@ impl AsyncSliceWriter for MemFile { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] /// A full in memory database for iroh-bytes. pub struct Store(Arc); -#[derive(Debug)] +#[derive(Debug, Default)] struct Inner { - rt: runtime::Handle, state: RwLock, } @@ -373,10 +372,7 @@ impl ReadableStore for Store { progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static, ) -> BoxFuture<'_, io::Result<()>> { let this = self.clone(); - self.0 - .rt - .main() - .spawn_blocking(move || this.export_sync(hash, target, mode, progress)) + tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress)) .map(flatten_to_io) .boxed() } @@ -458,25 +454,22 @@ impl super::Store for Store { progress: impl ProgressSender + IdGenerator, ) -> BoxFuture<'_, io::Result<(TempTag, u64)>> { let this = self.clone(); - self.0 - .rt - .main() - .spawn_blocking(move || { - let id = progress.new_id(); - progress.blocking_send(ImportProgress::Found { - id, - name: path.to_string_lossy().to_string(), - })?; - progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?; - // todo: provide progress for reading into mem - let bytes: Bytes = std::fs::read(path)?.into(); - let size = bytes.len() as u64; - progress.blocking_send(ImportProgress::Size { id, size })?; - let tag = this.import_bytes_sync(id, bytes, format, progress)?; - Ok((tag, size)) - }) - .map(flatten_to_io) - .boxed() + tokio::task::spawn_blocking(move || { + let id = progress.new_id(); + progress.blocking_send(ImportProgress::Found { + id, + name: path.to_string_lossy().to_string(), + })?; + progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?; + // todo: provide progress for reading into mem + let bytes: Bytes = std::fs::read(path)?.into(); + let size = bytes.len() as u64; + progress.blocking_send(ImportProgress::Size { id, size })?; + let tag = this.import_bytes_sync(id, bytes, format, progress)?; + Ok((tag, size)) + }) + .map(flatten_to_io) + .boxed() } fn import_stream( @@ -511,14 +504,11 @@ impl super::Store for Store { fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> BoxFuture<'_, io::Result> { let this = self.clone(); - self.0 - .rt - .main() - .spawn_blocking(move || { - this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default()) - }) - .map(flatten_to_io) - .boxed() + tokio::task::spawn_blocking(move || { + this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default()) + }) + .map(flatten_to_io) + .boxed() } fn set_tag(&self, name: Tag, value: Option) -> BoxFuture<'_, io::Result<()>> { @@ -582,11 +572,8 @@ impl LivenessTracker for Inner { impl Store { /// Create a new in memory database, using the given runtime. - pub fn new(rt: runtime::Handle) -> Self { - Self(Arc::new(Inner { - rt, - state: RwLock::new(State::default()), - })) + pub fn new() -> Self { + Self::default() } fn import_bytes_sync( diff --git a/iroh-bytes/src/util.rs b/iroh-bytes/src/util.rs index 12059371f8..2b96ab1163 100644 --- a/iroh-bytes/src/util.rs +++ b/iroh-bytes/src/util.rs @@ -8,7 +8,6 @@ use crate::{BlobFormat, Hash, HashAndFormat}; pub mod io; pub mod progress; -pub mod runtime; /// A tag #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, From, Into)] diff --git a/iroh-bytes/src/util/runtime.rs b/iroh-bytes/src/util/runtime.rs deleted file mode 100644 index ec64411fdc..0000000000 --- a/iroh-bytes/src/util/runtime.rs +++ /dev/null @@ -1,44 +0,0 @@ -//! The runtime module provides the iroh runtime, consisting of a general purpose -//! tokio runtime and a set of single threaded runtimes. -use std::sync::Arc; - -/// A handle to the iroh runtime -#[derive(Debug, Clone)] -pub struct Handle { - inner: Arc, -} - -impl Handle { - /// Create a new iroh runtime consisting of a tokio runtime and a thread per - /// core runtime. - pub fn new(rt: tokio::runtime::Handle, tpc: tokio_util::task::LocalPoolHandle) -> Self { - Self { - inner: Arc::new(HandleInner { rt, tpc }), - } - } - - /// Create a new iroh runtime using the current tokio runtime as the main - /// runtime, and the given number of thread per core executors. - pub fn from_current(size: usize) -> std::result::Result { - Ok(Self::new( - tokio::runtime::Handle::try_current()?, - tokio_util::task::LocalPoolHandle::new(size), - )) - } - - /// Get a handle to the main tokio runtime - pub fn main(&self) -> &tokio::runtime::Handle { - &self.inner.rt - } - - /// Get a handle to the thread pool for single threaded executors - pub fn local_pool(&self) -> &tokio_util::task::LocalPoolHandle { - &self.inner.tpc - } -} - -#[derive(Debug)] -struct HandleInner { - rt: tokio::runtime::Handle, - tpc: tokio_util::task::LocalPoolHandle, -} diff --git a/iroh/examples/client.rs b/iroh/examples/client.rs index 37bb2e9bce..03ce45c43c 100644 --- a/iroh/examples/client.rs +++ b/iroh/examples/client.rs @@ -8,19 +8,14 @@ use indicatif::HumanBytes; use iroh::node::Node; use iroh_base::base32; -use iroh_bytes::util::runtime; use iroh_sync::{store::Query, Entry}; use tokio_stream::StreamExt; #[tokio::main] async fn main() -> anyhow::Result<()> { - let rt = runtime::Handle::from_current(1)?; - let db = iroh_bytes::store::mem::Store::new(rt.clone()); + let db = iroh_bytes::store::mem::Store::new(); let store = iroh_sync::store::memory::Store::default(); - let node = Node::builder(db.clone(), store) - .runtime(&rt) - .spawn() - .await?; + let node = Node::builder(db.clone(), store).spawn().await?; let client = node.client(); let doc = client.docs.create().await?; let author = client.authors.create().await?; diff --git a/iroh/examples/collection.rs b/iroh/examples/collection.rs index e996bde509..ee3fc29627 100644 --- a/iroh/examples/collection.rs +++ b/iroh/examples/collection.rs @@ -6,9 +6,9 @@ //! This is using an in memory database and a random node id. //! run this example from the project root: //! $ cargo run -p collection -use iroh::bytes::util::runtime; use iroh::collection::{Blob, Collection}; use iroh_bytes::BlobFormat; +use tokio_util::task::LocalPoolHandle; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -39,8 +39,8 @@ async fn main() -> anyhow::Result<()> { // create a collection and add it to the db as well let collection = Collection::new(blobs, 0)?; let hash = db.insert_many(collection.to_blobs()).unwrap(); - // create a new iroh runtime with 1 worker thread, reusing the existing tokio runtime - let rt = runtime::Handle::from_current(1)?; + // create a new local pool handle with 1 worker thread + let lp = LocalPoolHandle::new(1); // create an in-memory doc store for iroh sync (not used here) let doc_store = iroh_sync::store::memory::Store::default(); @@ -48,7 +48,7 @@ async fn main() -> anyhow::Result<()> { // create a new node // we must configure the iroh collection parser so the node understands iroh collections let node = iroh::node::Node::builder(db, doc_store) - .runtime(&rt) + .local_pool(&lp) .spawn() .await?; // create a ticket diff --git a/iroh/examples/hello-world.rs b/iroh/examples/hello-world.rs index 25c47b169a..296ea008c1 100644 --- a/iroh/examples/hello-world.rs +++ b/iroh/examples/hello-world.rs @@ -5,8 +5,8 @@ //! This is using an in memory database and a random node id. //! run this example from the project root: //! $ cargo run --example hello-world -use iroh::bytes::util::runtime; use iroh_bytes::BlobFormat; +use tokio_util::task::LocalPoolHandle; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -26,12 +26,12 @@ async fn main() -> anyhow::Result<()> { // create an in-memory doc store (not used in the example) let doc_store = iroh_sync::store::memory::Store::default(); // create a new iroh runtime with 1 worker thread, reusing the existing tokio runtime - let rt = runtime::Handle::from_current(1)?; + let lp = LocalPoolHandle::new(1); // add some data and remember the hash let hash = db.insert(b"Hello, world!"); // create a new node let node = iroh::node::Node::builder(db, doc_store) - .runtime(&rt) + .local_pool(&lp) .spawn() .await?; // create a ticket diff --git a/iroh/examples/rpc.rs b/iroh/examples/rpc.rs index d2dba0a724..ffd8787e6c 100644 --- a/iroh/examples/rpc.rs +++ b/iroh/examples/rpc.rs @@ -9,12 +9,13 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use clap::Parser; +use iroh::rpc_protocol::ProviderService; use iroh::rpc_protocol::{ProviderRequest, ProviderResponse}; -use iroh::{bytes::util::runtime, rpc_protocol::ProviderService}; use iroh_bytes::store::Store; use iroh_net::key::SecretKey; use quic_rpc::transport::quinn::QuinnServerEndpoint; use quic_rpc::ServiceEndpoint; +use tokio_util::task::LocalPoolHandle; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -45,7 +46,7 @@ fn make_rpc_endpoint( async fn run(db: impl Store) -> anyhow::Result<()> { // create a new iroh runtime with 1 worker thread, reusing the existing tokio runtime - let rt = runtime::Handle::from_current(1)?; + let lp = LocalPoolHandle::new(1); // create a random secret key let secret_key = SecretKey::generate(); // create a rpc endpoint @@ -56,7 +57,7 @@ async fn run(db: impl Store) -> anyhow::Result<()> { let doc_store = iroh_sync::store::memory::Store::default(); let node = iroh::node::Node::builder(db, doc_store) .secret_key(secret_key) - .runtime(&rt) + .local_pool(&lp) .rpc_endpoint(rpc_endpoint) .spawn() .await?; @@ -86,17 +87,16 @@ struct Args { #[tokio::main] async fn main() -> anyhow::Result<()> { setup_logging(); - let rt = runtime::Handle::from_current(1)?; let args = Args::parse(); match args.path { Some(path) => { tokio::fs::create_dir_all(&path).await?; - let db = iroh_bytes::store::flat::Store::load(&path, &path, &path, &rt).await?; + let db = iroh_bytes::store::flat::Store::load(&path, &path, &path).await?; run(db).await } None => { - let db = iroh_bytes::store::mem::Store::new(rt); + let db = iroh_bytes::store::mem::Store::new(); run(db).await } } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index dffe203b74..eb40bd9e41 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -18,7 +18,6 @@ use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; use iroh_bytes::provider::AddProgress; use iroh_bytes::store::ValidateProgress; // use iroh_bytes::util::progress::FlumeProgressSender; -use iroh_bytes::util::runtime; use iroh_bytes::Hash; use iroh_bytes::{BlobFormat, Tag}; use iroh_net::{key::PublicKey, magic_endpoint::ConnectionInfo, NodeAddr}; @@ -70,14 +69,11 @@ where C: ServiceConnection, { /// Create a new high-level client to a Iroh node from the low-level RPC client. - pub fn new(rpc: RpcClient, rt: runtime::Handle) -> Self { + pub fn new(rpc: RpcClient) -> Self { Self { node: NodeClient { rpc: rpc.clone() }, blobs: BlobsClient { rpc: rpc.clone() }, - docs: DocsClient { - rpc: rpc.clone(), - rt, - }, + docs: DocsClient { rpc: rpc.clone() }, authors: AuthorsClient { rpc: rpc.clone() }, tags: TagsClient { rpc }, } @@ -135,7 +131,6 @@ where #[derive(Debug, Clone)] pub struct DocsClient { rpc: RpcClient, - rt: runtime::Handle, } impl DocsClient @@ -145,7 +140,7 @@ where /// Create a new document. pub async fn create(&self) -> Result> { let res = self.rpc.rpc(DocCreateRequest {}).await??; - let doc = Doc::new(self.rt.clone(), self.rpc.clone(), res.id); + let doc = Doc::new(self.rpc.clone(), res.id); Ok(doc) } @@ -162,7 +157,7 @@ where /// Import a document from a ticket and join all peers in the ticket. pub async fn import(&self, ticket: DocTicket) -> Result> { let res = self.rpc.rpc(DocImportRequest(ticket)).await??; - let doc = Doc::new(self.rt.clone(), self.rpc.clone(), res.doc_id); + let doc = Doc::new(self.rpc.clone(), res.doc_id); Ok(doc) } @@ -175,7 +170,7 @@ where /// Get a [`Doc`] client for a single document. Return None if the document cannot be found. pub async fn open(&self, id: NamespaceId) -> Result>> { self.rpc.rpc(DocOpenRequest { doc_id: id }).await??; - let doc = Doc::new(self.rt.clone(), self.rpc.clone(), id); + let doc = Doc::new(self.rpc.clone(), id); Ok(Some(doc)) } } @@ -537,7 +532,7 @@ struct DocInner> { id: NamespaceId, rpc: RpcClient, closed: AtomicBool, - rt: runtime::Handle, + rt: tokio::runtime::Handle, } impl Drop for DocInner @@ -547,7 +542,7 @@ where fn drop(&mut self) { let doc_id = self.id; let rpc = self.rpc.clone(); - self.rt.main().spawn(async move { + self.rt.spawn(async move { rpc.rpc(DocCloseRequest { doc_id }).await.ok(); }); } @@ -557,12 +552,12 @@ impl Doc where C: ServiceConnection, { - fn new(rt: runtime::Handle, rpc: RpcClient, id: NamespaceId) -> Self { + fn new(rpc: RpcClient, id: NamespaceId) -> Self { Self(Arc::new(DocInner { rpc, id, closed: AtomicBool::new(false), - rt, + rt: tokio::runtime::Handle::current(), })) } @@ -958,17 +953,17 @@ where mod tests { use super::*; - use iroh_bytes::util::runtime; use rand::RngCore; use tokio::io::AsyncWriteExt; + use tokio_util::task::LocalPoolHandle; #[tokio::test] async fn test_drop_doc_client_sync() -> Result<()> { let db = iroh_bytes::store::readonly_mem::Store::default(); let doc_store = iroh_sync::store::memory::Store::default(); - let rt = runtime::Handle::from_current(1)?; + let lp = LocalPoolHandle::new(1); let node = crate::node::Node::builder(db, doc_store) - .runtime(&rt) + .local_pool(&lp) .spawn() .await?; @@ -990,12 +985,8 @@ mod tests { #[tokio::test] async fn test_doc_import_export() -> Result<()> { let doc_store = iroh_sync::store::memory::Store::default(); - let rt = runtime::Handle::from_current(1)?; - let db = iroh_bytes::store::mem::Store::new(rt.clone()); - let node = crate::node::Node::builder(db, doc_store) - .runtime(&rt) - .spawn() - .await?; + let db = iroh_bytes::store::mem::Store::new(); + let node = crate::node::Node::builder(db, doc_store).spawn().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs index 42ccdfa268..82e6cd2e98 100644 --- a/iroh/src/client/quic.rs +++ b/iroh/src/client/quic.rs @@ -6,7 +6,6 @@ use std::{ time::Duration, }; -use iroh_bytes::util::runtime; use quic_rpc::transport::quinn::QuinnConnection; use crate::rpc_protocol::{NodeStatusRequest, ProviderRequest, ProviderResponse, ProviderService}; @@ -27,13 +26,9 @@ pub type Iroh = super::Iroh>; pub type Doc = super::Doc>; /// Connect to an iroh node running on the same computer, but in a different process. -pub async fn connect(rpc_port: u16, rt: Option) -> anyhow::Result { - let rt = match rt { - Some(rt) => rt, - None => runtime::Handle::from_current(1)?, - }; +pub async fn connect(rpc_port: u16) -> anyhow::Result { let client = connect_raw(rpc_port).await?; - Ok(Iroh::new(client, rt)) + Ok(Iroh::new(client)) } /// Create a raw RPC client to an iroh node running on the same computer, but in a different diff --git a/iroh/src/commands.rs b/iroh/src/commands.rs index c592718956..acee8b63bf 100644 --- a/iroh/src/commands.rs +++ b/iroh/src/commands.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use anyhow::{bail, ensure, Context, Result}; use clap::Parser; -use iroh_bytes::util::runtime; +use tokio_util::task::LocalPoolHandle; use crate::config::{iroh_data_root, ConsoleEnv, NodeConfig}; @@ -82,7 +82,7 @@ pub enum Commands { } impl Cli { - pub async fn run(self, rt: runtime::Handle) -> Result<()> { + pub async fn run(self, rt: LocalPoolHandle) -> Result<()> { match self.command { Commands::Console => { let env = ConsoleEnv::for_console()?; @@ -94,7 +94,7 @@ impl Cli { }) .await } else { - let iroh = iroh_quic_connect(rt).await.context("rpc connect")?; + let iroh = iroh_quic_connect().await.context("rpc connect")?; console::run(&iroh, &env).await } } @@ -108,7 +108,7 @@ impl Cli { }) .await } else { - let iroh = iroh_quic_connect(rt).await.context("rpc connect")?; + let iroh = iroh_quic_connect().await.context("rpc connect")?; command.run(&iroh, &env).await } } @@ -145,7 +145,7 @@ impl Cli { } } -async fn iroh_quic_connect(rt: runtime::Handle) -> Result { +async fn iroh_quic_connect() -> Result { let root = iroh_data_root()?; let rpc_status = RpcStatus::load(root).await?; match rpc_status { @@ -153,7 +153,7 @@ async fn iroh_quic_connect(rt: runtime::Handle) -> Result { - let iroh = iroh::client::quic::connect(rpc_port, Some(rt)) + let iroh = iroh::client::quic::connect(rpc_port) .await .context("quic::connect")?; Ok(iroh) diff --git a/iroh/src/commands/rpc.rs b/iroh/src/commands/rpc.rs index e8dae3e7cd..1650fb9e6b 100644 --- a/iroh/src/commands/rpc.rs +++ b/iroh/src/commands/rpc.rs @@ -104,10 +104,7 @@ impl RpcStatus { .await .context("read rpc lock file")?; let running_rpc_port = u16::from_le_bytes(buffer); - if iroh::client::quic::connect(running_rpc_port, None) - .await - .is_ok() - { + if iroh::client::quic::connect(running_rpc_port).await.is_ok() { return Ok(RpcStatus::Running(running_rpc_port)); } } diff --git a/iroh/src/commands/start.rs b/iroh/src/commands/start.rs index 2390784b99..197e6b16fa 100644 --- a/iroh/src/commands/start.rs +++ b/iroh/src/commands/start.rs @@ -15,12 +15,12 @@ use iroh::{ rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService}, util::{fs::load_secret_key, path::IrohPaths}, }; -use iroh_bytes::util::runtime; use iroh_net::{ derp::{DerpMap, DerpMode}, key::SecretKey, }; use quic_rpc::{transport::quinn::QuinnServerEndpoint, ServiceEndpoint}; +use tokio_util::task::LocalPoolHandle; use tracing::{info_span, Instrument}; use crate::config::{iroh_data_root, path_with_env, NodeConfig}; @@ -52,7 +52,7 @@ pub struct StartArgs { impl StartArgs { pub async fn run_with_command( self, - rt: &runtime::Handle, + rt: &LocalPoolHandle, config: &NodeConfig, run_type: RunType, command: F, @@ -62,7 +62,7 @@ impl StartArgs { T: Future> + 'static, { #[cfg(feature = "metrics")] - let metrics_fut = start_metrics_server(config.metrics_addr, rt); + let metrics_fut = start_metrics_server(config.metrics_addr); let res = self .run_with_command_inner(rt, config, run_type, command) @@ -80,7 +80,7 @@ impl StartArgs { async fn run_with_command_inner( self, - rt: &runtime::Handle, + rt: &LocalPoolHandle, config: &NodeConfig, run_type: RunType, command: F, @@ -99,7 +99,7 @@ impl StartArgs { let client = node.client(); - let mut command_task = rt.local_pool().spawn_pinned(move || { + let mut command_task = rt.spawn_pinned(move || { async move { match command(client).await { Err(err) => Err(err), @@ -141,7 +141,7 @@ impl StartArgs { async fn start_node( &self, - rt: &runtime::Handle, + rt: &LocalPoolHandle, derp_map: Option, ) -> Result> { let rpc_status = RpcStatus::load(iroh_data_root()?).await?; @@ -161,7 +161,7 @@ impl StartArgs { tokio::fs::create_dir_all(&blob_dir).await?; tokio::fs::create_dir_all(&partial_blob_dir).await?; let bao_store = - iroh_bytes::store::flat::Store::load(&blob_dir, &partial_blob_dir, &meta_dir, rt) + iroh_bytes::store::flat::Store::load(&blob_dir, &partial_blob_dir, &meta_dir) .await .with_context(|| { format!("Failed to load iroh database from {}", blob_dir.display()) @@ -179,7 +179,7 @@ impl StartArgs { Node::builder(bao_store, doc_store) .derp_mode(derp_mode) .peers_data_path(peers_data_path) - .runtime(rt) + .local_pool(rt) .rpc_endpoint(rpc_endpoint) .secret_key(secret_key) .spawn() @@ -267,13 +267,12 @@ fn create_spinner(msg: &'static str) -> ProgressBar { #[cfg(feature = "metrics")] pub fn start_metrics_server( metrics_addr: Option, - rt: &iroh_bytes::util::runtime::Handle, ) -> Option> { // doesn't start the server if the address is None if let Some(metrics_addr) = metrics_addr { // metrics are initilaized in iroh::node::Node::spawn // here we only start the server - return Some(rt.main().spawn(async move { + return Some(tokio::task::spawn(async move { if let Err(e) = iroh_metrics::metrics::start_metrics_server(metrics_addr).await { eprintln!("Failed to start metrics server: {e}"); } diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index 6b138d07cc..198635d24e 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -43,7 +43,7 @@ use tokio::{ sync::{mpsc, oneshot}, task::JoinSet, }; -use tokio_util::{sync::CancellationToken, time::delay_queue}; +use tokio_util::{sync::CancellationToken, task::LocalPoolHandle, time::delay_queue}; use tracing::{debug, error_span, trace, warn, Instrument}; mod get; @@ -224,7 +224,7 @@ pub struct Downloader { impl Downloader { /// Create a new Downloader. - pub fn new(store: S, endpoint: MagicEndpoint, rt: iroh_bytes::util::runtime::Handle) -> Self + pub fn new(store: S, endpoint: MagicEndpoint, rt: LocalPoolHandle) -> Self where S: Store, { @@ -240,7 +240,7 @@ impl Downloader { service.run().instrument(error_span!("downloader", %me)) }; - rt.local_pool().spawn_pinned(create_future); + rt.spawn_pinned(create_future); Self { next_id: Arc::new(AtomicU64::new(0)), msg_tx, diff --git a/iroh/src/downloader/test.rs b/iroh/src/downloader/test.rs index d460aa45fa..6f7029ad2f 100644 --- a/iroh/src/downloader/test.rs +++ b/iroh/src/downloader/test.rs @@ -16,16 +16,13 @@ impl Downloader { ) -> Self { let (msg_tx, msg_rx) = mpsc::channel(super::SERVICE_CHANNEL_CAPACITY); - iroh_bytes::util::runtime::Handle::from_current(1) - .unwrap() - .local_pool() - .spawn_pinned(move || async move { - // we want to see the logs of the service - let _guard = iroh_test::logging::setup(); + LocalPoolHandle::new(1).spawn_pinned(move || async move { + // we want to see the logs of the service + let _guard = iroh_test::logging::setup(); - let service = Service::new(getter, dialer, concurrency_limits, msg_rx); - service.run().await - }); + let service = Service::new(getter, dialer, concurrency_limits, msg_rx); + service.run().await + }); Downloader { next_id: Arc::new(AtomicU64::new(0)), diff --git a/iroh/src/main.rs b/iroh/src/main.rs index d15a30f907..1dba2af717 100644 --- a/iroh/src/main.rs +++ b/iroh/src/main.rs @@ -24,14 +24,12 @@ fn main() -> Result<()> { } async fn main_impl() -> Result<()> { - let tokio = tokio::runtime::Handle::current(); - let tpc = tokio_util::task::LocalPoolHandle::new(num_cpus::get()); - let rt = iroh::bytes::util::runtime::Handle::new(tokio, tpc); + let lp = tokio_util::task::LocalPoolHandle::new(num_cpus::get()); tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) .with(EnvFilter::from_default_env()) .init(); let cli = Cli::parse(); - cli.run(rt).await + cli.run(lp).await } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index b9a8016261..c58a4ec0db 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -27,8 +27,7 @@ use iroh_bytes::store::{ }; use iroh_bytes::util::progress::{FlumeProgressSender, IdGenerator, ProgressSender}; use iroh_bytes::{ - protocol::Closed, provider::AddProgress, util::runtime, BlobFormat, Hash, HashAndFormat, - TempTag, + protocol::Closed, provider::AddProgress, BlobFormat, Hash, HashAndFormat, TempTag, }; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_io::AsyncSliceReader; @@ -49,6 +48,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::task::JoinError; use tokio_util::sync::CancellationToken; +use tokio_util::task::LocalPoolHandle; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::downloader::Downloader; @@ -128,7 +128,7 @@ where keylog: bool, derp_mode: DerpMode, gc_policy: GcPolicy, - rt: Option, + rt: Option, docs: S, /// Path to store peer data. If `None`, peer data will not be persisted. peers_data_path: Option, @@ -235,7 +235,7 @@ where /// Sets the tokio runtime to use. /// /// If not set, the current runtime will be picked up. - pub fn runtime(mut self, rt: &runtime::Handle) -> Self { + pub fn local_pool(mut self, rt: &LocalPoolHandle) -> Self { self.rt = Some(rt.clone()); self } @@ -247,7 +247,9 @@ where /// get information about it. pub async fn spawn(self) -> Result> { trace!("spawning node"); - let rt = self.rt.context("runtime not set")?; + let lp = self + .rt + .unwrap_or_else(|| LocalPoolHandle::new(num_cpus::get())); // Initialize the metrics collection. // // The metrics are global per process. Subsequent calls do not change the metrics @@ -293,10 +295,9 @@ where let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info); // spawn the sync engine - let downloader = Downloader::new(self.db.clone(), endpoint.clone(), rt.clone()); + let downloader = Downloader::new(self.db.clone(), endpoint.clone(), lp.clone()); let ds = self.docs.clone(); let sync = SyncEngine::spawn( - rt.clone(), endpoint.clone(), gossip.clone(), self.docs, @@ -309,16 +310,12 @@ where tracing::info!("Starting GC task with interval {:?}", gc_period); let db = self.db.clone(); let callbacks = callbacks.clone(); - let task = rt - .local_pool() - .spawn_pinned(move || Self::gc_loop(db, ds, gc_period, callbacks)); + let task = lp.spawn_pinned(move || Self::gc_loop(db, ds, gc_period, callbacks)); Some(AbortingJoinHandle(task)) } else { None }; let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); - let rt2 = rt.clone(); - let rt3 = rt.clone(); let inner = Arc::new(NodeInner { db: self.db, endpoint: endpoint.clone(), @@ -328,7 +325,7 @@ where callbacks: callbacks.clone(), cb_sender, gc_task, - rt: rt.clone(), + rt: lp.clone(), sync, }); let task = { @@ -337,7 +334,7 @@ where inner: inner.clone(), }; let me = endpoint.node_id().fmt_short(); - rt2.main().spawn( + tokio::task::spawn( async move { Self::run( endpoint, @@ -346,7 +343,6 @@ where handler, self.rpc_endpoint, internal_rpc, - rt3, gossip, ) .await @@ -362,7 +358,7 @@ where // spawn a task that updates the gossip endpoints. let (first_endpoint_update_tx, first_endpoint_update_rx) = oneshot::channel(); let mut first_endpoint_update_tx = Some(first_endpoint_update_tx); - rt.main().spawn(async move { + tokio::task::spawn(async move { while let Ok(eps) = endpoints_update_r.recv_async().await { if let Err(err) = gossip.update_endpoints(&eps) { warn!("Failed to update gossip endpoints: {err:?}"); @@ -390,7 +386,6 @@ where handler: RpcHandler, rpc: E, internal_rpc: impl ServiceEndpoint, - rt: runtime::Handle, gossip: Gossip, ) { let rpc = RpcServer::new(rpc); @@ -423,7 +418,7 @@ where request = rpc.accept() => { match request { Ok((msg, chan)) => { - handle_rpc_request(msg, chan, &handler, &rt); + handle_rpc_request(msg, chan, &handler); } Err(e) => { info!("rpc request error: {:?}", e); @@ -434,7 +429,7 @@ where request = internal_rpc.accept() => { match request { Ok((msg, chan)) => { - handle_rpc_request(msg, chan, &handler, &rt); + handle_rpc_request(msg, chan, &handler); } Err(_) => { info!("last controller dropped, shutting down"); @@ -454,7 +449,7 @@ where let gossip = gossip.clone(); let inner = handler.inner.clone(); let sync = handler.inner.sync.clone(); - rt.main().spawn(async move { + tokio::task::spawn(async move { if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync).await { warn!("Handling incoming connection ended with error: {err}"); } @@ -636,7 +631,8 @@ struct NodeInner { callbacks: Callbacks, #[allow(dead_code)] gc_task: Option>, - rt: runtime::Handle, + #[debug("rt")] + rt: LocalPoolHandle, pub(crate) sync: SyncEngine, } @@ -709,7 +705,7 @@ impl Node { /// Return a client to control this node over an in-memory channel. pub fn client(&self) -> crate::client::mem::Iroh { - crate::client::Iroh::new(self.controller(), self.inner.rt.clone()) + crate::client::Iroh::new(self.controller()) } /// Return a single token containing everything needed to get a hash. @@ -783,7 +779,7 @@ struct RpcHandler { } impl RpcHandler { - fn rt(&self) -> runtime::Handle { + fn rt(&self) -> LocalPoolHandle { self.inner.rt.clone() } @@ -811,7 +807,7 @@ impl RpcHandler { _msg: BlobListIncompleteRequest, ) -> impl Stream + Send + 'static { let db = self.inner.db.clone(); - let local = self.inner.rt.local_pool().clone(); + let local = self.inner.rt.clone(); futures::stream::iter(db.partial_blobs()).filter_map(move |hash| { let db = db.clone(); let t = local.spawn_pinned(move || async move { @@ -836,7 +832,7 @@ impl RpcHandler { _msg: BlobListCollectionsRequest, ) -> impl Stream + Send + 'static { let db = self.inner.db.clone(); - let local = self.inner.rt.local_pool().clone(); + let local = self.inner.rt.clone(); let tags = db.tags(); futures::stream::iter(tags).filter_map(move |(name, HashAndFormat { hash, format })| { let db = db.clone(); @@ -898,7 +894,7 @@ impl RpcHandler { let (tx, rx) = mpsc::channel(1); let tx2 = tx.clone(); let db = self.inner.db.clone(); - self.rt().main().spawn(async move { + tokio::task::spawn(async move { if let Err(e) = db.validate(tx).await { tx2.send(ValidateProgress::Abort(e.into())).await.unwrap(); } @@ -913,7 +909,7 @@ impl RpcHandler { // provide a little buffer so that we don't slow down the sender let (tx, rx) = flume::bounded(32); let tx2 = tx.clone(); - self.rt().local_pool().spawn_pinned(|| async move { + self.rt().spawn_pinned(|| async move { if let Err(e) = self.blob_add_from_path0(msg, tx).await { tx2.send_async(AddProgress::Abort(e.into())).await.ok(); } @@ -928,7 +924,7 @@ impl RpcHandler { // provide a little buffer so that we don't slow down the sender let (tx, rx) = flume::bounded(32); let tx2 = tx.clone(); - self.rt().local_pool().spawn_pinned(|| async move { + self.rt().spawn_pinned(|| async move { if let Err(e) = self.doc_import_file0(msg, tx).await { tx2.send_async(DocImportProgress::Abort(e.into())) .await @@ -1015,7 +1011,7 @@ impl RpcHandler { ) -> impl Stream { let (tx, rx) = flume::bounded(1024); let tx2 = tx.clone(); - self.rt().local_pool().spawn_pinned(|| async move { + self.rt().spawn_pinned(|| async move { if let Err(e) = self.doc_export_file0(msg, tx).await { tx2.send_async(DocExportProgress::Abort(e.into())) .await @@ -1117,7 +1113,7 @@ impl RpcHandler { msg: BlobDownloadRequest, progress: impl ProgressSender + IdGenerator, ) -> anyhow::Result<()> { - let local = self.inner.rt.local_pool().clone(); + let local = self.inner.rt.clone(); let hash = msg.hash; let format = msg.format; let db = self.inner.db.clone(); @@ -1384,7 +1380,7 @@ impl RpcHandler { let (tx, rx) = flume::bounded(32); let this = self.clone(); - self.rt().local_pool().spawn_pinned(|| async move { + self.rt().spawn_pinned(|| async move { if let Err(err) = this.blob_add_stream0(msg, stream, tx.clone()).await { tx.send_async(AddProgress::Abort(err.into())).await.ok(); } @@ -1453,7 +1449,7 @@ impl RpcHandler { ) -> impl Stream> + Send + 'static { let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP); let entry = self.inner.db.get(&req.hash); - self.inner.rt.local_pool().spawn_pinned(move || async move { + self.inner.rt.spawn_pinned(move || async move { if let Err(err) = read_loop(entry, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { tx.send_async(RpcResult::Err(err.into())).await.ok(); } @@ -1497,7 +1493,7 @@ impl RpcHandler { ) -> impl Stream> + Send + 'static { // provide a little buffer so that we don't slow down the sender let (tx, rx) = flume::bounded(32); - self.rt().local_pool().spawn_pinned(|| async move { + self.rt().spawn_pinned(|| async move { match self.inner.endpoint.connection_infos().await { Ok(mut conn_infos) => { conn_infos.sort_by_key(|n| n.public_key.to_string()); @@ -1529,10 +1525,9 @@ fn handle_rpc_request>( msg: ProviderRequest, chan: RpcChannel, handler: &RpcHandler, - rt: &runtime::Handle, ) { let handler = handler.clone(); - rt.main().spawn(async move { + tokio::task::spawn(async move { use ProviderRequest::*; debug!("handling rpc request: {msg}"); match msg { @@ -1744,23 +1739,17 @@ mod tests { use super::*; - /// Pick up the tokio runtime from the thread local and add a - /// thread per core runtime. - fn test_runtime() -> runtime::Handle { - runtime::Handle::from_current(1).unwrap() - } - #[tokio::test] async fn test_ticket_multiple_addrs() { let _guard = iroh_test::logging::setup(); - let rt = test_runtime(); + let lp = LocalPoolHandle::new(1); let (db, hashes) = iroh_bytes::store::readonly_mem::Store::new([("test", b"hello")]); let doc_store = iroh_sync::store::memory::Store::default(); let hash = hashes["test"].into(); let node = Node::builder(db, doc_store) .bind_port(0) - .runtime(&rt) + .local_pool(&lp) .spawn() .await .unwrap(); @@ -1775,12 +1764,11 @@ mod tests { let _guard = iroh_test::logging::setup(); use std::io::Cursor; - let rt = runtime::Handle::from_current(1)?; - let db = iroh_bytes::store::mem::Store::new(rt); + let db = iroh_bytes::store::mem::Store::new(); let doc_store = iroh_sync::store::memory::Store::default(); let node = Node::builder(db, doc_store) .bind_port(0) - .runtime(&test_runtime()) + .local_pool(&LocalPoolHandle::new(1)) .spawn() .await?; @@ -1800,14 +1788,9 @@ mod tests { async fn test_node_add_tagged_blob_event() -> Result<()> { let _guard = iroh_test::logging::setup(); - let rt = runtime::Handle::from_current(1)?; - let db = iroh_bytes::store::mem::Store::new(rt); + let db = iroh_bytes::store::mem::Store::new(); let doc_store = iroh_sync::store::memory::Store::default(); - let node = Node::builder(db, doc_store) - .bind_port(0) - .runtime(&test_runtime()) - .spawn() - .await?; + let node = Node::builder(db, doc_store).bind_port(0).spawn().await?; let _drop_guard = node.cancel_token().drop_guard(); diff --git a/iroh/src/sync_engine.rs b/iroh/src/sync_engine.rs index 097fe7bb46..71b7094800 100644 --- a/iroh/src/sync_engine.rs +++ b/iroh/src/sync_engine.rs @@ -9,7 +9,7 @@ use futures::{ future::{BoxFuture, FutureExt, Shared}, Stream, TryStreamExt, }; -use iroh_bytes::{store::EntryStatus, util::runtime::Handle, Hash}; +use iroh_bytes::{store::EntryStatus, Hash}; use iroh_gossip::net::Gossip; use iroh_net::{key::PublicKey, MagicEndpoint, NodeAddr}; use iroh_sync::{ @@ -46,7 +46,6 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256; /// implementations in [rpc]. #[derive(derive_more::Debug, Clone)] pub struct SyncEngine { - pub(crate) rt: Handle, pub(crate) endpoint: MagicEndpoint, pub(crate) sync: SyncHandle, to_live_actor: mpsc::Sender, @@ -61,7 +60,6 @@ impl SyncEngine { /// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a /// thread for the [`iroh_sync::actor::SyncHandle`]. pub fn spawn( - rt: Handle, endpoint: MagicEndpoint, gossip: Gossip, replica_store: S, @@ -99,7 +97,7 @@ impl SyncEngine { downloader, live_actor_tx.clone(), ); - let live_actor_task = rt.main().spawn( + let live_actor_task = tokio::task::spawn( async move { if let Err(err) = actor.run().await { error!("sync actor failed: {err:?}"); @@ -107,7 +105,7 @@ impl SyncEngine { } .instrument(error_span!("sync", %me)), ); - let gossip_actor_task = rt.main().spawn( + let gossip_actor_task = tokio::task::spawn( async move { if let Err(err) = gossip_actor.run().await { error!("gossip recv actor failed: {err:?}"); @@ -130,7 +128,6 @@ impl SyncEngine { .shared(); Self { - rt, endpoint, sync, to_live_actor: live_actor_tx, diff --git a/iroh/src/sync_engine/rpc.rs b/iroh/src/sync_engine/rpc.rs index 8760d71c2a..4cbacaddd8 100644 --- a/iroh/src/sync_engine/rpc.rs +++ b/iroh/src/sync_engine/rpc.rs @@ -46,7 +46,7 @@ impl SyncEngine { let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. - self.rt.main().spawn(async move { + tokio::task::spawn(async move { let tx2 = tx.clone(); if let Err(err) = sync.list_authors(tx).await { tx2.send_async(Err(err)).await.ok(); @@ -78,7 +78,7 @@ impl SyncEngine { let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. - self.rt.main().spawn(async move { + tokio::task::spawn(async move { let tx2 = tx.clone(); if let Err(err) = sync.list_replicas(tx).await { tx2.send_async(Err(err)).await.ok(); @@ -215,7 +215,7 @@ impl SyncEngine { let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. - self.rt.main().spawn(async move { + tokio::task::spawn(async move { let tx2 = tx.clone(); if let Err(err) = sync.get_many(doc_id, query, tx).await { tx2.send_async(Err(err)).await.ok(); diff --git a/iroh/tests/cli.rs b/iroh/tests/cli.rs index e6e69dbc4c..d8058c4167 100644 --- a/iroh/tests/cli.rs +++ b/iroh/tests/cli.rs @@ -407,20 +407,15 @@ fn cli_provide_persistence() -> anyhow::Result<()> { anyhow::Ok(()) }; provide(&foo_path)?; - let tokio = tokio::runtime::Builder::new_multi_thread().build()?; - let rt = iroh_bytes::util::runtime::Handle::new( - tokio.handle().clone(), - tokio_util::task::LocalPoolHandle::new(1), - ); // should have some data now let db_path = IrohPaths::BaoFlatStoreComplete.with_root(&iroh_data_dir); - let db = Store::load_blocking(&db_path, &db_path, &db_path, &rt)?; + let db = Store::load_blocking(&db_path, &db_path, &db_path)?; let blobs = db.blobs().collect::>(); assert_eq!(blobs.len(), 3); provide(&bar_path)?; // should have more data now - let db = Store::load_blocking(&db_path, &db_path, &db_path, &rt)?; + let db = Store::load_blocking(&db_path, &db_path, &db_path)?; let blobs = db.blobs().collect::>(); assert_eq!(blobs.len(), 6); diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index 8220792439..1d608f873a 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -9,15 +9,10 @@ use rand::RngCore; use iroh_bytes::{ hashseq::HashSeq, store::{EntryStatus, Map, Store}, - util::{runtime, Tag}, + util::Tag, BlobFormat, HashAndFormat, }; - -/// Pick up the tokio runtime from the thread local and add a -/// thread per core runtime. -fn test_runtime() -> runtime::Handle { - runtime::Handle::from_current(1).unwrap() -} +use tokio_util::task::LocalPoolHandle; fn create_test_data(n: usize) -> Bytes { let mut rng = rand::thread_rng(); @@ -27,18 +22,14 @@ fn create_test_data(n: usize) -> Bytes { } /// Wrap a bao store in a node that has gc enabled. -async fn wrap_in_node( - bao_store: S, - rt: iroh_bytes::util::runtime::Handle, - gc_period: Duration, -) -> Node +async fn wrap_in_node(bao_store: S, gc_period: Duration) -> Node where S: iroh_bytes::store::Store, { let doc_store = iroh_sync::store::memory::Store::default(); Node::builder(bao_store, doc_store) - .runtime(&rt) .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) + .local_pool(&LocalPoolHandle::new(1)) .spawn() .await .unwrap() @@ -67,9 +58,8 @@ async fn gc_test_node() -> ( iroh_bytes::store::mem::Store, flume::Receiver, ) { - let rt = test_runtime(); - let bao_store = iroh_bytes::store::mem::Store::new(rt.clone()); - let node = wrap_in_node(bao_store.clone(), rt, Duration::from_millis(50)).await; + let bao_store = iroh_bytes::store::mem::Store::new(); + let node = wrap_in_node(bao_store.clone(), Duration::from_millis(50)).await; let db_recv = attach_db_events(&node).await; (node, bao_store, db_recv) } @@ -275,15 +265,13 @@ mod flat { #[tokio::test] async fn gc_flat_basics() -> Result<()> { let _ = tracing_subscriber::fmt::try_init(); - let rt = test_runtime(); let dir = testdir!(); let path = data_path(dir.clone()); let outboard_path = outboard_path(dir.clone()); let bao_store = - iroh_bytes::store::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt) - .await?; - let node = wrap_in_node(bao_store.clone(), rt, Duration::from_millis(0)).await; + iroh_bytes::store::flat::Store::load(dir.clone(), dir.clone(), dir.clone()).await?; + let node = wrap_in_node(bao_store.clone(), Duration::from_millis(0)).await; let evs = attach_db_events(&node).await; let data1 = create_test_data(123456); let tt1 = bao_store @@ -433,15 +421,13 @@ mod flat { #[tokio::test] async fn gc_flat_partial() -> Result<()> { let _ = tracing_subscriber::fmt::try_init(); - let rt = test_runtime(); let dir = testdir!(); let count_partial_data = count_partial_data(dir.clone()); let count_partial_outboard = count_partial_outboard(dir.clone()); let bao_store = - iroh_bytes::store::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt) - .await?; - let node = wrap_in_node(bao_store.clone(), rt, Duration::from_millis(0)).await; + iroh_bytes::store::flat::Store::load(dir.clone(), dir.clone(), dir.clone()).await?; + let node = wrap_in_node(bao_store.clone(), Duration::from_millis(0)).await; let evs = attach_db_events(&node).await; let data1: Bytes = create_test_data(123456); @@ -475,15 +461,13 @@ mod flat { #[tokio::test] async fn gc_flat_stress() -> Result<()> { let _ = tracing_subscriber::fmt::try_init(); - let rt = test_runtime(); let dir = testdir!(); let count_partial_data = count_partial_data(dir.clone()); let count_partial_outboard = count_partial_outboard(dir.clone()); let bao_store = - iroh_bytes::store::flat::Store::load(dir.clone(), dir.clone(), dir.clone(), &rt) - .await?; - let node = wrap_in_node(bao_store.clone(), rt, Duration::from_secs(1)).await; + iroh_bytes::store::flat::Store::load(dir.clone(), dir.clone(), dir.clone()).await?; + let node = wrap_in_node(bao_store.clone(), Duration::from_secs(1)).await; let evs = attach_db_events(&node).await; let mut deleted = Vec::new(); diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index cfdf1cb8fa..d233817b6d 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -28,15 +28,15 @@ use iroh_bytes::{ protocol::{GetRequest, RangeSpecSeq}, provider, store::{PartialMap, Store}, - util::runtime, BlobFormat, Hash, }; use iroh_sync::store; +use tokio_util::task::LocalPoolHandle; /// Pick up the tokio runtime from the thread local and add a /// thread per core runtime. -fn test_runtime() -> runtime::Handle { - runtime::Handle::from_current(1).unwrap() +fn test_local_pool() -> LocalPoolHandle { + LocalPoolHandle::new(1) } fn test_node(db: D) -> Builder { @@ -47,10 +47,10 @@ fn test_node(db: D) -> Builder Result<()> { let _guard = iroh_test::logging::setup(); - let rt = test_runtime(); + let lp = test_local_pool(); transfer_data( vec![("hello_world", "hello world!".as_bytes().to_vec())], - &rt, + &lp, ) .await } @@ -58,7 +58,7 @@ async fn basics() -> Result<()> { #[tokio::test] async fn multi_file() -> Result<()> { let _guard = iroh_test::logging::setup(); - let rt = test_runtime(); + let lp = test_local_pool(); let file_opts = vec![ ("1", 10), @@ -67,13 +67,13 @@ async fn multi_file() -> Result<()> { // overkill, but it works! Just annoying to wait for // ("4", 1024 * 1024 * 90), ]; - transfer_random_data(file_opts, &rt).await + transfer_random_data(file_opts, &lp).await } #[tokio::test] async fn many_files() -> Result<()> { let _guard = iroh_test::logging::setup(); - let rt = test_runtime(); + let lp = test_local_pool(); let num_files = [10, 100]; for num in num_files { println!("NUM_FILES: {num}"); @@ -84,7 +84,7 @@ async fn many_files() -> Result<()> { (name, 10) }) .collect(); - transfer_random_data(file_opts, &rt).await?; + transfer_random_data(file_opts, &lp).await?; } Ok(()) } @@ -92,7 +92,7 @@ async fn many_files() -> Result<()> { #[tokio::test] async fn sizes() -> Result<()> { let _guard = iroh_test::logging::setup(); - let rt = test_runtime(); + let lp = test_local_pool(); let sizes = [ 0, @@ -108,7 +108,7 @@ async fn sizes() -> Result<()> { for size in sizes { let now = Instant::now(); - transfer_random_data(vec![("hello_world", size)], &rt).await?; + transfer_random_data(vec![("hello_world", size)], &lp).await?; println!(" took {}ms", now.elapsed().as_millis()); } @@ -117,7 +117,7 @@ async fn sizes() -> Result<()> { #[tokio::test] async fn empty_files() -> Result<()> { - let rt = test_runtime(); + let lp = test_local_pool(); // try to transfer as many files as possible without hitting a limit // booo 400 is too small :( let num_files = 400; @@ -125,7 +125,7 @@ async fn empty_files() -> Result<()> { for i in 0..num_files { file_opts.push((i.to_string(), 0)); } - transfer_random_data(file_opts, &rt).await + transfer_random_data(file_opts, &lp).await } /// Create new get options with the given node id and addresses, using a @@ -155,9 +155,8 @@ async fn multiple_clients() -> Result<()> { 0, )?; let hash = db.insert_many(collection.to_blobs()).unwrap(); - let rt = test_runtime(); - let node = test_node(db).runtime(&rt).spawn().await?; - + let lp = test_local_pool(); + let node = test_node(db).local_pool(&lp).spawn().await?; let mut tasks = Vec::new(); for _i in 0..3 { let file_hash: Hash = expect_hash; @@ -166,7 +165,7 @@ async fn multiple_clients() -> Result<()> { let peer_id = node.node_id(); let content = content.to_vec(); - tasks.push(rt.local_pool().spawn_pinned(move || { + tasks.push(lp.spawn_pinned(move || { async move { let opts = get_options(peer_id, addrs); let expected_data = &content; @@ -190,10 +189,7 @@ async fn multiple_clients() -> Result<()> { // Run the test creating random data for each blob, using the size specified by the file // options -async fn transfer_random_data( - file_opts: Vec<(S, usize)>, - rt: &crate::runtime::Handle, -) -> Result<()> +async fn transfer_random_data(file_opts: Vec<(S, usize)>, rt: &LocalPoolHandle) -> Result<()> where S: Into + std::fmt::Debug + std::cmp::PartialEq + Clone, { @@ -209,7 +205,7 @@ where } // Run the test for a vec of filenames and blob data -async fn transfer_data(file_opts: Vec<(S, Vec)>, rt: &crate::runtime::Handle) -> Result<()> +async fn transfer_data(file_opts: Vec<(S, Vec)>, rt: &LocalPoolHandle) -> Result<()> where S: Into + std::fmt::Debug + std::cmp::PartialEq + Clone, { @@ -245,7 +241,7 @@ where // sort expects by name to match the canonical order of blobs expects.sort_by(|a, b| a.0.cmp(&b.0)); - let node = test_node(mdb.clone()).runtime(rt).spawn().await?; + let node = test_node(mdb.clone()).local_pool(rt).spawn().await?; let (events_sender, mut events_recv) = mpsc::unbounded_channel(); @@ -336,7 +332,7 @@ fn assert_events(events: Vec, num_blobs: usize) { #[tokio::test] async fn test_server_close() { - let rt = test_runtime(); + let lp = test_local_pool(); // Prepare a Provider transferring a file. let _guard = iroh_test::logging::setup(); let mut db = iroh_bytes::store::readonly_mem::Store::default(); @@ -350,7 +346,7 @@ async fn test_server_close() { ) .unwrap(); let hash = db.insert_many(collection.to_blobs()).unwrap(); - let mut node = test_node(db).runtime(&rt).spawn().await.unwrap(); + let mut node = test_node(db).local_pool(&lp).spawn().await.unwrap(); let node_addr = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); @@ -419,10 +415,10 @@ fn create_test_db( #[tokio::test] async fn test_ipv6() { let _guard = iroh_test::logging::setup(); - let rt = test_runtime(); + let lp = test_local_pool(); let (db, hash) = create_test_db([("test", b"hello")]); - let node = match test_node(db).runtime(&rt).spawn().await { + let node = match test_node(db).local_pool(&lp).spawn().await { Ok(provider) => provider, Err(_) => { // We assume the problem here is IPv6 on this host. If the problem is @@ -446,11 +442,11 @@ async fn test_ipv6() { #[tokio::test] async fn test_not_found() { let _ = iroh_test::logging::setup(); - let rt = test_runtime(); + let lp = test_local_pool(); let db = iroh_bytes::store::readonly_mem::Store::default(); let hash = blake3::hash(b"hello").into(); - let node = match test_node(db).runtime(&rt).spawn().await { + let node = match test_node(db).local_pool(&lp).spawn().await { Ok(provider) => provider, Err(_) => { // We assume the problem here is IPv6 on this host. If the problem is @@ -487,13 +483,13 @@ async fn test_not_found() { #[tokio::test] async fn test_chunk_not_found_1() { let _ = iroh_test::logging::setup(); - let rt = test_runtime(); + let lp = test_local_pool(); - let db = iroh_bytes::store::mem::Store::new(rt.clone()); + let db = iroh_bytes::store::mem::Store::new(); let data = (0..1024 * 64).map(|i| i as u8).collect::>(); let hash = blake3::hash(&data).into(); let _entry = db.get_or_create_partial(hash, data.len() as u64).unwrap(); - let node = match test_node(db).runtime(&rt).spawn().await { + let node = match test_node(db).local_pool(&lp).spawn().await { Ok(provider) => provider, Err(_) => { // We assume the problem here is IPv6 on this host. If the problem is @@ -528,9 +524,9 @@ async fn test_chunk_not_found_1() { #[tokio::test] async fn test_run_ticket() { - let rt = test_runtime(); + let lp = test_local_pool(); let (db, hash) = create_test_db([("test", b"hello")]); - let node = test_node(db).runtime(&rt).spawn().await.unwrap(); + let node = test_node(db).local_pool(&lp).spawn().await.unwrap(); let _drop_guard = node.cancel_token().drop_guard(); let ticket = node.ticket(hash, BlobFormat::HashSeq).await.unwrap(); @@ -577,9 +573,9 @@ async fn run_collection_get_request( #[tokio::test] async fn test_run_fsm() { - let rt = test_runtime(); + let lp = test_local_pool(); let (db, hash) = create_test_db([("a", b"hello"), ("b", b"world")]); - let node = test_node(db).runtime(&rt).spawn().await.unwrap(); + let node = test_node(db).local_pool(&lp).spawn().await.unwrap(); let addrs = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); tokio::time::timeout(Duration::from_secs(10), async move { @@ -623,12 +619,12 @@ fn make_test_data(n: usize) -> Vec { /// The verified last chunk also verifies the size. #[tokio::test] async fn test_size_request_blob() { - let rt = test_runtime(); + let lp = test_local_pool(); let expected = make_test_data(1024 * 64 + 1234); let last_chunk = last_chunk(&expected); let (db, hashes) = iroh_bytes::store::readonly_mem::Store::new([("test", &expected)]); let hash = Hash::from(*hashes.values().next().unwrap()); - let node = test_node(db).runtime(&rt).spawn().await.unwrap(); + let node = test_node(db).local_pool(&lp).spawn().await.unwrap(); let addrs = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); tokio::time::timeout(Duration::from_secs(10), async move { @@ -651,11 +647,11 @@ async fn test_size_request_blob() { #[tokio::test] async fn test_collection_stat() { - let rt = test_runtime(); + let lp = test_local_pool(); let child1 = make_test_data(123456); let child2 = make_test_data(345678); let (db, hash) = create_test_db([("a", &child1), ("b", &child2)]); - let node = test_node(db.clone()).runtime(&rt).spawn().await.unwrap(); + let node = test_node(db.clone()).local_pool(&lp).spawn().await.unwrap(); let addrs = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); tokio::time::timeout(Duration::from_secs(10), async move { diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index ab3c38cdf5..471a84d3f7 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -16,10 +16,11 @@ use iroh::{ use iroh_net::key::{PublicKey, SecretKey}; use quic_rpc::transport::misc::DummyServerEndpoint; use rand::{CryptoRng, Rng, SeedableRng}; +use tokio_util::task::LocalPoolHandle; use tracing::{debug, info}; use tracing_subscriber::{prelude::*, EnvFilter}; -use iroh_bytes::{util::runtime, Hash}; +use iroh_bytes::Hash; use iroh_net::derp::DerpMode; use iroh_sync::{ store::{self, Query}, @@ -28,35 +29,27 @@ use iroh_sync::{ const TIMEOUT: Duration = Duration::from_secs(60); -/// Pick up the tokio runtime from the thread local and add a -/// thread per core runtime. -fn test_runtime() -> runtime::Handle { - runtime::Handle::from_current(1).unwrap() -} - fn test_node( - rt: runtime::Handle, secret_key: SecretKey, ) -> Builder { - let db = iroh_bytes::store::mem::Store::new(rt.clone()); + let db = iroh_bytes::store::mem::Store::new(); let store = iroh_sync::store::memory::Store::default(); Node::builder(db, store) + .local_pool(&LocalPoolHandle::new(1)) .secret_key(secret_key) .derp_mode(DerpMode::Disabled) - .runtime(&rt) } // The function is not `async fn` so that we can take a `&mut` borrow on the `rng` without // capturing that `&mut` lifetime in the returned future. This allows to call it in a loop while // still collecting the futures before awaiting them alltogether (see [`spawn_nodes`]) fn spawn_node( - rt: runtime::Handle, i: usize, rng: &mut (impl CryptoRng + Rng), ) -> impl Future>> + 'static { let secret_key = SecretKey::generate_with_rng(rng); async move { - let node = test_node(rt, secret_key); + let node = test_node(secret_key); let node = node.spawn().await?; info!(?i, me = %node.node_id().fmt_short(), "node spawned"); Ok(node) @@ -64,13 +57,12 @@ fn spawn_node( } async fn spawn_nodes( - rt: runtime::Handle, n: usize, mut rng: &mut (impl CryptoRng + Rng), ) -> anyhow::Result>> { let mut futs = vec![]; for i in 0..n { - futs.push(spawn_node(rt.clone(), i, &mut rng)); + futs.push(spawn_node(i, &mut rng)); } futures::future::join_all(futs).await.into_iter().collect() } @@ -84,8 +76,7 @@ pub fn test_rng(seed: &[u8]) -> rand_chacha::ChaCha12Rng { async fn sync_simple() -> Result<()> { setup_logging(); let mut rng = test_rng(b"sync_simple"); - let rt = test_runtime(); - let nodes = spawn_nodes(rt, 2, &mut rng).await?; + let nodes = spawn_nodes(2, &mut rng).await?; let clients = nodes.iter().map(|node| node.client()).collect::>(); // create doc on node0 @@ -140,8 +131,7 @@ async fn sync_simple() -> Result<()> { async fn sync_subscribe_no_sync() -> Result<()> { let mut rng = test_rng(b"sync_subscribe"); setup_logging(); - let rt = test_runtime(); - let node = spawn_node(rt, 0, &mut rng).await?; + let node = spawn_node(0, &mut rng).await?; let client = node.client(); let doc = client.docs.create().await?; let mut sub = doc.subscribe().await?; @@ -164,8 +154,7 @@ async fn sync_gossip_bulk() -> Result<()> { let mut rng = test_rng(b"sync_gossip_bulk"); setup_logging(); - let rt = test_runtime(); - let nodes = spawn_nodes(rt.clone(), 2, &mut rng).await?; + let nodes = spawn_nodes(2, &mut rng).await?; let clients = nodes.iter().map(|node| node.client()).collect::>(); let _peer0 = nodes[0].node_id(); @@ -249,8 +238,7 @@ async fn sync_gossip_bulk() -> Result<()> { async fn sync_full_basic() -> Result<()> { let mut rng = test_rng(b"sync_full_basic"); setup_logging(); - let rt = test_runtime(); - let mut nodes = spawn_nodes(rt.clone(), 2, &mut rng).await?; + let mut nodes = spawn_nodes(2, &mut rng).await?; let mut clients = nodes.iter().map(|node| node.client()).collect::>(); // peer0: create doc and ticket @@ -338,7 +326,7 @@ async fn sync_full_basic() -> Result<()> { // our gossip implementation does not allow us to filter message receivers this way. info!("peer2: spawn"); - nodes.push(spawn_node(rt.clone(), nodes.len(), &mut rng).await?); + nodes.push(spawn_node(nodes.len(), &mut rng).await?); clients.push(nodes.last().unwrap().client()); let doc2 = clients[2].docs.import(ticket).await?; let peer2 = nodes[2].node_id(); @@ -414,8 +402,7 @@ async fn sync_full_basic() -> Result<()> { async fn sync_open_close() -> Result<()> { let mut rng = test_rng(b"sync_subscribe_stop_close"); setup_logging(); - let rt = test_runtime(); - let node = spawn_node(rt, 0, &mut rng).await?; + let node = spawn_node(0, &mut rng).await?; let client = node.client(); let doc = client.docs.create().await?; @@ -439,8 +426,7 @@ async fn sync_open_close() -> Result<()> { async fn sync_subscribe_stop_close() -> Result<()> { let mut rng = test_rng(b"sync_subscribe_stop_close"); setup_logging(); - let rt = test_runtime(); - let node = spawn_node(rt, 0, &mut rng).await?; + let node = spawn_node(0, &mut rng).await?; let client = node.client(); let doc = client.docs.create().await?; @@ -732,12 +718,10 @@ impl PartialEq for (Entry, Bytes) { #[tokio::test] async fn doc_delete() -> Result<()> { - let rt = test_runtime(); - let db = iroh_bytes::store::mem::Store::new(rt.clone()); + let db = iroh_bytes::store::mem::Store::new(); let store = iroh_sync::store::memory::Store::default(); let node = Node::builder(db, store) .gc_policy(iroh::node::GcPolicy::Interval(Duration::from_millis(100))) - .runtime(&rt) .spawn() .await?; let client = node.client(); @@ -766,8 +750,7 @@ async fn doc_delete() -> Result<()> { async fn sync_drop_doc() -> Result<()> { let mut rng = test_rng(b"sync_drop_doc"); setup_logging(); - let rt = test_runtime(); - let node = spawn_node(rt, 0, &mut rng).await?; + let node = spawn_node(0, &mut rng).await?; let client = node.client(); let doc = client.docs.create().await?;