Skip to content

Commit

Permalink
refactor: scrap HomeDB for generalized OpticsDB (#902)
Browse files Browse the repository at this point in the history
* refactor: comment out homedb and have typeddb use arg prefixes

* refactor: prefixes all loads and stores with home_name or entity in AgentDB

* fmt: cargo fmt

* fix: keyed encodable/decodable

* refactor: use AgentDB everywhere and scap HomeDB

* refactor: rename AgentDB OpticsDB

* fix: put watcher back into cargo.toml and fix OpticsDB there
  • Loading branch information
luketchang authored Oct 23, 2021
1 parent a240fac commit ea32adf
Show file tree
Hide file tree
Showing 13 changed files with 285 additions and 180 deletions.
26 changes: 15 additions & 11 deletions rust/agents/processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{debug, error, info, info_span, instrument, instrument::Instrumente

use optics_base::{cancel_task, decl_agent, AgentCore, Homes, OpticsAgent, Replicas};
use optics_core::{
accumulator::merkle::Proof, db::HomeDB, CommittedMessage, Common, Home, MessageStatus,
accumulator::merkle::Proof, db::OpticsDB, CommittedMessage, Common, Home, MessageStatus,
};

use crate::{
Expand All @@ -38,7 +38,7 @@ pub(crate) struct Replica {
interval: u64,
replica: Arc<Replicas>,
home: Arc<Homes>,
home_db: HomeDB,
db: OpticsDB,
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
next_message_nonce: Arc<prometheus::IntGaugeVec>,
Expand Down Expand Up @@ -72,8 +72,8 @@ impl Replica {
// 4. Check if the proof is valid under the replica
// 5. Submit the proof to the replica
let mut next_message_nonce: u32 = self
.home_db
.retrieve_latest_nonce(domain)?
.db
.retrieve_latest_nonce(self.home.name(), domain)?
.map(|n: u32| n + 1)
.unwrap_or_default();

Expand Down Expand Up @@ -107,8 +107,8 @@ impl Replica {
.await
{
Ok(Flow::Advance) => {
self.home_db
.store_latest_nonce(domain, next_message_nonce)?;
self.db
.store_latest_nonce(self.home.name(), domain, next_message_nonce)?;
next_message_nonce += 1;

self.next_message_nonce
Expand Down Expand Up @@ -199,7 +199,10 @@ impl Replica {
return Ok(Flow::Advance);
}

let proof = match self.home_db.proof_by_leaf_index(message.leaf_index) {
let proof = match self
.db
.proof_by_leaf_index(self.home.name(), message.leaf_index)
{
Ok(Some(p)) => p,
Ok(None) => {
info!(
Expand Down Expand Up @@ -356,7 +359,7 @@ impl OpticsAgent for Processor {
let home = self.home();
let next_message_nonce = self.next_message_nonce.clone();
let interval = self.interval;
let home_db = self.home_db();
let db = OpticsDB::new(self.db());

let replica_opt = self.replica_by_name(name);
let name = name.to_owned();
Expand All @@ -371,7 +374,7 @@ impl OpticsAgent for Processor {
interval,
replica,
home,
home_db,
db,
allowed,
denied,
next_message_nonce,
Expand All @@ -391,7 +394,8 @@ impl OpticsAgent for Processor {

// tree sync
info!("Starting ProverSync");
let sync = ProverSync::from_disk(self.home_db());
let sync =
ProverSync::from_disk(self.home().name().to_owned(), OpticsDB::new(self.db()));
let sync_task = sync.spawn();

info!("Starting indexer");
Expand Down Expand Up @@ -430,7 +434,7 @@ impl OpticsAgent for Processor {
self.core.home.name(),
&config.bucket,
config.region.parse().expect("invalid s3 region"),
self.home_db(),
OpticsDB::new(self.db()),
)
.spawn(),
)
Expand Down
44 changes: 31 additions & 13 deletions rust/agents/processor/src/prover_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use color_eyre::eyre::{bail, Result};
use ethers::core::types::H256;
use optics_core::{
accumulator::{incremental::IncrementalMerkle, INITIAL_ROOT},
db::{DbError, HomeDB},
db::{DbError, OpticsDB},
ChainCommunicationError,
};
use std::{fmt::Display, ops::Range, time::Duration};
Expand All @@ -13,7 +13,8 @@ use tracing::{debug, error, info, info_span, instrument, instrument::Instrumente
/// Struct to sync prover.
#[derive(Debug)]
pub struct ProverSync {
db: HomeDB,
home_name: String,
db: OpticsDB,
prover: Prover,
incremental: IncrementalMerkle,
}
Expand Down Expand Up @@ -70,7 +71,7 @@ impl ProverSync {
fn store_proof(&self, leaf_index: u32) -> Result<(), ProverSyncError> {
match self.prover.prove(leaf_index as usize) {
Ok(proof) => {
self.db.store_proof(leaf_index, &proof)?;
self.db.store_proof(&self.home_name, leaf_index, &proof)?;
info!(
leaf_index,
root = ?self.prover.root(),
Expand All @@ -90,14 +91,14 @@ impl ProverSync {
/// Given rocksdb handle `db` containing merkle tree leaves,
/// instantiates new `ProverSync` and fills prover's merkle tree
#[instrument(level = "debug", skip(db))]
pub fn from_disk(db: HomeDB) -> Self {
pub fn from_disk(home_name: String, db: OpticsDB) -> Self {
// Ingest all leaves in db into prover tree
let mut prover = Prover::default();
let mut incremental = IncrementalMerkle::default();

if let Some(root) = db.retrieve_latest_root().expect("db error") {
if let Some(root) = db.retrieve_latest_root(&home_name).expect("db error") {
for i in 0.. {
match db.leaf_by_leaf_index(i) {
match db.leaf_by_leaf_index(&home_name, i) {
Ok(Some(leaf)) => {
debug!(leaf_index = i, "Ingesting leaf from_disk");
prover.ingest(leaf).expect("!tree full");
Expand All @@ -118,6 +119,7 @@ impl ProverSync {
}

let sync = Self {
home_name: home_name.to_owned(),
prover,
incremental,
db,
Expand All @@ -126,8 +128,10 @@ impl ProverSync {
// Ensure proofs exist for all leaves
for i in 0..sync.prover.count() as u32 {
match (
sync.db.leaf_by_leaf_index(i).expect("db error"),
sync.db.proof_by_leaf_index(i).expect("db error"),
sync.db.leaf_by_leaf_index(&home_name, i).expect("db error"),
sync.db
.proof_by_leaf_index(&home_name, i)
.expect("db error"),
) {
(Some(_), None) => sync.store_proof(i).expect("db error"),
(None, _) => break,
Expand All @@ -154,7 +158,7 @@ impl ProverSync {
let mut leaves = vec![];

for i in range {
let leaf = self.db.wait_for_leaf(i as u32).await?;
let leaf = self.db.wait_for_leaf(&self.home_name, i as u32).await?;
if leaf.is_none() {
break;
}
Expand Down Expand Up @@ -215,7 +219,11 @@ impl ProverSync {
// store all calculated proofs in the db
// TODO(luke): refactor prover_sync so we dont have to iterate over every leaf (match from_disk implementation)
for idx in 0..self.prover.count() {
if self.db.proof_by_leaf_index(idx as u32)?.is_none() {
if self
.db
.proof_by_leaf_index(&self.home_name, idx as u32)?
.is_none()
{
self.store_proof(idx as u32)?;
}
}
Expand Down Expand Up @@ -262,7 +270,11 @@ impl ProverSync {
// As we fill the incremental merkle, its tree_size will always be
// equal to the index of the next leaf we want (e.g. if tree_size
// is 3, we want the 4th leaf, which is at index 3)
if let Some(leaf) = self.db.wait_for_leaf(tree_size as u32).await? {
if let Some(leaf) = self
.db
.wait_for_leaf(&self.home_name, tree_size as u32)
.await?
{
info!(
index = tree_size,
leaf = ?leaf,
Expand Down Expand Up @@ -305,7 +317,9 @@ impl ProverSync {
tokio::spawn(async move {
loop {
let local_root = self.local_root();
let signed_update_opt = self.db.update_by_previous_root(local_root)?;
let signed_update_opt = self
.db
.update_by_previous_root(&self.home_name, local_root)?;

// This if block is somewhat ugly.
// First we check if there is a signed update with the local root.
Expand All @@ -321,7 +335,11 @@ impl ProverSync {
);
self.update_full(local_root, signed_update.update.new_root)
.await?;
} else if !local_root.is_zero() && self.db.update_by_new_root(local_root)?.is_none()
} else if !local_root.is_zero()
&& self
.db
.update_by_new_root(&self.home_name, local_root)?
.is_none()
{
bail!(ProverSyncError::InvalidLocalRoot { local_root });
}
Expand Down
10 changes: 5 additions & 5 deletions rust/agents/processor/src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rusoto_s3::{GetObjectError, GetObjectRequest, PutObjectRequest, S3Client, S3

use color_eyre::eyre::{bail, eyre, Result};

use optics_core::{accumulator::merkle::Proof, db::HomeDB, Encode};
use optics_core::{accumulator::merkle::Proof, db::OpticsDB, Encode};
use tokio::{task::JoinHandle, time::sleep};
use tracing::{debug, info, info_span, instrument::Instrumented, Instrument};

Expand All @@ -20,7 +20,7 @@ pub struct Pusher {
name: String,
bucket: String,
region: Region,
db: HomeDB,
db: OpticsDB,
client: S3Client,
}

Expand All @@ -36,7 +36,7 @@ impl std::fmt::Debug for Pusher {

impl Pusher {
/// Instantiate a new pusher with a region
pub fn new(name: &str, bucket: &str, region: Region, db: HomeDB) -> Self {
pub fn new(name: &str, bucket: &str, region: Region, db: OpticsDB) -> Self {
let client = S3Client::new_with(
HttpClient::new().unwrap(),
EnvironmentProvider::default(),
Expand Down Expand Up @@ -112,12 +112,12 @@ impl Pusher {
tokio::spawn(async move {
let mut index = 0;
loop {
let proof = self.db.proof_by_leaf_index(index)?;
let proof = self.db.proof_by_leaf_index(&self.name, index)?;
match proof {
Some(proof) => {
let message = self
.db
.message_by_leaf_index(index)?
.message_by_leaf_index(&self.name, index)?
.ok_or_else(|| eyre!("Missing message for known proof"))?;
let proven = ProvenMessage {
proof,
Expand Down
16 changes: 8 additions & 8 deletions rust/agents/updater/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tracing::{error, info, instrument::Instrumented, Instrument};

use crate::settings::UpdaterSettings as Settings;
use optics_base::{AgentCore, Homes, OpticsAgent};
use optics_core::{db::HomeDB, Common, Home, SignedUpdate, Signers, Update};
use optics_core::{db::OpticsDB, Common, Home, SignedUpdate, Signers, Update};

#[derive(Debug)]
struct UpdateHandler {
Expand All @@ -29,7 +29,7 @@ struct UpdateHandler {
rx: Receiver<Update>,
update_pause: u64,
signer: Arc<Signers>,
home_db: HomeDB,
db: OpticsDB,
mutex: Arc<Mutex<()>>,
signed_attestation_count: IntCounterVec,
}
Expand All @@ -50,7 +50,7 @@ impl UpdateHandler {
rx: Receiver<Update>,
update_pause: u64,
signer: Arc<Signers>,
home_db: HomeDB,
db: OpticsDB,
mutex: Arc<Mutex<()>>,
signed_attestation_count: IntCounterVec,
) -> Self {
Expand All @@ -59,15 +59,15 @@ impl UpdateHandler {
rx,
update_pause,
signer,
home_db,
db,
mutex,
signed_attestation_count,
}
}

fn check_conflict(&self, update: &Update) -> Option<SignedUpdate> {
self.home_db
.update_by_previous_root(update.previous_root)
self.db
.update_by_previous_root(self.home.name(), update.previous_root)
.expect("db failure")
}

Expand Down Expand Up @@ -150,7 +150,7 @@ impl UpdateHandler {
self.home.update(&signed).await?;

info!("Storing signed update in db");
self.home_db.store_latest_update(&signed)?;
self.db.store_latest_update(self.home.name(), &signed)?;
Ok(())
// guard dropped here
}
Expand Down Expand Up @@ -266,7 +266,7 @@ impl OpticsAgent for Updater {
rx,
self.update_pause,
self.signer.clone(),
HomeDB::new(self.db(), self.home().name().to_owned()),
OpticsDB::new(self.db()),
Default::default(),
self.signed_attestation_count.clone(),
);
Expand Down
Loading

0 comments on commit ea32adf

Please sign in to comment.