From 5fe0dd6c59e10e2fc7f09755e7e288857445dcbc Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 29 Jan 2025 12:27:58 +0000 Subject: [PATCH 01/33] Solver participation validator --- Cargo.lock | 1 + crates/autopilot/Cargo.toml | 1 + crates/autopilot/src/arguments.rs | 20 +++ crates/autopilot/src/database/competition.rs | 21 +++ .../autopilot/src/domain/competition/mod.rs | 6 +- .../competition/solver_participation_guard.rs | 157 ++++++++++++++++++ .../src/domain/settlement/observer.rs | 21 ++- crates/autopilot/src/run.rs | 19 ++- crates/autopilot/src/run_loop.rs | 33 ++-- crates/database/src/solver_competition.rs | 51 ++++++ 10 files changed, 308 insertions(+), 22 deletions(-) create mode 100644 crates/autopilot/src/domain/competition/solver_participation_guard.rs diff --git a/Cargo.lock b/Cargo.lock index 7b4421db5d..2e208f3ec9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,6 +284,7 @@ dependencies = [ "clap", "contracts", "cow-amm", + "dashmap", "database", "derive_more 1.0.0", "ethcontract", diff --git a/crates/autopilot/Cargo.toml b/crates/autopilot/Cargo.toml index 9cdfb90295..7fa92c9ace 100644 --- a/crates/autopilot/Cargo.toml +++ b/crates/autopilot/Cargo.toml @@ -25,6 +25,7 @@ chrono = { workspace = true } clap = { workspace = true } contracts = { path = "../contracts" } cow-amm = { path = "../cow-amm" } +dashmap = { workspace = true } database = { path = "../database" } derive_more = { workspace = true } ethcontract = { workspace = true } diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 5d7b155cb7..ed5aa92657 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -242,6 +242,14 @@ pub struct Arguments { /// Archive node URL used to index CoW AMM #[clap(long, env)] pub archive_node_url: Option, + + #[clap(long, env, default_value = "5m", value_parser = humantime::parse_duration)] + /// The time-to-live for the solver participation blacklist cache. + pub solver_blacklist_cache_ttl: Duration, + + #[clap(long, env, default_value = "3")] + /// The number of last auctions to check solver participation eligibility. + pub solver_last_auctions_participation_count: u32, } impl std::fmt::Display for Arguments { @@ -287,6 +295,8 @@ impl std::fmt::Display for Arguments { max_winners_per_auction, archive_node_url, max_solutions_per_solver, + solver_blacklist_cache_ttl, + solver_last_auctions_participation_count, } = self; write!(f, "{}", shared)?; @@ -370,6 +380,16 @@ impl std::fmt::Display for Arguments { "max_solutions_per_solver: {:?}", max_solutions_per_solver )?; + writeln!( + f, + "solver_blacklist_cache_ttl: {:?}", + solver_blacklist_cache_ttl + )?; + writeln!( + f, + "solver_last_auctions_participation_count: {:?}", + solver_last_auctions_participation_count + )?; Ok(()) } } diff --git a/crates/autopilot/src/database/competition.rs b/crates/autopilot/src/database/competition.rs index 81cc5e63ef..2411e74f0f 100644 --- a/crates/autopilot/src/database/competition.rs +++ b/crates/autopilot/src/database/competition.rs @@ -139,4 +139,25 @@ impl super::Postgres { Ok(()) } + + pub async fn find_non_settling_solvers( + &self, + last_auctions_count: u32, + current_block: u64, + ) -> anyhow::Result> { + let mut ex = self.pool.acquire().await.context("acquire")?; + + let _timer = super::Metrics::get() + .database_queries + .with_label_values(&["find_non_settling_solvers"]) + .start_timer(); + + database::solver_competition::find_non_settling_solvers( + &mut ex, + last_auctions_count, + current_block, + ) + .await + .context("solver_competition::find_non_settling_solvers") + } } diff --git a/crates/autopilot/src/domain/competition/mod.rs b/crates/autopilot/src/domain/competition/mod.rs index 52cd69df22..bcd5210536 100644 --- a/crates/autopilot/src/domain/competition/mod.rs +++ b/crates/autopilot/src/domain/competition/mod.rs @@ -6,8 +6,12 @@ use { }; mod participant; +mod solver_participation_guard; -pub use participant::{Participant, Ranked, Unranked}; +pub use { + participant::{Participant, Ranked, Unranked}, + solver_participation_guard::{DatabaseSolverParticipationValidator, SolverParticipationGuard}, +}; type SolutionId = u64; diff --git a/crates/autopilot/src/domain/competition/solver_participation_guard.rs b/crates/autopilot/src/domain/competition/solver_participation_guard.rs new file mode 100644 index 0000000000..1ec1379098 --- /dev/null +++ b/crates/autopilot/src/domain/competition/solver_participation_guard.rs @@ -0,0 +1,157 @@ +use { + crate::{database::Postgres, domain::eth, infra::Ethereum}, + ethrpc::block_stream::CurrentBlockWatcher, + futures::future::try_join_all, + std::{ + sync::Arc, + time::{Duration, Instant}, + }, +}; + +#[derive(Clone)] +pub struct SolverParticipationGuard(Arc); + +struct Inner { + validators: Vec>, +} + +impl SolverParticipationGuard { + pub fn new( + eth: Ethereum, + db: Postgres, + settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + ttl: Duration, + last_auctions_count: u32, + ) -> Self { + let current_block = eth.current_block().clone(); + let onchain_solver_participation_validator = OnchainSolverParticipationValidator { eth }; + let database_solver_participation_validator = DatabaseSolverParticipationValidator::new( + db, + current_block, + settlement_updates_receiver, + ttl, + last_auctions_count, + ); + + Self(Arc::new(Inner { + validators: vec![ + Box::new(database_solver_participation_validator), + Box::new(onchain_solver_participation_validator), + ], + })) + } + + pub async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { + try_join_all( + self.0 + .validators + .iter() + .map(|strategy| strategy.can_participate(solver)), + ) + .await + .map(|results| results.into_iter().all(|can_participate| can_participate)) + } +} + +#[async_trait::async_trait] +trait SolverParticipationValidator: Send + Sync { + async fn can_participate(&self, solver: ð::Address) -> anyhow::Result; +} + +#[derive(Clone)] +pub struct DatabaseSolverParticipationValidator(Arc); + +struct DatabaseSolverParticipationValidatorInner { + db: Postgres, + cache: dashmap::DashMap, + ttl: Duration, + last_auctions_count: u32, +} + +impl DatabaseSolverParticipationValidator { + pub fn new( + db: Postgres, + current_block: CurrentBlockWatcher, + settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + ttl: Duration, + last_auctions_count: u32, + ) -> Self { + let self_ = Self(Arc::new(DatabaseSolverParticipationValidatorInner { + db, + cache: Default::default(), + ttl, + last_auctions_count, + })); + + self_.start_maintenance(settlement_updates_receiver, current_block); + + self_ + } + + fn start_maintenance( + &self, + mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + current_block: CurrentBlockWatcher, + ) { + let self_ = self.0.clone(); + tokio::spawn(async move { + while settlement_updates_receiver.recv().await.is_some() { + let current_block = current_block.borrow().number; + match self_ + .db + .find_non_settling_solvers(self_.last_auctions_count, current_block) + .await + { + Ok(non_settling_solvers) => { + let non_settling_solvers = non_settling_solvers + .into_iter() + .map(|solver| eth::Address(solver.0.into())) + .collect::>(); + + tracing::debug!(?non_settling_solvers, "found non-settling solvers",); + + let now = Instant::now(); + for solver in non_settling_solvers { + self_.cache.insert(solver, now); + } + } + Err(err) => { + tracing::warn!(?err, "error while searching for non-settling solvers") + } + } + } + }); + } +} + +#[async_trait::async_trait] +impl SolverParticipationValidator for DatabaseSolverParticipationValidator { + async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { + if let Some(entry) = self.0.cache.get(solver) { + if Instant::now().duration_since(*entry.value()) < self.0.ttl { + return Ok(false); + } else { + self.0.cache.remove(solver); + } + } + + Ok(true) + } +} + +struct OnchainSolverParticipationValidator { + eth: Ethereum, +} + +#[async_trait::async_trait] +impl SolverParticipationValidator for OnchainSolverParticipationValidator { + async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { + Ok(self + .eth + .contracts() + .authenticator() + .is_solver(solver.0) + .call() + .await?) + } +} diff --git a/crates/autopilot/src/domain/settlement/observer.rs b/crates/autopilot/src/domain/settlement/observer.rs index d142646728..0374465a1f 100644 --- a/crates/autopilot/src/domain/settlement/observer.rs +++ b/crates/autopilot/src/domain/settlement/observer.rs @@ -19,23 +19,34 @@ use { pub struct Observer { eth: infra::Ethereum, persistence: infra::Persistence, + settlement_updates_sender: tokio::sync::mpsc::UnboundedSender<()>, } impl Observer { /// Creates a new Observer and asynchronously schedules the first update /// run. - pub fn new(eth: infra::Ethereum, persistence: infra::Persistence) -> Self { - Self { eth, persistence } + pub fn new( + eth: infra::Ethereum, + persistence: infra::Persistence, + settlement_updates_sender: tokio::sync::mpsc::UnboundedSender<()>, + ) -> Self { + Self { + eth, + persistence, + settlement_updates_sender, + } } /// Fetches all the available missing data needed for bookkeeping. /// This needs to get called after indexing a new settlement event /// since this code needs that data to already be present in the DB. pub async fn update(&self) { + let mut updated = false; loop { match self.single_update().await { Ok(true) => { tracing::debug!("on settlement event updater ran and processed event"); + updated = true; // There might be more pending updates, continue immediately. continue; } @@ -49,6 +60,12 @@ impl Observer { } } } + if updated { + // Notify the solver participation guard that a settlement has been updated. + if let Err(err) = self.settlement_updates_sender.send(()) { + tracing::error!(?err, "failed to notify solver participation guard"); + } + } } /// Update database for settlement events that have not been processed yet. diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 1ea1f6de93..ede9f38045 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -15,7 +15,7 @@ use { }, Postgres, }, - domain, + domain::{self, competition::SolverParticipationGuard}, event_updater::EventUpdater, infra, maintenance::Maintenance, @@ -367,8 +367,20 @@ pub async fn run(args: Arguments) { let persistence = infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await; - let settlement_observer = - crate::domain::settlement::Observer::new(eth.clone(), persistence.clone()); + let (settlement_updates_sender, settlement_updates_receiver) = + tokio::sync::mpsc::unbounded_channel(); + let settlement_observer = crate::domain::settlement::Observer::new( + eth.clone(), + persistence.clone(), + settlement_updates_sender, + ); + let solver_participation_guard = SolverParticipationGuard::new( + eth.clone(), + db.clone(), + settlement_updates_receiver, + args.solver_blacklist_cache_ttl, + args.solver_last_auctions_participation_count, + ); let settlement_contract_start_index = if let Some(DeploymentInformation::BlockNumber(settlement_contract_start_index)) = eth.contracts().settlement().deployment_information() @@ -574,6 +586,7 @@ pub async fn run(args: Arguments) { eth, persistence.clone(), drivers, + solver_participation_guard, solvable_orders_cache, trusted_tokens, liveness.clone(), diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index fa0d2a5c0f..492ca18751 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -4,7 +4,14 @@ use { domain::{ self, auction::Id, - competition::{self, Solution, SolutionError, TradedOrder, Unranked}, + competition::{ + self, + Solution, + SolutionError, + SolverParticipationGuard, + TradedOrder, + Unranked, + }, eth::{self, TxId}, OrderUid, }, @@ -59,6 +66,7 @@ pub struct RunLoop { eth: infra::Ethereum, persistence: infra::Persistence, drivers: Vec>, + solver_participation_guard: SolverParticipationGuard, solvable_orders_cache: Arc, trusted_tokens: AutoUpdatingTokenList, in_flight_orders: Arc>>, @@ -75,6 +83,7 @@ impl RunLoop { eth: infra::Ethereum, persistence: infra::Persistence, drivers: Vec>, + solver_participation_guard: SolverParticipationGuard, solvable_orders_cache: Arc, trusted_tokens: AutoUpdatingTokenList, liveness: Arc, @@ -85,6 +94,7 @@ impl RunLoop { eth, persistence, drivers, + solver_participation_guard, solvable_orders_cache, trusted_tokens, in_flight_orders: Default::default(), @@ -726,23 +736,14 @@ impl RunLoop { request: &solve::Request, ) -> Result>, SolveError> { - let authenticator = self.eth.contracts().authenticator(); - let is_allowed = authenticator - .is_solver(driver.submission_address.into()) - .call() - .await - .map_err(|err| { - tracing::warn!( - driver = driver.name, - ?driver.submission_address, - ?err, - "failed to check if solver is deny listed" - ); + let can_participate = self.solver_participation_guard.can_participate(&driver.submission_address).await.map_err(|err| { + tracing::error!(?err, driver = %driver.name, ?driver.submission_address, "solver participation check failed"); SolveError::SolverDenyListed - })?; + } + )?; - // Do not send the request to the driver if the solver is denied - if !is_allowed { + // Do not send the request to the driver if the solver is deny-listed + if !can_participate { return Err(SolveError::SolverDenyListed); } diff --git a/crates/database/src/solver_competition.rs b/crates/database/src/solver_competition.rs index 01e0183858..6edd700c70 100644 --- a/crates/database/src/solver_competition.rs +++ b/crates/database/src/solver_competition.rs @@ -97,6 +97,57 @@ GROUP BY sc.id sqlx::query_as(QUERY).bind(tx_hash).fetch_optional(ex).await } +pub async fn find_non_settling_solvers( + ex: &mut PgConnection, + last_auctions_count: u32, + current_block: u64, +) -> Result, sqlx::Error> { + const QUERY: &str = r#" +WITH + last_auctions AS ( + SELECT ps.auction_id, ps.solver + FROM ( + SELECT DISTINCT ca.id AS auction_id + FROM competition_auctions ca + WHERE ca.deadline <= $1 + ORDER BY ca.id DESC + LIMIT $2 + ) latest_auctions + JOIN proposed_solutions ps ON ps.auction_id = latest_auctions.auction_id + WHERE ps.is_winner = true + ), + unsuccessful_solvers AS ( + SELECT la.auction_id, la.solver + FROM last_auctions la + LEFT JOIN settlements s + ON la.auction_id = s.auction_id AND la.solver = s.solver + WHERE s.auction_id IS NULL + ), + solver_appearance_count AS ( + SELECT solver, COUNT(DISTINCT auction_id) AS appearance_count + FROM unsuccessful_solvers + GROUP BY solver + ), + auction_count AS ( + SELECT COUNT(DISTINCT auction_id) AS total_auctions + FROM last_auctions + ), + consistent_solvers AS ( + SELECT sa.solver + FROM solver_appearance_count sa, auction_count ac + WHERE sa.appearance_count = ac.total_auctions + ) +SELECT DISTINCT solver +FROM consistent_solvers; + "#; + + sqlx::query_scalar(QUERY) + .bind(sqlx::types::BigDecimal::from(current_block)) + .bind(i64::from(last_auctions_count)) + .fetch_all(ex) + .await +} + #[derive(Clone, Debug, PartialEq, Default)] pub struct Solution { // Unique Id generated by the autopilot to uniquely identify the solution within Auction From 53199457ffed674c60a66bb990bb989792bd0ade Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 29 Jan 2025 13:25:22 +0000 Subject: [PATCH 02/33] Test --- crates/database/src/solver_competition.rs | 166 +++++++++++++++++++++- 1 file changed, 165 insertions(+), 1 deletion(-) diff --git a/crates/database/src/solver_competition.rs b/crates/database/src/solver_competition.rs index 6edd700c70..0139610671 100644 --- a/crates/database/src/solver_competition.rs +++ b/crates/database/src/solver_competition.rs @@ -368,8 +368,10 @@ mod tests { use { super::*, crate::{ + auction, byte_array::ByteArray, - events::{EventIndex, Settlement}, + events::{self, EventIndex, Settlement}, + settlements, }, sqlx::{Connection, Row}, }; @@ -584,4 +586,166 @@ mod tests { // inserted (2 fetched from "proposed_jit_orders" and 1 from "orders" table) assert!(fetched_solutions[2].orders.len() == 3); } + + #[tokio::test] + #[ignore] + async fn postgres_non_settling_solvers_roundtrip() { + let mut db = PgConnection::connect("postgresql://").await.unwrap(); + let mut db = db.begin().await.unwrap(); + crate::clear_DANGER_(&mut db).await.unwrap(); + + let non_settling_solver = ByteArray([1u8; 20]); + + let deadline_block = 100u64; + let last_auctions_count = 3i64; + // competition_auctions + // Insert auctions within the deadline + for auction_id in 1..=4 { + let auction = auction::Auction { + id: auction_id, + block: auction_id, + deadline: i64::try_from(deadline_block).unwrap(), + order_uids: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + surplus_capturing_jit_order_owners: Default::default(), + }; + auction::save(&mut db, auction).await.unwrap(); + } + + // Insert auctions outside the deadline + for auction_id in 5..=6 { + let auction = auction::Auction { + id: auction_id, + block: auction_id, + deadline: i64::try_from(deadline_block).unwrap() + auction_id, + order_uids: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + surplus_capturing_jit_order_owners: Default::default(), + }; + auction::save(&mut db, auction).await.unwrap(); + } + + // proposed_solutions + // Non-settling solver wins all auctions within the deadline + for auction_id in 2..=4 { + let solutions = vec![Solution { + uid: auction_id, + id: auction_id.into(), + solver: non_settling_solver, + is_winner: true, + score: Default::default(), + orders: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + }]; + save_solutions(&mut db, auction_id, &solutions) + .await + .unwrap(); + } + + // Non-settling solver wins not all the auctions within the deadline + for auction_id in 2..=4 { + let solutions = vec![Solution { + uid: auction_id, + id: auction_id.into(), + solver: ByteArray([2u8; 20]), + is_winner: auction_id != 2, + score: Default::default(), + orders: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + }]; + save_solutions(&mut db, auction_id, &solutions) + .await + .unwrap(); + } + + // Another non-settling solver has `last_auctions_count` winning auctions but + // not consecutive + for auction_id in 1..=4 { + // Break the sequence + if auction_id == 2 { + continue; + } + let solutions = vec![Solution { + uid: auction_id, + id: auction_id.into(), + solver: ByteArray([3u8; 20]), + is_winner: true, + score: Default::default(), + orders: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + }]; + save_solutions(&mut db, auction_id, &solutions) + .await + .unwrap(); + } + + // One more non-settling solver has `last_auctions_count` winning auctions but + // some of them are outside the deadline + for auction_id in 3..=5 { + let solutions = vec![Solution { + uid: auction_id, + id: auction_id.into(), + solver: ByteArray([4u8; 20]), + is_winner: true, + score: Default::default(), + orders: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + }]; + save_solutions(&mut db, auction_id, &solutions) + .await + .unwrap(); + } + + // Verify only the non-settling solver is returned + let result = find_non_settling_solvers( + &mut db, + u32::try_from(last_auctions_count).unwrap(), + deadline_block, + ) + .await + .unwrap(); + assert_eq!(result, vec![non_settling_solver]); + + // Non-settling solver settles one of the auctions + let event = EventIndex { + block_number: 4, + log_index: 0, + }; + let settlement = Settlement { + solver: non_settling_solver, + transaction_hash: ByteArray([0u8; 32]), + }; + events::insert_settlement(&mut db, &event, &settlement) + .await + .unwrap(); + + // The same result until the auction_id is updated in the settlements table + let result = find_non_settling_solvers( + &mut db, + u32::try_from(last_auctions_count).unwrap(), + deadline_block, + ) + .await + .unwrap(); + assert_eq!(result, vec![non_settling_solver]); + + settlements::update_settlement_auction(&mut db, 4, 0, 4) + .await + .unwrap(); + + let result = find_non_settling_solvers( + &mut db, + u32::try_from(last_auctions_count).unwrap(), + deadline_block, + ) + .await + .unwrap(); + assert!(result.is_empty()); + } } From e65c328fea34ad8cb59f8941db87222e9a509d6e Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 29 Jan 2025 13:30:34 +0000 Subject: [PATCH 03/33] Avoid rpc calls every time --- .../competition/solver_participation_guard.rs | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/crates/autopilot/src/domain/competition/solver_participation_guard.rs b/crates/autopilot/src/domain/competition/solver_participation_guard.rs index 1ec1379098..1ee4252f99 100644 --- a/crates/autopilot/src/domain/competition/solver_participation_guard.rs +++ b/crates/autopilot/src/domain/competition/solver_participation_guard.rs @@ -1,7 +1,6 @@ use { crate::{database::Postgres, domain::eth, infra::Ethereum}, ethrpc::block_stream::CurrentBlockWatcher, - futures::future::try_join_all, std::{ sync::Arc, time::{Duration, Instant}, @@ -12,7 +11,8 @@ use { pub struct SolverParticipationGuard(Arc); struct Inner { - validators: Vec>, + onchain_solver_participation_validator: OnchainSolverParticipationValidator, + database_solver_participation_validator: DatabaseSolverParticipationValidator, } impl SolverParticipationGuard { @@ -34,22 +34,31 @@ impl SolverParticipationGuard { ); Self(Arc::new(Inner { - validators: vec![ - Box::new(database_solver_participation_validator), - Box::new(onchain_solver_participation_validator), - ], + onchain_solver_participation_validator, + database_solver_participation_validator, })) } pub async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { - try_join_all( - self.0 - .validators - .iter() - .map(|strategy| strategy.can_participate(solver)), - ) - .await - .map(|results| results.into_iter().all(|can_participate| can_participate)) + if !self + .0 + .database_solver_participation_validator + .can_participate(solver) + .await? + { + return Ok(false); + } + + if !self + .0 + .onchain_solver_participation_validator + .can_participate(solver) + .await? + { + return Ok(false); + } + + Ok(true) } } From fc3321baf1cb4c7ed5054576cdc0d2085433e025 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 29 Jan 2025 13:44:26 +0000 Subject: [PATCH 04/33] Typo --- crates/autopilot/src/database/competition.rs | 1 - .../src/domain/competition/solver_participation_guard.rs | 2 +- crates/driver/src/domain/competition/bad_tokens/metrics.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/autopilot/src/database/competition.rs b/crates/autopilot/src/database/competition.rs index 2411e74f0f..88c9a86fca 100644 --- a/crates/autopilot/src/database/competition.rs +++ b/crates/autopilot/src/database/competition.rs @@ -146,7 +146,6 @@ impl super::Postgres { current_block: u64, ) -> anyhow::Result> { let mut ex = self.pool.acquire().await.context("acquire")?; - let _timer = super::Metrics::get() .database_queries .with_label_values(&["find_non_settling_solvers"]) diff --git a/crates/autopilot/src/domain/competition/solver_participation_guard.rs b/crates/autopilot/src/domain/competition/solver_participation_guard.rs index 1ee4252f99..8d8b961a58 100644 --- a/crates/autopilot/src/domain/competition/solver_participation_guard.rs +++ b/crates/autopilot/src/domain/competition/solver_participation_guard.rs @@ -117,7 +117,7 @@ impl DatabaseSolverParticipationValidator { .map(|solver| eth::Address(solver.0.into())) .collect::>(); - tracing::debug!(?non_settling_solvers, "found non-settling solvers",); + tracing::debug!(?non_settling_solvers, "found non-settling solvers"); let now = Instant::now(); for solver in non_settling_solvers { diff --git a/crates/driver/src/domain/competition/bad_tokens/metrics.rs b/crates/driver/src/domain/competition/bad_tokens/metrics.rs index daaad1ee7c..a0444169b9 100644 --- a/crates/driver/src/domain/competition/bad_tokens/metrics.rs +++ b/crates/driver/src/domain/competition/bad_tokens/metrics.rs @@ -110,7 +110,7 @@ impl Detector { flagged_unsupported_at: None, }); - // token neeeds to be frozen as unsupported for a while + // token needs to be frozen as unsupported for a while if self.quality_based_on_stats(&stats) == Quality::Unsupported && stats .flagged_unsupported_at From 0fbd61c05a638a0dc4b221c8300e7cfa8ba623b8 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 29 Jan 2025 14:28:33 +0000 Subject: [PATCH 05/33] Docs --- crates/autopilot/src/database/competition.rs | 4 ++ .../competition/solver_participation_guard.rs | 61 ++++++++++--------- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/crates/autopilot/src/database/competition.rs b/crates/autopilot/src/database/competition.rs index 88c9a86fca..682ac81d1d 100644 --- a/crates/autopilot/src/database/competition.rs +++ b/crates/autopilot/src/database/competition.rs @@ -140,6 +140,10 @@ impl super::Postgres { Ok(()) } + /// Finds solvers that won `last_auctions_count` consecutive auctions but + /// never settled any of them. The current block is used to prevent + /// selecting auctions with deadline after the current block since they + /// still can be settled. pub async fn find_non_settling_solvers( &self, last_auctions_count: u32, diff --git a/crates/autopilot/src/domain/competition/solver_participation_guard.rs b/crates/autopilot/src/domain/competition/solver_participation_guard.rs index 8d8b961a58..71eacf0e40 100644 --- a/crates/autopilot/src/domain/competition/solver_participation_guard.rs +++ b/crates/autopilot/src/domain/competition/solver_participation_guard.rs @@ -7,12 +7,14 @@ use { }, }; +/// This struct checks whether a solver can participate in the competition by +/// using different validators. #[derive(Clone)] pub struct SolverParticipationGuard(Arc); struct Inner { - onchain_solver_participation_validator: OnchainSolverParticipationValidator, - database_solver_participation_validator: DatabaseSolverParticipationValidator, + /// Stores the validators in order they will be called. + validators: Vec>, } impl SolverParticipationGuard { @@ -34,28 +36,25 @@ impl SolverParticipationGuard { ); Self(Arc::new(Inner { - onchain_solver_participation_validator, - database_solver_participation_validator, + validators: vec![ + Box::new(database_solver_participation_validator), + Box::new(onchain_solver_participation_validator), + ], })) } + /// Checks if a solver can participate in the competition. + /// Sequentially asks internal validators to avoid redundant RPC calls in + /// the following order: + /// 1. DatabaseSolverParticipationValidator - operates fast since it uses + /// in-memory cache. + /// 2. OnchainSolverParticipationValidator - only then calls the + /// Authenticator contract. pub async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { - if !self - .0 - .database_solver_participation_validator - .can_participate(solver) - .await? - { - return Ok(false); - } - - if !self - .0 - .onchain_solver_participation_validator - .can_participate(solver) - .await? - { - return Ok(false); + for validator in &self.0.validators { + if !validator.is_allowed(solver).await? { + return Ok(false); + } } Ok(true) @@ -64,15 +63,17 @@ impl SolverParticipationGuard { #[async_trait::async_trait] trait SolverParticipationValidator: Send + Sync { - async fn can_participate(&self, solver: ð::Address) -> anyhow::Result; + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result; } +/// Checks the DB by searching for solvers that won N last consecutive auctions +/// but never settled any of them. #[derive(Clone)] pub struct DatabaseSolverParticipationValidator(Arc); struct DatabaseSolverParticipationValidatorInner { db: Postgres, - cache: dashmap::DashMap, + banned_solvers: dashmap::DashMap, ttl: Duration, last_auctions_count: u32, } @@ -87,7 +88,7 @@ impl DatabaseSolverParticipationValidator { ) -> Self { let self_ = Self(Arc::new(DatabaseSolverParticipationValidatorInner { db, - cache: Default::default(), + banned_solvers: Default::default(), ttl, last_auctions_count, })); @@ -97,6 +98,8 @@ impl DatabaseSolverParticipationValidator { self_ } + /// Update the internal cache only once the settlement table is updated to + /// avoid redundant DB queries. fn start_maintenance( &self, mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, @@ -121,7 +124,7 @@ impl DatabaseSolverParticipationValidator { let now = Instant::now(); for solver in non_settling_solvers { - self_.cache.insert(solver, now); + self_.banned_solvers.insert(solver, now); } } Err(err) => { @@ -135,12 +138,12 @@ impl DatabaseSolverParticipationValidator { #[async_trait::async_trait] impl SolverParticipationValidator for DatabaseSolverParticipationValidator { - async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { - if let Some(entry) = self.0.cache.get(solver) { + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { + if let Some(entry) = self.0.banned_solvers.get(solver) { if Instant::now().duration_since(*entry.value()) < self.0.ttl { return Ok(false); } else { - self.0.cache.remove(solver); + self.0.banned_solvers.remove(solver); } } @@ -148,13 +151,15 @@ impl SolverParticipationValidator for DatabaseSolverParticipationValidator { } } +/// Calls Authenticator contract to check if a solver has a sufficient +/// permission. struct OnchainSolverParticipationValidator { eth: Ethereum, } #[async_trait::async_trait] impl SolverParticipationValidator for OnchainSolverParticipationValidator { - async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { Ok(self .eth .contracts() From b1abfa0be8a1282ad008011c546c8029e695bfc1 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 29 Jan 2025 16:57:08 +0000 Subject: [PATCH 06/33] Metrics --- .../competition/solver_participation_guard.rs | 16 ++++++++++++++-- crates/autopilot/src/domain/mod.rs | 15 +++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/crates/autopilot/src/domain/competition/solver_participation_guard.rs b/crates/autopilot/src/domain/competition/solver_participation_guard.rs index 71eacf0e40..75cd0e4b82 100644 --- a/crates/autopilot/src/domain/competition/solver_participation_guard.rs +++ b/crates/autopilot/src/domain/competition/solver_participation_guard.rs @@ -1,5 +1,9 @@ use { - crate::{database::Postgres, domain::eth, infra::Ethereum}, + crate::{ + database::Postgres, + domain::{eth, Metrics}, + infra::Ethereum, + }, ethrpc::block_stream::CurrentBlockWatcher, std::{ sync::Arc, @@ -117,7 +121,15 @@ impl DatabaseSolverParticipationValidator { Ok(non_settling_solvers) => { let non_settling_solvers = non_settling_solvers .into_iter() - .map(|solver| eth::Address(solver.0.into())) + .map(|solver| { + let address = eth::Address(solver.0.into()); + + Metrics::get() + .non_settling_solver + .with_label_values(&[&format!("{:#x}", address.0)]); + + address + }) .collect::>(); tracing::debug!(?non_settling_solvers, "found non-settling solvers"); diff --git a/crates/autopilot/src/domain/mod.rs b/crates/autopilot/src/domain/mod.rs index ceaae58e2a..ad7d0077af 100644 --- a/crates/autopilot/src/domain/mod.rs +++ b/crates/autopilot/src/domain/mod.rs @@ -14,3 +14,18 @@ pub use { fee::ProtocolFees, quote::Quote, }; + +#[derive(prometheus_metric_storage::MetricStorage)] +#[metric(subsystem = "domain")] +pub struct Metrics { + /// How many times the solver marked as non-settling based on the database + /// statistics. + #[metric(labels("solver"))] + pub non_settling_solver: prometheus::IntCounterVec, +} + +impl Metrics { + fn get() -> &'static Self { + Metrics::instance(observe::metrics::get_storage_registry()).unwrap() + } +} From 292dcffb9538b993bba7d16ffe49c0338ad42580 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 29 Jan 2025 17:42:56 +0000 Subject: [PATCH 07/33] Configurable validators --- crates/autopilot/src/arguments.rs | 53 +++++++++++++++---- .../competition/solver_participation_guard.rs | 43 ++++++++------- crates/autopilot/src/run.rs | 19 +++---- 3 files changed, 77 insertions(+), 38 deletions(-) diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index ed5aa92657..2134bd3329 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -243,15 +243,52 @@ pub struct Arguments { #[clap(long, env)] pub archive_node_url: Option, - #[clap(long, env, default_value = "5m", value_parser = humantime::parse_duration)] + /// Configuration for the solver participation guard. + #[clap(flatten)] + pub solver_participation_guard: SolverParticipationGuardConfig, +} + +#[derive(Debug, clap::Parser)] +pub struct SolverParticipationGuardConfig { + #[clap(flatten)] + pub db_based_validator: DbBasedValidatorConfig, + + #[clap(flatten)] + pub onchain_based_validator: OnchainBasedValidatorConfig, +} + +#[derive(Debug, clap::Parser)] +pub struct DbBasedValidatorConfig { + /// Enables or disables the solver participation guard + #[clap( + long, + env, + name = "db_based_solver_participation_guard_enabled", + default_value = "true" + )] + pub enabled: bool, + /// The time-to-live for the solver participation blacklist cache. + #[clap(long, env, default_value = "5m", value_parser = humantime::parse_duration)] pub solver_blacklist_cache_ttl: Duration, - #[clap(long, env, default_value = "3")] /// The number of last auctions to check solver participation eligibility. + #[clap(long, env, default_value = "3")] pub solver_last_auctions_participation_count: u32, } +#[derive(Debug, Clone, clap::Parser)] +pub struct OnchainBasedValidatorConfig { + /// Enables or disables the solver participation guard + #[clap( + long, + env, + name = "onchain_based_solver_participation_guard_enabled", + default_value = "true" + )] + pub enabled: bool, +} + impl std::fmt::Display for Arguments { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let Self { @@ -295,8 +332,7 @@ impl std::fmt::Display for Arguments { max_winners_per_auction, archive_node_url, max_solutions_per_solver, - solver_blacklist_cache_ttl, - solver_last_auctions_participation_count, + solver_participation_guard, } = self; write!(f, "{}", shared)?; @@ -382,13 +418,8 @@ impl std::fmt::Display for Arguments { )?; writeln!( f, - "solver_blacklist_cache_ttl: {:?}", - solver_blacklist_cache_ttl - )?; - writeln!( - f, - "solver_last_auctions_participation_count: {:?}", - solver_last_auctions_participation_count + "solver_participation_guard: {:?}", + solver_participation_guard )?; Ok(()) } diff --git a/crates/autopilot/src/domain/competition/solver_participation_guard.rs b/crates/autopilot/src/domain/competition/solver_participation_guard.rs index 75cd0e4b82..f02029370d 100644 --- a/crates/autopilot/src/domain/competition/solver_participation_guard.rs +++ b/crates/autopilot/src/domain/competition/solver_participation_guard.rs @@ -1,5 +1,6 @@ use { crate::{ + arguments::SolverParticipationGuardConfig, database::Postgres, domain::{eth, Metrics}, infra::Ethereum, @@ -26,25 +27,31 @@ impl SolverParticipationGuard { eth: Ethereum, db: Postgres, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, - ttl: Duration, - last_auctions_count: u32, + config: SolverParticipationGuardConfig, ) -> Self { - let current_block = eth.current_block().clone(); - let onchain_solver_participation_validator = OnchainSolverParticipationValidator { eth }; - let database_solver_participation_validator = DatabaseSolverParticipationValidator::new( - db, - current_block, - settlement_updates_receiver, - ttl, - last_auctions_count, - ); - - Self(Arc::new(Inner { - validators: vec![ - Box::new(database_solver_participation_validator), - Box::new(onchain_solver_participation_validator), - ], - })) + let mut validators: Vec> = Vec::new(); + + if config.db_based_validator.enabled { + let current_block = eth.current_block().clone(); + let database_solver_participation_validator = DatabaseSolverParticipationValidator::new( + db, + current_block, + settlement_updates_receiver, + config.db_based_validator.solver_blacklist_cache_ttl, + config + .db_based_validator + .solver_last_auctions_participation_count, + ); + validators.push(Box::new(database_solver_participation_validator)); + } + + if config.onchain_based_validator.enabled { + let onchain_solver_participation_validator = + OnchainSolverParticipationValidator { eth }; + validators.push(Box::new(onchain_solver_participation_validator)); + } + + Self(Arc::new(Inner { validators })) } /// Checks if a solver can participate in the competition. diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index ede9f38045..cf538343c8 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -365,21 +365,22 @@ pub async fn run(args: Arguments) { None }; - let persistence = - infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await; let (settlement_updates_sender, settlement_updates_receiver) = tokio::sync::mpsc::unbounded_channel(); - let settlement_observer = crate::domain::settlement::Observer::new( - eth.clone(), - persistence.clone(), - settlement_updates_sender, - ); + let solver_participation_guard = SolverParticipationGuard::new( eth.clone(), db.clone(), settlement_updates_receiver, - args.solver_blacklist_cache_ttl, - args.solver_last_auctions_participation_count, + args.solver_participation_guard, + ); + + let persistence = + infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await; + let settlement_observer = crate::domain::settlement::Observer::new( + eth.clone(), + persistence.clone(), + settlement_updates_sender, ); let settlement_contract_start_index = if let Some(DeploymentInformation::BlockNumber(settlement_contract_start_index)) = From fe9ef5b506e2b13074937b89f141d9d043f40b85 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 29 Jan 2025 17:58:13 +0000 Subject: [PATCH 08/33] Fixed clap config --- crates/autopilot/src/arguments.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 2134bd3329..e0fcb13489 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -261,9 +261,9 @@ pub struct SolverParticipationGuardConfig { pub struct DbBasedValidatorConfig { /// Enables or disables the solver participation guard #[clap( - long, - env, - name = "db_based_solver_participation_guard_enabled", + id = "db_enabled", + long = "db-based-solver-participation-guard-enabled", + env = "DB_BASED_SOLVER_PARTICIPATION_GUARD_ENABLED", default_value = "true" )] pub enabled: bool, @@ -281,9 +281,9 @@ pub struct DbBasedValidatorConfig { pub struct OnchainBasedValidatorConfig { /// Enables or disables the solver participation guard #[clap( - long, - env, - name = "onchain_based_solver_participation_guard_enabled", + id = "onchain_enabled", + long = "onchain-based-solver-participation-guard-enabled", + env = "ONCHAIN_BASED_SOLVER_PARTICIPATION_GUARD_ENABLED", default_value = "true" )] pub enabled: bool, From c5e350235e3455e55357ed34a907801f800e8467 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 12:36:57 +0000 Subject: [PATCH 09/33] Refactoring --- .../autopilot/src/domain/competition/mod.rs | 4 +- .../competition/participation_guard/db.rs | 104 ++++++++++ .../competition/participation_guard/mod.rs | 74 +++++++ .../participation_guard/onchain.rs | 20 ++ .../competition/solver_participation_guard.rs | 190 ------------------ 5 files changed, 200 insertions(+), 192 deletions(-) create mode 100644 crates/autopilot/src/domain/competition/participation_guard/db.rs create mode 100644 crates/autopilot/src/domain/competition/participation_guard/mod.rs create mode 100644 crates/autopilot/src/domain/competition/participation_guard/onchain.rs delete mode 100644 crates/autopilot/src/domain/competition/solver_participation_guard.rs diff --git a/crates/autopilot/src/domain/competition/mod.rs b/crates/autopilot/src/domain/competition/mod.rs index bcd5210536..7ef2d6d06f 100644 --- a/crates/autopilot/src/domain/competition/mod.rs +++ b/crates/autopilot/src/domain/competition/mod.rs @@ -6,11 +6,11 @@ use { }; mod participant; -mod solver_participation_guard; +mod participation_guard; pub use { participant::{Participant, Ranked, Unranked}, - solver_participation_guard::{DatabaseSolverParticipationValidator, SolverParticipationGuard}, + participation_guard::SolverParticipationGuard, }; type SolutionId = u64; diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs new file mode 100644 index 0000000000..4eb3ce7c17 --- /dev/null +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -0,0 +1,104 @@ +use { + crate::{ + database::Postgres, + domain::{eth, Metrics}, + }, + ethrpc::block_stream::CurrentBlockWatcher, + std::{ + sync::Arc, + time::{Duration, Instant}, + }, +}; + +/// Checks the DB by searching for solvers that won N last consecutive auctions +/// but never settled any of them. +#[derive(Clone)] +pub(super) struct Validator(Arc); + +struct Inner { + db: Postgres, + banned_solvers: dashmap::DashMap, + ttl: Duration, + last_auctions_count: u32, +} + +impl Validator { + pub fn new( + db: Postgres, + current_block: CurrentBlockWatcher, + settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + ttl: Duration, + last_auctions_count: u32, + ) -> Self { + let self_ = Self(Arc::new(Inner { + db, + banned_solvers: Default::default(), + ttl, + last_auctions_count, + })); + + self_.start_maintenance(settlement_updates_receiver, current_block); + + self_ + } + + /// Update the internal cache only once the settlement table is updated to + /// avoid redundant DB queries. + fn start_maintenance( + &self, + mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + current_block: CurrentBlockWatcher, + ) { + let self_ = self.0.clone(); + tokio::spawn(async move { + while settlement_updates_receiver.recv().await.is_some() { + let current_block = current_block.borrow().number; + match self_ + .db + .find_non_settling_solvers(self_.last_auctions_count, current_block) + .await + { + Ok(non_settling_solvers) => { + let non_settling_solvers = non_settling_solvers + .into_iter() + .map(|solver| { + let address = eth::Address(solver.0.into()); + + Metrics::get() + .non_settling_solver + .with_label_values(&[&format!("{:#x}", address.0)]); + + address + }) + .collect::>(); + + tracing::debug!(?non_settling_solvers, "found non-settling solvers"); + + let now = Instant::now(); + for solver in non_settling_solvers { + self_.banned_solvers.insert(solver, now); + } + } + Err(err) => { + tracing::warn!(?err, "error while searching for non-settling solvers") + } + } + } + }); + } +} + +#[async_trait::async_trait] +impl super::Validator for Validator { + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { + if let Some(entry) = self.0.banned_solvers.get(solver) { + if Instant::now().duration_since(*entry.value()) < self.0.ttl { + return Ok(false); + } else { + self.0.banned_solvers.remove(solver); + } + } + + Ok(true) + } +} diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs new file mode 100644 index 0000000000..7450dc6126 --- /dev/null +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -0,0 +1,74 @@ +mod db; +mod onchain; + +use { + crate::{ + arguments::SolverParticipationGuardConfig, + database::Postgres, + domain::eth, + infra::Ethereum, + }, + std::sync::Arc, +}; + +/// This struct checks whether a solver can participate in the competition by +/// using different validators. +#[derive(Clone)] +pub struct SolverParticipationGuard(Arc); + +struct Inner { + /// Stores the validators in order they will be called. + validators: Vec>, +} + +impl SolverParticipationGuard { + pub fn new( + eth: Ethereum, + db: Postgres, + settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + config: SolverParticipationGuardConfig, + ) -> Self { + let mut validators: Vec> = Vec::new(); + + if config.db_based_validator.enabled { + let current_block = eth.current_block().clone(); + let database_solver_participation_validator = db::Validator::new( + db, + current_block, + settlement_updates_receiver, + config.db_based_validator.solver_blacklist_cache_ttl, + config + .db_based_validator + .solver_last_auctions_participation_count, + ); + validators.push(Box::new(database_solver_participation_validator)); + } + + if config.onchain_based_validator.enabled { + let onchain_solver_participation_validator = onchain::Validator { eth }; + validators.push(Box::new(onchain_solver_participation_validator)); + } + + Self(Arc::new(Inner { validators })) + } + + /// Checks if a solver can participate in the competition. + /// Sequentially asks internal validators to avoid redundant RPC calls in + /// the following order: + /// 1. DB-based validator: operates fast since it uses in-memory cache. + /// 2. Onchain-based validator: only then calls the Authenticator contract. + pub async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { + for validator in &self.0.validators { + if !validator.is_allowed(solver).await? { + return Ok(false); + } + } + + Ok(true) + } +} + +#[async_trait::async_trait] +trait Validator: Send + Sync { + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result; +} diff --git a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs new file mode 100644 index 0000000000..82d0ef3fb7 --- /dev/null +++ b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs @@ -0,0 +1,20 @@ +use crate::{domain::eth, infra::Ethereum}; + +/// Calls Authenticator contract to check if a solver has a sufficient +/// permission. +pub(super) struct Validator { + pub eth: Ethereum, +} + +#[async_trait::async_trait] +impl super::Validator for Validator { + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { + Ok(self + .eth + .contracts() + .authenticator() + .is_solver(solver.0) + .call() + .await?) + } +} diff --git a/crates/autopilot/src/domain/competition/solver_participation_guard.rs b/crates/autopilot/src/domain/competition/solver_participation_guard.rs deleted file mode 100644 index f02029370d..0000000000 --- a/crates/autopilot/src/domain/competition/solver_participation_guard.rs +++ /dev/null @@ -1,190 +0,0 @@ -use { - crate::{ - arguments::SolverParticipationGuardConfig, - database::Postgres, - domain::{eth, Metrics}, - infra::Ethereum, - }, - ethrpc::block_stream::CurrentBlockWatcher, - std::{ - sync::Arc, - time::{Duration, Instant}, - }, -}; - -/// This struct checks whether a solver can participate in the competition by -/// using different validators. -#[derive(Clone)] -pub struct SolverParticipationGuard(Arc); - -struct Inner { - /// Stores the validators in order they will be called. - validators: Vec>, -} - -impl SolverParticipationGuard { - pub fn new( - eth: Ethereum, - db: Postgres, - settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, - config: SolverParticipationGuardConfig, - ) -> Self { - let mut validators: Vec> = Vec::new(); - - if config.db_based_validator.enabled { - let current_block = eth.current_block().clone(); - let database_solver_participation_validator = DatabaseSolverParticipationValidator::new( - db, - current_block, - settlement_updates_receiver, - config.db_based_validator.solver_blacklist_cache_ttl, - config - .db_based_validator - .solver_last_auctions_participation_count, - ); - validators.push(Box::new(database_solver_participation_validator)); - } - - if config.onchain_based_validator.enabled { - let onchain_solver_participation_validator = - OnchainSolverParticipationValidator { eth }; - validators.push(Box::new(onchain_solver_participation_validator)); - } - - Self(Arc::new(Inner { validators })) - } - - /// Checks if a solver can participate in the competition. - /// Sequentially asks internal validators to avoid redundant RPC calls in - /// the following order: - /// 1. DatabaseSolverParticipationValidator - operates fast since it uses - /// in-memory cache. - /// 2. OnchainSolverParticipationValidator - only then calls the - /// Authenticator contract. - pub async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { - for validator in &self.0.validators { - if !validator.is_allowed(solver).await? { - return Ok(false); - } - } - - Ok(true) - } -} - -#[async_trait::async_trait] -trait SolverParticipationValidator: Send + Sync { - async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result; -} - -/// Checks the DB by searching for solvers that won N last consecutive auctions -/// but never settled any of them. -#[derive(Clone)] -pub struct DatabaseSolverParticipationValidator(Arc); - -struct DatabaseSolverParticipationValidatorInner { - db: Postgres, - banned_solvers: dashmap::DashMap, - ttl: Duration, - last_auctions_count: u32, -} - -impl DatabaseSolverParticipationValidator { - pub fn new( - db: Postgres, - current_block: CurrentBlockWatcher, - settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, - ttl: Duration, - last_auctions_count: u32, - ) -> Self { - let self_ = Self(Arc::new(DatabaseSolverParticipationValidatorInner { - db, - banned_solvers: Default::default(), - ttl, - last_auctions_count, - })); - - self_.start_maintenance(settlement_updates_receiver, current_block); - - self_ - } - - /// Update the internal cache only once the settlement table is updated to - /// avoid redundant DB queries. - fn start_maintenance( - &self, - mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, - current_block: CurrentBlockWatcher, - ) { - let self_ = self.0.clone(); - tokio::spawn(async move { - while settlement_updates_receiver.recv().await.is_some() { - let current_block = current_block.borrow().number; - match self_ - .db - .find_non_settling_solvers(self_.last_auctions_count, current_block) - .await - { - Ok(non_settling_solvers) => { - let non_settling_solvers = non_settling_solvers - .into_iter() - .map(|solver| { - let address = eth::Address(solver.0.into()); - - Metrics::get() - .non_settling_solver - .with_label_values(&[&format!("{:#x}", address.0)]); - - address - }) - .collect::>(); - - tracing::debug!(?non_settling_solvers, "found non-settling solvers"); - - let now = Instant::now(); - for solver in non_settling_solvers { - self_.banned_solvers.insert(solver, now); - } - } - Err(err) => { - tracing::warn!(?err, "error while searching for non-settling solvers") - } - } - } - }); - } -} - -#[async_trait::async_trait] -impl SolverParticipationValidator for DatabaseSolverParticipationValidator { - async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { - if let Some(entry) = self.0.banned_solvers.get(solver) { - if Instant::now().duration_since(*entry.value()) < self.0.ttl { - return Ok(false); - } else { - self.0.banned_solvers.remove(solver); - } - } - - Ok(true) - } -} - -/// Calls Authenticator contract to check if a solver has a sufficient -/// permission. -struct OnchainSolverParticipationValidator { - eth: Ethereum, -} - -#[async_trait::async_trait] -impl SolverParticipationValidator for OnchainSolverParticipationValidator { - async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { - Ok(self - .eth - .contracts() - .authenticator() - .is_solver(solver.0) - .call() - .await?) - } -} From a9e6a3fe9311c524ce85672480e14a8fff67ff10 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 16:07:06 +0000 Subject: [PATCH 10/33] Config per solver --- crates/autopilot/src/arguments.rs | 92 +++++++++++++------ .../competition/participation_guard/mod.rs | 18 ++-- crates/autopilot/src/run.rs | 2 +- 3 files changed, 70 insertions(+), 42 deletions(-) diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index e0fcb13489..e70b5ba0a3 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -245,20 +245,11 @@ pub struct Arguments { /// Configuration for the solver participation guard. #[clap(flatten)] - pub solver_participation_guard: SolverParticipationGuardConfig, + pub db_based_solver_participation_guard: DbBasedSolverParticipationGuardConfig, } #[derive(Debug, clap::Parser)] -pub struct SolverParticipationGuardConfig { - #[clap(flatten)] - pub db_based_validator: DbBasedValidatorConfig, - - #[clap(flatten)] - pub onchain_based_validator: OnchainBasedValidatorConfig, -} - -#[derive(Debug, clap::Parser)] -pub struct DbBasedValidatorConfig { +pub struct DbBasedSolverParticipationGuardConfig { /// Enables or disables the solver participation guard #[clap( id = "db_enabled", @@ -277,18 +268,6 @@ pub struct DbBasedValidatorConfig { pub solver_last_auctions_participation_count: u32, } -#[derive(Debug, Clone, clap::Parser)] -pub struct OnchainBasedValidatorConfig { - /// Enables or disables the solver participation guard - #[clap( - id = "onchain_enabled", - long = "onchain-based-solver-participation-guard-enabled", - env = "ONCHAIN_BASED_SOLVER_PARTICIPATION_GUARD_ENABLED", - default_value = "true" - )] - pub enabled: bool, -} - impl std::fmt::Display for Arguments { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let Self { @@ -332,7 +311,7 @@ impl std::fmt::Display for Arguments { max_winners_per_auction, archive_node_url, max_solutions_per_solver, - solver_participation_guard, + db_based_solver_participation_guard, } = self; write!(f, "{}", shared)?; @@ -418,8 +397,8 @@ impl std::fmt::Display for Arguments { )?; writeln!( f, - "solver_participation_guard: {:?}", - solver_participation_guard + "db_based_solver_participation_guard: {:?}", + db_based_solver_participation_guard )?; Ok(()) } @@ -432,6 +411,7 @@ pub struct Solver { pub url: Url, pub submission_account: Account, pub fairness_threshold: Option, + pub accepts_unsettled_blocking: bool, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -480,18 +460,34 @@ impl FromStr for Solver { Account::Address(H160::from_str(parts[2]).context("failed to parse submission")?) }; - let fairness_threshold = match parts.get(3) { - Some(value) => { - Some(U256::from_dec_str(value).context("failed to parse fairness threshold")?) + let mut fairness_threshold: Option = Default::default(); + let mut accepts_unsettled_blocking = false; + + if let Some(value) = parts.get(3) { + match U256::from_dec_str(value) { + Ok(parsed_fairness_threshold) => { + fairness_threshold = Some(parsed_fairness_threshold); + } + Err(_) => { + accepts_unsettled_blocking = value + .parse() + .context("failed to parse solver's third arg param")? + } } - None => None, }; + if let Some(value) = parts.get(4) { + accepts_unsettled_blocking = value + .parse() + .context("failed to parse `accepts_unsettled_blocking` flag")?; + } + Ok(Self { name: name.to_owned(), url, fairness_threshold, submission_account, + accepts_unsettled_blocking, }) } } @@ -688,6 +684,7 @@ mod test { name: "name1".into(), url: Url::parse("http://localhost:8080").unwrap(), fairness_threshold: None, + accepts_unsettled_blocking: false, submission_account: Account::Address(H160::from_slice(&hex!( "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" ))), @@ -703,6 +700,7 @@ mod test { name: "name1".into(), url: Url::parse("http://localhost:8080").unwrap(), fairness_threshold: None, + accepts_unsettled_blocking: false, submission_account: Account::Kms( Arn::from_str("arn:aws:kms:supersecretstuff").unwrap(), ), @@ -721,6 +719,40 @@ mod test { "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" ))), fairness_threshold: Some(U256::exp10(18)), + accepts_unsettled_blocking: false, + }; + assert_eq!(driver, expected); + } + + #[test] + fn parse_driver_with_accepts_unsettled_blocking_flag() { + let argument = + "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|true"; + let driver = Solver::from_str(argument).unwrap(); + let expected = Solver { + name: "name1".into(), + url: Url::parse("http://localhost:8080").unwrap(), + submission_account: Account::Address(H160::from_slice(&hex!( + "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" + ))), + fairness_threshold: None, + accepts_unsettled_blocking: true, + }; + assert_eq!(driver, expected); + } + + #[test] + fn parse_driver_with_threshold_and_accepts_unsettled_blocking_flag() { + let argument = "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|1000000000000000000|true"; + let driver = Solver::from_str(argument).unwrap(); + let expected = Solver { + name: "name1".into(), + url: Url::parse("http://localhost:8080").unwrap(), + submission_account: Account::Address(H160::from_slice(&hex!( + "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" + ))), + fairness_threshold: Some(U256::exp10(18)), + accepts_unsettled_blocking: true, }; assert_eq!(driver, expected); } diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index 7450dc6126..07f08f210e 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -3,7 +3,7 @@ mod onchain; use { crate::{ - arguments::SolverParticipationGuardConfig, + arguments::DbBasedSolverParticipationGuardConfig, database::Postgres, domain::eth, infra::Ethereum, @@ -26,28 +26,24 @@ impl SolverParticipationGuard { eth: Ethereum, db: Postgres, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, - config: SolverParticipationGuardConfig, + db_based_validator_config: DbBasedSolverParticipationGuardConfig, ) -> Self { let mut validators: Vec> = Vec::new(); - if config.db_based_validator.enabled { + if db_based_validator_config.enabled { let current_block = eth.current_block().clone(); let database_solver_participation_validator = db::Validator::new( db, current_block, settlement_updates_receiver, - config.db_based_validator.solver_blacklist_cache_ttl, - config - .db_based_validator - .solver_last_auctions_participation_count, + db_based_validator_config.solver_blacklist_cache_ttl, + db_based_validator_config.solver_last_auctions_participation_count, ); validators.push(Box::new(database_solver_participation_validator)); } - if config.onchain_based_validator.enabled { - let onchain_solver_participation_validator = onchain::Validator { eth }; - validators.push(Box::new(onchain_solver_participation_validator)); - } + let onchain_solver_participation_validator = onchain::Validator { eth }; + validators.push(Box::new(onchain_solver_participation_validator)); Self(Arc::new(Inner { validators })) } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index cf538343c8..e3a08c5400 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -372,7 +372,7 @@ pub async fn run(args: Arguments) { eth.clone(), db.clone(), settlement_updates_receiver, - args.solver_participation_guard, + args.db_based_solver_participation_guard, ); let persistence = From 9a55fe2fb2ab6c6ac84dafd2197448a1c0e1236d Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 16:29:03 +0000 Subject: [PATCH 11/33] Start using the new config --- .../competition/participation_guard/db.rs | 16 +++++++++++++ .../competition/participation_guard/mod.rs | 4 +++- crates/autopilot/src/infra/solvers/mod.rs | 3 +++ crates/autopilot/src/run.rs | 24 ++++++++++++------- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 4eb3ce7c17..d37849c0aa 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -5,6 +5,7 @@ use { }, ethrpc::block_stream::CurrentBlockWatcher, std::{ + collections::HashMap, sync::Arc, time::{Duration, Instant}, }, @@ -20,6 +21,7 @@ struct Inner { banned_solvers: dashmap::DashMap, ttl: Duration, last_auctions_count: u32, + db_validator_acceptance_by_solver: HashMap, } impl Validator { @@ -29,12 +31,14 @@ impl Validator { settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, ttl: Duration, last_auctions_count: u32, + db_validator_acceptance_by_solver: HashMap, ) -> Self { let self_ = Self(Arc::new(Inner { db, banned_solvers: Default::default(), ttl, last_auctions_count, + db_validator_acceptance_by_solver, })); self_.start_maintenance(settlement_updates_receiver, current_block); @@ -91,6 +95,18 @@ impl Validator { #[async_trait::async_trait] impl super::Validator for Validator { async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { + // Check if solver accepted this feature. This should be removed once a CIP is + // approved. + if !self + .0 + .db_validator_acceptance_by_solver + .get(solver) + .copied() + .unwrap_or_default() + { + return Ok(true); + } + if let Some(entry) = self.0.banned_solvers.get(solver) { if Instant::now().duration_since(*entry.value()) < self.0.ttl { return Ok(false); diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index 07f08f210e..f4fb14a9a5 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -8,7 +8,7 @@ use { domain::eth, infra::Ethereum, }, - std::sync::Arc, + std::{collections::HashMap, sync::Arc}, }; /// This struct checks whether a solver can participate in the competition by @@ -27,6 +27,7 @@ impl SolverParticipationGuard { db: Postgres, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, db_based_validator_config: DbBasedSolverParticipationGuardConfig, + db_validator_acceptance_by_solver: HashMap, ) -> Self { let mut validators: Vec> = Vec::new(); @@ -38,6 +39,7 @@ impl SolverParticipationGuard { settlement_updates_receiver, db_based_validator_config.solver_blacklist_cache_ttl, db_based_validator_config.solver_last_auctions_participation_count, + db_validator_acceptance_by_solver, ); validators.push(Box::new(database_solver_participation_validator)); } diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index b84dc79654..aae623ebaa 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -21,6 +21,7 @@ pub struct Driver { // another driver solved with surplus exceeding this driver's surplus by `threshold` pub fairness_threshold: Option, pub submission_address: eth::Address, + pub accepts_unsettled_blocking: bool, client: Client, } @@ -38,6 +39,7 @@ impl Driver { name: String, fairness_threshold: Option, submission_account: Account, + accepts_unsettled_blocking: bool, ) -> Result { let submission_address = match submission_account { Account::Kms(key_id) => { @@ -70,6 +72,7 @@ impl Driver { .build() .map_err(Error::FailedToBuildClient)?, submission_address: submission_address.into(), + accepts_unsettled_blocking, }) } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index e3a08c5400..cfa031f81a 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -52,6 +52,7 @@ use { token_list::{AutoUpdatingTokenList, TokenListConfiguration}, }, std::{ + collections::HashMap, sync::{Arc, RwLock}, time::{Duration, Instant}, }, @@ -368,13 +369,6 @@ pub async fn run(args: Arguments) { let (settlement_updates_sender, settlement_updates_receiver) = tokio::sync::mpsc::unbounded_channel(); - let solver_participation_guard = SolverParticipationGuard::new( - eth.clone(), - db.clone(), - settlement_updates_receiver, - args.db_based_solver_participation_guard, - ); - let persistence = infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await; let settlement_observer = crate::domain::settlement::Observer::new( @@ -561,6 +555,7 @@ pub async fn run(args: Arguments) { max_winners_per_auction: args.max_winners_per_auction, max_solutions_per_solver: args.max_solutions_per_solver, }; + let drivers_futures = args .drivers .into_iter() @@ -570,6 +565,7 @@ pub async fn run(args: Arguments) { driver.name.clone(), driver.fairness_threshold.map(Into::into), driver.submission_account, + driver.accepts_unsettled_blocking, ) .await .map(Arc::new) @@ -577,11 +573,22 @@ pub async fn run(args: Arguments) { }) .collect::>(); - let drivers = futures::future::join_all(drivers_futures) + let drivers: Vec<_> = futures::future::join_all(drivers_futures) .await .into_iter() .collect(); + let solver_participation_guard = SolverParticipationGuard::new( + eth.clone(), + db.clone(), + settlement_updates_receiver, + args.db_based_solver_participation_guard, + drivers + .iter() + .map(|driver| (driver.submission_address, driver.accepts_unsettled_blocking)) + .collect::>(), + ); + let run = RunLoop::new( run_loop_config, eth, @@ -613,6 +620,7 @@ async fn shadow_mode(args: Arguments) -> ! { driver.name.clone(), driver.fairness_threshold.map(Into::into), driver.submission_account, + driver.accepts_unsettled_blocking, ) .await .map(Arc::new) From f9bdafdba9dc78295b171d3ee40e73bc8ad134a6 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 16:32:53 +0000 Subject: [PATCH 12/33] Simplify to hashset --- .../domain/competition/participation_guard/db.rs | 16 +++++----------- .../competition/participation_guard/mod.rs | 6 +++--- crates/autopilot/src/run.rs | 8 +++++--- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index d37849c0aa..f765b39418 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -5,7 +5,7 @@ use { }, ethrpc::block_stream::CurrentBlockWatcher, std::{ - collections::HashMap, + collections::HashSet, sync::Arc, time::{Duration, Instant}, }, @@ -21,7 +21,7 @@ struct Inner { banned_solvers: dashmap::DashMap, ttl: Duration, last_auctions_count: u32, - db_validator_acceptance_by_solver: HashMap, + db_validator_accepted_solvers: HashSet, } impl Validator { @@ -31,14 +31,14 @@ impl Validator { settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, ttl: Duration, last_auctions_count: u32, - db_validator_acceptance_by_solver: HashMap, + db_validator_accepted_solvers: HashSet, ) -> Self { let self_ = Self(Arc::new(Inner { db, banned_solvers: Default::default(), ttl, last_auctions_count, - db_validator_acceptance_by_solver, + db_validator_accepted_solvers, })); self_.start_maintenance(settlement_updates_receiver, current_block); @@ -97,13 +97,7 @@ impl super::Validator for Validator { async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { // Check if solver accepted this feature. This should be removed once a CIP is // approved. - if !self - .0 - .db_validator_acceptance_by_solver - .get(solver) - .copied() - .unwrap_or_default() - { + if !self.0.db_validator_accepted_solvers.contains(solver) { return Ok(true); } diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index f4fb14a9a5..d0127e68d5 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -8,7 +8,7 @@ use { domain::eth, infra::Ethereum, }, - std::{collections::HashMap, sync::Arc}, + std::{collections::HashSet, sync::Arc}, }; /// This struct checks whether a solver can participate in the competition by @@ -27,7 +27,7 @@ impl SolverParticipationGuard { db: Postgres, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, db_based_validator_config: DbBasedSolverParticipationGuardConfig, - db_validator_acceptance_by_solver: HashMap, + db_validator_accepted_solvers: HashSet, ) -> Self { let mut validators: Vec> = Vec::new(); @@ -39,7 +39,7 @@ impl SolverParticipationGuard { settlement_updates_receiver, db_based_validator_config.solver_blacklist_cache_ttl, db_based_validator_config.solver_last_auctions_participation_count, - db_validator_acceptance_by_solver, + db_validator_accepted_solvers, ); validators.push(Box::new(database_solver_participation_validator)); } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index cfa031f81a..87bfa4d942 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -52,7 +52,7 @@ use { token_list::{AutoUpdatingTokenList, TokenListConfiguration}, }, std::{ - collections::HashMap, + collections::HashSet, sync::{Arc, RwLock}, time::{Duration, Instant}, }, @@ -585,8 +585,10 @@ pub async fn run(args: Arguments) { args.db_based_solver_participation_guard, drivers .iter() - .map(|driver| (driver.submission_address, driver.accepts_unsettled_blocking)) - .collect::>(), + .filter_map(|driver| { + (driver.accepts_unsettled_blocking).then_some(driver.submission_address) + }) + .collect::>(), ); let run = RunLoop::new( From 5fc831ee08005a8b020f9c23f5642ef05ace29fa Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 16:37:56 +0000 Subject: [PATCH 13/33] Nit --- crates/autopilot/src/run.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 87bfa4d942..8c8f2b7834 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -586,7 +586,9 @@ pub async fn run(args: Arguments) { drivers .iter() .filter_map(|driver| { - (driver.accepts_unsettled_blocking).then_some(driver.submission_address) + driver + .accepts_unsettled_blocking + .then_some(driver.submission_address) }) .collect::>(), ); From 3154cd0827f40ab52563eac2c66f4a6ae5b99a7f Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 15:18:48 +0000 Subject: [PATCH 14/33] Use driver's name in metrics --- .../competition/participation_guard/db.rs | 45 +++++++++++-------- .../competition/participation_guard/mod.rs | 8 ++-- crates/autopilot/src/run.rs | 10 ++--- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index f765b39418..8ddbb28f5f 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -2,10 +2,11 @@ use { crate::{ database::Postgres, domain::{eth, Metrics}, + infra, }, ethrpc::block_stream::CurrentBlockWatcher, std::{ - collections::HashSet, + collections::HashMap, sync::Arc, time::{Duration, Instant}, }, @@ -21,7 +22,7 @@ struct Inner { banned_solvers: dashmap::DashMap, ttl: Duration, last_auctions_count: u32, - db_validator_accepted_solvers: HashSet, + drivers_by_address: HashMap>, } impl Validator { @@ -31,14 +32,19 @@ impl Validator { settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, ttl: Duration, last_auctions_count: u32, - db_validator_accepted_solvers: HashSet, + drivers_by_address: HashMap>, ) -> Self { + // Keep only drivers that accept unsettled blocking. + let drivers_by_address = drivers_by_address + .into_iter() + .filter(|(_, driver)| driver.accepts_unsettled_blocking) + .collect(); let self_ = Self(Arc::new(Inner { db, banned_solvers: Default::default(), ttl, last_auctions_count, - db_validator_accepted_solvers, + drivers_by_address, })); self_.start_maintenance(settlement_updates_receiver, current_block); @@ -53,13 +59,14 @@ impl Validator { mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, current_block: CurrentBlockWatcher, ) { - let self_ = self.0.clone(); + let self_ = self.clone(); tokio::spawn(async move { while settlement_updates_receiver.recv().await.is_some() { let current_block = current_block.borrow().number; match self_ + .0 .db - .find_non_settling_solvers(self_.last_auctions_count, current_block) + .find_non_settling_solvers(self_.0.last_auctions_count, current_block) .await { Ok(non_settling_solvers) => { @@ -67,10 +74,11 @@ impl Validator { .into_iter() .map(|solver| { let address = eth::Address(solver.0.into()); - - Metrics::get() - .non_settling_solver - .with_label_values(&[&format!("{:#x}", address.0)]); + if let Some(driver) = self_.0.drivers_by_address.get(&address) { + Metrics::get() + .non_settling_solver + .with_label_values(&[&driver.name]); + } address }) @@ -79,9 +87,14 @@ impl Validator { tracing::debug!(?non_settling_solvers, "found non-settling solvers"); let now = Instant::now(); - for solver in non_settling_solvers { - self_.banned_solvers.insert(solver, now); - } + non_settling_solvers + .into_iter() + // Check if solver accepted this feature. This should be removed once a CIP is + // approved. + .filter(|solver| self_.0.drivers_by_address.contains_key(solver)) + .for_each(|solver| { + self_.0.banned_solvers.insert(solver, now); + }); } Err(err) => { tracing::warn!(?err, "error while searching for non-settling solvers") @@ -95,12 +108,6 @@ impl Validator { #[async_trait::async_trait] impl super::Validator for Validator { async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { - // Check if solver accepted this feature. This should be removed once a CIP is - // approved. - if !self.0.db_validator_accepted_solvers.contains(solver) { - return Ok(true); - } - if let Some(entry) = self.0.banned_solvers.get(solver) { if Instant::now().duration_since(*entry.value()) < self.0.ttl { return Ok(false); diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index d0127e68d5..925cf9e52e 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -6,9 +6,9 @@ use { arguments::DbBasedSolverParticipationGuardConfig, database::Postgres, domain::eth, - infra::Ethereum, + infra::{self, Ethereum}, }, - std::{collections::HashSet, sync::Arc}, + std::{collections::HashMap, sync::Arc}, }; /// This struct checks whether a solver can participate in the competition by @@ -27,7 +27,7 @@ impl SolverParticipationGuard { db: Postgres, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, db_based_validator_config: DbBasedSolverParticipationGuardConfig, - db_validator_accepted_solvers: HashSet, + drivers_by_address: HashMap>, ) -> Self { let mut validators: Vec> = Vec::new(); @@ -39,7 +39,7 @@ impl SolverParticipationGuard { settlement_updates_receiver, db_based_validator_config.solver_blacklist_cache_ttl, db_based_validator_config.solver_last_auctions_participation_count, - db_validator_accepted_solvers, + drivers_by_address, ); validators.push(Box::new(database_solver_participation_validator)); } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 8c8f2b7834..a6e30dd562 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -52,7 +52,7 @@ use { token_list::{AutoUpdatingTokenList, TokenListConfiguration}, }, std::{ - collections::HashSet, + collections::HashMap, sync::{Arc, RwLock}, time::{Duration, Instant}, }, @@ -585,12 +585,8 @@ pub async fn run(args: Arguments) { args.db_based_solver_participation_guard, drivers .iter() - .filter_map(|driver| { - driver - .accepts_unsettled_blocking - .then_some(driver.submission_address) - }) - .collect::>(), + .map(|driver| (driver.submission_address, driver.clone())) + .collect::>(), ); let run = RunLoop::new( From 47007c100d3676f1f14699c51313503768e86b8f Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 15:19:28 +0000 Subject: [PATCH 15/33] Nit --- crates/autopilot/src/run.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index a6e30dd562..bfd3455d71 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -52,7 +52,6 @@ use { token_list::{AutoUpdatingTokenList, TokenListConfiguration}, }, std::{ - collections::HashMap, sync::{Arc, RwLock}, time::{Duration, Instant}, }, @@ -586,7 +585,7 @@ pub async fn run(args: Arguments) { drivers .iter() .map(|driver| (driver.submission_address, driver.clone())) - .collect::>(), + .collect(), ); let run = RunLoop::new( From bb9059eaba96859de229ce30f215ffe075c6e0fd Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 16:13:45 +0000 Subject: [PATCH 16/33] Send metrics about each found solver --- .../competition/participation_guard/db.rs | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 8ddbb28f5f..f43caa804c 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -34,11 +34,6 @@ impl Validator { last_auctions_count: u32, drivers_by_address: HashMap>, ) -> Self { - // Keep only drivers that accept unsettled blocking. - let drivers_by_address = drivers_by_address - .into_iter() - .filter(|(_, driver)| driver.accepts_unsettled_blocking) - .collect(); let self_ = Self(Arc::new(Inner { db, banned_solvers: Default::default(), @@ -74,10 +69,18 @@ impl Validator { .into_iter() .map(|solver| { let address = eth::Address(solver.0.into()); - if let Some(driver) = self_.0.drivers_by_address.get(&address) { - Metrics::get() - .non_settling_solver - .with_label_values(&[&driver.name]); + match self_.0.drivers_by_address.get(&address) { + Some(driver) => { + Metrics::get() + .non_settling_solver + .with_label_values(&[&driver.name]); + } + None => { + tracing::warn!( + ?address, + "unrecognized driver in non-settling solvers", + ); + } } address From 6787d3453fabe99af99374f417c8db2fd8cc6fb6 Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 16:24:16 +0000 Subject: [PATCH 17/33] Cache only accepted solvers --- .../src/domain/competition/participation_guard/db.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index f43caa804c..9b2731ddec 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -94,7 +94,13 @@ impl Validator { .into_iter() // Check if solver accepted this feature. This should be removed once a CIP is // approved. - .filter(|solver| self_.0.drivers_by_address.contains_key(solver)) + .filter_map(|solver| { + self_ + .0 + .drivers_by_address + .get(&solver) + .filter(|driver| driver.accepts_unsettled_blocking).map(|_| solver) + }) .for_each(|solver| { self_.0.banned_solvers.insert(solver, now); }); From a2710c695f7c42644fdbe94a1d2d4fd3ac420458 Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 17:21:47 +0000 Subject: [PATCH 18/33] Refactoring --- .../competition/participation_guard/db.rs | 42 ++++++++----------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 9b2731ddec..7f517debb7 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -65,41 +65,35 @@ impl Validator { .await { Ok(non_settling_solvers) => { - let non_settling_solvers = non_settling_solvers + let non_settling_drivers = non_settling_solvers .into_iter() - .map(|solver| { + .filter_map(|solver| { let address = eth::Address(solver.0.into()); - match self_.0.drivers_by_address.get(&address) { - Some(driver) => { - Metrics::get() - .non_settling_solver - .with_label_values(&[&driver.name]); - } - None => { - tracing::warn!( - ?address, - "unrecognized driver in non-settling solvers", - ); - } + if let Some(driver) = self_.0.drivers_by_address.get(&address) { + Metrics::get() + .non_settling_solver + .with_label_values(&[&driver.name]); + Some(driver.clone()) + } else { + None } - - address }) .collect::>(); - tracing::debug!(?non_settling_solvers, "found non-settling solvers"); + let non_settling_solver_names = non_settling_drivers + .iter() + .map(|driver| driver.name.clone()) + .collect::>(); + + tracing::debug!(solvers = ?non_settling_solver_names, "found non-settling solvers"); let now = Instant::now(); - non_settling_solvers + non_settling_drivers .into_iter() // Check if solver accepted this feature. This should be removed once a CIP is // approved. - .filter_map(|solver| { - self_ - .0 - .drivers_by_address - .get(&solver) - .filter(|driver| driver.accepts_unsettled_blocking).map(|_| solver) + .filter_map(|driver| { + driver.accepts_unsettled_blocking.then_some(driver.submission_address) }) .for_each(|solver| { self_.0.banned_solvers.insert(solver, now); From 1f43009f0d9b7df05b53d5a610e25a751c33e332 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 3 Feb 2025 11:47:18 +0000 Subject: [PATCH 19/33] Fix the tests --- crates/database/src/lib.rs | 41 +++++++++++++---------- crates/database/src/solver_competition.rs | 19 +++++++---- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/crates/database/src/lib.rs b/crates/database/src/lib.rs index ec30cd557d..a185a90779 100644 --- a/crates/database/src/lib.rs +++ b/crates/database/src/lib.rs @@ -50,28 +50,35 @@ pub type PgTransaction<'a> = sqlx::Transaction<'a, sqlx::Postgres>; /// The names of tables we use in the db. pub const TABLES: &[&str] = &[ - "orders", - "trades", - "invalidations", - "last_indexed_blocks", - "quotes", - "settlements", - "presignature_events", - "order_quotes", - "solver_competitions", + "app_data", + "auction_orders", + "auction_participants", + "auction_prices", "auctions", "competition_auctions", - "onchain_placed_orders", "ethflow_orders", - "order_execution", - "interactions", "ethflow_refunds", - "settlement_scores", - "settlement_observations", - "auction_prices", - "auction_participants", - "app_data", + "fee_policies", + "interactions", + "invalidations", "jit_orders", + "last_indexed_blocks", + "onchain_order_invalidations", + "onchain_placed_orders", + "order_execution", + "order_quotes", + "orders", + "presignature_events", + "proposed_jit_orders", + "proposed_solutions", + "proposed_trade_executions", + "quotes", + "settlement_observations", + "settlement_scores", + "settlements", + "solver_competitions", + "surplus_capturing_jit_order_owners", + "trades", ]; /// The names of potentially big volume tables we use in the db. diff --git a/crates/database/src/solver_competition.rs b/crates/database/src/solver_competition.rs index 0139610671..9fc22fa561 100644 --- a/crates/database/src/solver_competition.rs +++ b/crates/database/src/solver_competition.rs @@ -596,6 +596,7 @@ mod tests { let non_settling_solver = ByteArray([1u8; 20]); + let mut solution_uid = 0; let deadline_block = 100u64; let last_auctions_count = 3i64; // competition_auctions @@ -628,11 +629,12 @@ mod tests { } // proposed_solutions - // Non-settling solver wins all auctions within the deadline + // Non-settling solver wins `last_auctions_count` auctions within the deadline for auction_id in 2..=4 { + solution_uid += 1; let solutions = vec![Solution { uid: auction_id, - id: auction_id.into(), + id: solution_uid.into(), solver: non_settling_solver, is_winner: true, score: Default::default(), @@ -645,11 +647,12 @@ mod tests { .unwrap(); } - // Non-settling solver wins not all the auctions within the deadline + // Another non-settling solver wins not all the auctions within the deadline for auction_id in 2..=4 { + solution_uid += 1; let solutions = vec![Solution { uid: auction_id, - id: auction_id.into(), + id: solution_uid.into(), solver: ByteArray([2u8; 20]), is_winner: auction_id != 2, score: Default::default(), @@ -662,16 +665,17 @@ mod tests { .unwrap(); } - // Another non-settling solver has `last_auctions_count` winning auctions but + // One more non-settling solver has `last_auctions_count` winning auctions but // not consecutive for auction_id in 1..=4 { // Break the sequence if auction_id == 2 { continue; } + solution_uid += 1; let solutions = vec![Solution { uid: auction_id, - id: auction_id.into(), + id: solution_uid.into(), solver: ByteArray([3u8; 20]), is_winner: true, score: Default::default(), @@ -687,9 +691,10 @@ mod tests { // One more non-settling solver has `last_auctions_count` winning auctions but // some of them are outside the deadline for auction_id in 3..=5 { + solution_uid += 1; let solutions = vec![Solution { uid: auction_id, - id: auction_id.into(), + id: solution_uid.into(), solver: ByteArray([4u8; 20]), is_winner: true, score: Default::default(), From 366611d2d417b9e941bc52025f9e3ba7ada7c742 Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 7 Feb 2025 14:54:10 +0000 Subject: [PATCH 20/33] Nits --- crates/autopilot/src/domain/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/autopilot/src/domain/mod.rs b/crates/autopilot/src/domain/mod.rs index ad7d0077af..9d8d9b36db 100644 --- a/crates/autopilot/src/domain/mod.rs +++ b/crates/autopilot/src/domain/mod.rs @@ -18,8 +18,8 @@ pub use { #[derive(prometheus_metric_storage::MetricStorage)] #[metric(subsystem = "domain")] pub struct Metrics { - /// How many times the solver marked as non-settling based on the database - /// statistics. + /// How many times the solver was marked as non-settling based on the + /// database statistics. #[metric(labels("solver"))] pub non_settling_solver: prometheus::IntCounterVec, } From e9a70f541ac47ec0c21fffa6f3d9972dc9369d14 Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 11 Feb 2025 20:16:59 +0000 Subject: [PATCH 21/33] Trigger updates on the proposed_solution table insert --- .../src/domain/settlement/observer.rs | 21 ++----------------- crates/autopilot/src/run.rs | 12 +++++------ crates/autopilot/src/run_loop.rs | 20 ++++++++++++++---- 3 files changed, 23 insertions(+), 30 deletions(-) diff --git a/crates/autopilot/src/domain/settlement/observer.rs b/crates/autopilot/src/domain/settlement/observer.rs index 0374465a1f..d142646728 100644 --- a/crates/autopilot/src/domain/settlement/observer.rs +++ b/crates/autopilot/src/domain/settlement/observer.rs @@ -19,34 +19,23 @@ use { pub struct Observer { eth: infra::Ethereum, persistence: infra::Persistence, - settlement_updates_sender: tokio::sync::mpsc::UnboundedSender<()>, } impl Observer { /// Creates a new Observer and asynchronously schedules the first update /// run. - pub fn new( - eth: infra::Ethereum, - persistence: infra::Persistence, - settlement_updates_sender: tokio::sync::mpsc::UnboundedSender<()>, - ) -> Self { - Self { - eth, - persistence, - settlement_updates_sender, - } + pub fn new(eth: infra::Ethereum, persistence: infra::Persistence) -> Self { + Self { eth, persistence } } /// Fetches all the available missing data needed for bookkeeping. /// This needs to get called after indexing a new settlement event /// since this code needs that data to already be present in the DB. pub async fn update(&self) { - let mut updated = false; loop { match self.single_update().await { Ok(true) => { tracing::debug!("on settlement event updater ran and processed event"); - updated = true; // There might be more pending updates, continue immediately. continue; } @@ -60,12 +49,6 @@ impl Observer { } } } - if updated { - // Notify the solver participation guard that a settlement has been updated. - if let Err(err) = self.settlement_updates_sender.send(()) { - tracing::error!(?err, "failed to notify solver participation guard"); - } - } } /// Update database for settlement events that have not been processed yet. diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index e2154a9e28..497fd19af8 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -365,16 +365,13 @@ pub async fn run(args: Arguments) { None }; - let (settlement_updates_sender, settlement_updates_receiver) = + let (competition_updates_sender, competition_updates_receiver) = tokio::sync::mpsc::unbounded_channel(); let persistence = infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await; - let settlement_observer = crate::domain::settlement::Observer::new( - eth.clone(), - persistence.clone(), - settlement_updates_sender, - ); + let settlement_observer = + crate::domain::settlement::Observer::new(eth.clone(), persistence.clone()); let settlement_contract_start_index = if let Some(DeploymentInformation::BlockNumber(settlement_contract_start_index)) = eth.contracts().settlement().deployment_information() @@ -580,7 +577,7 @@ pub async fn run(args: Arguments) { let solver_participation_guard = SolverParticipationGuard::new( eth.clone(), db.clone(), - settlement_updates_receiver, + competition_updates_receiver, args.db_based_solver_participation_guard, drivers .iter() @@ -598,6 +595,7 @@ pub async fn run(args: Arguments) { trusted_tokens, liveness.clone(), Arc::new(maintenance), + competition_updates_sender, ); run.run_forever().await; } diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 3cdb93bbd4..de5483feed 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -74,6 +74,7 @@ pub struct RunLoop { /// Maintenance tasks that should run before every runloop to have /// the most recent data available. maintenance: Arc, + competition_updates_sender: tokio::sync::mpsc::UnboundedSender<()>, } impl RunLoop { @@ -88,6 +89,7 @@ impl RunLoop { trusted_tokens: AutoUpdatingTokenList, liveness: Arc, maintenance: Arc, + competition_updates_sender: tokio::sync::mpsc::UnboundedSender<()>, ) -> Self { Self { config, @@ -100,6 +102,7 @@ impl RunLoop { in_flight_orders: Default::default(), liveness, maintenance, + competition_updates_sender, } } @@ -463,7 +466,7 @@ impl RunLoop { competition_table, }; - if let Err(err) = futures::try_join!( + match futures::try_join!( self.persistence .save_auction(auction, block_deadline) .map_err(|e| e.0.context("failed to save auction")), @@ -471,9 +474,18 @@ impl RunLoop { .save_solutions(auction.id, solutions) .map_err(|e| e.0.context("failed to save solutions")), ) { - // Don't error if saving of auction and solution fails, until stable. - // Various edge cases with JIT orders verifiable only in production. - tracing::warn!(?err, "failed to save new competition data"); + Ok(_) => { + // Notify the solver participation guard that the proposed solutions have been + // saved. + if let Err(err) = self.competition_updates_sender.send(()) { + tracing::error!(?err, "failed to notify solver participation guard"); + } + } + Err(err) => { + // Don't error if saving of auction and solution fails, until stable. + // Various edge cases with JIT orders verifiable only in production. + tracing::warn!(?err, "failed to save new competition data"); + } } tracing::trace!(?competition, "saving competition"); From e220eafa35798a74090f3181b27b647d17792f05 Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 11 Feb 2025 20:30:25 +0000 Subject: [PATCH 22/33] Nit --- .../src/domain/competition/participation_guard/db.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 7f517debb7..e235c34722 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -69,14 +69,13 @@ impl Validator { .into_iter() .filter_map(|solver| { let address = eth::Address(solver.0.into()); - if let Some(driver) = self_.0.drivers_by_address.get(&address) { + self_.0.drivers_by_address.get(&address).map(|driver| { Metrics::get() .non_settling_solver .with_label_values(&[&driver.name]); - Some(driver.clone()) - } else { - None - } + + driver.clone() + }) }) .collect::>(); From 51832d49c5388687a30e5f3d45201d31826c54f0 Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 11 Feb 2025 21:05:36 +0000 Subject: [PATCH 23/33] Formatting --- .../autopilot/src/domain/competition/participation_guard/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index e235c34722..f181265bf5 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -73,7 +73,7 @@ impl Validator { Metrics::get() .non_settling_solver .with_label_values(&[&driver.name]); - + driver.clone() }) }) From 17ee52c373d76e498c7efaa41bf3eff389aba873 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 12 Feb 2025 14:20:07 +0000 Subject: [PATCH 24/33] infra::Persistence --- crates/autopilot/src/database/competition.rs | 24 ----------------- .../competition/participation_guard/db.rs | 12 ++++----- .../competition/participation_guard/mod.rs | 5 ++-- crates/autopilot/src/infra/persistence/mod.rs | 27 +++++++++++++++++++ crates/autopilot/src/run.rs | 2 +- 5 files changed, 35 insertions(+), 35 deletions(-) diff --git a/crates/autopilot/src/database/competition.rs b/crates/autopilot/src/database/competition.rs index 682ac81d1d..81cc5e63ef 100644 --- a/crates/autopilot/src/database/competition.rs +++ b/crates/autopilot/src/database/competition.rs @@ -139,28 +139,4 @@ impl super::Postgres { Ok(()) } - - /// Finds solvers that won `last_auctions_count` consecutive auctions but - /// never settled any of them. The current block is used to prevent - /// selecting auctions with deadline after the current block since they - /// still can be settled. - pub async fn find_non_settling_solvers( - &self, - last_auctions_count: u32, - current_block: u64, - ) -> anyhow::Result> { - let mut ex = self.pool.acquire().await.context("acquire")?; - let _timer = super::Metrics::get() - .database_queries - .with_label_values(&["find_non_settling_solvers"]) - .start_timer(); - - database::solver_competition::find_non_settling_solvers( - &mut ex, - last_auctions_count, - current_block, - ) - .await - .context("solver_competition::find_non_settling_solvers") - } } diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index f181265bf5..ef8280e3bc 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -1,6 +1,5 @@ use { crate::{ - database::Postgres, domain::{eth, Metrics}, infra, }, @@ -18,7 +17,7 @@ use { pub(super) struct Validator(Arc); struct Inner { - db: Postgres, + persistence: infra::Persistence, banned_solvers: dashmap::DashMap, ttl: Duration, last_auctions_count: u32, @@ -27,7 +26,7 @@ struct Inner { impl Validator { pub fn new( - db: Postgres, + persistence: infra::Persistence, current_block: CurrentBlockWatcher, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, ttl: Duration, @@ -35,7 +34,7 @@ impl Validator { drivers_by_address: HashMap>, ) -> Self { let self_ = Self(Arc::new(Inner { - db, + persistence, banned_solvers: Default::default(), ttl, last_auctions_count, @@ -60,7 +59,7 @@ impl Validator { let current_block = current_block.borrow().number; match self_ .0 - .db + .persistence .find_non_settling_solvers(self_.0.last_auctions_count, current_block) .await { @@ -68,8 +67,7 @@ impl Validator { let non_settling_drivers = non_settling_solvers .into_iter() .filter_map(|solver| { - let address = eth::Address(solver.0.into()); - self_.0.drivers_by_address.get(&address).map(|driver| { + self_.0.drivers_by_address.get(&solver).map(|driver| { Metrics::get() .non_settling_solver .with_label_values(&[&driver.name]); diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index 925cf9e52e..aae595091d 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -4,7 +4,6 @@ mod onchain; use { crate::{ arguments::DbBasedSolverParticipationGuardConfig, - database::Postgres, domain::eth, infra::{self, Ethereum}, }, @@ -24,7 +23,7 @@ struct Inner { impl SolverParticipationGuard { pub fn new( eth: Ethereum, - db: Postgres, + persistence: infra::Persistence, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, db_based_validator_config: DbBasedSolverParticipationGuardConfig, drivers_by_address: HashMap>, @@ -34,7 +33,7 @@ impl SolverParticipationGuard { if db_based_validator_config.enabled { let current_block = eth.current_block().clone(); let database_solver_participation_validator = db::Validator::new( - db, + persistence, current_block, settlement_updates_receiver, db_based_validator_config.solver_blacklist_cache_ttl, diff --git a/crates/autopilot/src/infra/persistence/mod.rs b/crates/autopilot/src/infra/persistence/mod.rs index 5a46e35a7e..813ef4fb4c 100644 --- a/crates/autopilot/src/infra/persistence/mod.rs +++ b/crates/autopilot/src/infra/persistence/mod.rs @@ -797,6 +797,33 @@ impl Persistence { ex.commit().await?; Ok(()) } + + /// Finds solvers that won `last_auctions_count` consecutive auctions but + /// never settled any of them. The current block is used to prevent + /// selecting auctions with deadline after the current block since they + /// still can be settled. + pub async fn find_non_settling_solvers( + &self, + last_auctions_count: u32, + current_block: u64, + ) -> anyhow::Result> { + let mut ex = self.postgres.pool.acquire().await.context("acquire")?; + let _timer = Metrics::get() + .database_queries + .with_label_values(&["find_non_settling_solvers"]) + .start_timer(); + + Ok(database::solver_competition::find_non_settling_solvers( + &mut ex, + last_auctions_count, + current_block, + ) + .await + .context("failed to fetch non-settling solvers")? + .into_iter() + .map(|solver| eth::Address(solver.0.into())) + .collect()) + } } #[derive(prometheus_metric_storage::MetricStorage)] diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 497fd19af8..1320dfcbe3 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -576,7 +576,7 @@ pub async fn run(args: Arguments) { let solver_participation_guard = SolverParticipationGuard::new( eth.clone(), - db.clone(), + persistence.clone(), competition_updates_receiver, args.db_based_solver_participation_guard, drivers From cba693a38622f2d6a5fc42f50c9c23c855351e17 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 12 Feb 2025 14:21:22 +0000 Subject: [PATCH 25/33] Naming --- .../src/domain/competition/participation_guard/db.rs | 8 ++++---- .../src/domain/competition/participation_guard/mod.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index ef8280e3bc..dc00916646 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -28,7 +28,7 @@ impl Validator { pub fn new( persistence: infra::Persistence, current_block: CurrentBlockWatcher, - settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, ttl: Duration, last_auctions_count: u32, drivers_by_address: HashMap>, @@ -41,7 +41,7 @@ impl Validator { drivers_by_address, })); - self_.start_maintenance(settlement_updates_receiver, current_block); + self_.start_maintenance(competition_updates_receiver, current_block); self_ } @@ -50,12 +50,12 @@ impl Validator { /// avoid redundant DB queries. fn start_maintenance( &self, - mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + mut competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, current_block: CurrentBlockWatcher, ) { let self_ = self.clone(); tokio::spawn(async move { - while settlement_updates_receiver.recv().await.is_some() { + while competition_updates_receiver.recv().await.is_some() { let current_block = current_block.borrow().number; match self_ .0 diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index aae595091d..75e89d0679 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -24,7 +24,7 @@ impl SolverParticipationGuard { pub fn new( eth: Ethereum, persistence: infra::Persistence, - settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, db_based_validator_config: DbBasedSolverParticipationGuardConfig, drivers_by_address: HashMap>, ) -> Self { @@ -35,7 +35,7 @@ impl SolverParticipationGuard { let database_solver_participation_validator = db::Validator::new( persistence, current_block, - settlement_updates_receiver, + competition_updates_receiver, db_based_validator_config.solver_blacklist_cache_ttl, db_based_validator_config.solver_last_auctions_participation_count, drivers_by_address, From c3c9433f7bb38c61633ee2437f7877da6fd684d6 Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 14 Feb 2025 09:41:00 +0000 Subject: [PATCH 26/33] Comment --- .../src/domain/competition/participation_guard/db.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index dc00916646..c843c2fb7e 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -46,8 +46,9 @@ impl Validator { self_ } - /// Update the internal cache only once the settlement table is updated to - /// avoid redundant DB queries. + /// Update the internal cache only once the competition auctions table is + /// updated to avoid redundant DB queries on each block or any other + /// timeout. fn start_maintenance( &self, mut competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, From 4f6cd1d0433e56dbf9d44511db5b9fd2d5892a81 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 17 Feb 2025 11:58:20 +0000 Subject: [PATCH 27/33] Comments --- crates/autopilot/src/arguments.rs | 4 +++- .../src/domain/competition/participation_guard/db.rs | 11 ++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index b581c504d0..4d2b9f4a46 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -262,7 +262,9 @@ pub struct DbBasedSolverParticipationGuardConfig { )] pub enabled: bool, - /// The time-to-live for the solver participation blacklist cache. + /// Sets the duration for which the solver remains blacklisted. + /// Technically, the time-to-live for the solver participation blacklist + /// cache. #[clap(long, env, default_value = "5m", value_parser = humantime::parse_duration)] pub solver_blacklist_cache_ttl: Duration, diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index c843c2fb7e..e0f254a9fe 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -88,8 +88,8 @@ impl Validator { let now = Instant::now(); non_settling_drivers .into_iter() - // Check if solver accepted this feature. This should be removed once a CIP is - // approved. + // Check if solver accepted this feature. This should be removed once the CIP + // making this mandatory has been approved. .filter_map(|driver| { driver.accepts_unsettled_blocking.then_some(driver.submission_address) }) @@ -102,6 +102,7 @@ impl Validator { } } } + tracing::error!("competition_updates_receiver closed"); }); } } @@ -110,11 +111,7 @@ impl Validator { impl super::Validator for Validator { async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { if let Some(entry) = self.0.banned_solvers.get(solver) { - if Instant::now().duration_since(*entry.value()) < self.0.ttl { - return Ok(false); - } else { - self.0.banned_solvers.remove(solver); - } + return Ok(entry.elapsed() >= self.0.ttl); } Ok(true) From bdd33d0b381844fa79f948de2487357c1ba26e17 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 17 Feb 2025 12:03:17 +0000 Subject: [PATCH 28/33] Simplify the code --- .../competition/participation_guard/db.rs | 57 +++++++------------ 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index e0f254a9fe..2304302715 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -58,51 +58,38 @@ impl Validator { tokio::spawn(async move { while competition_updates_receiver.recv().await.is_some() { let current_block = current_block.borrow().number; - match self_ + let non_settling_solvers = match self_ .0 .persistence .find_non_settling_solvers(self_.0.last_auctions_count, current_block) .await { - Ok(non_settling_solvers) => { - let non_settling_drivers = non_settling_solvers - .into_iter() - .filter_map(|solver| { - self_.0.drivers_by_address.get(&solver).map(|driver| { - Metrics::get() - .non_settling_solver - .with_label_values(&[&driver.name]); - - driver.clone() - }) - }) - .collect::>(); - - let non_settling_solver_names = non_settling_drivers - .iter() - .map(|driver| driver.name.clone()) - .collect::>(); + Ok(non_settling_solvers) => non_settling_solvers, + Err(err) => { + tracing::warn!(?err, "error while searching for non-settling solvers"); + continue; + } + }; - tracing::debug!(solvers = ?non_settling_solver_names, "found non-settling solvers"); + tracing::debug!(solvers = ?non_settling_solvers, "found non-settling solvers"); - let now = Instant::now(); - non_settling_drivers - .into_iter() - // Check if solver accepted this feature. This should be removed once the CIP - // making this mandatory has been approved. - .filter_map(|driver| { - driver.accepts_unsettled_blocking.then_some(driver.submission_address) - }) - .for_each(|solver| { - self_.0.banned_solvers.insert(solver, now); - }); - } - Err(err) => { - tracing::warn!(?err, "error while searching for non-settling solvers") + let now = Instant::now(); + for solver in non_settling_solvers { + let Some(driver) = self_.0.drivers_by_address.get(&solver) else { + continue; + }; + Metrics::get() + .non_settling_solver + .with_label_values(&[&driver.name]); + // Check if solver accepted this feature. This should be removed once the CIP + // making this mandatory has been approved. + if driver.accepts_unsettled_blocking { + tracing::debug!(?solver, "disabling solver temporarily"); + self_.0.banned_solvers.insert(solver, now); } } } - tracing::error!("competition_updates_receiver closed"); + tracing::error!("stream of settlement updates terminated unexpectedly"); }); } } From fd0fc273c8bd223b6088f08c228049abcc32a339 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 17 Feb 2025 12:15:54 +0000 Subject: [PATCH 29/33] Nits --- .../domain/competition/participation_guard/mod.rs | 9 ++++++--- crates/autopilot/src/run.rs | 5 +---- crates/database/src/solver_competition.rs | 12 ++++++++++++ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index 75e89d0679..00a2759f21 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -7,7 +7,7 @@ use { domain::eth, infra::{self, Ethereum}, }, - std::{collections::HashMap, sync::Arc}, + std::sync::Arc, }; /// This struct checks whether a solver can participate in the competition by @@ -26,7 +26,7 @@ impl SolverParticipationGuard { persistence: infra::Persistence, competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, db_based_validator_config: DbBasedSolverParticipationGuardConfig, - drivers_by_address: HashMap>, + drivers: impl IntoIterator>, ) -> Self { let mut validators: Vec> = Vec::new(); @@ -38,7 +38,10 @@ impl SolverParticipationGuard { competition_updates_receiver, db_based_validator_config.solver_blacklist_cache_ttl, db_based_validator_config.solver_last_auctions_participation_count, - drivers_by_address, + drivers + .into_iter() + .map(|driver| (driver.submission_address, driver.clone())) + .collect(), ); validators.push(Box::new(database_solver_participation_validator)); } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 1320dfcbe3..24faf50c72 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -579,10 +579,7 @@ pub async fn run(args: Arguments) { persistence.clone(), competition_updates_receiver, args.db_based_solver_participation_guard, - drivers - .iter() - .map(|driver| (driver.submission_address, driver.clone())) - .collect(), + drivers.iter().cloned(), ); let run = RunLoop::new( diff --git a/crates/database/src/solver_competition.rs b/crates/database/src/solver_competition.rs index 9fc22fa561..f778c4315a 100644 --- a/crates/database/src/solver_competition.rs +++ b/crates/database/src/solver_competition.rs @@ -97,6 +97,18 @@ GROUP BY sc.id sqlx::query_as(QUERY).bind(tx_hash).fetch_optional(ex).await } +/// Identifies solvers that have consistently failed to settle solutions in +/// recent N auctions. +/// +/// 1. Retrieves `last_auctions_count` most recent auctions already ended +/// auctions by filtering them by their deadlines. +/// 2. Identifies solvers who won these auctions but did not submit a successful +/// settlement. +/// 3. Counts how often each solver appears in these unsuccessful cases. +/// 4. Determines the total number of auctions considered. +/// 5. Flags solvers who failed to settle in all of these auctions. +/// 6. Returns a list of solvers that have consistently failed to settle +/// solutions. pub async fn find_non_settling_solvers( ex: &mut PgConnection, last_auctions_count: u32, From e5250a56a54a9d41b989b71e028b887768613575 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 17 Feb 2025 14:58:12 +0000 Subject: [PATCH 30/33] Solver names in the log --- .../src/domain/competition/participation_guard/db.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 2304302715..49f61d79ac 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -71,23 +71,25 @@ impl Validator { } }; - tracing::debug!(solvers = ?non_settling_solvers, "found non-settling solvers"); - + let mut non_settling_solver_names: Vec<&str> = Vec::new(); let now = Instant::now(); for solver in non_settling_solvers { let Some(driver) = self_.0.drivers_by_address.get(&solver) else { continue; }; + non_settling_solver_names.push(driver.name.as_ref()); Metrics::get() .non_settling_solver .with_label_values(&[&driver.name]); // Check if solver accepted this feature. This should be removed once the CIP // making this mandatory has been approved. if driver.accepts_unsettled_blocking { - tracing::debug!(?solver, "disabling solver temporarily"); + tracing::debug!(solver = ?driver.name, "disabling solver temporarily"); self_.0.banned_solvers.insert(solver, now); } } + + tracing::debug!(solvers = ?non_settling_solver_names, "found non-settling solvers"); } tracing::error!("stream of settlement updates terminated unexpectedly"); }); From 051bd50822c0e02f9a3f6b30d15c75ca3d8d67b6 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 17 Feb 2025 17:37:21 +0000 Subject: [PATCH 31/33] Naming --- crates/autopilot/src/arguments.rs | 25 ++++++++----------- .../competition/participation_guard/db.rs | 2 +- crates/autopilot/src/infra/solvers/mod.rs | 6 ++--- crates/autopilot/src/run.rs | 4 +-- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 4d2b9f4a46..63a23b6a36 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -416,7 +416,7 @@ pub struct Solver { pub url: Url, pub submission_account: Account, pub fairness_threshold: Option, - pub accepts_unsettled_blocking: bool, + pub requested_timeout_on_problems: bool, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -466,7 +466,7 @@ impl FromStr for Solver { }; let mut fairness_threshold: Option = Default::default(); - let mut accepts_unsettled_blocking = false; + let mut requested_timeout_on_problems = false; if let Some(value) = parts.get(3) { match U256::from_dec_str(value) { @@ -474,17 +474,14 @@ impl FromStr for Solver { fairness_threshold = Some(parsed_fairness_threshold); } Err(_) => { - accepts_unsettled_blocking = value - .parse() - .context("failed to parse solver's third arg param")? + requested_timeout_on_problems = + value.to_lowercase() == "requested_timeout_on_problems"; } } }; if let Some(value) = parts.get(4) { - accepts_unsettled_blocking = value - .parse() - .context("failed to parse `accepts_unsettled_blocking` flag")?; + requested_timeout_on_problems = value.to_lowercase() == "requested_timeout_on_problems"; } Ok(Self { @@ -492,7 +489,7 @@ impl FromStr for Solver { url, fairness_threshold, submission_account, - accepts_unsettled_blocking, + requested_timeout_on_problems, }) } } @@ -689,7 +686,7 @@ mod test { name: "name1".into(), url: Url::parse("http://localhost:8080").unwrap(), fairness_threshold: None, - accepts_unsettled_blocking: false, + requested_timeout_on_problems: false, submission_account: Account::Address(H160::from_slice(&hex!( "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" ))), @@ -705,7 +702,7 @@ mod test { name: "name1".into(), url: Url::parse("http://localhost:8080").unwrap(), fairness_threshold: None, - accepts_unsettled_blocking: false, + requested_timeout_on_problems: false, submission_account: Account::Kms( Arn::from_str("arn:aws:kms:supersecretstuff").unwrap(), ), @@ -724,7 +721,7 @@ mod test { "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" ))), fairness_threshold: Some(U256::exp10(18)), - accepts_unsettled_blocking: false, + requested_timeout_on_problems: false, }; assert_eq!(driver, expected); } @@ -741,7 +738,7 @@ mod test { "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" ))), fairness_threshold: None, - accepts_unsettled_blocking: true, + requested_timeout_on_problems: true, }; assert_eq!(driver, expected); } @@ -757,7 +754,7 @@ mod test { "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" ))), fairness_threshold: Some(U256::exp10(18)), - accepts_unsettled_blocking: true, + requested_timeout_on_problems: true, }; assert_eq!(driver, expected); } diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 49f61d79ac..d84be45405 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -83,7 +83,7 @@ impl Validator { .with_label_values(&[&driver.name]); // Check if solver accepted this feature. This should be removed once the CIP // making this mandatory has been approved. - if driver.accepts_unsettled_blocking { + if driver.requested_timeout_on_problems { tracing::debug!(solver = ?driver.name, "disabling solver temporarily"); self_.0.banned_solvers.insert(solver, now); } diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index aae623ebaa..0d8a28d026 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -21,7 +21,7 @@ pub struct Driver { // another driver solved with surplus exceeding this driver's surplus by `threshold` pub fairness_threshold: Option, pub submission_address: eth::Address, - pub accepts_unsettled_blocking: bool, + pub requested_timeout_on_problems: bool, client: Client, } @@ -39,7 +39,7 @@ impl Driver { name: String, fairness_threshold: Option, submission_account: Account, - accepts_unsettled_blocking: bool, + requested_timeout_on_problems: bool, ) -> Result { let submission_address = match submission_account { Account::Kms(key_id) => { @@ -72,7 +72,7 @@ impl Driver { .build() .map_err(Error::FailedToBuildClient)?, submission_address: submission_address.into(), - accepts_unsettled_blocking, + requested_timeout_on_problems, }) } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 24faf50c72..9d9b2b6887 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -561,7 +561,7 @@ pub async fn run(args: Arguments) { driver.name.clone(), driver.fairness_threshold.map(Into::into), driver.submission_account, - driver.accepts_unsettled_blocking, + driver.requested_timeout_on_problems, ) .await .map(Arc::new) @@ -614,7 +614,7 @@ async fn shadow_mode(args: Arguments) -> ! { driver.name.clone(), driver.fairness_threshold.map(Into::into), driver.submission_account, - driver.accepts_unsettled_blocking, + driver.requested_timeout_on_problems, ) .await .map(Arc::new) From 4bb86404e058ff23799ba856b1d85e1a95fa3959 Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 17 Feb 2025 17:41:21 +0000 Subject: [PATCH 32/33] Fixes unit tests --- crates/autopilot/src/arguments.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 63a23b6a36..44715b3090 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -729,7 +729,7 @@ mod test { #[test] fn parse_driver_with_accepts_unsettled_blocking_flag() { let argument = - "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|true"; + "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|requested_timeout_on_problems"; let driver = Solver::from_str(argument).unwrap(); let expected = Solver { name: "name1".into(), @@ -745,7 +745,7 @@ mod test { #[test] fn parse_driver_with_threshold_and_accepts_unsettled_blocking_flag() { - let argument = "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|1000000000000000000|true"; + let argument = "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|1000000000000000000|requested_timeout_on_problems"; let driver = Solver::from_str(argument).unwrap(); let expected = Solver { name: "name1".into(), From de31f1ebe8b7142302df3e08628d74751429155b Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 18 Feb 2025 10:58:04 +0000 Subject: [PATCH 33/33] Nit --- .../competition/participation_guard/db.rs | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index d84be45405..7ef9fddb67 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -71,23 +71,26 @@ impl Validator { } }; - let mut non_settling_solver_names: Vec<&str> = Vec::new(); let now = Instant::now(); - for solver in non_settling_solvers { - let Some(driver) = self_.0.drivers_by_address.get(&solver) else { - continue; - }; - non_settling_solver_names.push(driver.name.as_ref()); - Metrics::get() - .non_settling_solver - .with_label_values(&[&driver.name]); - // Check if solver accepted this feature. This should be removed once the CIP - // making this mandatory has been approved. - if driver.requested_timeout_on_problems { - tracing::debug!(solver = ?driver.name, "disabling solver temporarily"); - self_.0.banned_solvers.insert(solver, now); - } - } + let non_settling_solver_names: Vec<&str> = non_settling_solvers + .iter() + .filter_map(|solver| self_.0.drivers_by_address.get(solver)) + .map(|driver| { + Metrics::get() + .non_settling_solver + .with_label_values(&[&driver.name]); + // Check if solver accepted this feature. This should be removed once the + // CIP making this mandatory has been approved. + if driver.requested_timeout_on_problems { + tracing::debug!(solver = ?driver.name, "disabling solver temporarily"); + self_ + .0 + .banned_solvers + .insert(driver.submission_address, now); + } + driver.name.as_ref() + }) + .collect(); tracing::debug!(solvers = ?non_settling_solver_names, "found non-settling solvers"); }