diff --git a/rust/agents/processor/src/processor.rs b/rust/agents/processor/src/processor.rs index 83efa2a4f..464ec2c0d 100644 --- a/rust/agents/processor/src/processor.rs +++ b/rust/agents/processor/src/processor.rs @@ -15,7 +15,7 @@ use tracing::{debug, error, info, info_span, instrument, instrument::Instrumente use optics_base::{cancel_task, decl_agent, AgentCore, Homes, OpticsAgent, Replicas}; use optics_core::{ - accumulator::merkle::Proof, db::HomeDB, CommittedMessage, Common, Home, MessageStatus, + accumulator::merkle::Proof, db::OpticsDB, CommittedMessage, Common, Home, MessageStatus, }; use crate::{ @@ -38,7 +38,7 @@ pub(crate) struct Replica { interval: u64, replica: Arc, home: Arc, - home_db: HomeDB, + db: OpticsDB, allowed: Option>>, denied: Option>>, next_message_nonce: Arc, @@ -72,8 +72,8 @@ impl Replica { // 4. Check if the proof is valid under the replica // 5. Submit the proof to the replica let mut next_message_nonce: u32 = self - .home_db - .retrieve_latest_nonce(domain)? + .db + .retrieve_latest_nonce(self.home.name(), domain)? .map(|n: u32| n + 1) .unwrap_or_default(); @@ -107,8 +107,8 @@ impl Replica { .await { Ok(Flow::Advance) => { - self.home_db - .store_latest_nonce(domain, next_message_nonce)?; + self.db + .store_latest_nonce(self.home.name(), domain, next_message_nonce)?; next_message_nonce += 1; self.next_message_nonce @@ -199,7 +199,10 @@ impl Replica { return Ok(Flow::Advance); } - let proof = match self.home_db.proof_by_leaf_index(message.leaf_index) { + let proof = match self + .db + .proof_by_leaf_index(self.home.name(), message.leaf_index) + { Ok(Some(p)) => p, Ok(None) => { info!( @@ -356,7 +359,7 @@ impl OpticsAgent for Processor { let home = self.home(); let next_message_nonce = self.next_message_nonce.clone(); let interval = self.interval; - let home_db = self.home_db(); + let db = OpticsDB::new(self.db()); let replica_opt = self.replica_by_name(name); let name = name.to_owned(); @@ -371,7 +374,7 @@ impl OpticsAgent for Processor { interval, replica, home, - home_db, + db, allowed, denied, next_message_nonce, @@ -391,7 +394,8 @@ impl OpticsAgent for Processor { // tree sync info!("Starting ProverSync"); - let sync = ProverSync::from_disk(self.home_db()); + let sync = + ProverSync::from_disk(self.home().name().to_owned(), OpticsDB::new(self.db())); let sync_task = sync.spawn(); info!("Starting indexer"); @@ -430,7 +434,7 @@ impl OpticsAgent for Processor { self.core.home.name(), &config.bucket, config.region.parse().expect("invalid s3 region"), - self.home_db(), + OpticsDB::new(self.db()), ) .spawn(), ) diff --git a/rust/agents/processor/src/prover_sync.rs b/rust/agents/processor/src/prover_sync.rs index 26549b496..09496bf9a 100644 --- a/rust/agents/processor/src/prover_sync.rs +++ b/rust/agents/processor/src/prover_sync.rs @@ -3,7 +3,7 @@ use color_eyre::eyre::{bail, Result}; use ethers::core::types::H256; use optics_core::{ accumulator::{incremental::IncrementalMerkle, INITIAL_ROOT}, - db::{DbError, HomeDB}, + db::{DbError, OpticsDB}, ChainCommunicationError, }; use std::{fmt::Display, ops::Range, time::Duration}; @@ -13,7 +13,8 @@ use tracing::{debug, error, info, info_span, instrument, instrument::Instrumente /// Struct to sync prover. #[derive(Debug)] pub struct ProverSync { - db: HomeDB, + home_name: String, + db: OpticsDB, prover: Prover, incremental: IncrementalMerkle, } @@ -70,7 +71,7 @@ impl ProverSync { fn store_proof(&self, leaf_index: u32) -> Result<(), ProverSyncError> { match self.prover.prove(leaf_index as usize) { Ok(proof) => { - self.db.store_proof(leaf_index, &proof)?; + self.db.store_proof(&self.home_name, leaf_index, &proof)?; info!( leaf_index, root = ?self.prover.root(), @@ -90,14 +91,14 @@ impl ProverSync { /// Given rocksdb handle `db` containing merkle tree leaves, /// instantiates new `ProverSync` and fills prover's merkle tree #[instrument(level = "debug", skip(db))] - pub fn from_disk(db: HomeDB) -> Self { + pub fn from_disk(home_name: String, db: OpticsDB) -> Self { // Ingest all leaves in db into prover tree let mut prover = Prover::default(); let mut incremental = IncrementalMerkle::default(); - if let Some(root) = db.retrieve_latest_root().expect("db error") { + if let Some(root) = db.retrieve_latest_root(&home_name).expect("db error") { for i in 0.. { - match db.leaf_by_leaf_index(i) { + match db.leaf_by_leaf_index(&home_name, i) { Ok(Some(leaf)) => { debug!(leaf_index = i, "Ingesting leaf from_disk"); prover.ingest(leaf).expect("!tree full"); @@ -118,6 +119,7 @@ impl ProverSync { } let sync = Self { + home_name: home_name.to_owned(), prover, incremental, db, @@ -126,8 +128,10 @@ impl ProverSync { // Ensure proofs exist for all leaves for i in 0..sync.prover.count() as u32 { match ( - sync.db.leaf_by_leaf_index(i).expect("db error"), - sync.db.proof_by_leaf_index(i).expect("db error"), + sync.db.leaf_by_leaf_index(&home_name, i).expect("db error"), + sync.db + .proof_by_leaf_index(&home_name, i) + .expect("db error"), ) { (Some(_), None) => sync.store_proof(i).expect("db error"), (None, _) => break, @@ -154,7 +158,7 @@ impl ProverSync { let mut leaves = vec![]; for i in range { - let leaf = self.db.wait_for_leaf(i as u32).await?; + let leaf = self.db.wait_for_leaf(&self.home_name, i as u32).await?; if leaf.is_none() { break; } @@ -215,7 +219,11 @@ impl ProverSync { // store all calculated proofs in the db // TODO(luke): refactor prover_sync so we dont have to iterate over every leaf (match from_disk implementation) for idx in 0..self.prover.count() { - if self.db.proof_by_leaf_index(idx as u32)?.is_none() { + if self + .db + .proof_by_leaf_index(&self.home_name, idx as u32)? + .is_none() + { self.store_proof(idx as u32)?; } } @@ -262,7 +270,11 @@ impl ProverSync { // As we fill the incremental merkle, its tree_size will always be // equal to the index of the next leaf we want (e.g. if tree_size // is 3, we want the 4th leaf, which is at index 3) - if let Some(leaf) = self.db.wait_for_leaf(tree_size as u32).await? { + if let Some(leaf) = self + .db + .wait_for_leaf(&self.home_name, tree_size as u32) + .await? + { info!( index = tree_size, leaf = ?leaf, @@ -305,7 +317,9 @@ impl ProverSync { tokio::spawn(async move { loop { let local_root = self.local_root(); - let signed_update_opt = self.db.update_by_previous_root(local_root)?; + let signed_update_opt = self + .db + .update_by_previous_root(&self.home_name, local_root)?; // This if block is somewhat ugly. // First we check if there is a signed update with the local root. @@ -321,7 +335,11 @@ impl ProverSync { ); self.update_full(local_root, signed_update.update.new_root) .await?; - } else if !local_root.is_zero() && self.db.update_by_new_root(local_root)?.is_none() + } else if !local_root.is_zero() + && self + .db + .update_by_new_root(&self.home_name, local_root)? + .is_none() { bail!(ProverSyncError::InvalidLocalRoot { local_root }); } diff --git a/rust/agents/processor/src/push.rs b/rust/agents/processor/src/push.rs index 52ede3744..e42edfc41 100644 --- a/rust/agents/processor/src/push.rs +++ b/rust/agents/processor/src/push.rs @@ -5,7 +5,7 @@ use rusoto_s3::{GetObjectError, GetObjectRequest, PutObjectRequest, S3Client, S3 use color_eyre::eyre::{bail, eyre, Result}; -use optics_core::{accumulator::merkle::Proof, db::HomeDB, Encode}; +use optics_core::{accumulator::merkle::Proof, db::OpticsDB, Encode}; use tokio::{task::JoinHandle, time::sleep}; use tracing::{debug, info, info_span, instrument::Instrumented, Instrument}; @@ -20,7 +20,7 @@ pub struct Pusher { name: String, bucket: String, region: Region, - db: HomeDB, + db: OpticsDB, client: S3Client, } @@ -36,7 +36,7 @@ impl std::fmt::Debug for Pusher { impl Pusher { /// Instantiate a new pusher with a region - pub fn new(name: &str, bucket: &str, region: Region, db: HomeDB) -> Self { + pub fn new(name: &str, bucket: &str, region: Region, db: OpticsDB) -> Self { let client = S3Client::new_with( HttpClient::new().unwrap(), EnvironmentProvider::default(), @@ -112,12 +112,12 @@ impl Pusher { tokio::spawn(async move { let mut index = 0; loop { - let proof = self.db.proof_by_leaf_index(index)?; + let proof = self.db.proof_by_leaf_index(&self.name, index)?; match proof { Some(proof) => { let message = self .db - .message_by_leaf_index(index)? + .message_by_leaf_index(&self.name, index)? .ok_or_else(|| eyre!("Missing message for known proof"))?; let proven = ProvenMessage { proof, diff --git a/rust/agents/updater/src/updater.rs b/rust/agents/updater/src/updater.rs index 1954d1667..698fba30e 100644 --- a/rust/agents/updater/src/updater.rs +++ b/rust/agents/updater/src/updater.rs @@ -20,7 +20,7 @@ use tracing::{error, info, instrument::Instrumented, Instrument}; use crate::settings::UpdaterSettings as Settings; use optics_base::{AgentCore, Homes, OpticsAgent}; -use optics_core::{db::HomeDB, Common, Home, SignedUpdate, Signers, Update}; +use optics_core::{db::OpticsDB, Common, Home, SignedUpdate, Signers, Update}; #[derive(Debug)] struct UpdateHandler { @@ -29,7 +29,7 @@ struct UpdateHandler { rx: Receiver, update_pause: u64, signer: Arc, - home_db: HomeDB, + db: OpticsDB, mutex: Arc>, signed_attestation_count: IntCounterVec, } @@ -50,7 +50,7 @@ impl UpdateHandler { rx: Receiver, update_pause: u64, signer: Arc, - home_db: HomeDB, + db: OpticsDB, mutex: Arc>, signed_attestation_count: IntCounterVec, ) -> Self { @@ -59,15 +59,15 @@ impl UpdateHandler { rx, update_pause, signer, - home_db, + db, mutex, signed_attestation_count, } } fn check_conflict(&self, update: &Update) -> Option { - self.home_db - .update_by_previous_root(update.previous_root) + self.db + .update_by_previous_root(self.home.name(), update.previous_root) .expect("db failure") } @@ -150,7 +150,7 @@ impl UpdateHandler { self.home.update(&signed).await?; info!("Storing signed update in db"); - self.home_db.store_latest_update(&signed)?; + self.db.store_latest_update(self.home.name(), &signed)?; Ok(()) // guard dropped here } @@ -266,7 +266,7 @@ impl OpticsAgent for Updater { rx, self.update_pause, self.signer.clone(), - HomeDB::new(self.db(), self.home().name().to_owned()), + OpticsDB::new(self.db()), Default::default(), self.signed_attestation_count.clone(), ); diff --git a/rust/agents/watcher/src/watcher.rs b/rust/agents/watcher/src/watcher.rs index d6ba2c08e..2f385e479 100644 --- a/rust/agents/watcher/src/watcher.rs +++ b/rust/agents/watcher/src/watcher.rs @@ -14,7 +14,7 @@ use tracing::{info, info_span, instrument::Instrumented, Instrument}; use optics_base::{cancel_task, AgentCore, ConnectionManagers, Homes, OpticsAgent}; use optics_core::{ - db::HomeDB, ChainCommunicationError, Common, ConnectionManager, DoubleUpdate, + db::OpticsDB, ChainCommunicationError, Common, ConnectionManager, DoubleUpdate, FailureNotification, Home, SignedUpdate, Signers, TxOutcome, }; @@ -158,13 +158,13 @@ where #[derive(Debug)] pub struct UpdateHandler { rx: mpsc::Receiver, - home_db: HomeDB, + db: OpticsDB, home: Arc, } impl UpdateHandler { - pub fn new(rx: mpsc::Receiver, home_db: HomeDB, home: Arc) -> Self { - Self { rx, home_db, home } + pub fn new(rx: mpsc::Receiver, db: OpticsDB, home: Arc) -> Self { + Self { rx, db, home } } fn check_double_update(&mut self, update: &SignedUpdate) -> Result<(), DoubleUpdate> { @@ -172,8 +172,8 @@ impl UpdateHandler { let new_root = update.update.new_root; match self - .home_db - .update_by_previous_root(old_root) + .db + .update_by_previous_root(self.home.name(), old_root) .expect("!db_get") { Some(existing) => { @@ -182,7 +182,9 @@ impl UpdateHandler { } } None => { - self.home_db.store_latest_update(update).expect("!db_put"); + self.db + .store_latest_update(self.home.name(), update) + .expect("!db_put"); } } @@ -306,7 +308,7 @@ impl Watcher { &self, double_update_tx: oneshot::Sender, ) -> Instrumented>> { - let home_db = HomeDB::new(self.db(), self.home().name().to_owned()); + let db = OpticsDB::new(self.db()); let home = self.home(); let replicas = self.replicas().clone(); let interval_seconds = self.interval_seconds; @@ -316,7 +318,7 @@ impl Watcher { tokio::spawn(async move { // Spawn update handler let (tx, rx) = mpsc::channel(200); - let handler = UpdateHandler::new(rx, home_db, home.clone()).spawn(); + let handler = UpdateHandler::new(rx, db, home.clone()).spawn(); // For each replica, spawn polling and history syncing tasks for (name, replica) in replicas { @@ -672,12 +674,15 @@ mod test { .await .expect("!sign"); + let mut mock_home = MockHomeContract::new(); + mock_home.expect__name().return_const("home_1".to_owned()); + { let (_tx, rx) = mpsc::channel(200); let mut handler = UpdateHandler { rx, - home_db: HomeDB::new(db, "home_1".to_owned()), - home: Arc::new(MockHomeContract::new().into()), + db: OpticsDB::new(db), + home: Arc::new(mock_home.into()), }; let _first_update_ret = handler diff --git a/rust/chains/optics-ethereum/src/home.rs b/rust/chains/optics-ethereum/src/home.rs index 456466122..7e06e7cbb 100644 --- a/rust/chains/optics-ethereum/src/home.rs +++ b/rust/chains/optics-ethereum/src/home.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use color_eyre::Result; use ethers::contract::abigen; use ethers::core::types::{Signature, H256}; -use optics_core::db::{HomeDB, DB}; +use optics_core::db::{OpticsDB, DB}; /* use optics_core::traits::CommittedMessage; use optics_core::SignedUpdateWithMeta; @@ -43,9 +43,10 @@ struct HomeIndexer where M: ethers::providers::Middleware, { + home_name: String, contract: Arc>, provider: Arc, - home_db: HomeDB, + db: OpticsDB, from_height: u32, chunk_size: u32, indexed_height: prometheus::IntGauge, @@ -93,9 +94,10 @@ where }); for update_with_meta in updates_with_meta { - self.home_db - .store_latest_update(&update_with_meta.signed_update)?; - self.home_db.store_update_metadata( + self.db + .store_latest_update(&self.home_name, &update_with_meta.signed_update)?; + self.db.store_update_metadata( + &self.home_name, update_with_meta.signed_update.update.new_root, update_with_meta.metadata, )?; @@ -128,7 +130,8 @@ where }); for message in messages { - self.home_db.store_raw_committed_message(&message)?; + self.db + .store_raw_committed_message(&self.home_name, &message)?; let committed_message: CommittedMessage = message.try_into()?; info!( @@ -148,8 +151,8 @@ where tokio::spawn(async move { let mut next_height: u32 = self - .home_db - .retrieve_decodable("", LAST_INSPECTED) + .db + .retrieve_decodable(&self.home_name, "", LAST_INSPECTED) .expect("db failure") .unwrap_or(self.from_height); info!( @@ -177,8 +180,8 @@ where self.sync_leaves(next_height, to) )?; - self.home_db - .store_encodable("", LAST_INSPECTED, &next_height)?; + self.db + .store_encodable(&self.home_name, "", LAST_INSPECTED, &next_height)?; next_height = to; // sleep here if we've caught up if to == tip { @@ -197,7 +200,7 @@ where M: ethers::providers::Middleware, { contract: Arc>, - home_db: HomeDB, + db: OpticsDB, domain: u32, name: String, provider: Arc, @@ -222,7 +225,7 @@ where contract: Arc::new(EthereumHomeInternal::new(address, provider.clone())), domain: *domain, name: name.to_owned(), - home_db: HomeDB::new(db, name.to_owned()), + db: OpticsDB::new(db), provider, } } @@ -275,7 +278,7 @@ where old_root: H256, ) -> Result, ChainCommunicationError> { loop { - if let Some(update) = self.home_db.update_by_previous_root(old_root)? { + if let Some(update) = self.db.update_by_previous_root(&self.name, old_root)? { return Ok(Some(update)); } sleep(Duration::from_millis(500)).await; @@ -288,7 +291,7 @@ where new_root: H256, ) -> Result, ChainCommunicationError> { loop { - if let Some(update) = self.home_db.update_by_new_root(new_root)? { + if let Some(update) = self.db.update_by_new_root(&self.name, new_root)? { return Ok(Some(update)); } sleep(Duration::from_millis(500)).await; @@ -343,8 +346,9 @@ where indexed_height: prometheus::IntGauge, ) -> Instrumented>> { let indexer = HomeIndexer { + home_name: self.name.to_owned(), contract: self.contract.clone(), - home_db: self.home_db.clone(), + db: self.db.clone(), from_height, provider: self.provider.clone(), chunk_size, @@ -360,7 +364,7 @@ where nonce: u32, ) -> Result, ChainCommunicationError> { loop { - if let Some(update) = self.home_db.message_by_nonce(destination, nonce)? { + if let Some(update) = self.db.message_by_nonce(&self.name, destination, nonce)? { return Ok(Some(update)); } sleep(Duration::from_millis(500)).await; @@ -373,7 +377,7 @@ where leaf: H256, ) -> Result, ChainCommunicationError> { loop { - if let Some(update) = self.home_db.message_by_leaf(leaf)? { + if let Some(update) = self.db.message_by_leaf(&self.name, leaf)? { return Ok(Some(update)); } sleep(Duration::from_millis(500)).await; @@ -385,7 +389,7 @@ where tree_index: usize, ) -> Result, ChainCommunicationError> { loop { - if let Some(update) = self.home_db.leaf_by_leaf_index(tree_index as u32)? { + if let Some(update) = self.db.leaf_by_leaf_index(&self.name, tree_index as u32)? { return Ok(Some(update)); } sleep(Duration::from_millis(500)).await; diff --git a/rust/optics-base/src/agent.rs b/rust/optics-base/src/agent.rs index 25416210a..77dfb5a01 100644 --- a/rust/optics-base/src/agent.rs +++ b/rust/optics-base/src/agent.rs @@ -8,10 +8,7 @@ use crate::{ use async_trait::async_trait; use color_eyre::{eyre::WrapErr, Result}; use futures_util::future::select_all; -use optics_core::{ - db::{HomeDB, DB}, - Common, Home, -}; +use optics_core::{db::DB, Common, Home}; use tracing::instrument::Instrumented; use tracing::{info_span, Instrument}; @@ -61,11 +58,6 @@ pub trait OpticsAgent: Send + Sync + std::fmt::Debug + AsRef { self.as_ref().db.clone() } - /// Return a handle to the DB with the home schema - fn home_db(&self) -> HomeDB { - HomeDB::new(self.as_ref().db.clone(), self.home().name().to_owned()) - } - /// Return a reference to a home contract fn home(&self) -> Arc { self.as_ref().home.clone() diff --git a/rust/optics-core/src/db/mod.rs b/rust/optics-core/src/db/mod.rs index ff078aeda..7a5d6bdbe 100644 --- a/rust/optics-core/src/db/mod.rs +++ b/rust/optics-core/src/db/mod.rs @@ -11,8 +11,8 @@ mod typed_db; pub use typed_db::*; /// DB operations tied to specific home -mod home_db; -pub use home_db::*; +mod optics_db; +pub use optics_db::*; use crate::{Decode, Encode, OpticsError}; diff --git a/rust/optics-core/src/db/home_db.rs b/rust/optics-core/src/db/optics_db.rs similarity index 56% rename from rust/optics-core/src/db/home_db.rs rename to rust/optics-core/src/db/optics_db.rs index 964c2b4a2..b59e821af 100644 --- a/rust/optics-core/src/db/home_db.rs +++ b/rust/optics-core/src/db/optics_db.rs @@ -29,50 +29,54 @@ static LATEST_LEAF_INDEX: &str = "latest_known_leaf_index_"; /// /// Key structure: ```__``` #[derive(Debug, Clone)] -pub struct HomeDB(TypedDB); +pub struct OpticsDB(TypedDB); -impl HomeDB { - /// Instantiated new `HomeDB` - pub fn new(db: DB, home_name: String) -> Self { - Self(TypedDB::new(db, home_name)) +impl OpticsDB { + /// Instantiated new `OpticsDB` + pub fn new(db: DB) -> Self { + Self(TypedDB::new(db)) } /// Store encodable value pub fn store_encodable( &self, + entity: impl AsRef<[u8]>, prefix: impl AsRef<[u8]>, key: impl AsRef<[u8]>, value: &V, ) -> Result<(), DbError> { - self.0.store_encodable(prefix, key, value) + self.0.store_encodable(entity, prefix, key, value) } /// Retrieve decodable value pub fn retrieve_decodable( &self, + entity: impl AsRef<[u8]>, prefix: impl AsRef<[u8]>, key: impl AsRef<[u8]>, ) -> Result, DbError> { - self.0.retrieve_decodable(prefix, key) + self.0.retrieve_decodable(entity, prefix, key) } /// Store encodable kv pair pub fn store_keyed_encodable( &self, + entity: impl AsRef<[u8]>, prefix: impl AsRef<[u8]>, key: &K, value: &V, ) -> Result<(), DbError> { - self.0.store_encodable(prefix, key.to_vec(), value) + self.0.store_keyed_encodable(entity, prefix, key, value) } /// Retrieve decodable value given encodable key pub fn retrieve_keyed_decodable( &self, + entity: impl AsRef<[u8]>, prefix: impl AsRef<[u8]>, key: &K, ) -> Result, DbError> { - self.0.retrieve_decodable(prefix, key.to_vec()) + self.0.retrieve_keyed_decodable(entity, prefix, key) } /// Store a raw committed message @@ -81,7 +85,11 @@ impl HomeDB { /// - `destination_and_nonce` --> `leaf` /// - `leaf_index` --> `leaf` /// - `leaf` --> `message` - pub fn store_raw_committed_message(&self, message: &RawCommittedMessage) -> Result<()> { + pub fn store_raw_committed_message( + &self, + home_name: impl AsRef<[u8]>, + message: &RawCommittedMessage, + ) -> Result<()> { let parsed = OpticsMessage::read_from(&mut message.message.clone().as_slice())?; let destination_and_nonce = parsed.destination_and_nonce(); @@ -96,31 +104,39 @@ impl HomeDB { leaf_index = message.leaf_index, "storing raw committed message in db" ); - self.store_leaf(message.leaf_index, destination_and_nonce, leaf)?; - self.store_keyed_encodable(MESSAGE, &leaf, message)?; + self.store_leaf(&home_name, message.leaf_index, destination_and_nonce, leaf)?; + self.store_keyed_encodable(&home_name, MESSAGE, &leaf, message)?; Ok(()) } /// Store the latest known leaf_index /// /// Key --> value: `LATEST_LEAF_INDEX` --> `leaf_index` - pub fn update_latest_leaf_index(&self, leaf_index: u32) -> Result<(), DbError> { - if let Ok(Some(idx)) = self.retrieve_latest_leaf_index() { + pub fn update_latest_leaf_index( + &self, + home_name: impl AsRef<[u8]>, + leaf_index: u32, + ) -> Result<(), DbError> { + if let Ok(Some(idx)) = self.retrieve_latest_leaf_index(&home_name) { if leaf_index <= idx { return Ok(()); } } - self.store_encodable("", LATEST_LEAF_INDEX, &leaf_index) + self.store_encodable(&home_name, "", LATEST_LEAF_INDEX, &leaf_index) } /// Retrieve the highest known leaf_index - pub fn retrieve_latest_leaf_index(&self) -> Result, DbError> { - self.retrieve_decodable("", LATEST_LEAF_INDEX) + pub fn retrieve_latest_leaf_index( + &self, + home_name: impl AsRef<[u8]>, + ) -> Result, DbError> { + self.retrieve_decodable(home_name, "", LATEST_LEAF_INDEX) } /// Store the leaf keyed by leaf_index fn store_leaf( &self, + home_name: impl AsRef<[u8]>, leaf_index: u32, destination_and_nonce: u64, leaf: H256, @@ -130,49 +146,64 @@ impl HomeDB { leaf = ?leaf, "storing leaf hash keyed by index and dest+nonce" ); - self.store_keyed_encodable(LEAF, &destination_and_nonce, &leaf)?; - self.store_keyed_encodable(LEAF, &leaf_index, &leaf)?; - self.update_latest_leaf_index(leaf_index) + self.store_keyed_encodable(&home_name, LEAF, &destination_and_nonce, &leaf)?; + self.store_keyed_encodable(&home_name, LEAF, &leaf_index, &leaf)?; + self.update_latest_leaf_index(&home_name, leaf_index) } /// Retrieve a raw committed message by its leaf hash - pub fn message_by_leaf(&self, leaf: H256) -> Result, DbError> { - self.retrieve_keyed_decodable(MESSAGE, &leaf) + pub fn message_by_leaf( + &self, + home_name: impl AsRef<[u8]>, + leaf: H256, + ) -> Result, DbError> { + self.retrieve_keyed_decodable(home_name, MESSAGE, &leaf) } /// Retrieve the leaf hash keyed by leaf index - pub fn leaf_by_leaf_index(&self, leaf_index: u32) -> Result, DbError> { - self.retrieve_keyed_decodable(LEAF, &leaf_index) + pub fn leaf_by_leaf_index( + &self, + home_name: impl AsRef<[u8]>, + leaf_index: u32, + ) -> Result, DbError> { + self.retrieve_keyed_decodable(home_name, LEAF, &leaf_index) } /// Retrieve the leaf hash keyed by destination and nonce - pub fn leaf_by_nonce(&self, destination: u32, nonce: u32) -> Result, DbError> { + pub fn leaf_by_nonce( + &self, + home_name: impl AsRef<[u8]>, + destination: u32, + nonce: u32, + ) -> Result, DbError> { let dest_and_nonce = utils::destination_and_nonce(destination, nonce); - self.retrieve_keyed_decodable(LEAF, &dest_and_nonce) + self.retrieve_keyed_decodable(home_name, LEAF, &dest_and_nonce) } /// Retrieve a raw committed message by its leaf hash pub fn message_by_nonce( &self, + home_name: impl AsRef<[u8]>, destination: u32, nonce: u32, ) -> Result, DbError> { - let leaf = self.leaf_by_nonce(destination, nonce)?; + let leaf = self.leaf_by_nonce(&home_name, destination, nonce)?; match leaf { None => Ok(None), - Some(leaf) => self.message_by_leaf(leaf), + Some(leaf) => self.message_by_leaf(&home_name, leaf), } } /// Retrieve a raw committed message by its leaf index pub fn message_by_leaf_index( &self, + home_name: impl AsRef<[u8]>, index: u32, ) -> Result, DbError> { - let leaf: Option = self.leaf_by_leaf_index(index)?; + let leaf: Option = self.leaf_by_leaf_index(&home_name, index)?; match leaf { None => Ok(None), - Some(leaf) => self.message_by_leaf(leaf), + Some(leaf) => self.message_by_leaf(&home_name, leaf), } } @@ -180,25 +211,35 @@ impl HomeDB { /// /// Keys --> Values: /// - `replica_domain` --> `nonce` - pub fn store_latest_nonce(&self, replica_domain: u32, nonce: u32) -> Result<(), DbError> { - self.store_keyed_encodable(LATEST_NONCE, &replica_domain, &nonce)?; + pub fn store_latest_nonce( + &self, + home_name: impl AsRef<[u8]>, + replica_domain: u32, + nonce: u32, + ) -> Result<(), DbError> { + self.store_keyed_encodable(home_name, LATEST_NONCE, &replica_domain, &nonce)?; Ok(()) } /// Retrieves the latest inspected nonce for a given replica domain - pub fn retrieve_latest_nonce(&self, replica_domain: u32) -> Result, DbError> { - self.retrieve_keyed_decodable(LATEST_NONCE, &replica_domain) + pub fn retrieve_latest_nonce( + &self, + home_name: impl AsRef<[u8]>, + replica_domain: u32, + ) -> Result, DbError> { + self.retrieve_keyed_decodable(home_name, LATEST_NONCE, &replica_domain) } - /// Retrieve the latest committed - pub fn retrieve_latest_root(&self) -> Result, DbError> { - self.retrieve_decodable("", LATEST_ROOT) + /// Store the latest committed + fn store_latest_root(&self, entity: impl AsRef<[u8]>, root: H256) -> Result<(), DbError> { + debug!(root = ?root, "storing new latest root in DB"); + self.store_encodable(entity, "", LATEST_ROOT, &root) } - fn store_latest_root(&self, root: H256) -> Result<(), DbError> { - debug!(root = ?root, "storing new latest root in DB"); - self.store_encodable("", LATEST_ROOT, &root) + /// Retrieve the latest committed + pub fn retrieve_latest_root(&self, entity: impl AsRef<[u8]>) -> Result, DbError> { + self.retrieve_decodable(entity, "", LATEST_ROOT) } /// Store update metadata (by update's new root) @@ -207,17 +248,22 @@ impl HomeDB { /// - `update_new_root` --> `update_metadata` pub fn store_update_metadata( &self, + entity: impl AsRef<[u8]>, new_root: H256, metadata: UpdateMeta, ) -> Result<(), DbError> { debug!(new_root = ?new_root, metadata = ?metadata, "storing update metadata in DB"); - self.store_keyed_encodable(UPDATE_META, &new_root, &metadata) + self.store_keyed_encodable(entity, UPDATE_META, &new_root, &metadata) } /// Retrieve update metadata (by update's new root) - pub fn retrieve_update_metadata(&self, new_root: H256) -> Result, DbError> { - self.retrieve_keyed_decodable(UPDATE_META, &new_root) + pub fn retrieve_update_metadata( + &self, + entity: impl AsRef<[u8]>, + new_root: H256, + ) -> Result, DbError> { + self.retrieve_keyed_decodable(entity, UPDATE_META, &new_root) } /// Store a signed update building off latest root @@ -226,7 +272,11 @@ impl HomeDB { /// - `LATEST_ROOT` --> `root` /// - `new_root` --> `prev_root` /// - `prev_root` --> `update` - pub fn store_latest_update(&self, update: &SignedUpdate) -> Result<(), DbError> { + pub fn store_latest_update( + &self, + entity: impl AsRef<[u8]>, + update: &SignedUpdate, + ) -> Result<(), DbError> { debug!( previous_root = ?update.update.previous_root, new_root = ?update.update.new_root, @@ -235,10 +285,10 @@ impl HomeDB { // If there is no latest root, or if this update is on the latest root // update latest root - match self.retrieve_latest_root()? { + match self.retrieve_latest_root(&entity)? { Some(root) => { if root == update.update.previous_root { - self.store_latest_root(update.update.new_root)?; + self.store_latest_root(&entity, update.update.new_root)?; } else { warn!( "Attempted to store update not building off latest root: {:?}", @@ -246,11 +296,12 @@ impl HomeDB { ) } } - None => self.store_latest_root(update.update.new_root)?, + None => self.store_latest_root(&entity, update.update.new_root)?, } - self.store_keyed_encodable(UPDATE, &update.update.previous_root, update)?; + self.store_keyed_encodable(&entity, UPDATE, &update.update.previous_root, update)?; self.store_keyed_encodable( + &entity, PREV_ROOT, &update.update.new_root, &update.update.previous_root, @@ -260,17 +311,23 @@ impl HomeDB { /// Retrieve an update by its previous root pub fn update_by_previous_root( &self, + entity: impl AsRef<[u8]>, previous_root: H256, ) -> Result, DbError> { - self.retrieve_keyed_decodable(UPDATE, &previous_root) + self.retrieve_keyed_decodable(entity, UPDATE, &previous_root) } /// Retrieve an update by its new root - pub fn update_by_new_root(&self, new_root: H256) -> Result, DbError> { - let prev_root: Option = self.retrieve_keyed_decodable(PREV_ROOT, &new_root)?; + pub fn update_by_new_root( + &self, + entity: impl AsRef<[u8]>, + new_root: H256, + ) -> Result, DbError> { + let prev_root: Option = + self.retrieve_keyed_decodable(&entity, PREV_ROOT, &new_root)?; match prev_root { - Some(prev_root) => self.update_by_previous_root(prev_root), + Some(prev_root) => self.update_by_previous_root(&entity, prev_root), None => Ok(None), } } @@ -284,26 +341,36 @@ impl HomeDB { /// /// Keys --> Values: /// - `leaf_index` --> `proof` - pub fn store_proof(&self, leaf_index: u32, proof: &Proof) -> Result<(), DbError> { + pub fn store_proof( + &self, + home_name: impl AsRef<[u8]>, + leaf_index: u32, + proof: &Proof, + ) -> Result<(), DbError> { debug!(leaf_index, "storing proof in DB"); - self.store_keyed_encodable(PROOF, &leaf_index, proof) + self.store_keyed_encodable(home_name, PROOF, &leaf_index, proof) } /// Retrieve a proof by its leaf index - pub fn proof_by_leaf_index(&self, leaf_index: u32) -> Result, DbError> { - self.retrieve_keyed_decodable(PROOF, &leaf_index) + pub fn proof_by_leaf_index( + &self, + home_name: impl AsRef<[u8]>, + leaf_index: u32, + ) -> Result, DbError> { + self.retrieve_keyed_decodable(home_name, PROOF, &leaf_index) } // TODO(james): this is a quick-fix for the prover_sync and I don't like it /// poll db ever 100 milliseconds waitinf for a leaf. pub fn wait_for_leaf( &self, + home_name: impl AsRef<[u8]>, leaf_index: u32, ) -> impl Future, DbError>> { let slf = self.clone(); async move { loop { - if let Some(leaf) = slf.leaf_by_leaf_index(leaf_index)? { + if let Some(leaf) = slf.leaf_by_leaf_index(&home_name, leaf_index)? { return Ok(Some(leaf)); } sleep(Duration::from_millis(100)).await diff --git a/rust/optics-core/src/db/typed_db.rs b/rust/optics-core/src/db/typed_db.rs index b84fcd6bc..f67b3a1eb 100644 --- a/rust/optics-core/src/db/typed_db.rs +++ b/rust/optics-core/src/db/typed_db.rs @@ -6,28 +6,22 @@ use color_eyre::Result; /// /// Key structure: ```__``` #[derive(Debug, Clone)] -pub struct TypedDB { - db: DB, - type_prefix: Vec, -} +pub struct TypedDB(DB); impl TypedDB { /// Instantiate new `TypedDB` - pub fn new(db: DB, type_prefix: impl Into>) -> Self { - Self { - db, - type_prefix: type_prefix.into(), - } + pub fn new(db: DB) -> Self { + Self(db) } /// Return reference to raw db pub fn db(&self) -> &DB { - &self.db + &self.0 } - fn full_prefix(&self, prefix: impl AsRef<[u8]>) -> Vec { + fn full_prefix(entity: impl AsRef<[u8]>, prefix: impl AsRef<[u8]>) -> Vec { let mut full_prefix = vec![]; - full_prefix.extend(self.type_prefix.as_ref() as &[u8]); + full_prefix.extend(entity.as_ref()); full_prefix.extend("_".as_bytes()); full_prefix.extend(prefix.as_ref()); full_prefix @@ -36,39 +30,46 @@ impl TypedDB { /// Store encodable value pub fn store_encodable( &self, + entity: impl AsRef<[u8]>, prefix: impl AsRef<[u8]>, key: impl AsRef<[u8]>, value: &V, ) -> Result<(), DbError> { - self.db - .store_encodable(&self.full_prefix(prefix), key, value) + self.0 + .store_encodable(TypedDB::full_prefix(entity, prefix), key, value) } /// Retrieve decodable value pub fn retrieve_decodable( &self, + entity: impl AsRef<[u8]>, prefix: impl AsRef<[u8]>, key: impl AsRef<[u8]>, ) -> Result, DbError> { - self.db.retrieve_decodable(&self.full_prefix(prefix), key) + self.0 + .retrieve_decodable(TypedDB::full_prefix(entity, prefix), key) } /// Store encodable kv pair pub fn store_keyed_encodable( &self, + entity: impl AsRef<[u8]>, prefix: impl AsRef<[u8]>, key: &K, value: &V, ) -> Result<(), DbError> { - self.store_encodable(prefix, key.to_vec(), value) + self.0 + .store_keyed_encodable(TypedDB::full_prefix(entity, prefix), key, value) } /// Retrieve decodable value given encodable key pub fn retrieve_keyed_decodable( &self, + entity: impl AsRef<[u8]>, prefix: impl AsRef<[u8]>, key: &K, ) -> Result, DbError> { - self.retrieve_decodable(prefix, key.to_vec()) + self.0 + .retrieve_keyed_decodable(TypedDB::full_prefix(entity, prefix), key) } } diff --git a/rust/optics-test/src/test_utils.rs b/rust/optics-test/src/test_utils.rs index 22fa044a6..5f55a3f00 100644 --- a/rust/optics-test/src/test_utils.rs +++ b/rust/optics-test/src/test_utils.rs @@ -42,13 +42,14 @@ mod test { use super::*; use ethers::types::H256; use optics_core::{ - accumulator::merkle::Proof, db::HomeDB, Encode, OpticsMessage, RawCommittedMessage, + accumulator::merkle::Proof, db::OpticsDB, Encode, OpticsMessage, RawCommittedMessage, }; #[tokio::test] - async fn home_db_stores_and_retrieves_messages() { + async fn db_stores_and_retrieves_messages() { run_test_db(|db| async move { - let home_db = HomeDB::new(db, "home_1".to_owned()); + let home_name = "home_1".to_owned(); + let db = OpticsDB::new(db); let m = OpticsMessage { origin: 10, @@ -66,19 +67,23 @@ mod test { }; assert_eq!(m.to_leaf(), message.leaf()); - home_db.store_raw_committed_message(&message).unwrap(); + db.store_raw_committed_message(&home_name, &message) + .unwrap(); - let by_nonce = home_db - .message_by_nonce(m.destination, m.nonce) + let by_nonce = db + .message_by_nonce(&home_name, m.destination, m.nonce) .unwrap() .unwrap(); assert_eq!(by_nonce, message); - let by_leaf = home_db.message_by_leaf(message.leaf()).unwrap().unwrap(); + let by_leaf = db + .message_by_leaf(&home_name, message.leaf()) + .unwrap() + .unwrap(); assert_eq!(by_leaf, message); - let by_index = home_db - .message_by_leaf_index(message.leaf_index) + let by_index = db + .message_by_leaf_index(&home_name, message.leaf_index) .unwrap() .unwrap(); assert_eq!(by_index, message); @@ -87,18 +92,19 @@ mod test { } #[tokio::test] - async fn home_db_stores_and_retrieves_proofs() { + async fn db_stores_and_retrieves_proofs() { run_test_db(|db| async move { - let home_db = HomeDB::new(db, "home_1".to_owned()); + let home_name = "home_1".to_owned(); + let db = OpticsDB::new(db); let proof = Proof { leaf: H256::from_low_u64_be(15), index: 32, path: Default::default(), }; - home_db.store_proof(13, &proof).unwrap(); + db.store_proof(&home_name, 13, &proof).unwrap(); - let by_index = home_db.proof_by_leaf_index(13).unwrap().unwrap(); + let by_index = db.proof_by_leaf_index(&home_name, 13).unwrap().unwrap(); assert_eq!(by_index, proof); }) .await; diff --git a/rust/tools/optics-cli/src/subcommands/db_state.rs b/rust/tools/optics-cli/src/subcommands/db_state.rs index 01dab216e..de4352629 100644 --- a/rust/tools/optics-cli/src/subcommands/db_state.rs +++ b/rust/tools/optics-cli/src/subcommands/db_state.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, convert::TryInto, fs::OpenOptions, io::Write}; use structopt::StructOpt; use optics_core::{ - db::{HomeDB, DB}, + db::{OpticsDB, DB}, CommittedMessage, }; @@ -29,11 +29,11 @@ type OutputVec = Vec<((H256, u64), Vec)>; impl DbStateCommand { pub async fn run(&self) -> Result<()> { - let db = HomeDB::new(DB::from_path(&self.db_path)?, self.home_name.clone()); + let db = OpticsDB::new(DB::from_path(&self.db_path)?); - let messages_by_committed_roots = DbStateCommand::create_comitted_root_to_message_map(&db)?; + let messages_by_committed_roots = self.create_comitted_root_to_message_map(&db)?; - let output_vec = DbStateCommand::create_output_vec(&db, messages_by_committed_roots)?; + let output_vec = self.create_output_vec(&db, messages_by_committed_roots)?; if self.json { DbStateCommand::save_to_json(output_vec)?; @@ -45,13 +45,14 @@ impl DbStateCommand { } fn create_comitted_root_to_message_map( - db: &HomeDB, + &self, + db: &OpticsDB, ) -> Result>> { let mut messages_by_committed_roots: HashMap> = HashMap::new(); for index in 0.. { - match db.message_by_leaf_index(index)? { + match db.message_by_leaf_index(&self.home_name, index)? { Some(message) => { - if db.proof_by_leaf_index(index)?.is_none() { + if db.proof_by_leaf_index(&self.home_name, index)?.is_none() { println!("Failed to find proof for leaf index {}!", index); } @@ -81,19 +82,22 @@ impl DbStateCommand { } fn create_output_vec( - db: &HomeDB, + &self, + db: &OpticsDB, messages_by_committed_roots: HashMap>, ) -> Result { // Create mapping of (update root, block_number) to [messages] let mut output_map: HashMap<(H256, u64), Vec> = HashMap::new(); for (committed_root, bucket) in messages_by_committed_roots { - let containing_update_opt = db.update_by_previous_root(committed_root)?; + let containing_update_opt = + db.update_by_previous_root(&self.home_name, committed_root)?; match containing_update_opt { Some(containing_update) => { let new_root = containing_update.update.new_root; - let update_metadata = - db.retrieve_update_metadata(new_root)?.unwrap_or_else(|| { + let update_metadata = db + .retrieve_update_metadata(&self.home_name, new_root)? + .unwrap_or_else(|| { panic!("Couldn't find metadata for update {:?}", containing_update) }); diff --git a/rust/tools/optics-cli/src/subcommands/prove.rs b/rust/tools/optics-cli/src/subcommands/prove.rs index 7f98eafcc..9650ef880 100644 --- a/rust/tools/optics-cli/src/subcommands/prove.rs +++ b/rust/tools/optics-cli/src/subcommands/prove.rs @@ -5,7 +5,7 @@ use crate::{replicas, rpc}; use optics_core::{ accumulator::merkle::Proof, - db::{HomeDB, DB}, + db::{OpticsDB, DB}, ContractLocator, Decode, MessageStatus, OpticsMessage, Replica, Signers, }; use optics_ethereum::EthereumReplica; @@ -110,19 +110,23 @@ impl ProveCommand { } fn fetch_proof(&self) -> Result<(OpticsMessage, Proof)> { - let db = HomeDB::new(DB::from_path(&self.db_path)?, self.home_name.clone()); + let db = OpticsDB::new(DB::from_path(&self.db_path)?); let idx = match (self.leaf_index, self.leaf) { (Some(idx), _) => idx, - (None, Some(digest)) => match db.message_by_leaf(digest)? { + (None, Some(digest)) => match db.message_by_leaf(&self.home_name, digest)? { Some(leaf) => leaf.leaf_index, None => bail!("No leaf index or "), }, (None, None) => bail!("Must provide leaf index or leaf hash"), }; - let proof = db.proof_by_leaf_index(idx)?.expect("no proof"); - let message = db.message_by_leaf_index(idx)?.expect("no message"); + let proof = db + .proof_by_leaf_index(&self.home_name, idx)? + .expect("no proof"); + let message = db + .message_by_leaf_index(&self.home_name, idx)? + .expect("no message"); let message = OpticsMessage::read_from(&mut message.message.as_slice())?; Ok((message, proof))