Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh-blobs)!: implement some collection related things on the client side #2349

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iroh-blobs/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn export_collection<D: BaoStore>(
progress: impl ProgressSender<Msg = ExportProgress> + IdGenerator,
) -> anyhow::Result<()> {
tokio::fs::create_dir_all(&outpath).await?;
let collection = Collection::load(db, &hash).await?;
let collection = Collection::load_db(db, &hash).await?;
for (name, hash) in collection.into_iter() {
#[allow(clippy::needless_borrow)]
let path = outpath.join(pathbuf_from_name(&name));
Expand Down
26 changes: 23 additions & 3 deletions iroh-blobs/src/format/collection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! The collection type used by iroh
use std::collections::BTreeMap;
use std::{collections::BTreeMap, future::Future};

use anyhow::Context;
use bao_tree::blake3;
Expand Down Expand Up @@ -64,6 +64,12 @@ impl IntoIterator for Collection {
}
}

/// A simple store trait for loading blobs
pub trait SimpleStore {
/// Load a blob from the store
fn load(&self, hash: Hash) -> impl Future<Output = anyhow::Result<Bytes>> + Send + '_;
}

/// Metadata for a collection
///
/// This is the wire format for the metadata blob.
Expand All @@ -84,7 +90,7 @@ impl Collection {
///
/// To persist the collection, write all the blobs to storage, and use the
/// hash of the last blob as the collection hash.
pub fn to_blobs(&self) -> impl Iterator<Item = Bytes> {
pub fn to_blobs(&self) -> impl DoubleEndedIterator<Item = Bytes> {
let meta = CollectionMeta {
header: *Self::HEADER,
names: self.names(),
Expand Down Expand Up @@ -160,11 +166,25 @@ impl Collection {
Ok((collection, res, stats))
}

/// Create a new collection from a hash sequence and metadata.
pub async fn load(root: Hash, store: &impl SimpleStore) -> anyhow::Result<Self> {
let hs = store.load(root).await?;
let hs = HashSeq::try_from(hs)?;
let meta_hash = hs.iter().next().context("empty hash seq")?;
let meta = store.load(meta_hash).await?;
let meta: CollectionMeta = postcard::from_bytes(&meta)?;
anyhow::ensure!(
meta.names.len() + 1 == hs.len(),
"names and links length mismatch"
);
Ok(Self::from_parts(hs.into_iter(), meta))
}

/// Load a collection from a store given a root hash
///
/// This assumes that both the links and the metadata of the collection is stored in the store.
/// It does not require that all child blobs are stored in the store.
pub async fn load<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
pub async fn load_db<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
where
D: crate::store::Map,
{
Expand Down
2 changes: 1 addition & 1 deletion iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl ListCommands {
}
}
Self::Collections => {
let mut response = iroh.blobs.list_collections().await?;
let mut response = iroh.blobs.list_collections()?;
while let Some(item) = response.next().await {
let CollectionInfo {
tag,
Expand Down
60 changes: 46 additions & 14 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
use genawaiter::sync::{Co, Gen};
use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket};
use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::Collection,
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
BlobFormat, Hash, Tag,
Expand All @@ -31,13 +32,12 @@ use tracing::warn;

use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest,
BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption,
};

use super::{flatten, Iroh};
use super::{flatten, tags, Iroh};

/// Iroh blobs client.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -322,18 +322,35 @@ where

/// Read the content of a collection.
pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
let BlobGetCollectionResponse { collection } =
self.rpc.rpc(BlobGetCollectionRequest { hash }).await??;
Ok(collection)
Collection::load(hash, self).await
}

/// List all collections.
pub async fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
let stream = self
.rpc
.server_streaming(BlobListCollectionsRequest)
.await?;
Ok(flatten(stream))
pub fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
let this = self.clone();
Ok(Gen::new(|co| async move {
if let Err(cause) = this.list_collections_impl(&co).await {
co.yield_(Err(cause)).await;
}
}))
}

async fn list_collections_impl(&self, co: &Co<Result<CollectionInfo>>) -> Result<()> {
let tags = self.tags_client();
let mut tags = tags.list_hash_seq().await?;
while let Some(tag) = tags.next().await {
let tag = tag?;
if let Ok(collection) = self.get_collection(tag.hash).await {
let info = CollectionInfo {
tag: tag.name,
hash: tag.hash,
total_blobs_count: Some(collection.len() as u64 + 1),
total_blobs_size: Some(0),
};
co.yield_(Ok(info)).await;
}
}
Ok(())
}

/// Delete a blob.
Expand Down Expand Up @@ -366,6 +383,21 @@ where
Ok(BlobStatus::Partial { size: reader.size })
}
}

fn tags_client(&self) -> tags::Client<C> {
tags::Client {
rpc: self.rpc.clone(),
}
}
}

impl<C> SimpleStore for Client<C>
where
C: ServiceConnection<RpcService>,
{
async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
self.read_to_bytes(hash).await
}
}

/// Whether to wrap the added data in a collection.
Expand Down Expand Up @@ -929,7 +961,7 @@ mod tests {
.create_collection(collection, SetTagOption::Auto, tags)
.await?;

let collections: Vec<_> = client.blobs.list_collections().await?.try_collect().await?;
let collections: Vec<_> = client.blobs.list_collections()?.try_collect().await?;

assert_eq!(collections.len(), 1);
{
Expand Down
11 changes: 10 additions & 1 deletion iroh/src/client/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ where
{
/// List all tags.
pub async fn list(&self) -> Result<impl Stream<Item = Result<TagInfo>>> {
let stream = self.rpc.server_streaming(ListTagsRequest).await?;
let stream = self.rpc.server_streaming(ListTagsRequest::all()).await?;
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
}

/// List all tags with a hash_seq format.
pub async fn list_hash_seq(&self) -> Result<impl Stream<Item = Result<TagInfo>>> {
let stream = self
.rpc
.server_streaming(ListTagsRequest::hash_seq())
.await?;
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
}

Expand Down
79 changes: 6 additions & 73 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress,
use iroh_blobs::util::progress::ProgressSender;
use iroh_blobs::BlobFormat;
use iroh_blobs::{
hashseq::parse_hash_seq,
provider::AddProgress,
store::{Store as BaoStore, ValidateProgress},
util::progress::FlumeProgressSender,
Expand All @@ -33,16 +32,13 @@ use quic_rpc::{
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, info};

use crate::client::blobs::{
BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption,
};
use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption};
use crate::client::tags::TagInfo;
use crate::client::NodeStatus;
use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse,
BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest,
BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest,
BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest,
DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest,
Expand Down Expand Up @@ -95,12 +91,7 @@ impl<D: BaoStore> Handler<D> {
chan.server_streaming(msg, handler, Self::blob_list_incomplete)
.await
}
BlobListCollections(msg) => {
chan.server_streaming(msg, handler, Self::blob_list_collections)
.await
}
CreateCollection(msg) => chan.rpc(msg, handler, Self::create_collection).await,
BlobGetCollection(msg) => chan.rpc(msg, handler, Self::blob_get_collection).await,
ListTags(msg) => {
chan.server_streaming(msg, handler, Self::blob_list_tags)
.await
Expand Down Expand Up @@ -348,39 +339,6 @@ impl<D: BaoStore> Handler<D> {
Ok(())
}

async fn blob_list_collections_impl(
self,
co: &Co<RpcResult<CollectionInfo>>,
) -> anyhow::Result<()> {
let db = self.inner.db.clone();
let local = self.inner.rt.clone();
let tags = db.tags().await.unwrap();
for item in tags {
let (name, HashAndFormat { hash, format }) = item?;
if !format.is_hash_seq() {
continue;
}
let Some(entry) = db.get(&hash).await? else {
continue;
};
let count = local
.spawn_pinned(|| async move {
let reader = entry.data_reader().await?;
let (_collection, count) = parse_hash_seq(reader).await?;
anyhow::Ok(count)
})
.await??;
co.yield_(Ok(CollectionInfo {
tag: name,
hash,
total_blobs_count: Some(count),
total_blobs_size: None,
}))
.await;
}
Ok(())
}

fn blob_list(
self,
_msg: BlobListRequest,
Expand All @@ -403,17 +361,6 @@ impl<D: BaoStore> Handler<D> {
})
}

fn blob_list_collections(
self,
_msg: BlobListCollectionsRequest,
) -> impl Stream<Item = RpcResult<CollectionInfo>> + Send + 'static {
Gen::new(move |co| async move {
if let Err(e) = self.blob_list_collections_impl(&co).await {
co.yield_(Err(e.into())).await;
}
})
}

async fn blob_delete_tag(self, msg: DeleteTagRequest) -> RpcResult<()> {
self.inner.db.set_tag(msg.name, None).await?;
Ok(())
Expand All @@ -424,15 +371,16 @@ impl<D: BaoStore> Handler<D> {
Ok(())
}

fn blob_list_tags(self, _msg: ListTagsRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
fn blob_list_tags(self, msg: ListTagsRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
tracing::info!("blob_list_tags");
Gen::new(|co| async move {
let tags = self.inner.db.tags().await.unwrap();
#[allow(clippy::manual_flatten)]
for item in tags {
if let Ok((name, HashAndFormat { hash, format })) = item {
tracing::info!("{:?} {} {:?}", name, hash, format);
co.yield_(TagInfo { name, hash, format }).await;
if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) {
co.yield_(TagInfo { name, hash, format }).await;
}
}
}
})
Expand Down Expand Up @@ -1044,21 +992,6 @@ impl<D: BaoStore> Handler<D> {

Ok(CreateCollectionResponse { hash, tag })
}

async fn blob_get_collection(
self,
req: BlobGetCollectionRequest,
) -> RpcResult<BlobGetCollectionResponse> {
let hash = req.hash;
let db = self.inner.db.clone();
let collection = self
.rt()
.spawn_pinned(move || async move { Collection::load(&db, &hash).await })
.await
.map_err(|_| anyhow!("join failed"))??;

Ok(BlobGetCollectionResponse { collection })
}
}

async fn download<D>(
Expand Down
Loading
Loading