Skip to content

Commit

Permalink
refactor: named closures for the 3 callbacks
Browse files Browse the repository at this point in the history
also remove comment that is no longer relevant
  • Loading branch information
rklaehn committed Jan 31, 2023
1 parent da7c156 commit 6617d39
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 90 deletions.
2 changes: 0 additions & 2 deletions src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ where
let blob_reader =
on_blob(blob.hash, blob_reader, Some(blob.name)).await?;
reader = blob_reader.into_inner();
// await the completion of the copying. Only then can we get back the reader.
// reader = reader1.await??;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod blobs;
pub mod blobs;
pub mod get;
pub mod protocol;
pub mod provider;
Expand Down
169 changes: 82 additions & 87 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,96 +266,91 @@ async fn get_interactive(
.await;

let pb = ProgressBar::hidden();
let stats = get::run(
hash,
token,
opts,
|| {
let out_writer = &out_writer;
async move {
out_writer
.println(format!("{} Requesting ...", style("[2/3]").bold().dim()))
.await;
Ok(())
}
},
|collection| {
let pb = &pb;
let out_writer = &out_writer;
async move {
let name = collection.name();
let total_entries = collection.total_entries();
let size = collection.total_blobs_size();
out_writer
.println(format!(
"{} Downloading {name}...",
style("[3/3]").bold().dim()
))
.await;
out_writer
.println(format!(
" {total_entries} file(s) with total transfer size {size}"
))
.await;
pb.set_style(
ProgressStyle::with_template(PROGRESS_STYLE)
.unwrap()
.with_key(
"eta",
|state: &ProgressState, w: &mut dyn std::fmt::Write| {
write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()
},
)
.progress_chars("#>-"),
);
pb.set_length(size);
pb.set_draw_target(ProgressDrawTarget::stderr());

Ok(())
}
},
|hash, mut reader, name| {
let out = &out;
let pb = &pb;
async move {
let name = name.map_or_else(|| hash.to_string(), |n| n);
pb.set_message(format!("Receiving {name}..."));
let on_connected = || {
let out_writer = &out_writer;
async move {
out_writer
.println(format!("{} Requesting ...", style("[2/3]").bold().dim()))
.await;
Ok(())
}
};
let on_collection = |collection: sendme::blobs::Collection| {
let pb = &pb;
let out_writer = &out_writer;
async move {
let name = collection.name();
let total_entries = collection.total_entries();
let size = collection.total_blobs_size();
out_writer
.println(format!(
"{} Downloading {name}...",
style("[3/3]").bold().dim()
))
.await;
out_writer
.println(format!(
" {total_entries} file(s) with total transfer size {size}"
))
.await;
pb.set_style(
ProgressStyle::with_template(PROGRESS_STYLE)
.unwrap()
.with_key(
"eta",
|state: &ProgressState, w: &mut dyn std::fmt::Write| {
write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()
},
)
.progress_chars("#>-"),
);
pb.set_length(size);
pb.set_draw_target(ProgressDrawTarget::stderr());

if let Some(ref outpath) = out {
tokio::fs::create_dir_all(outpath)
.await
.context("Unable to create directory {outpath}")?;
let dirpath = std::path::PathBuf::from(outpath);
let filepath = dirpath.join(name);
let (temp_file, dup) = tokio::task::spawn_blocking(|| {
let temp_file = tempfile::Builder::new()
.prefix("sendme-tmp-")
.tempfile_in(dirpath)
.context("Failed to create temporary output file")?;
let dup = temp_file.as_file().try_clone()?;
Ok::<_, anyhow::Error>((temp_file, dup))
})
.await??;
let file = tokio::fs::File::from_std(dup);
let out = tokio::io::BufWriter::new(file);
// wrap for progress bar
let mut wrapped_out = pb.wrap_async_write(out);
tokio::io::copy(&mut reader, &mut wrapped_out).await?;
let filepath2 = filepath.clone();
tokio::task::spawn_blocking(|| temp_file.persist(filepath2))
.await?
.context("Failed to write output file")?;
} else {
// Write to OUT_WRITER
let mut stdout = tokio::io::stdout();
tokio::io::copy(&mut reader, &mut stdout).await?;
}
Ok(())
}
};
let on_blob = |hash: blake3::Hash, mut reader, name: Option<String>| {
let out = &out;
let pb = &pb;
async move {
let name = name.map_or_else(|| hash.to_string(), |n| n);
pb.set_message(format!("Receiving {name}..."));

Ok(reader)
if let Some(ref outpath) = out {
tokio::fs::create_dir_all(outpath)
.await
.context("Unable to create directory {outpath}")?;
let dirpath = std::path::PathBuf::from(outpath);
let filepath = dirpath.join(name);
let (temp_file, dup) = tokio::task::spawn_blocking(|| {
let temp_file = tempfile::Builder::new()
.prefix("sendme-tmp-")
.tempfile_in(dirpath)
.context("Failed to create temporary output file")?;
let dup = temp_file.as_file().try_clone()?;
Ok::<_, anyhow::Error>((temp_file, dup))
})
.await??;
let file = tokio::fs::File::from_std(dup);
let out = tokio::io::BufWriter::new(file);
// wrap for progress bar
let mut wrapped_out = pb.wrap_async_write(out);
tokio::io::copy(&mut reader, &mut wrapped_out).await?;
let filepath2 = filepath.clone();
tokio::task::spawn_blocking(|| temp_file.persist(filepath2))
.await?
.context("Failed to write output file")?;
} else {
// Write to OUT_WRITER
let mut stdout = tokio::io::stdout();
tokio::io::copy(&mut reader, &mut stdout).await?;
}
},
)
.await?;

Ok(reader)
}
};
let stats = get::run(hash, token, opts, on_connected, on_collection, on_blob).await?;

pb.finish_and_clear();
out_writer
Expand Down

0 comments on commit 6617d39

Please sign in to comment.