Skip to content

Commit

Permalink
fix append recovery bug (#251) (#253)
Browse files Browse the repository at this point in the history
Close #250

Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie authored Jul 26, 2022
1 parent f69b62a commit a5251cf
Showing 1 changed file with 82 additions and 24 deletions.
106 changes: 82 additions & 24 deletions src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,28 +343,25 @@ impl<A: AllocatorTrait> MemTable<A> {
}
}

/// Appends some entries from rewrite queue. Assumes this table has no
/// append data.
/// Appends some entries from append queue. Assumes this table has no
/// rewrite data.
///
/// This method is only used for recovery.
pub fn append_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
pub fn replay_append(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
debug_assert_eq!(self.rewrite_count, 0);
self.prepare_append(
entry_indexes[0].index,
// Rewrite -> Compact Append -> Rewrite.
true, /* allow_hole */
// Refer to case in `merge_append_table`. They can be adapted
// to attack this path via a global rewrite without deleting
// obsolete rewrite files.
false, /* allow_hole */
// Refer to case in `merge_newer_neighbor`.
true, /* allow_overwrite */
);
self.global_stats.add(LogQueue::Rewrite, len);
self.global_stats.add(LogQueue::Append, len);
for ei in &entry_indexes {
debug_assert_eq!(ei.entries.unwrap().id.queue, LogQueue::Append);
self.entry_indexes.push_back(ei.into());
}
self.rewrite_count = self.entry_indexes.len();
}
}

Expand Down Expand Up @@ -442,6 +439,31 @@ impl<A: AllocatorTrait> MemTable<A> {
self.rewrite_count = pos + rewrite_len;
}

/// Appends some entries from rewrite queue. Assumes this table has no
/// append data.
///
/// This method is only used for recovery.
pub fn replay_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
self.prepare_append(
entry_indexes[0].index,
// Rewrite -> Compact Append -> Rewrite.
true, /* allow_hole */
// Refer to case in `merge_append_table`. They can be adapted
// to attack this path via a global rewrite without deleting
// obsolete rewrite files.
true, /* allow_overwrite */
);
self.global_stats.add(LogQueue::Rewrite, len);
for ei in &entry_indexes {
self.entry_indexes.push_back(ei.into());
}
self.rewrite_count = self.entry_indexes.len();
}
}

/// Removes all entries with index smaller than `index`. Returns the number
/// of deleted entries.
pub fn compact_to(&mut self, index: u64) -> u64 {
Expand Down Expand Up @@ -984,6 +1006,38 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
}
}

/// Applies changes from log items that are replayed from a append queue.
/// Assumes it haven't applied any rewrite data.
///
/// This method is only used for recovery.
pub fn replay_append_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().replay_append(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
self.remove(raft, true /* record_tombstone */);
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.write().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let value = kv.value.unwrap();
memtable.write().put(kv.key, value, kv.file_id.unwrap());
}
OpType::Del => {
let key = kv.key;
memtable.write().delete(key.as_slice());
}
},
}
}
}

/// Applies changes from log items that have been written to rewrite queue.
pub fn apply_rewrite_writes(
&self,
Expand Down Expand Up @@ -1015,15 +1069,16 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
/// Assumes it haven't applied any append data.
///
/// This method is only used for recovery.
pub fn apply_replayed_rewrite_writes(&self, log_items: LogItemDrain) {
pub fn replay_rewrite_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().append_rewrite(entries_to_add.0);
memtable.write().replay_rewrite(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
// Only append tombstone needs to be recorded.
self.remove(raft, false /* record_tombstone */);
}
LogItemContent::Command(Command::Compact { index }) => {
Expand Down Expand Up @@ -1109,21 +1164,17 @@ impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
}
}
match file_id.queue {
LogQueue::Append => self.memtables.apply_append_writes(item_batch.drain()),
LogQueue::Rewrite => self
.memtables
.apply_replayed_rewrite_writes(item_batch.drain()),
LogQueue::Append => self.memtables.replay_append_writes(item_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(item_batch.drain()),
}
Ok(())
}

fn merge(&mut self, mut rhs: Self, queue: LogQueue) -> Result<()> {
self.log_batch.merge(&mut rhs.log_batch.clone());
match queue {
LogQueue::Append => self.memtables.apply_append_writes(rhs.log_batch.drain()),
LogQueue::Rewrite => self
.memtables
.apply_replayed_rewrite_writes(rhs.log_batch.drain()),
LogQueue::Append => self.memtables.replay_append_writes(rhs.log_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(rhs.log_batch.drain()),
}
self.memtables.merge_newer_neighbor(rhs.memtables);
Ok(())
Expand Down Expand Up @@ -1924,7 +1975,7 @@ mod tests {
memtable.compact_to(7);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
Expand Down Expand Up @@ -1966,7 +2017,7 @@ mod tests {
memtable.compact_to(10);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
Expand Down Expand Up @@ -2017,7 +2068,7 @@ mod tests {
memtable.merge_newer_neighbor(&mut m1);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Rewrite, 1),
Expand Down Expand Up @@ -2099,6 +2150,13 @@ mod tests {
batches[1].add_command(last_rid, Command::Compact { index: 5 });
batches[2].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[2]));

// entries [1, 10] => entries [11, 20][5, 10] => compact 8
last_rid += 1;
batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[1]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(5, 11, files[1]));
batches[2].add_command(last_rid, Command::Compact { index: 8 });

for b in batches.iter_mut() {
b.finish_write(FileBlockHandle::dummy(LogQueue::Append));
}
Expand Down

0 comments on commit a5251cf

Please sign in to comment.