diff --git a/Cargo.lock b/Cargo.lock index 5ee6b90f09..e4665ee611 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,6 +284,7 @@ dependencies = [ "clap", "contracts", "cow-amm", + "dashmap", "database", "derive_more 1.0.0", "ethcontract", diff --git a/crates/autopilot/Cargo.toml b/crates/autopilot/Cargo.toml index 9cdfb90295..7fa92c9ace 100644 --- a/crates/autopilot/Cargo.toml +++ b/crates/autopilot/Cargo.toml @@ -25,6 +25,7 @@ chrono = { workspace = true } clap = { workspace = true } contracts = { path = "../contracts" } cow-amm = { path = "../cow-amm" } +dashmap = { workspace = true } database = { path = "../database" } derive_more = { workspace = true } ethcontract = { workspace = true } diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 8b7bb9df05..44715b3090 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -245,6 +245,32 @@ pub struct Arguments { /// Archive node URL used to index CoW AMM #[clap(long, env)] pub archive_node_url: Option, + + /// Configuration for the solver participation guard. + #[clap(flatten)] + pub db_based_solver_participation_guard: DbBasedSolverParticipationGuardConfig, +} + +#[derive(Debug, clap::Parser)] +pub struct DbBasedSolverParticipationGuardConfig { + /// Enables or disables the solver participation guard + #[clap( + id = "db_enabled", + long = "db-based-solver-participation-guard-enabled", + env = "DB_BASED_SOLVER_PARTICIPATION_GUARD_ENABLED", + default_value = "true" + )] + pub enabled: bool, + + /// Sets the duration for which the solver remains blacklisted. + /// Technically, the time-to-live for the solver participation blacklist + /// cache. + #[clap(long, env, default_value = "5m", value_parser = humantime::parse_duration)] + pub solver_blacklist_cache_ttl: Duration, + + /// The number of last auctions to check solver participation eligibility. + #[clap(long, env, default_value = "3")] + pub solver_last_auctions_participation_count: u32, } impl std::fmt::Display for Arguments { @@ -290,6 +316,7 @@ impl std::fmt::Display for Arguments { max_winners_per_auction, archive_node_url, max_solutions_per_solver, + db_based_solver_participation_guard, } = self; write!(f, "{}", shared)?; @@ -373,6 +400,11 @@ impl std::fmt::Display for Arguments { "max_solutions_per_solver: {:?}", max_solutions_per_solver )?; + writeln!( + f, + "db_based_solver_participation_guard: {:?}", + db_based_solver_participation_guard + )?; Ok(()) } } @@ -384,6 +416,7 @@ pub struct Solver { pub url: Url, pub submission_account: Account, pub fairness_threshold: Option, + pub requested_timeout_on_problems: bool, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -432,18 +465,31 @@ impl FromStr for Solver { Account::Address(H160::from_str(parts[2]).context("failed to parse submission")?) }; - let fairness_threshold = match parts.get(3) { - Some(value) => { - Some(U256::from_dec_str(value).context("failed to parse fairness threshold")?) + let mut fairness_threshold: Option = Default::default(); + let mut requested_timeout_on_problems = false; + + if let Some(value) = parts.get(3) { + match U256::from_dec_str(value) { + Ok(parsed_fairness_threshold) => { + fairness_threshold = Some(parsed_fairness_threshold); + } + Err(_) => { + requested_timeout_on_problems = + value.to_lowercase() == "requested_timeout_on_problems"; + } } - None => None, }; + if let Some(value) = parts.get(4) { + requested_timeout_on_problems = value.to_lowercase() == "requested_timeout_on_problems"; + } + Ok(Self { name: name.to_owned(), url, fairness_threshold, submission_account, + requested_timeout_on_problems, }) } } @@ -640,6 +686,7 @@ mod test { name: "name1".into(), url: Url::parse("http://localhost:8080").unwrap(), fairness_threshold: None, + requested_timeout_on_problems: false, submission_account: Account::Address(H160::from_slice(&hex!( "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" ))), @@ -655,6 +702,7 @@ mod test { name: "name1".into(), url: Url::parse("http://localhost:8080").unwrap(), fairness_threshold: None, + requested_timeout_on_problems: false, submission_account: Account::Kms( Arn::from_str("arn:aws:kms:supersecretstuff").unwrap(), ), @@ -673,6 +721,40 @@ mod test { "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" ))), fairness_threshold: Some(U256::exp10(18)), + requested_timeout_on_problems: false, + }; + assert_eq!(driver, expected); + } + + #[test] + fn parse_driver_with_accepts_unsettled_blocking_flag() { + let argument = + "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|requested_timeout_on_problems"; + let driver = Solver::from_str(argument).unwrap(); + let expected = Solver { + name: "name1".into(), + url: Url::parse("http://localhost:8080").unwrap(), + submission_account: Account::Address(H160::from_slice(&hex!( + "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" + ))), + fairness_threshold: None, + requested_timeout_on_problems: true, + }; + assert_eq!(driver, expected); + } + + #[test] + fn parse_driver_with_threshold_and_accepts_unsettled_blocking_flag() { + let argument = "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|1000000000000000000|requested_timeout_on_problems"; + let driver = Solver::from_str(argument).unwrap(); + let expected = Solver { + name: "name1".into(), + url: Url::parse("http://localhost:8080").unwrap(), + submission_account: Account::Address(H160::from_slice(&hex!( + "C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" + ))), + fairness_threshold: Some(U256::exp10(18)), + requested_timeout_on_problems: true, }; assert_eq!(driver, expected); } diff --git a/crates/autopilot/src/domain/competition/mod.rs b/crates/autopilot/src/domain/competition/mod.rs index 52cd69df22..7ef2d6d06f 100644 --- a/crates/autopilot/src/domain/competition/mod.rs +++ b/crates/autopilot/src/domain/competition/mod.rs @@ -6,8 +6,12 @@ use { }; mod participant; +mod participation_guard; -pub use participant::{Participant, Ranked, Unranked}; +pub use { + participant::{Participant, Ranked, Unranked}, + participation_guard::SolverParticipationGuard, +}; type SolutionId = u64; diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs new file mode 100644 index 0000000000..7ef9fddb67 --- /dev/null +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -0,0 +1,111 @@ +use { + crate::{ + domain::{eth, Metrics}, + infra, + }, + ethrpc::block_stream::CurrentBlockWatcher, + std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, + }, +}; + +/// Checks the DB by searching for solvers that won N last consecutive auctions +/// but never settled any of them. +#[derive(Clone)] +pub(super) struct Validator(Arc); + +struct Inner { + persistence: infra::Persistence, + banned_solvers: dashmap::DashMap, + ttl: Duration, + last_auctions_count: u32, + drivers_by_address: HashMap>, +} + +impl Validator { + pub fn new( + persistence: infra::Persistence, + current_block: CurrentBlockWatcher, + competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + ttl: Duration, + last_auctions_count: u32, + drivers_by_address: HashMap>, + ) -> Self { + let self_ = Self(Arc::new(Inner { + persistence, + banned_solvers: Default::default(), + ttl, + last_auctions_count, + drivers_by_address, + })); + + self_.start_maintenance(competition_updates_receiver, current_block); + + self_ + } + + /// Update the internal cache only once the competition auctions table is + /// updated to avoid redundant DB queries on each block or any other + /// timeout. + fn start_maintenance( + &self, + mut competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + current_block: CurrentBlockWatcher, + ) { + let self_ = self.clone(); + tokio::spawn(async move { + while competition_updates_receiver.recv().await.is_some() { + let current_block = current_block.borrow().number; + let non_settling_solvers = match self_ + .0 + .persistence + .find_non_settling_solvers(self_.0.last_auctions_count, current_block) + .await + { + Ok(non_settling_solvers) => non_settling_solvers, + Err(err) => { + tracing::warn!(?err, "error while searching for non-settling solvers"); + continue; + } + }; + + let now = Instant::now(); + let non_settling_solver_names: Vec<&str> = non_settling_solvers + .iter() + .filter_map(|solver| self_.0.drivers_by_address.get(solver)) + .map(|driver| { + Metrics::get() + .non_settling_solver + .with_label_values(&[&driver.name]); + // Check if solver accepted this feature. This should be removed once the + // CIP making this mandatory has been approved. + if driver.requested_timeout_on_problems { + tracing::debug!(solver = ?driver.name, "disabling solver temporarily"); + self_ + .0 + .banned_solvers + .insert(driver.submission_address, now); + } + driver.name.as_ref() + }) + .collect(); + + tracing::debug!(solvers = ?non_settling_solver_names, "found non-settling solvers"); + } + tracing::error!("stream of settlement updates terminated unexpectedly"); + }); + } +} + +#[async_trait::async_trait] +impl super::Validator for Validator { + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { + if let Some(entry) = self.0.banned_solvers.get(solver) { + return Ok(entry.elapsed() >= self.0.ttl); + } + + Ok(true) + } +} diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs new file mode 100644 index 0000000000..00a2759f21 --- /dev/null +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -0,0 +1,74 @@ +mod db; +mod onchain; + +use { + crate::{ + arguments::DbBasedSolverParticipationGuardConfig, + domain::eth, + infra::{self, Ethereum}, + }, + std::sync::Arc, +}; + +/// This struct checks whether a solver can participate in the competition by +/// using different validators. +#[derive(Clone)] +pub struct SolverParticipationGuard(Arc); + +struct Inner { + /// Stores the validators in order they will be called. + validators: Vec>, +} + +impl SolverParticipationGuard { + pub fn new( + eth: Ethereum, + persistence: infra::Persistence, + competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, + db_based_validator_config: DbBasedSolverParticipationGuardConfig, + drivers: impl IntoIterator>, + ) -> Self { + let mut validators: Vec> = Vec::new(); + + if db_based_validator_config.enabled { + let current_block = eth.current_block().clone(); + let database_solver_participation_validator = db::Validator::new( + persistence, + current_block, + competition_updates_receiver, + db_based_validator_config.solver_blacklist_cache_ttl, + db_based_validator_config.solver_last_auctions_participation_count, + drivers + .into_iter() + .map(|driver| (driver.submission_address, driver.clone())) + .collect(), + ); + validators.push(Box::new(database_solver_participation_validator)); + } + + let onchain_solver_participation_validator = onchain::Validator { eth }; + validators.push(Box::new(onchain_solver_participation_validator)); + + Self(Arc::new(Inner { validators })) + } + + /// Checks if a solver can participate in the competition. + /// Sequentially asks internal validators to avoid redundant RPC calls in + /// the following order: + /// 1. DB-based validator: operates fast since it uses in-memory cache. + /// 2. Onchain-based validator: only then calls the Authenticator contract. + pub async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { + for validator in &self.0.validators { + if !validator.is_allowed(solver).await? { + return Ok(false); + } + } + + Ok(true) + } +} + +#[async_trait::async_trait] +trait Validator: Send + Sync { + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result; +} diff --git a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs new file mode 100644 index 0000000000..82d0ef3fb7 --- /dev/null +++ b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs @@ -0,0 +1,20 @@ +use crate::{domain::eth, infra::Ethereum}; + +/// Calls Authenticator contract to check if a solver has a sufficient +/// permission. +pub(super) struct Validator { + pub eth: Ethereum, +} + +#[async_trait::async_trait] +impl super::Validator for Validator { + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { + Ok(self + .eth + .contracts() + .authenticator() + .is_solver(solver.0) + .call() + .await?) + } +} diff --git a/crates/autopilot/src/domain/mod.rs b/crates/autopilot/src/domain/mod.rs index ceaae58e2a..9d8d9b36db 100644 --- a/crates/autopilot/src/domain/mod.rs +++ b/crates/autopilot/src/domain/mod.rs @@ -14,3 +14,18 @@ pub use { fee::ProtocolFees, quote::Quote, }; + +#[derive(prometheus_metric_storage::MetricStorage)] +#[metric(subsystem = "domain")] +pub struct Metrics { + /// How many times the solver was marked as non-settling based on the + /// database statistics. + #[metric(labels("solver"))] + pub non_settling_solver: prometheus::IntCounterVec, +} + +impl Metrics { + fn get() -> &'static Self { + Metrics::instance(observe::metrics::get_storage_registry()).unwrap() + } +} diff --git a/crates/autopilot/src/infra/persistence/mod.rs b/crates/autopilot/src/infra/persistence/mod.rs index 5a46e35a7e..813ef4fb4c 100644 --- a/crates/autopilot/src/infra/persistence/mod.rs +++ b/crates/autopilot/src/infra/persistence/mod.rs @@ -797,6 +797,33 @@ impl Persistence { ex.commit().await?; Ok(()) } + + /// Finds solvers that won `last_auctions_count` consecutive auctions but + /// never settled any of them. The current block is used to prevent + /// selecting auctions with deadline after the current block since they + /// still can be settled. + pub async fn find_non_settling_solvers( + &self, + last_auctions_count: u32, + current_block: u64, + ) -> anyhow::Result> { + let mut ex = self.postgres.pool.acquire().await.context("acquire")?; + let _timer = Metrics::get() + .database_queries + .with_label_values(&["find_non_settling_solvers"]) + .start_timer(); + + Ok(database::solver_competition::find_non_settling_solvers( + &mut ex, + last_auctions_count, + current_block, + ) + .await + .context("failed to fetch non-settling solvers")? + .into_iter() + .map(|solver| eth::Address(solver.0.into())) + .collect()) + } } #[derive(prometheus_metric_storage::MetricStorage)] diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index b84dc79654..0d8a28d026 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -21,6 +21,7 @@ pub struct Driver { // another driver solved with surplus exceeding this driver's surplus by `threshold` pub fairness_threshold: Option, pub submission_address: eth::Address, + pub requested_timeout_on_problems: bool, client: Client, } @@ -38,6 +39,7 @@ impl Driver { name: String, fairness_threshold: Option, submission_account: Account, + requested_timeout_on_problems: bool, ) -> Result { let submission_address = match submission_account { Account::Kms(key_id) => { @@ -70,6 +72,7 @@ impl Driver { .build() .map_err(Error::FailedToBuildClient)?, submission_address: submission_address.into(), + requested_timeout_on_problems, }) } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 93e3a0a740..9d9b2b6887 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -15,7 +15,7 @@ use { }, Postgres, }, - domain, + domain::{self, competition::SolverParticipationGuard}, event_updater::EventUpdater, infra, maintenance::Maintenance, @@ -365,6 +365,9 @@ pub async fn run(args: Arguments) { None }; + let (competition_updates_sender, competition_updates_receiver) = + tokio::sync::mpsc::unbounded_channel(); + let persistence = infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await; let settlement_observer = @@ -548,6 +551,7 @@ pub async fn run(args: Arguments) { max_winners_per_auction: args.max_winners_per_auction, max_solutions_per_solver: args.max_solutions_per_solver, }; + let drivers_futures = args .drivers .into_iter() @@ -557,6 +561,7 @@ pub async fn run(args: Arguments) { driver.name.clone(), driver.fairness_threshold.map(Into::into), driver.submission_account, + driver.requested_timeout_on_problems, ) .await .map(Arc::new) @@ -564,20 +569,30 @@ pub async fn run(args: Arguments) { }) .collect::>(); - let drivers = futures::future::join_all(drivers_futures) + let drivers: Vec<_> = futures::future::join_all(drivers_futures) .await .into_iter() .collect(); + let solver_participation_guard = SolverParticipationGuard::new( + eth.clone(), + persistence.clone(), + competition_updates_receiver, + args.db_based_solver_participation_guard, + drivers.iter().cloned(), + ); + let run = RunLoop::new( run_loop_config, eth, persistence.clone(), drivers, + solver_participation_guard, solvable_orders_cache, trusted_tokens, liveness.clone(), Arc::new(maintenance), + competition_updates_sender, ); run.run_forever().await; } @@ -599,6 +614,7 @@ async fn shadow_mode(args: Arguments) -> ! { driver.name.clone(), driver.fairness_threshold.map(Into::into), driver.submission_account, + driver.requested_timeout_on_problems, ) .await .map(Arc::new) diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index f540baebf0..d85e9ed95a 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -4,7 +4,14 @@ use { domain::{ self, auction::Id, - competition::{self, Solution, SolutionError, TradedOrder, Unranked}, + competition::{ + self, + Solution, + SolutionError, + SolverParticipationGuard, + TradedOrder, + Unranked, + }, eth::{self, TxId}, OrderUid, }, @@ -59,6 +66,7 @@ pub struct RunLoop { eth: infra::Ethereum, persistence: infra::Persistence, drivers: Vec>, + solver_participation_guard: SolverParticipationGuard, solvable_orders_cache: Arc, trusted_tokens: AutoUpdatingTokenList, in_flight_orders: Arc>>, @@ -66,6 +74,7 @@ pub struct RunLoop { /// Maintenance tasks that should run before every runloop to have /// the most recent data available. maintenance: Arc, + competition_updates_sender: tokio::sync::mpsc::UnboundedSender<()>, } impl RunLoop { @@ -75,21 +84,25 @@ impl RunLoop { eth: infra::Ethereum, persistence: infra::Persistence, drivers: Vec>, + solver_participation_guard: SolverParticipationGuard, solvable_orders_cache: Arc, trusted_tokens: AutoUpdatingTokenList, liveness: Arc, maintenance: Arc, + competition_updates_sender: tokio::sync::mpsc::UnboundedSender<()>, ) -> Self { Self { config, eth, persistence, drivers, + solver_participation_guard, solvable_orders_cache, trusted_tokens, in_flight_orders: Default::default(), liveness, maintenance, + competition_updates_sender, } } @@ -453,7 +466,7 @@ impl RunLoop { competition_table, }; - if let Err(err) = futures::try_join!( + match futures::try_join!( self.persistence .save_auction(auction, block_deadline) .map_err(|e| e.0.context("failed to save auction")), @@ -461,9 +474,18 @@ impl RunLoop { .save_solutions(auction.id, solutions) .map_err(|e| e.0.context("failed to save solutions")), ) { - // Don't error if saving of auction and solution fails, until stable. - // Various edge cases with JIT orders verifiable only in production. - tracing::warn!(?err, "failed to save new competition data"); + Ok(_) => { + // Notify the solver participation guard that the proposed solutions have been + // saved. + if let Err(err) = self.competition_updates_sender.send(()) { + tracing::error!(?err, "failed to notify solver participation guard"); + } + } + Err(err) => { + // Don't error if saving of auction and solution fails, until stable. + // Various edge cases with JIT orders verifiable only in production. + tracing::warn!(?err, "failed to save new competition data"); + } } tracing::trace!(?competition, "saving competition"); @@ -725,23 +747,14 @@ impl RunLoop { request: &solve::Request, ) -> Result>, SolveError> { - let authenticator = self.eth.contracts().authenticator(); - let is_allowed = authenticator - .is_solver(driver.submission_address.into()) - .call() - .await - .map_err(|err| { - tracing::warn!( - driver = driver.name, - ?driver.submission_address, - ?err, - "failed to check if solver is deny listed" - ); + let can_participate = self.solver_participation_guard.can_participate(&driver.submission_address).await.map_err(|err| { + tracing::error!(?err, driver = %driver.name, ?driver.submission_address, "solver participation check failed"); SolveError::SolverDenyListed - })?; + } + )?; - // Do not send the request to the driver if the solver is denied - if !is_allowed { + // Do not send the request to the driver if the solver is deny-listed + if !can_participate { return Err(SolveError::SolverDenyListed); } diff --git a/crates/database/src/solver_competition.rs b/crates/database/src/solver_competition.rs index 01e0183858..f778c4315a 100644 --- a/crates/database/src/solver_competition.rs +++ b/crates/database/src/solver_competition.rs @@ -97,6 +97,69 @@ GROUP BY sc.id sqlx::query_as(QUERY).bind(tx_hash).fetch_optional(ex).await } +/// Identifies solvers that have consistently failed to settle solutions in +/// recent N auctions. +/// +/// 1. Retrieves `last_auctions_count` most recent auctions already ended +/// auctions by filtering them by their deadlines. +/// 2. Identifies solvers who won these auctions but did not submit a successful +/// settlement. +/// 3. Counts how often each solver appears in these unsuccessful cases. +/// 4. Determines the total number of auctions considered. +/// 5. Flags solvers who failed to settle in all of these auctions. +/// 6. Returns a list of solvers that have consistently failed to settle +/// solutions. +pub async fn find_non_settling_solvers( + ex: &mut PgConnection, + last_auctions_count: u32, + current_block: u64, +) -> Result, sqlx::Error> { + const QUERY: &str = r#" +WITH + last_auctions AS ( + SELECT ps.auction_id, ps.solver + FROM ( + SELECT DISTINCT ca.id AS auction_id + FROM competition_auctions ca + WHERE ca.deadline <= $1 + ORDER BY ca.id DESC + LIMIT $2 + ) latest_auctions + JOIN proposed_solutions ps ON ps.auction_id = latest_auctions.auction_id + WHERE ps.is_winner = true + ), + unsuccessful_solvers AS ( + SELECT la.auction_id, la.solver + FROM last_auctions la + LEFT JOIN settlements s + ON la.auction_id = s.auction_id AND la.solver = s.solver + WHERE s.auction_id IS NULL + ), + solver_appearance_count AS ( + SELECT solver, COUNT(DISTINCT auction_id) AS appearance_count + FROM unsuccessful_solvers + GROUP BY solver + ), + auction_count AS ( + SELECT COUNT(DISTINCT auction_id) AS total_auctions + FROM last_auctions + ), + consistent_solvers AS ( + SELECT sa.solver + FROM solver_appearance_count sa, auction_count ac + WHERE sa.appearance_count = ac.total_auctions + ) +SELECT DISTINCT solver +FROM consistent_solvers; + "#; + + sqlx::query_scalar(QUERY) + .bind(sqlx::types::BigDecimal::from(current_block)) + .bind(i64::from(last_auctions_count)) + .fetch_all(ex) + .await +} + #[derive(Clone, Debug, PartialEq, Default)] pub struct Solution { // Unique Id generated by the autopilot to uniquely identify the solution within Auction @@ -317,8 +380,10 @@ mod tests { use { super::*, crate::{ + auction, byte_array::ByteArray, - events::{EventIndex, Settlement}, + events::{self, EventIndex, Settlement}, + settlements, }, sqlx::{Connection, Row}, }; @@ -533,4 +598,171 @@ mod tests { // inserted (2 fetched from "proposed_jit_orders" and 1 from "orders" table) assert!(fetched_solutions[2].orders.len() == 3); } + + #[tokio::test] + #[ignore] + async fn postgres_non_settling_solvers_roundtrip() { + let mut db = PgConnection::connect("postgresql://").await.unwrap(); + let mut db = db.begin().await.unwrap(); + crate::clear_DANGER_(&mut db).await.unwrap(); + + let non_settling_solver = ByteArray([1u8; 20]); + + let mut solution_uid = 0; + let deadline_block = 100u64; + let last_auctions_count = 3i64; + // competition_auctions + // Insert auctions within the deadline + for auction_id in 1..=4 { + let auction = auction::Auction { + id: auction_id, + block: auction_id, + deadline: i64::try_from(deadline_block).unwrap(), + order_uids: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + surplus_capturing_jit_order_owners: Default::default(), + }; + auction::save(&mut db, auction).await.unwrap(); + } + + // Insert auctions outside the deadline + for auction_id in 5..=6 { + let auction = auction::Auction { + id: auction_id, + block: auction_id, + deadline: i64::try_from(deadline_block).unwrap() + auction_id, + order_uids: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + surplus_capturing_jit_order_owners: Default::default(), + }; + auction::save(&mut db, auction).await.unwrap(); + } + + // proposed_solutions + // Non-settling solver wins `last_auctions_count` auctions within the deadline + for auction_id in 2..=4 { + solution_uid += 1; + let solutions = vec![Solution { + uid: auction_id, + id: solution_uid.into(), + solver: non_settling_solver, + is_winner: true, + score: Default::default(), + orders: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + }]; + save_solutions(&mut db, auction_id, &solutions) + .await + .unwrap(); + } + + // Another non-settling solver wins not all the auctions within the deadline + for auction_id in 2..=4 { + solution_uid += 1; + let solutions = vec![Solution { + uid: auction_id, + id: solution_uid.into(), + solver: ByteArray([2u8; 20]), + is_winner: auction_id != 2, + score: Default::default(), + orders: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + }]; + save_solutions(&mut db, auction_id, &solutions) + .await + .unwrap(); + } + + // One more non-settling solver has `last_auctions_count` winning auctions but + // not consecutive + for auction_id in 1..=4 { + // Break the sequence + if auction_id == 2 { + continue; + } + solution_uid += 1; + let solutions = vec![Solution { + uid: auction_id, + id: solution_uid.into(), + solver: ByteArray([3u8; 20]), + is_winner: true, + score: Default::default(), + orders: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + }]; + save_solutions(&mut db, auction_id, &solutions) + .await + .unwrap(); + } + + // One more non-settling solver has `last_auctions_count` winning auctions but + // some of them are outside the deadline + for auction_id in 3..=5 { + solution_uid += 1; + let solutions = vec![Solution { + uid: auction_id, + id: solution_uid.into(), + solver: ByteArray([4u8; 20]), + is_winner: true, + score: Default::default(), + orders: Default::default(), + price_tokens: Default::default(), + price_values: Default::default(), + }]; + save_solutions(&mut db, auction_id, &solutions) + .await + .unwrap(); + } + + // Verify only the non-settling solver is returned + let result = find_non_settling_solvers( + &mut db, + u32::try_from(last_auctions_count).unwrap(), + deadline_block, + ) + .await + .unwrap(); + assert_eq!(result, vec![non_settling_solver]); + + // Non-settling solver settles one of the auctions + let event = EventIndex { + block_number: 4, + log_index: 0, + }; + let settlement = Settlement { + solver: non_settling_solver, + transaction_hash: ByteArray([0u8; 32]), + }; + events::insert_settlement(&mut db, &event, &settlement) + .await + .unwrap(); + + // The same result until the auction_id is updated in the settlements table + let result = find_non_settling_solvers( + &mut db, + u32::try_from(last_auctions_count).unwrap(), + deadline_block, + ) + .await + .unwrap(); + assert_eq!(result, vec![non_settling_solver]); + + settlements::update_settlement_auction(&mut db, 4, 0, 4) + .await + .unwrap(); + + let result = find_non_settling_solvers( + &mut db, + u32::try_from(last_auctions_count).unwrap(), + deadline_block, + ) + .await + .unwrap(); + assert!(result.is_empty()); + } } diff --git a/crates/driver/src/domain/competition/bad_tokens/metrics.rs b/crates/driver/src/domain/competition/bad_tokens/metrics.rs index daaad1ee7c..a0444169b9 100644 --- a/crates/driver/src/domain/competition/bad_tokens/metrics.rs +++ b/crates/driver/src/domain/competition/bad_tokens/metrics.rs @@ -110,7 +110,7 @@ impl Detector { flagged_unsupported_at: None, }); - // token neeeds to be frozen as unsupported for a while + // token needs to be frozen as unsupported for a while if self.quality_based_on_stats(&stats) == Quality::Unsupported && stats .flagged_unsupported_at