Skip to content

Commit

Permalink
refactor: prefixes all loads and stores with home_name or entity in A…
Browse files Browse the repository at this point in the history
…gentDB
  • Loading branch information
luketchang committed Oct 15, 2021
1 parent f242a39 commit d68b084
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,50 +29,54 @@ static LATEST_LEAF_INDEX: &str = "latest_known_leaf_index_";
///
/// Key structure: ```<home_name>_<additional_prefix(es)>_<key>```
#[derive(Debug, Clone)]
pub struct HomeDB(TypedDB);
pub struct AgentDB(TypedDB);

impl HomeDB {
impl AgentDB {
/// Instantiated new `HomeDB`
pub fn new(db: DB, home_name: String) -> Self {
Self(TypedDB::new(db, home_name))
pub fn new(db: DB) -> Self {
Self(TypedDB::new(db))
}

/// Store encodable value
pub fn store_encodable<V: Encode>(
&self,
entity: String,
prefix: impl AsRef<[u8]>,
key: impl AsRef<[u8]>,
value: &V,
) -> Result<(), DbError> {
self.0.store_encodable(prefix, key, value)
self.0.store_encodable(entity, prefix, key, value)
}

/// Retrieve decodable value
pub fn retrieve_decodable<V: Decode>(
&self,
entity: String,
prefix: impl AsRef<[u8]>,
key: impl AsRef<[u8]>,
) -> Result<Option<V>, DbError> {
self.0.retrieve_decodable(prefix, key)
self.0.retrieve_decodable(entity, prefix, key)
}

/// Store encodable kv pair
pub fn store_keyed_encodable<K: Encode, V: Encode>(
&self,
entity: String,
prefix: impl AsRef<[u8]>,
key: &K,
value: &V,
) -> Result<(), DbError> {
self.0.store_encodable(prefix, key.to_vec(), value)
self.0.store_keyed_encodable(entity, prefix, key, value)
}

/// Retrieve decodable value given encodable key
pub fn retrieve_keyed_decodable<K: Encode, V: Decode>(
&self,
entity: String,
prefix: impl AsRef<[u8]>,
key: &K,
) -> Result<Option<V>, DbError> {
self.0.retrieve_decodable(prefix, key.to_vec())
self.0.retrieve_keyed_decodable(entity, prefix, key)
}

/// Store a raw committed message
Expand All @@ -81,7 +85,11 @@ impl HomeDB {
/// - `destination_and_nonce` --> `leaf`
/// - `leaf_index` --> `leaf`
/// - `leaf` --> `message`
pub fn store_raw_committed_message(&self, message: &RawCommittedMessage) -> Result<()> {
pub fn store_raw_committed_message(
&self,
home_name: String,
message: &RawCommittedMessage,
) -> Result<()> {
let parsed = OpticsMessage::read_from(&mut message.message.clone().as_slice())?;

let destination_and_nonce = parsed.destination_and_nonce();
Expand All @@ -96,31 +104,41 @@ impl HomeDB {
leaf_index = message.leaf_index,
"storing raw committed message in db"
);
self.store_leaf(message.leaf_index, destination_and_nonce, leaf)?;
self.store_keyed_encodable(MESSAGE, &leaf, message)?;
self.store_leaf(
home_name.to_owned(),
message.leaf_index,
destination_and_nonce,
leaf,
)?;
self.store_keyed_encodable(home_name.to_owned(), MESSAGE, &leaf, message)?;
Ok(())
}

/// Store the latest known leaf_index
///
/// Key --> value: `LATEST_LEAF_INDEX` --> `leaf_index`
pub fn update_latest_leaf_index(&self, leaf_index: u32) -> Result<(), DbError> {
if let Ok(Some(idx)) = self.retrieve_latest_leaf_index() {
pub fn update_latest_leaf_index(
&self,
home_name: String,
leaf_index: u32,
) -> Result<(), DbError> {
if let Ok(Some(idx)) = self.retrieve_latest_leaf_index(home_name.to_owned()) {
if leaf_index <= idx {
return Ok(());
}
}
self.store_encodable("", LATEST_LEAF_INDEX, &leaf_index)
self.store_encodable(home_name.to_owned(), "", LATEST_LEAF_INDEX, &leaf_index)
}

/// Retrieve the highest known leaf_index
pub fn retrieve_latest_leaf_index(&self) -> Result<Option<u32>, DbError> {
self.retrieve_decodable("", LATEST_LEAF_INDEX)
pub fn retrieve_latest_leaf_index(&self, home_name: String) -> Result<Option<u32>, DbError> {
self.retrieve_decodable(home_name, "", LATEST_LEAF_INDEX)
}

/// Store the leaf keyed by leaf_index
fn store_leaf(
&self,
home_name: String,
leaf_index: u32,
destination_and_nonce: u64,
leaf: H256,
Expand All @@ -130,75 +148,100 @@ impl HomeDB {
leaf = ?leaf,
"storing leaf hash keyed by index and dest+nonce"
);
self.store_keyed_encodable(LEAF, &destination_and_nonce, &leaf)?;
self.store_keyed_encodable(LEAF, &leaf_index, &leaf)?;
self.update_latest_leaf_index(leaf_index)
self.store_keyed_encodable(home_name.to_owned(), LEAF, &destination_and_nonce, &leaf)?;
self.store_keyed_encodable(home_name.to_owned(), LEAF, &leaf_index, &leaf)?;
self.update_latest_leaf_index(home_name.to_owned(), leaf_index)
}

/// Retrieve a raw committed message by its leaf hash
pub fn message_by_leaf(&self, leaf: H256) -> Result<Option<RawCommittedMessage>, DbError> {
self.retrieve_keyed_decodable(MESSAGE, &leaf)
pub fn message_by_leaf(
&self,
home_name: String,
leaf: H256,
) -> Result<Option<RawCommittedMessage>, DbError> {
self.retrieve_keyed_decodable(home_name, MESSAGE, &leaf)
}

/// Retrieve the leaf hash keyed by leaf index
pub fn leaf_by_leaf_index(&self, leaf_index: u32) -> Result<Option<H256>, DbError> {
self.retrieve_keyed_decodable(LEAF, &leaf_index)
pub fn leaf_by_leaf_index(
&self,
home_name: String,
leaf_index: u32,
) -> Result<Option<H256>, DbError> {
self.retrieve_keyed_decodable(home_name, LEAF, &leaf_index)
}

/// Retrieve the leaf hash keyed by destination and nonce
pub fn leaf_by_nonce(&self, destination: u32, nonce: u32) -> Result<Option<H256>, DbError> {
pub fn leaf_by_nonce(
&self,
home_name: String,
destination: u32,
nonce: u32,
) -> Result<Option<H256>, DbError> {
let dest_and_nonce = utils::destination_and_nonce(destination, nonce);
self.retrieve_keyed_decodable(LEAF, &dest_and_nonce)
self.retrieve_keyed_decodable(home_name, LEAF, &dest_and_nonce)
}

/// Retrieve a raw committed message by its leaf hash
pub fn message_by_nonce(
&self,
home_name: String,
destination: u32,
nonce: u32,
) -> Result<Option<RawCommittedMessage>, DbError> {
let leaf = self.leaf_by_nonce(destination, nonce)?;
let leaf = self.leaf_by_nonce(home_name.to_owned(), destination, nonce)?;
match leaf {
None => Ok(None),
Some(leaf) => self.message_by_leaf(leaf),
Some(leaf) => self.message_by_leaf(home_name.to_owned(), leaf),
}
}

/// Retrieve a raw committed message by its leaf index
pub fn message_by_leaf_index(
&self,
home_name: String,
index: u32,
) -> Result<Option<RawCommittedMessage>, DbError> {
let leaf: Option<H256> = self.leaf_by_leaf_index(index)?;
let leaf: Option<H256> = self.leaf_by_leaf_index(home_name.to_owned(), index)?;
match leaf {
None => Ok(None),
Some(leaf) => self.message_by_leaf(leaf),
Some(leaf) => self.message_by_leaf(home_name.to_owned(), leaf),
}
}

/// Stores the latest inspected nonce for a given replica domain
///
/// Keys --> Values:
/// - `replica_domain` --> `nonce`
pub fn store_latest_nonce(&self, replica_domain: u32, nonce: u32) -> Result<(), DbError> {
self.store_keyed_encodable(LATEST_NONCE, &replica_domain, &nonce)?;
pub fn store_latest_nonce(
&self,
home_name: String,
replica_domain: u32,
nonce: u32,
) -> Result<(), DbError> {
self.store_keyed_encodable(home_name, LATEST_NONCE, &replica_domain, &nonce)?;

Ok(())
}

/// Retrieves the latest inspected nonce for a given replica domain
pub fn retrieve_latest_nonce(&self, replica_domain: u32) -> Result<Option<u32>, DbError> {
self.retrieve_keyed_decodable(LATEST_NONCE, &replica_domain)
pub fn retrieve_latest_nonce(
&self,
home_name: String,
replica_domain: u32,
) -> Result<Option<u32>, DbError> {
self.retrieve_keyed_decodable(home_name, LATEST_NONCE, &replica_domain)
}

/// Retrieve the latest committed
pub fn retrieve_latest_root(&self) -> Result<Option<H256>, DbError> {
self.retrieve_decodable("", LATEST_ROOT)
/// Store the latest committed
fn store_latest_root(&self, entity: String, root: H256) -> Result<(), DbError> {
debug!(root = ?root, "storing new latest root in DB");
self.store_encodable(entity, "", LATEST_ROOT, &root)
}

fn store_latest_root(&self, root: H256) -> Result<(), DbError> {
debug!(root = ?root, "storing new latest root in DB");
self.store_encodable("", LATEST_ROOT, &root)
/// Retrieve the latest committed
pub fn retrieve_latest_root(&self, entity: String) -> Result<Option<H256>, DbError> {
self.retrieve_decodable(entity, "", LATEST_ROOT)
}

/// Store update metadata (by update's new root)
Expand All @@ -207,17 +250,22 @@ impl HomeDB {
/// - `update_new_root` --> `update_metadata`
pub fn store_update_metadata(
&self,
entity: String,
new_root: H256,
metadata: UpdateMeta,
) -> Result<(), DbError> {
debug!(new_root = ?new_root, metadata = ?metadata, "storing update metadata in DB");

self.store_keyed_encodable(UPDATE_META, &new_root, &metadata)
self.store_keyed_encodable(entity, UPDATE_META, &new_root, &metadata)
}

/// Retrieve update metadata (by update's new root)
pub fn retrieve_update_metadata(&self, new_root: H256) -> Result<Option<UpdateMeta>, DbError> {
self.retrieve_keyed_decodable(UPDATE_META, &new_root)
pub fn retrieve_update_metadata(
&self,
entity: String,
new_root: H256,
) -> Result<Option<UpdateMeta>, DbError> {
self.retrieve_keyed_decodable(entity, UPDATE_META, &new_root)
}

/// Store a signed update building off latest root
Expand All @@ -226,7 +274,11 @@ impl HomeDB {
/// - `LATEST_ROOT` --> `root`
/// - `new_root` --> `prev_root`
/// - `prev_root` --> `update`
pub fn store_latest_update(&self, update: &SignedUpdate) -> Result<(), DbError> {
pub fn store_latest_update(
&self,
entity: String,
update: &SignedUpdate,
) -> Result<(), DbError> {
debug!(
previous_root = ?update.update.previous_root,
new_root = ?update.update.new_root,
Expand All @@ -235,22 +287,28 @@ impl HomeDB {

// If there is no latest root, or if this update is on the latest root
// update latest root
match self.retrieve_latest_root()? {
match self.retrieve_latest_root(entity.to_owned())? {
Some(root) => {
if root == update.update.previous_root {
self.store_latest_root(update.update.new_root)?;
self.store_latest_root(entity.to_owned(), update.update.new_root)?;
} else {
warn!(
"Attempted to store update not building off latest root: {:?}",
update
)
}
}
None => self.store_latest_root(update.update.new_root)?,
None => self.store_latest_root(entity.to_owned(), update.update.new_root)?,
}

self.store_keyed_encodable(UPDATE, &update.update.previous_root, update)?;
self.store_keyed_encodable(
entity.to_owned(),
UPDATE,
&update.update.previous_root,
update,
)?;
self.store_keyed_encodable(
entity.to_owned(),
PREV_ROOT,
&update.update.new_root,
&update.update.previous_root,
Expand All @@ -260,17 +318,23 @@ impl HomeDB {
/// Retrieve an update by its previous root
pub fn update_by_previous_root(
&self,
entity: String,
previous_root: H256,
) -> Result<Option<SignedUpdate>, DbError> {
self.retrieve_keyed_decodable(UPDATE, &previous_root)
self.retrieve_keyed_decodable(entity, UPDATE, &previous_root)
}

/// Retrieve an update by its new root
pub fn update_by_new_root(&self, new_root: H256) -> Result<Option<SignedUpdate>, DbError> {
let prev_root: Option<H256> = self.retrieve_keyed_decodable(PREV_ROOT, &new_root)?;
pub fn update_by_new_root(
&self,
entity: String,
new_root: H256,
) -> Result<Option<SignedUpdate>, DbError> {
let prev_root: Option<H256> =
self.retrieve_keyed_decodable(entity.to_owned(), PREV_ROOT, &new_root)?;

match prev_root {
Some(prev_root) => self.update_by_previous_root(prev_root),
Some(prev_root) => self.update_by_previous_root(entity.to_owned(), prev_root),
None => Ok(None),
}
}
Expand All @@ -284,26 +348,36 @@ impl HomeDB {
///
/// Keys --> Values:
/// - `leaf_index` --> `proof`
pub fn store_proof(&self, leaf_index: u32, proof: &Proof) -> Result<(), DbError> {
pub fn store_proof(
&self,
home_name: String,
leaf_index: u32,
proof: &Proof,
) -> Result<(), DbError> {
debug!(leaf_index, "storing proof in DB");
self.store_keyed_encodable(PROOF, &leaf_index, proof)
self.store_keyed_encodable(home_name, PROOF, &leaf_index, proof)
}

/// Retrieve a proof by its leaf index
pub fn proof_by_leaf_index(&self, leaf_index: u32) -> Result<Option<Proof>, DbError> {
self.retrieve_keyed_decodable(PROOF, &leaf_index)
pub fn proof_by_leaf_index(
&self,
home_name: String,
leaf_index: u32,
) -> Result<Option<Proof>, DbError> {
self.retrieve_keyed_decodable(home_name, PROOF, &leaf_index)
}

// TODO(james): this is a quick-fix for the prover_sync and I don't like it
/// poll db ever 100 milliseconds waitinf for a leaf.
pub fn wait_for_leaf(
&self,
home_name: String,
leaf_index: u32,
) -> impl Future<Output = Result<Option<H256>, DbError>> {
let slf = self.clone();
async move {
loop {
if let Some(leaf) = slf.leaf_by_leaf_index(leaf_index)? {
if let Some(leaf) = slf.leaf_by_leaf_index(home_name.to_owned(), leaf_index)? {
return Ok(Some(leaf));
}
sleep(Duration::from_millis(100)).await
Expand Down
Loading

0 comments on commit d68b084

Please sign in to comment.