Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: separate db from home and consolidate home_db and syncing #878

Closed
wants to merge 13 commits into from
24 changes: 5 additions & 19 deletions rust/agents/processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use optics_core::{
db::HomeDB,
traits::{CommittedMessage, Common, Home, MessageStatus},
};
use std::convert::TryInto;

use crate::{
prover_sync::ProverSync,
Expand Down Expand Up @@ -162,9 +163,9 @@ impl Replica {
async fn try_msg_by_domain_and_nonce(&self, domain: u32, nonce: u32) -> Result<Flow> {
use optics_core::traits::Replica;

let message = match self.home.message_by_nonce(domain, nonce).await {
Ok(Some(m)) => m,
Ok(None) => {
let message: CommittedMessage = match self.home_db.message_by_nonce(domain, nonce)? {
Some(m) => m.try_into()?,
None => {
info!(
domain = domain,
sequence = nonce,
Expand All @@ -174,7 +175,6 @@ impl Replica {
);
return Ok(Flow::Repeat);
}
Err(e) => bail!(e),
};

info!(target: "seen_committed_messages", leaf_index = message.leaf_index);
Expand Down Expand Up @@ -403,21 +403,7 @@ impl OpticsAgent for Processor {

info!("Starting indexer");
// indexer setup
let block_height = self
.as_ref()
.metrics
.new_int_gauge(
"block_height",
"Height of a recently observed block",
&["network", "agent"],
)
.expect("failed to register block_height metric")
.with_label_values(&[self.home().name(), Self::AGENT_NAME]);
let indexer = &self.as_ref().indexer;
let index_task = self
.home()
.index(indexer.from(), indexer.chunk_size(), block_height);

let index_task = self.syncing_home_db().index();
info!("started indexer and sync");

// instantiate task array here so we can optionally push run_task
Expand Down
11 changes: 7 additions & 4 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ use optics_base::{
home::Homes,
replica::Replicas,
};
use optics_core::traits::Common;
use optics_core::{db::HomeDB, traits::Common};

use crate::settings::RelayerSettings as Settings;

#[derive(Debug)]
struct UpdatePoller {
duration: Duration,
home: Arc<Homes>,
home_db: HomeDB,
replica: Arc<Replicas>,
semaphore: Mutex<()>,
}
Expand All @@ -32,9 +33,10 @@ impl std::fmt::Display for UpdatePoller {
}

impl UpdatePoller {
fn new(home: Arc<Homes>, replica: Arc<Replicas>, duration: u64) -> Self {
fn new(home: Arc<Homes>, home_db: HomeDB, replica: Arc<Replicas>, duration: u64) -> Self {
Self {
home,
home_db,
replica,
duration: Duration::from_secs(duration),
semaphore: Mutex::new(()),
Expand All @@ -53,7 +55,7 @@ impl UpdatePoller {
);

// Check for first signed update building off of the replica's current root
let signed_update_opt = self.home.signed_update_by_old_root(old_root).await?;
let signed_update_opt = self.home_db.update_by_previous_root(old_root)?;

// If signed update exists, update replica's current root
if let Some(signed_update) = signed_update_opt {
Expand Down Expand Up @@ -134,6 +136,7 @@ impl OpticsAgent for Relayer {
fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> {
let replica_opt = self.replica_by_name(name);
let home = self.home();
let home_db = self.home_db();
let name = name.to_owned();

let duration = self.duration;
Expand All @@ -144,7 +147,7 @@ impl OpticsAgent for Relayer {
}
let replica = replica_opt.unwrap();

let update_poller = UpdatePoller::new(home, replica.clone(), duration);
let update_poller = UpdatePoller::new(home, home_db, replica.clone(), duration);
update_poller.spawn().await?
})
.in_current_span()
Expand Down
24 changes: 2 additions & 22 deletions rust/agents/updater/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ use color_eyre::Result;

use futures_util::future::select_all;

use optics_base::{agent::OpticsAgent, cancel_task};
use optics_core::traits::{Common, Home};

use crate::{settings::UpdaterSettings as Settings, updater::Updater};
use optics_base::{agent::OpticsAgent, cancel_task};

#[allow(unused_must_use)]
async fn _main() -> Result<()> {
Expand All @@ -34,25 +32,7 @@ async fn _main() -> Result<()> {

let _ = agent.metrics().run_http_server();

// this is deliberately different from other agents because the updater
// does not run replicas. As a result, most of the contents of run_all are
// broken out here
let indexer = &agent.as_ref().indexer;

let block_height = agent
.as_ref()
.metrics
.new_int_gauge(
"block_height",
"Height of a recently observed block",
&["network", "agent"],
)
.expect("failed to register block_height metric")
.with_label_values(&[agent.home().name(), Updater::AGENT_NAME]);

let index_task = agent
.home()
.index(indexer.from(), indexer.chunk_size(), block_height);
let index_task = agent.syncing_home_db().index();
let run_task = agent.run("");

let futs = vec![index_task, run_task];
Expand Down
3 changes: 1 addition & 2 deletions rust/agents/updater/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use optics_core::{
#[derive(Debug)]
struct UpdateHandler {
home: Arc<Homes>,

rx: Receiver<Update>,
update_pause: u64,
signer: Arc<Signers>,
Expand Down Expand Up @@ -273,7 +272,7 @@ impl OpticsAgent for Updater {
rx,
self.update_pause,
self.signer.clone(),
HomeDB::new(self.db(), self.home().name().to_owned()),
self.home_db(),
Default::default(),
self.signed_attestation_count.clone(),
);
Expand Down
Loading