Skip to content

pipelined extraction #236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2c02588
pipelined extraction
cosmicexplorer Jun 27, 2024
a1734b1
bzip2 support needed for benchmark test
cosmicexplorer Aug 17, 2024
3f0a887
more review comments
cosmicexplorer Aug 17, 2024
ec63cdd
fix merge errors
cosmicexplorer Aug 17, 2024
c311784
correctly handle backslashes in entry names (i.e. don't)
cosmicexplorer Aug 17, 2024
8efb409
make PathSplitError avoid consing a String until necessary
cosmicexplorer Aug 17, 2024
3c19c28
add repro_old423 test for pipelining
cosmicexplorer Aug 20, 2024
6ccecd0
silence dead code warnings for windows
cosmicexplorer Aug 20, 2024
7332eb6
fix ci error
cosmicexplorer Jan 16, 2025
c9dd876
avoid erroring for top-level directory entries
cosmicexplorer Jan 16, 2025
17d611e
use num_cpus by default for parallelism
cosmicexplorer Feb 9, 2025
5bd512e
we spawn three threads per chunk
cosmicexplorer Feb 10, 2025
8dd83d3
add dynamically-generated test archive
cosmicexplorer Feb 11, 2025
7ea1b85
remove some lint ignores
cosmicexplorer Feb 12, 2025
6f9d3d6
add back default features for displaydoc and update version
cosmicexplorer Feb 12, 2025
f4c43cb
add FIXME for follow-up work to support absolute paths
cosmicexplorer Feb 12, 2025
1ae44f0
box each level of the b-tree together with its values
cosmicexplorer Feb 12, 2025
c92d018
impl From<DirEntry<...>> for FSEntry
cosmicexplorer Feb 13, 2025
d6da333
make some tests return Result
cosmicexplorer Feb 13, 2025
41381b1
simplify some btreemap creation
cosmicexplorer Feb 13, 2025
4fc051a
move handle_creation module to a separate file
cosmicexplorer Feb 13, 2025
730f18f
downgrade HandleCreationError to io::Error
cosmicexplorer Feb 13, 2025
2f8e5b4
use ByAddress over ZipDataHandle
cosmicexplorer Feb 13, 2025
0378ab9
fix lint error
cosmicexplorer Feb 13, 2025
295c7f0
make an assert into a debug assert
cosmicexplorer Feb 13, 2025
436570f
remove extraneous error case
cosmicexplorer Feb 13, 2025
a22c53b
replace unsafe transmutes with Pod methods
cosmicexplorer Feb 13, 2025
24c4425
add dead code ignore
cosmicexplorer Feb 13, 2025
11321bd
use if let for matching
cosmicexplorer Feb 13, 2025
06f7b89
fix dead code ignores
cosmicexplorer Feb 13, 2025
d6bbc9c
add note about shared future dependency task DAG
cosmicexplorer Feb 13, 2025
7bbe4ae
Merge branch 'master' into pipelined-extract-v2
Pr0methean Mar 17, 2025
1369bfe
Merge branch 'master' into pipelined-extract-v2
Pr0methean Apr 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,17 @@ zstd = { version = "0.13", optional = true, default-features = false }
zopfli = { version = "0.8", optional = true }
deflate64 = { version = "0.1.9", optional = true }
lzma-rs = { version = "0.3", default-features = false, optional = true }
num_cpus = { version = "1.16", optional = true }
by_address = { version = "1.2.1", optional = true }
xz2 = { version = "0.1.7", optional = true }
proc-macro2 = { version = ">=1.0.60", optional = true } # Override transitive dep on 1.0.59 due to https://github.com/rust-lang/rust/issues/113152

[target.'cfg(any(all(target_arch = "arm", target_pointer_width = "32"), target_arch = "mips", target_arch = "powerpc"))'.dependencies]
crossbeam-utils = "0.8.21"

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.155", optional = true }

[target.'cfg(fuzzing)'.dependencies]
arbitrary = { version = "1.4.1", features = ["derive"] }

Expand All @@ -66,7 +71,10 @@ walkdir = "2.5"
time = { workspace = true, features = ["formatting", "macros"] }
anyhow = "1.0.95"
clap = { version = "=4.4.18", features = ["derive"] }
tempdir = "0.3.7"
tempfile = "3.15"
lazy_static = "1.5"
num_cpus = "1.16"

[features]
aes-crypto = ["aes", "constant_time_eq", "hmac", "pbkdf2", "sha1", "getrandom", "zeroize"]
Expand All @@ -85,6 +93,7 @@ nt-time = ["dep:nt-time"]
lzma = ["lzma-rs/stream"]
unreserved = []
xz = ["dep:xz2"]
parallelism = ["libc", "num_cpus", "by_address"]
default = [
"aes-crypto",
"bzip2",
Expand All @@ -107,3 +116,7 @@ harness = false
[[bench]]
name = "merge_archive"
harness = false

[[bench]]
name = "extract"
harness = false
149 changes: 149 additions & 0 deletions benches/extract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use bencher::{benchmark_group, benchmark_main};

use bencher::Bencher;
use lazy_static::lazy_static;
use tempdir::TempDir;
use tempfile::tempfile;

use std::fs;
use std::path::Path;
use std::sync::{Arc, Mutex};

use zip::result::ZipResult;
use zip::write::ZipWriter;
use zip::ZipArchive;

#[cfg(all(feature = "parallelism", unix))]
use zip::read::{split_extract, ExtractionParameters};

/* This archive has a set of entries repeated 20x:
* - 200K random data, stored uncompressed (CompressionMethod::Stored)
* - 246K text data (the project gutenberg html version of king lear)
* (CompressionMethod::Bzip2, compression level 1) (project gutenberg ebooks are public domain)
*
* The full archive file is 5.3MB.
*/
fn static_test_archive() -> ZipResult<ZipArchive<fs::File>> {
assert!(
cfg!(feature = "bzip2"),
"this test archive requires bzip2 support"
);
let path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/data/stored-and-compressed-text.zip");
let file = fs::File::open(path)?;
ZipArchive::new(file)
}

lazy_static! {
static ref STATIC_TEST_ARCHIVE: Arc<Mutex<ZipArchive<fs::File>>> = {
let archive = static_test_archive().unwrap();
Arc::new(Mutex::new(archive))
};
}

/* This archive is generated dynamically, in order to scale with the number of reported CPUs.
* - We want at least 768 files (4 per VCPU on EC2 *.48xlarge instances) to run in CI.
* - We want to retain the interspersed random/text entries from static_test_archive().
*
* We will copy over entries from the static archive repeatedly until we reach the desired file
* count.
*/
fn dynamic_test_archive(src_archive: &mut ZipArchive<fs::File>) -> ZipResult<ZipArchive<fs::File>> {
let desired_num_entries: usize = num_cpus::get() * 4;
let mut output_archive = ZipWriter::new(tempfile()?);

for (src_index, output_index) in (0..src_archive.len()).cycle().zip(0..desired_num_entries) {
let src_file = src_archive.by_index_raw(src_index)?;
let output_name = if src_file.name().starts_with("random-") {
format!("random-{output_index}.dat")
} else {
assert!(src_file.name().starts_with("text-"));
format!("text-{output_index}.dat")
};
output_archive.raw_copy_file_rename(src_file, output_name)?;
}

output_archive.finish_into_readable()
}

lazy_static! {
static ref DYNAMIC_TEST_ARCHIVE: Arc<Mutex<ZipArchive<fs::File>>> = {
let mut src = STATIC_TEST_ARCHIVE.lock().unwrap();
let archive = dynamic_test_archive(&mut src).unwrap();
Arc::new(Mutex::new(archive))
};
}

fn do_extract_basic(bench: &mut Bencher, archive: &mut ZipArchive<fs::File>) {
let total_size: u64 = archive.decompressed_size().unwrap().try_into().unwrap();

let parent = TempDir::new("zip-extract").unwrap();

bench.bytes = total_size;
bench.bench_n(1, |bench| {
bench.iter(move || {
let outdir = TempDir::new_in(parent.path(), "bench-subdir")
.unwrap()
.into_path();
archive.extract(outdir).unwrap();
});
});
}

fn extract_basic_static(bench: &mut Bencher) {
let mut archive = STATIC_TEST_ARCHIVE.lock().unwrap();
do_extract_basic(bench, &mut archive);
}

fn extract_basic_dynamic(bench: &mut Bencher) {
let mut archive = DYNAMIC_TEST_ARCHIVE.lock().unwrap();
do_extract_basic(bench, &mut archive);
}

#[cfg(all(feature = "parallelism", unix))]
fn do_extract_split(bench: &mut Bencher, archive: &ZipArchive<fs::File>) {
let total_size: u64 = archive.decompressed_size().unwrap().try_into().unwrap();

let params = ExtractionParameters {
decompression_threads: num_cpus::get() / 3,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will the other 2/3 of the CPUs be doing? Also, does this need to be clamped to at least 1?

..Default::default()
};

let parent = TempDir::new("zip-extract").unwrap();

bench.bytes = total_size;
bench.bench_n(1, |bench| {
bench.iter(move || {
let outdir = TempDir::new_in(parent.path(), "bench-subdir")
.unwrap()
.into_path();
split_extract(archive, &outdir, params.clone()).unwrap();
});
});
}

#[cfg(all(feature = "parallelism", unix))]
fn extract_split_static(bench: &mut Bencher) {
let archive = STATIC_TEST_ARCHIVE.lock().unwrap();
do_extract_split(bench, &archive);
}

#[cfg(all(feature = "parallelism", unix))]
fn extract_split_dynamic(bench: &mut Bencher) {
let archive = DYNAMIC_TEST_ARCHIVE.lock().unwrap();
do_extract_split(bench, &archive);
}

#[cfg(not(all(feature = "parallelism", unix)))]
benchmark_group!(benches, extract_basic_static, extract_basic_dynamic);

#[cfg(all(feature = "parallelism", unix))]
benchmark_group!(
benches,
extract_basic_static,
extract_basic_dynamic,
extract_split_static,
extract_split_dynamic
);

benchmark_main!(benches);
2 changes: 1 addition & 1 deletion benches/read_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ fn parse_large_non_zip(bench: &mut Bencher) {
let dir = TempDir::with_prefix("large-non-zip-bench").unwrap();
let file = dir.path().join("zeros");
let buf = vec![0u8; FILE_SIZE];
fs::write(&file, &buf).unwrap();
fs::write(&file, buf).unwrap();

bench.iter(|| {
assert!(zip::ZipArchive::new(std::fs::File::open(&file).unwrap()).is_err());
Expand Down
9 changes: 9 additions & 0 deletions src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ pub(crate) mod lzma;

pub(crate) mod magic_finder;

#[cfg(feature = "parallelism")]
pub(crate) mod handle_creation;
#[cfg(feature = "parallelism")]
pub(crate) mod pipelining;
#[cfg(all(unix, feature = "parallelism"))]
pub use pipelining::split_extraction::{split_extract, ExtractionParameters, SplitExtractionError};
#[cfg(feature = "parallelism")]
pub(crate) mod split;

// Put the struct declaration in a private module to convince rustdoc to display ZipArchive nicely
pub(crate) mod zip_archive {
use indexmap::IndexMap;
Expand Down
Loading
Loading