Skip to content

Commit

Permalink
refactor(iroh-bytes): remove explicitly passing the runtime to the fl…
Browse files Browse the repository at this point in the history
…at store (#1829)

## Description

the flat store will now assume that there is an ambient tokio runtime
and pick that up to do spawn_blocking in all async ops. I hate this, but
this is how tokio wants to be used, so what can you do.

It makes creating a store much simpler.

## Notes & open questions

-

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
rklaehn authored Nov 25, 2023
1 parent dfe7c0a commit 3d2e118
Show file tree
Hide file tree
Showing 24 changed files with 212 additions and 384 deletions.
5 changes: 3 additions & 2 deletions iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use iroh_io::stats::{
};
use iroh_io::{AsyncStreamWriter, TokioStreamWriter};
use serde::{Deserialize, Serialize};
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, debug_span, info, trace, warn};
use tracing_futures::Instrument;

Expand Down Expand Up @@ -351,7 +352,7 @@ pub async fn handle_connection<D: Map, E: EventSender>(
connecting: quinn::Connecting,
db: D,
events: E,
rt: crate::util::runtime::Handle,
rt: LocalPoolHandle,
) {
let remote_addr = connecting.remote_address();
let connection = match connecting.await {
Expand All @@ -376,7 +377,7 @@ pub async fn handle_connection<D: Map, E: EventSender>(
};
events.send(Event::ClientConnected { connection_id }).await;
let db = db.clone();
rt.local_pool().spawn_pinned(|| {
rt.spawn_pinned(|| {
async move {
if let Err(err) = handle_stream(db, reader, writer).await {
warn!("error: {err:#?}",);
Expand Down
59 changes: 17 additions & 42 deletions iroh-bytes/src/store/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,7 @@ impl PartialMap for Store {

fn insert_complete(&self, entry: Self::PartialEntry) -> BoxFuture<'_, io::Result<()>> {
let this = self.clone();
self.0
.options
.rt
.spawn_blocking(move || this.insert_complete_sync(entry))
tokio::task::spawn_blocking(move || this.insert_complete_sync(entry))
.map(flatten_to_io)
.boxed()
}
Expand All @@ -378,7 +375,6 @@ struct Options {
meta_path: PathBuf,
move_threshold: u64,
inline_threshold: u64,
rt: tokio::runtime::Handle,
}

impl Options {
Expand Down Expand Up @@ -678,8 +674,7 @@ impl ReadableStore for Store {
progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
) -> BoxFuture<'_, io::Result<()>> {
let this = self.clone();
self.rt()
.spawn_blocking(move || this.export_sync(hash, target, mode, progress))
tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress))
.map(flatten_to_io)
.boxed()
}
Expand All @@ -694,16 +689,14 @@ impl super::Store for Store {
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> BoxFuture<'_, io::Result<(TempTag, u64)>> {
let this = self.clone();
self.rt()
.spawn_blocking(move || this.import_file_sync(path, mode, format, progress))
tokio::task::spawn_blocking(move || this.import_file_sync(path, mode, format, progress))
.map(flatten_to_io)
.boxed()
}

fn import_bytes(&self, data: Bytes, format: BlobFormat) -> BoxFuture<'_, io::Result<TempTag>> {
let this = self.clone();
self.rt()
.spawn_blocking(move || this.import_bytes_sync(data, format))
tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format))
.map(flatten_to_io)
.boxed()
}
Expand All @@ -714,7 +707,6 @@ impl super::Store for Store {
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> BoxFuture<'_, io::Result<(TempTag, u64)>> {
let rt = self.rt().clone();
let this = self.clone();
async move {
let id = progress.new_id();
Expand All @@ -737,29 +729,25 @@ impl super::Store for Store {
writer.flush().await?;
drop(writer);
let file = ImportFile::TempFile(temp_data_path);
rt.spawn_blocking(move || this.finalize_import_sync(file, format, id, progress))
.map(flatten_to_io)
.await
tokio::task::spawn_blocking(move || {
this.finalize_import_sync(file, format, id, progress)
})
.map(flatten_to_io)
.await
}
.boxed()
}

fn create_tag(&self, value: HashAndFormat) -> BoxFuture<'_, io::Result<Tag>> {
let this = self.clone();
self.0
.options
.rt
.spawn_blocking(move || this.create_tag_sync(value))
tokio::task::spawn_blocking(move || this.create_tag_sync(value))
.map(flatten_to_io)
.boxed()
}

fn set_tag(&self, name: Tag, value: Option<HashAndFormat>) -> BoxFuture<'_, io::Result<()>> {
let this = self.clone();
self.0
.options
.rt
.spawn_blocking(move || this.set_tag_sync(name, value))
tokio::task::spawn_blocking(move || this.set_tag_sync(name, value))
.map(flatten_to_io)
.boxed()
}
Expand Down Expand Up @@ -788,10 +776,7 @@ impl super::Store for Store {
tracing::debug!("delete: {:?}", hash);
let this = self.clone();
let hash = *hash;
self.0
.options
.rt
.spawn_blocking(move || this.delete_sync(hash))
tokio::task::spawn_blocking(move || this.delete_sync(hash))
.map(flatten_to_io)
.boxed()
}
Expand Down Expand Up @@ -839,10 +824,6 @@ impl ImportFile {
}

impl Store {
fn rt(&self) -> &tokio::runtime::Handle {
&self.0.options.rt
}

fn temp_path(&self) -> PathBuf {
self.0.options.partial_path.join(temp_name())
}
Expand Down Expand Up @@ -1197,7 +1178,6 @@ impl Store {
complete_path: PathBuf,
partial_path: PathBuf,
meta_path: PathBuf,
rt: crate::util::runtime::Handle,
) -> anyhow::Result<Self> {
tracing::info!(
"loading database from {} {}",
Expand Down Expand Up @@ -1456,7 +1436,6 @@ impl Store {
meta_path,
move_threshold: 1024 * 128,
inline_threshold: 1024 * 16,
rt: rt.main().clone(),
},
complete_io_mutex: Mutex::new(()),
})))
Expand All @@ -1467,13 +1446,11 @@ impl Store {
complete_path: impl AsRef<Path>,
partial_path: impl AsRef<Path>,
meta_path: impl AsRef<Path>,
rt: &crate::util::runtime::Handle,
) -> anyhow::Result<Self> {
let complete_path = complete_path.as_ref().to_path_buf();
let partial_path = partial_path.as_ref().to_path_buf();
let meta_path = meta_path.as_ref().to_path_buf();
let rt = rt.clone();
let db = Self::load_sync(complete_path, partial_path, meta_path, rt)?;
let db = Self::load_sync(complete_path, partial_path, meta_path)?;
Ok(db)
}

Expand All @@ -1482,16 +1459,14 @@ impl Store {
complete_path: impl AsRef<Path>,
partial_path: impl AsRef<Path>,
meta_path: impl AsRef<Path>,
rt: &crate::util::runtime::Handle,
) -> anyhow::Result<Self> {
let complete_path = complete_path.as_ref().to_path_buf();
let partial_path = partial_path.as_ref().to_path_buf();
let meta_path = meta_path.as_ref().to_path_buf();
let rtc = rt.clone();
let db = rt
.main()
.spawn_blocking(move || Self::load_sync(complete_path, partial_path, meta_path, rtc))
.await??;
let db = tokio::task::spawn_blocking(move || {
Self::load_sync(complete_path, partial_path, meta_path)
})
.await??;
Ok(db)
}

Expand Down
67 changes: 27 additions & 40 deletions iroh-bytes/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
},
util::{
progress::{IdGenerator, IgnoreProgressSender, ProgressSender},
runtime, LivenessTracker,
LivenessTracker,
},
BlobFormat, Hash, HashAndFormat, Tag, TempTag, IROH_BLOCK_SIZE,
};
Expand Down Expand Up @@ -182,13 +182,12 @@ impl AsyncSliceWriter for MemFile {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
/// A full in memory database for iroh-bytes.
pub struct Store(Arc<Inner>);

#[derive(Debug)]
#[derive(Debug, Default)]
struct Inner {
rt: runtime::Handle,
state: RwLock<State>,
}

Expand Down Expand Up @@ -373,10 +372,7 @@ impl ReadableStore for Store {
progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
) -> BoxFuture<'_, io::Result<()>> {
let this = self.clone();
self.0
.rt
.main()
.spawn_blocking(move || this.export_sync(hash, target, mode, progress))
tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress))
.map(flatten_to_io)
.boxed()
}
Expand Down Expand Up @@ -458,25 +454,22 @@ impl super::Store for Store {
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> BoxFuture<'_, io::Result<(TempTag, u64)>> {
let this = self.clone();
self.0
.rt
.main()
.spawn_blocking(move || {
let id = progress.new_id();
progress.blocking_send(ImportProgress::Found {
id,
name: path.to_string_lossy().to_string(),
})?;
progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
// todo: provide progress for reading into mem
let bytes: Bytes = std::fs::read(path)?.into();
let size = bytes.len() as u64;
progress.blocking_send(ImportProgress::Size { id, size })?;
let tag = this.import_bytes_sync(id, bytes, format, progress)?;
Ok((tag, size))
})
.map(flatten_to_io)
.boxed()
tokio::task::spawn_blocking(move || {
let id = progress.new_id();
progress.blocking_send(ImportProgress::Found {
id,
name: path.to_string_lossy().to_string(),
})?;
progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
// todo: provide progress for reading into mem
let bytes: Bytes = std::fs::read(path)?.into();
let size = bytes.len() as u64;
progress.blocking_send(ImportProgress::Size { id, size })?;
let tag = this.import_bytes_sync(id, bytes, format, progress)?;
Ok((tag, size))
})
.map(flatten_to_io)
.boxed()
}

fn import_stream(
Expand Down Expand Up @@ -511,14 +504,11 @@ impl super::Store for Store {

fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> BoxFuture<'_, io::Result<TempTag>> {
let this = self.clone();
self.0
.rt
.main()
.spawn_blocking(move || {
this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default())
})
.map(flatten_to_io)
.boxed()
tokio::task::spawn_blocking(move || {
this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default())
})
.map(flatten_to_io)
.boxed()
}

fn set_tag(&self, name: Tag, value: Option<HashAndFormat>) -> BoxFuture<'_, io::Result<()>> {
Expand Down Expand Up @@ -582,11 +572,8 @@ impl LivenessTracker for Inner {

impl Store {
/// Create a new in memory database, using the given runtime.
pub fn new(rt: runtime::Handle) -> Self {
Self(Arc::new(Inner {
rt,
state: RwLock::new(State::default()),
}))
pub fn new() -> Self {
Self::default()
}

fn import_bytes_sync(
Expand Down
1 change: 0 additions & 1 deletion iroh-bytes/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{BlobFormat, Hash, HashAndFormat};

pub mod io;
pub mod progress;
pub mod runtime;

/// A tag
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, From, Into)]
Expand Down
44 changes: 0 additions & 44 deletions iroh-bytes/src/util/runtime.rs

This file was deleted.

9 changes: 2 additions & 7 deletions iroh/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@
use indicatif::HumanBytes;
use iroh::node::Node;
use iroh_base::base32;
use iroh_bytes::util::runtime;
use iroh_sync::{store::Query, Entry};
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let rt = runtime::Handle::from_current(1)?;
let db = iroh_bytes::store::mem::Store::new(rt.clone());
let db = iroh_bytes::store::mem::Store::new();
let store = iroh_sync::store::memory::Store::default();
let node = Node::builder(db.clone(), store)
.runtime(&rt)
.spawn()
.await?;
let node = Node::builder(db.clone(), store).spawn().await?;
let client = node.client();
let doc = client.docs.create().await?;
let author = client.authors.create().await?;
Expand Down
Loading

0 comments on commit 3d2e118

Please sign in to comment.