Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Repository::to_indexed_checked and ::to_index_ids_checked() #168

Merged
merged 2 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 34 additions & 40 deletions crates/core/src/commands/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,50 +269,44 @@ fn check_packs(
IndexType::DataIds
});

let mut process_pack = |p: IndexPack, check_time: bool| {
let blob_type = p.blob_type();
let pack_size = p.pack_size();
_ = packs.insert(p.id, pack_size);
if hot_be.is_some() && blob_type == BlobType::Tree {
_ = tree_packs.insert(p.id, pack_size);
}

// Check if time is set _
if check_time && p.time.is_none() {
error!("pack {}: No time is set! Run prune to correct this!", p.id);
}

// check offsests in index
let mut expected_offset: u32 = 0;
let mut blobs = p.blobs;
blobs.sort_unstable();
for blob in blobs {
if blob.tpe != blob_type {
error!(
"pack {}: blob {} blob type does not match: type: {:?}, expected: {:?}",
p.id, blob.id, blob.tpe, blob_type
);
}

if blob.offset != expected_offset {
error!(
"pack {}: blob {} offset in index: {}, expected: {}",
p.id, blob.id, blob.offset, expected_offset
);
}
expected_offset += blob.length;
}
};

let p = pb.progress_counter("reading index...");
for index in be.stream_all::<IndexFile>(&p)? {
let index = index?.1;
index_collector.extend(index.packs.clone());
for p in index.packs {
process_pack(p, false);
}
for p in index.packs_to_delete {
process_pack(p, true);
for (p, to_delete) in index.all_packs() {
let check_time = to_delete; // Check if time is set for packs marked to delete
let blob_type = p.blob_type();
let pack_size = p.pack_size();
_ = packs.insert(p.id, pack_size);
if hot_be.is_some() && blob_type == BlobType::Tree {
_ = tree_packs.insert(p.id, pack_size);
}

// Check if time is set _
if check_time && p.time.is_none() {
error!("pack {}: No time is set! Run prune to correct this!", p.id);
}

// check offsests in index
let mut expected_offset: u32 = 0;
let mut blobs = p.blobs;
blobs.sort_unstable();
for blob in blobs {
if blob.tpe != blob_type {
error!(
"pack {}: blob {} blob type does not match: type: {:?}, expected: {:?}",
p.id, blob.id, blob.tpe, blob_type
);
}

if blob.offset != expected_offset {
error!(
"pack {}: blob {} offset in index: {}, expected: {}",
p.id, blob.id, blob.offset, expected_offset
);
}
expected_offset += blob.length;
}
}
}

Expand Down
164 changes: 111 additions & 53 deletions crates/core/src/commands/repair/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use crate::{
FileType, ReadBackend, WriteBackend,
},
error::{CommandErrorKind, RusticErrorKind, RusticResult},
index::indexer::Indexer,
index::{binarysorted::IndexCollector, indexer::Indexer, GlobalIndex},
progress::{Progress, ProgressBars},
repofile::{IndexFile, IndexPack, PackHeader, PackHeaderRef},
repository::{Open, Repository},
Id,
};

#[cfg_attr(feature = "clap", derive(clap::Parser))]
Expand Down Expand Up @@ -49,60 +50,14 @@ impl RepairIndexOptions {
CommandErrorKind::NotAllowedWithAppendOnly("index repair".to_string()).into(),
);
}
let be = repo.dbe();
let p = repo.pb.progress_spinner("listing packs...");
let mut packs: HashMap<_, _> = be
.list_with_size(FileType::Pack)
.map_err(RusticErrorKind::Backend)?
.into_iter()
.collect();
p.finish();

let mut pack_read_header = Vec::new();

let mut process_pack = |p: IndexPack,
to_delete: bool,
new_index: &mut IndexFile,
changed: &mut bool| {
let index_size = p.pack_size();
let id = p.id;
match packs.remove(&id) {
None => {
// this pack either does not exist or was already indexed in another index file => remove from index!
*changed = true;
debug!("removing non-existing pack {id} from index");
}
Some(size) => {
if index_size != size {
info!("pack {id}: size computed by index: {index_size}, actual size: {size}, will re-read header");
}

if index_size != size || self.read_all {
// pack exists, but sizes do not match or we want to read all pack files
pack_read_header.push((
id,
Some(PackHeaderRef::from_index_pack(&p).size()),
size,
));
*changed = true;
} else {
new_index.add(p, to_delete);
}
}
}
};
let be = repo.dbe();
let mut checker = PackChecker::new(repo)?;

let p = repo.pb.progress_counter("reading index...");
for index in be.stream_all::<IndexFile>(&p)? {
let (index_id, index) = index?;
let mut new_index = IndexFile::default();
let mut changed = false;
for p in index.packs {
process_pack(p, false, &mut new_index, &mut changed);
}
for p in index.packs_to_delete {
process_pack(p, true, &mut new_index, &mut changed);
}
let (new_index, changed) = checker.check_pack(index, self.read_all);
match (changed, dry_run) {
(true, true) => info!("would have modified index file {index_id}"),
(true, false) => {
Expand All @@ -117,9 +72,7 @@ impl RepairIndexOptions {
}
p.finish();

// process packs which are listed but not contained in the index
pack_read_header.extend(packs.into_iter().map(|(id, size)| (id, None, size)));

let pack_read_header = checker.into_pack_to_read();
repo.warm_up_wait(pack_read_header.iter().map(|(id, _, _)| *id))?;

let indexer = Indexer::new(be.clone()).into_shared();
Expand Down Expand Up @@ -156,3 +109,108 @@ impl RepairIndexOptions {
Ok(())
}
}

struct PackChecker {
packs: HashMap<Id, u32>,
packs_to_read: Vec<(Id, Option<u32>, u32)>,
}

impl PackChecker {
fn new<P: ProgressBars, S: Open>(repo: &Repository<P, S>) -> RusticResult<Self> {
let be = repo.dbe();
let p = repo.pb.progress_spinner("listing packs...");
let packs: HashMap<_, _> = be
.list_with_size(FileType::Pack)
.map_err(RusticErrorKind::Backend)?
.into_iter()
.collect();
p.finish();

Ok(Self {
packs,
packs_to_read: Vec::new(),
})
}

fn check_pack(&mut self, indexfile: IndexFile, read_all: bool) -> (IndexFile, bool) {
let mut new_index = IndexFile::default();
let mut changed = false;
for (p, to_delete) in indexfile.all_packs() {
let index_size = p.pack_size();
let id = p.id;
match self.packs.remove(&id) {
None => {
// this pack either does not exist or was already indexed in another index file => remove from index!
debug!("removing non-existing pack {id} from index");
changed = true;
}
Some(size) => {
if index_size != size {
info!("pack {id}: size computed by index: {index_size}, actual size: {size}, will re-read header");
}

if index_size != size || read_all {
// pack exists, but sizes do not match or we want to read all pack files
self.packs_to_read.push((
id,
Some(PackHeaderRef::from_index_pack(&p).size()),
size,
));
} else {
new_index.add(p, to_delete);
}
}
}
}
(new_index, changed)
}

fn into_pack_to_read(mut self) -> Vec<(Id, Option<u32>, u32)> {
// add packs which are listed but not contained in the index
self.packs_to_read
.extend(self.packs.into_iter().map(|(id, size)| (id, None, size)));
self.packs_to_read
}
}

pub(crate) fn index_checked_from_collector<P: ProgressBars, S: Open>(
repo: &Repository<P, S>,
mut collector: IndexCollector,
) -> RusticResult<GlobalIndex> {
let mut checker = PackChecker::new(repo)?;
let be = repo.dbe();

let p = repo.pb.progress_counter("reading index...");
for index in be.stream_all::<IndexFile>(&p)? {
collector.extend(checker.check_pack(index?.1, false).0.packs);
}
p.finish();

let pack_read_header = checker.into_pack_to_read();
repo.warm_up_wait(pack_read_header.iter().map(|(id, _, _)| *id))?;

let p = repo.pb.progress_counter("reading pack headers");
p.set_length(
pack_read_header
.len()
.try_into()
.map_err(CommandErrorKind::ConversionToU64Failed)?,
);
let index_packs: Vec<_> = pack_read_header
.into_iter()
.map(|(id, size_hint, packsize)| {
debug!("reading pack {id}...");
let pack = IndexPack {
id,
blobs: PackHeader::from_file(be, id, size_hint, packsize)?.into_blobs(),
..Default::default()
};
p.inc(1);
Ok(pack)
})
.collect::<RusticResult<_>>()?;
p.finish();

collector.extend(index_packs);
Ok(GlobalIndex::new_from_index(collector.into_index()))
}
7 changes: 7 additions & 0 deletions crates/core/src/repofile/indexfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ impl IndexFile {
self.packs.push(p);
}
}

pub(crate) fn all_packs(self) -> impl Iterator<Item = (IndexPack, bool)> {
self.packs
.into_iter()
.map(|pack| (pack, false))
.chain(self.packs_to_delete.into_iter().map(|pack| (pack, true)))
}
}

#[derive(Serialize, Deserialize, Default, Debug, Clone)]
Expand Down
Loading
Loading