Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Fixed an issue with forked counters #363

Merged
merged 6 commits into from
Feb 7, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 65 additions & 24 deletions util/src/journaldb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ impl JournalDB {

/// Commit all recent insert operations and historical removals from the old era
/// to the backing database.
#[allow(cyclomatic_complexity)]
pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
Expand All @@ -105,6 +104,17 @@ impl JournalDB {
// for each end_era that we journaled that we are no passing by,
// we remove all of its removes assuming it is canonical and all
// of its inserts otherwise.
//
// We also keep reference counters for each key inserted in the journal to handle
// the following cases where key K must not be deleted from the DB when processing removals :
// Given H is the journal size in eras, 0 <= C <= H.
// Key K is removed in era A(N) and re-inserted in canonical era B(N + C).
// Key K is removed in era A(N) and re-inserted in non-canonical era B`(N + C).
// Key K is added in non-canonical era A'(N) canonical B(N + C).
//
// The counter is encreased each time a key is inserted in the journal in the commit. The list of insertions
// is saved with the era record. When the era becomes end_era and goes out of journal the counter is decreased
// and the key is safe to delete.

// record new commit's details.
let batch = WriteBatch::new();
Expand All @@ -125,6 +135,7 @@ impl JournalDB {

let mut r = RlpStream::new_list(3);
let inserts: Vec<H256> = self.overlay.keys().iter().filter(|&(_, &c)| c > 0).map(|(key, _)| key.clone()).collect();
// Increase counter for each inserted key no matter if the block is canonical or not.
for i in &inserts {
*counters.entry(i.clone()).or_insert(0) += 1;
}
Expand All @@ -139,46 +150,42 @@ impl JournalDB {
if let Some((end_era, canon_id)) = end {
let mut index = 0usize;
let mut last;
let mut to_remove: Vec<H256> = Vec::new();
let mut canon_inserts: Vec<H256> = Vec::new();
while let Some(rlp_data) = try!(self.backing.get({
let mut r = RlpStream::new_list(2);
r.append(&end_era);
r.append(&index);
last = r.drain();
&last
})) {
let to_add;
let rlp = Rlp::new(&rlp_data);
{
to_add = rlp.val_at(1);
for i in &to_add {
let delete_counter = {
if let Some(mut cnt) = counters.get_mut(i) {
*cnt -= 1;
*cnt == 0
}
else { false }

};
if delete_counter {
counters.remove(i);
}
}
let inserts: Vec<H256> = rlp.val_at(1);
JournalDB::decrease_counters(&inserts, &mut counters);
// Collect keys to be removed. These are removed keys for canonical block, inserted for non-canonical
if canon_id == rlp.val_at(0) {
to_remove.extend(rlp.at(2).iter().map(|r| r.as_val::<H256>()));
canon_inserts = inserts;
}
let to_remove: Vec<H256> = if canon_id == rlp.val_at(0) {rlp.val_at(2)} else {to_add};
for i in &to_remove {
if !counters.contains_key(i) {
batch.delete(&i).expect("Low-level database error. Some issue with your hard disk?");
}
else {
to_remove.extend(inserts);
}

try!(batch.delete(&last));
trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, to_remove.len());
index += 1;
}

let canon_inserts = canon_inserts.drain(..).collect::<HashSet<_>>();
// Purge removed keys if they are not referenced and not re-inserted in the canon commit
let mut deletes = 0;
for h in to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.contains -> O(n)

better to use a hash_set for canon_inserts?

try!(batch.delete(&h));
deletes += 1;
}
try!(batch.put(&LAST_ERA_KEY, &encode(&end_era)));
trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deletes);
}

// Commit overlay insertions
let mut ret = 0u32;
let mut deletes = 0usize;
for i in self.overlay.drain().into_iter() {
Expand All @@ -200,6 +207,23 @@ impl JournalDB {
Ok(ret)
}


// Decrease counters for given keys. Deletes obsolete counters
fn decrease_counters(keys: &[H256], counters: &mut HashMap<H256, i32>) {
for i in keys.iter() {
let delete_counter = {
if let Some(mut cnt) = counters.get_mut(i) {
*cnt -= 1;
*cnt == 0
}
else { false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this be the case?

};
if delete_counter {
counters.remove(i);
}
}
}

fn payload(&self, key: &H256) -> Option<Bytes> {
self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
}
Expand Down Expand Up @@ -387,4 +411,21 @@ mod tests {
jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap();
assert!(jdb.exists(&foo));
}

#[test]
fn fork_same_key() {
// history is 1
let mut jdb = JournalDB::new_temp();
jdb.commit(0, &b"0".sha3(), None).unwrap();

let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();

jdb.insert(b"foo");
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.exists(&foo));

jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap();
assert!(jdb.exists(&foo));
}
}