Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh-bytes): Weak entry map #2080

Merged
merged 3 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iroh-bytes/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Implementations of blob stores
use crate::{BlobFormat, Hash, HashAndFormat};
pub mod bao_file;
mod bao_file;
pub mod mem;
pub mod readonly_mem;

Expand Down
127 changes: 52 additions & 75 deletions iroh-bytes/src/store/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
io,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::{Arc, RwLock},
sync::{Arc, RwLock, Weak},
};

use bao_tree::{
Expand Down Expand Up @@ -82,35 +82,41 @@ struct DataPaths {
/// For the memory variant, it does reading in a zero copy way, since storage
/// is already a `Bytes`.
#[derive(Default, derive_more::Debug)]
pub(crate) struct CompleteMemOrFileStorage {
pub struct CompleteMemOrFileStorage {
/// data part, which can be in memory or on disk.
#[debug("{:?}", data.as_ref().map_mem(|x| x.len()))]
pub data: MemOrFile<Bytes, (File, u64)>,
/// outboard part, which can be in memory or on disk.
#[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()))]
pub outboard: MemOrFile<Bytes, (File, u64)>,
}

impl CompleteMemOrFileStorage {
/// Read from the data file at the given offset, until end of file or max bytes.
pub fn read_data_at(&self, offset: u64, len: usize) -> Bytes {
match &self.data {
MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
}
}

/// Read from the outboard file at the given offset, until end of file or max bytes.
pub fn read_outboard_at(&self, offset: u64, len: usize) -> Bytes {
match &self.outboard {
MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len),
MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(),
}
}

/// The size of the data file.
pub fn data_size(&self) -> u64 {
match &self.data {
MemOrFile::Mem(mem) => mem.len() as u64,
MemOrFile::File((_file, size)) => *size,
}
}

/// The size of the outboard file.
pub fn outboard_size(&self) -> u64 {
match &self.outboard {
MemOrFile::Mem(mem) => mem.len() as u64,
Expand Down Expand Up @@ -324,15 +330,16 @@ fn max_offset(batch: &[BaoContentItem]) -> u64 {
.unwrap_or(0)
}

/// A file storage for an incomplete bao file.
#[derive(Debug)]
pub(crate) struct FileStorage {
pub struct FileStorage {
data: std::fs::File,
outboard: std::fs::File,
sizes: std::fs::File,
}

impl FileStorage {
#[allow(dead_code)]
/// Split into data, outboard and sizes files.
pub fn into_parts(self) -> (File, File, File) {
(self.data, self.outboard, self.sizes)
}
Expand Down Expand Up @@ -393,7 +400,7 @@ impl FileStorage {

/// The storage for a bao file. This can be either in memory or on disk.
#[derive(Debug)]
pub(crate) enum BaoFileStorage {
pub enum BaoFileStorage {
/// The entry is incomplete and in memory.
///
/// Since it is incomplete, it must be writeable.
Expand Down Expand Up @@ -447,6 +454,7 @@ impl BaoFileStorage {
}
}

/// True if the storage is in memory.
pub fn is_mem(&self) -> bool {
match self {
Self::IncompleteMem(_) => true,
Expand All @@ -456,38 +464,28 @@ impl BaoFileStorage {
}
}

#[cfg(test)]
fn new_handle_id() -> u64 {
static HANDLE_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
HANDLE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
#[derive(Debug, Clone)]
pub struct BaoFileHandleWeak(Weak<BaoFileHandleInner>);

impl BaoFileHandleWeak {
pub fn upgrade(&self) -> Option<BaoFileHandle> {
self.0.upgrade().map(BaoFileHandle)
}
}

/// A cheaply cloneable handle to a bao file, including the hash and the configuration.
#[derive(Debug, Clone)]
pub struct BaoFileHandle {
pub(crate) storage: Arc<RwLock<BaoFileStorage>>,
#[derive(Debug)]
pub struct BaoFileHandleInner {
pub(crate) storage: RwLock<BaoFileStorage>,
config: Arc<BaoFileConfig>,
hash: Hash,
/// An unqiue id for the handle, used for ensuring that a handle is dropped
/// and recreated in tests.
#[cfg(test)]
pub(crate) id: u64,
}

impl Drop for BaoFileHandle {
fn drop(&mut self) {
if let Some(cb) = self.config.on_drop.as_ref() {
let strong_count = Arc::strong_count(&self.storage);
tracing::trace!("dropping BaoFileHandle {}", strong_count);
cb(&self.hash, strong_count)
}
}
}
/// A cheaply cloneable handle to a bao file, including the hash and the configuration.
#[derive(Debug, Clone, derive_more::Deref)]
pub struct BaoFileHandle(Arc<BaoFileHandleInner>);

pub(crate) type CreateCb = Arc<dyn Fn(&Hash) -> io::Result<()> + Send + Sync>;

pub(crate) type DropCb = Arc<dyn Fn(&Hash, usize) + Send + Sync>;

/// Configuration for the deferred batch writer. It will start writing to memory,
/// and then switch to a file when the memory limit is reached.
#[derive(derive_more::Debug, Clone)]
Expand All @@ -501,24 +499,15 @@ pub struct BaoFileConfig {
/// Todo: make this async.
#[debug("{:?}", on_file_create.as_ref().map(|_| ()))]
on_file_create: Option<CreateCb>,

#[debug("{:?}", on_drop.as_ref().map(|_| ()))]
on_drop: Option<DropCb>,
}

impl BaoFileConfig {
/// Create a new deferred batch writer configuration.
pub fn new(
dir: Arc<PathBuf>,
max_mem: usize,
on_file_create: Option<CreateCb>,
on_drop: Option<DropCb>,
) -> Self {
pub fn new(dir: Arc<PathBuf>, max_mem: usize, on_file_create: Option<CreateCb>) -> Self {
Self {
dir,
max_mem,
on_file_create,
on_drop,
}
}

Expand Down Expand Up @@ -642,13 +631,11 @@ impl BaoFileHandle {
/// Since there are very likely to be many of these, we use an arc rwlock
pub fn incomplete_mem(config: Arc<BaoFileConfig>, hash: Hash) -> Self {
let storage = BaoFileStorage::incomplete_mem();
Self {
storage: Arc::new(RwLock::new(storage)),
Self(Arc::new(BaoFileHandleInner {
storage: RwLock::new(storage),
config,
hash,
#[cfg(test)]
id: new_handle_id(),
}
}))
}

/// Create a new bao file handle with a partial file.
Expand All @@ -659,13 +646,26 @@ impl BaoFileHandle {
outboard: create_read_write(&paths.outboard)?,
sizes: create_read_write(&paths.sizes)?,
});
Ok(Self {
storage: Arc::new(RwLock::new(storage)),
Ok(Self(Arc::new(BaoFileHandleInner {
storage: RwLock::new(storage),
config,
hash,
#[cfg(test)]
id: new_handle_id(),
})
})))
}

/// Create a new complete bao file handle.
pub fn new_complete(
config: Arc<BaoFileConfig>,
hash: Hash,
data: MemOrFile<Bytes, (File, u64)>,
outboard: MemOrFile<Bytes, (File, u64)>,
) -> Self {
let storage = BaoFileStorage::Complete(CompleteMemOrFileStorage { data, outboard });
Self(Arc::new(BaoFileHandleInner {
storage: RwLock::new(storage),
config,
hash,
}))
}

/// Transform the storage in place. If the transform fails, the storage will
Expand All @@ -681,23 +681,6 @@ impl BaoFileHandle {
Ok(())
}

/// Create a new complete bao file handle.
pub fn new_complete(
config: Arc<BaoFileConfig>,
hash: Hash,
data: MemOrFile<Bytes, (File, u64)>,
outboard: MemOrFile<Bytes, (File, u64)>,
) -> Self {
let storage = BaoFileStorage::Complete(CompleteMemOrFileStorage { data, outboard });
Self {
storage: Arc::new(RwLock::new(storage)),
config,
hash,
#[cfg(test)]
id: new_handle_id(),
}
}

/// True if the file is complete.
pub fn is_complete(&self) -> bool {
matches!(
Expand Down Expand Up @@ -788,6 +771,10 @@ impl BaoFileHandle {
}
}
}

pub fn downgrade(&self) -> BaoFileHandleWeak {
BaoFileHandleWeak(Arc::downgrade(&self.0))
}
}

/// This is finally the thing for which we can implement BaoPairMut.
Expand Down Expand Up @@ -972,13 +959,6 @@ pub mod test_support {
res
}

pub fn create_test_data(n: usize) -> Bytes {
let mut rng = rand::thread_rng();
let mut data = vec![0; n];
rng.fill_bytes(&mut data);
data.into()
}

/// Take some data and encode it
pub fn simulate_remote(data: &[u8]) -> (Hash, Cursor<Bytes>) {
let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
Expand Down Expand Up @@ -1080,7 +1060,6 @@ mod tests {
Arc::new(temp_dir.as_ref().to_owned()),
1024 * 16,
None,
None,
)),
hash.into(),
);
Expand Down Expand Up @@ -1137,7 +1116,6 @@ mod tests {
Arc::new(temp_dir.as_ref().to_owned()),
1024 * 16,
None,
None,
)),
hash.into(),
);
Expand Down Expand Up @@ -1193,7 +1171,6 @@ mod tests {
Arc::new(temp_dir.as_ref().to_owned()),
1024 * 16,
None,
None,
)),
hash,
);
Expand Down
Loading
Loading