Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handles bug fixes on the db iterator #2096

Merged
merged 4 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ serial = [
]
test = [ "ledger-block/test" ]
test-helpers = [
"ledger-test-helpers",
"ledger-committee/test-helpers",
"ledger-narwhal/test-helpers"
]
Expand Down Expand Up @@ -93,6 +94,12 @@ package = "snarkvm-ledger-store"
path = "./store"
version = "=0.15.4"

[dependencies.ledger-test-helpers]
package = "snarkvm-ledger-test-helpers"
path = "./test-helpers"
version = "=0.15.4"
optional = true

[dependencies.synthesizer]
package = "snarkvm-synthesizer"
path = "../synthesizer"
Expand Down
3 changes: 3 additions & 0 deletions ledger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub use ledger_store as store;

pub use crate::block::*;

#[cfg(feature = "test-helpers")]
pub use ledger_test_helpers;

mod helpers;
pub use helpers::*;

Expand Down
36 changes: 29 additions & 7 deletions ledger/store/src/helpers/rocksdb/internal/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,21 @@ impl<
.db_iter
.next()?
.map_err(|e| {
error!("RocksDB iterator error: {e}");
error!("RocksDB Iter iterator error: {e}");
})
.ok()?;

// Deserialize the key and value.
let key = bincode::deserialize(&key[PREFIX_LEN..])
.map_err(|e| {
error!("RocksDB Iter deserialize(key) error: {e}");
})
.ok()?;
let value = bincode::deserialize(&value)
.map_err(|e| {
error!("RocksDB Iter deserialize(value) error: {e}");
})
.ok()?;
let key = bincode::deserialize(&key[PREFIX_LEN..]).ok()?;
let value = bincode::deserialize(&value).ok()?;

Some((Cow::Owned(key), Cow::Owned(value)))
}
Expand All @@ -409,10 +419,16 @@ impl<'a, K: 'a + Clone + Debug + PartialEq + Eq + Hash + Serialize + Deserialize
.db_iter
.next()?
.map_err(|e| {
error!("RocksDB iterator error: {e}");
error!("RocksDB Keys iterator error: {e}");
})
.ok()?;

// Deserialize the key.
let key = bincode::deserialize(&key[PREFIX_LEN..])
.map_err(|e| {
error!("RocksDB Keys deserialize(key) error: {e}");
})
.ok()?;
let key = bincode::deserialize(&key[PREFIX_LEN..]).ok()?;

Some(Cow::Owned(key))
}
Expand All @@ -438,10 +454,16 @@ impl<'a, V: 'a + Clone + PartialEq + Eq + Serialize + DeserializeOwned> Iterator
.db_iter
.next()?
.map_err(|e| {
error!("RocksDB iterator error: {e}");
error!("RocksDB Values iterator error: {e}");
})
.ok()?;

// Deserialize the value.
let value = bincode::deserialize(&value)
.map_err(|e| {
error!("RocksDB Values deserialize(value) error: {e}");
})
.ok()?;
let value = bincode::deserialize(&value).ok()?;

Some(Cow::Owned(value))
}
Expand Down
196 changes: 129 additions & 67 deletions ledger/store/src/helpers/rocksdb/internal/nested_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,6 @@ impl<M: Serialize + DeserializeOwned, K: Serialize + DeserializeOwned, V: Serial
Ok(raw_map_key)
}

// #[inline]
// fn get_map_raw(&self, map: &M) -> Result<Option<rocksdb::DBPinnableSlice>> {
// let raw_map = self.create_prefixed_map(map)?;
// match self.database.get_pinned(&raw_map)? {
// Some(data) => Ok(Some(data)),
// None => Ok(None),
// }
// }

#[inline]
fn get_map_key_raw(&self, map: &M, key: &K) -> Result<Option<rocksdb::DBPinnableSlice>> {
let raw_map_key = self.create_prefixed_map_key(map, key)?;
Expand All @@ -88,12 +79,12 @@ impl<M: Serialize + DeserializeOwned, K: Serialize + DeserializeOwned, V: Serial
}
}
#[inline]
fn get_map_and_key(map_key: &[u8]) -> Option<(&[u8], &[u8])> {
let map_len = u32::from_bytes_le(&map_key[PREFIX_LEN..][..4]).ok()? as usize;
fn get_map_and_key(map_key: &[u8]) -> Result<(&[u8], &[u8])> {
let map_len = u32::from_bytes_le(&map_key[PREFIX_LEN..][..4])? as usize;
let map = &map_key[PREFIX_LEN + 4..][..map_len];
let key = &map_key[PREFIX_LEN + 4 + map_len..];

Some((map, key))
Ok((map, key))
}

impl<
Expand Down Expand Up @@ -132,17 +123,37 @@ impl<
true => self.atomic_batch.lock().push((*map, None, None)),
// Otherwise, remove the map directly from the map.
false => {
let map = self.create_prefixed_map(map)?;
// Serialize the map.
let serialized_map = bincode::serialize(map)?;

// Batching the delete operations to optimize the write performance and ensure atomicity.
let mut batch = rocksdb::WriteBatch::default();

// Iterate over the keys with the specified map prefix.
let entries_to_delete =
self.database.iterator(rocksdb::IteratorMode::From(&map, rocksdb::Direction::Forward));
// Construct an iterator over the DB with the specified prefix.
let iterator = self.database.iterator(rocksdb::IteratorMode::From(
&self.create_prefixed_map(map)?,
rocksdb::Direction::Forward,
));

// Now delete all the identified keys.
for entry in entries_to_delete {
// Iterate over the entries in the DB with the specified prefix.
for entry in iterator {
let (map_key, _) = entry?;
self.database.delete(&map_key).unwrap();

// Extract the bytes belonging to the map and the key.
let (entry_map, _) = get_map_and_key(&map_key)?;

// If the 'entry_map' matches 'serialized_map', delete the key.
if entry_map == serialized_map {
batch.delete(map_key);
} else {
// If the 'entry_map' no longer matches the 'serialized_map',
// we've moved past the relevant keys and can break the loop.
break;
}
}

// Deleting the batched keys atomically from RocksDB.
self.database.write(batch)?;
}
}
Ok(())
Expand All @@ -159,8 +170,8 @@ impl<
// Otherwise, remove the key-value pair directly from the map.
false => {
// Prepare the prefixed map-key.
let raw_key = self.create_prefixed_map_key(map, key)?;
self.database.delete(raw_key)?;
let map_key = self.create_prefixed_map_key(map, key)?;
self.database.delete(map_key)?;
}
}
Ok(())
Expand Down Expand Up @@ -257,16 +268,30 @@ impl<
}
(Some(key), None) => atomic_batch.delete(self.create_prefixed_map_key(&map, &key)?),
(None, None) => {
let map = self.create_prefixed_map(&map)?;
// Serialize the map.
let serialized_map = bincode::serialize(&map)?;

// Iterate over the keys with the specified map prefix.
let entries_to_delete =
self.database.iterator(rocksdb::IteratorMode::From(&map, rocksdb::Direction::Forward));
// Construct an iterator over the DB with the specified prefix.
let iterator = self.database.iterator(rocksdb::IteratorMode::From(
&self.create_prefixed_map(&map)?,
rocksdb::Direction::Forward,
));

// Now delete all the identified keys.
for entry in entries_to_delete {
// Iterate over the entries in the DB with the specified prefix.
for entry in iterator {
let (map_key, _) = entry?;
atomic_batch.delete(map_key);

// Extract the bytes belonging to the map and the key.
let (entry_map, _) = get_map_and_key(&map_key)?;

// If the 'entry_map' matches 'serialized_map', delete the key.
if entry_map == serialized_map {
atomic_batch.delete(map_key);
} else {
// If the 'entry_map' no longer matches the 'serialized_map',
// we've moved past the relevant keys and can break the loop.
break;
}
}
}
(None, Some(_)) => unreachable!("Cannot insert a value without a key"),
Expand Down Expand Up @@ -356,32 +381,38 @@ impl<
/// Returns the key-value pairs for the given map, if it exists.
///
fn get_map_confirmed(&'a self, map: &M) -> Result<Vec<(K, V)>> {
// Prepare the prefixed map.
let raw_map = self.create_prefixed_map(map)?;
// Serialize the map.
let serialized_map = bincode::serialize(map)?;

// Iterate over the keys with the specified map prefix.
let entries: Vec<_> = self
// Initialize a vector for the entries.
let mut entries = Vec::new();

// Construct an iterator over the DB with the specified prefix.
let iterator = self
.database
.iterator(rocksdb::IteratorMode::From(&raw_map, rocksdb::Direction::Forward))
.filter_map(|entry| {
let (map_key, value) = entry.ok()?;

// Extract the bytes belonging to the map and the key.
let (entry_map, entry_key) = get_map_and_key(&map_key)?;

if entry_map == serialized_map {
// Deserialize the key.
let key = bincode::deserialize(entry_key).ok()?;
// Deserialize the value.
let value = bincode::deserialize(&value).ok()?;
Some((key, value))
} else {
None
}
})
.collect();
.iterator(rocksdb::IteratorMode::From(&self.create_prefixed_map(map)?, rocksdb::Direction::Forward));

// Iterate over the entries in the DB with the specified prefix.
for entry in iterator {
let (map_key, value) = entry?;

// Extract the bytes belonging to the map and the key.
let (entry_map, entry_key) = get_map_and_key(&map_key)?;

// If the 'entry_map' matches 'serialized_map', deserialize the key and value.
if entry_map == serialized_map {
// Deserialize the key.
let key = bincode::deserialize(entry_key)?;
// Deserialize the value.
let value = bincode::deserialize(&value)?;
// Push the key-value pair to the vector.
entries.push((key, value));
} else {
// If the 'entry_map' no longer matches the 'serialized_map',
// we've moved past the relevant keys and can break the loop.
break;
}
}

Ok(entries)
}
Expand All @@ -390,11 +421,6 @@ impl<
/// Returns the speculative key-value pairs for the given map, if it exists.
///
fn get_map_speculative(&'a self, map: &M) -> Result<Vec<(K, V)>> {
// If there is no atomic batch in progress, then return the confirmed key-value pairs.
if !self.is_atomic_in_progress() {
return self.get_map_confirmed(map);
}

// Retrieve the confirmed key-value pairs for the given map.
let mut key_values = self.get_map_confirmed(map)?;

Expand Down Expand Up @@ -547,17 +573,34 @@ impl<
.db_iter
.next()?
.map_err(|e| {
error!("RocksDB iterator error: {e}");
error!("RocksDB NestedIter iterator error: {e}");
})
.ok()?;

// Extract the bytes belonging to the map and the key.
let (entry_map, entry_key) = get_map_and_key(&map_key)?;
// Deserialize the map and key.
let map = bincode::deserialize(entry_map).ok()?;
let key = bincode::deserialize(entry_key).ok()?;
let (entry_map, entry_key) = get_map_and_key(&map_key)
.map_err(|e| {
error!("RocksDB NestedIter get_map_and_key error: {e}");
})
.ok()?;

// Deserialize the map, key, and value.
let map = bincode::deserialize(entry_map)
.map_err(|e| {
error!("RocksDB NestedIter deserialize(map) error: {e}");
})
.ok()?;
let key = bincode::deserialize(entry_key)
.map_err(|e| {
error!("RocksDB NestedIter deserialize(key) error: {e}");
})
.ok()?;
// Deserialize the value.
let value = bincode::deserialize(&value).ok()?;
let value = bincode::deserialize(&value)
.map_err(|e| {
error!("RocksDB NestedIter deserialize(value) error: {e}");
})
.ok()?;

Some((Cow::Owned(map), Cow::Owned(key), Cow::Owned(value)))
}
Expand Down Expand Up @@ -597,15 +640,28 @@ impl<
.db_iter
.next()?
.map_err(|e| {
error!("RocksDB iterator error: {e}");
error!("RocksDB NestedKeys iterator error: {e}");
})
.ok()?;

// Extract the bytes belonging to the map and the key.
let (entry_map, entry_key) = get_map_and_key(&map_key)?;
let (entry_map, entry_key) = get_map_and_key(&map_key)
.map_err(|e| {
error!("RocksDB NestedKeys get_map_and_key error: {e}");
})
.ok()?;

// Deserialize the map and key.
let map = bincode::deserialize(entry_map).ok()?;
let key = bincode::deserialize(entry_key).ok()?;
let map = bincode::deserialize(entry_map)
.map_err(|e| {
error!("RocksDB NestedKeys deserialize(map) error: {e}");
})
.ok()?;
let key = bincode::deserialize(entry_key)
.map_err(|e| {
error!("RocksDB NestedKeys deserialize(key) error: {e}");
})
.ok()?;

Some((Cow::Owned(map), Cow::Owned(key)))
}
Expand All @@ -631,10 +687,16 @@ impl<'a, V: 'a + Clone + PartialEq + Eq + Serialize + DeserializeOwned> Iterator
.db_iter
.next()?
.map_err(|e| {
error!("RocksDB iterator error: {e}");
error!("RocksDB NestedValues iterator error: {e}");
})
.ok()?;

// Deserialize the value.
let value = bincode::deserialize(&value)
.map_err(|e| {
error!("RocksDB NestedValues deserialize(value) error: {e}");
})
.ok()?;
let value = bincode::deserialize(&value).ok()?;

Some(Cow::Owned(value))
}
Expand Down
Loading