From 8d2faaa2b326ed129c66297dd1377cecbdc1d2e8 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 24 Oct 2024 16:51:41 +0800 Subject: [PATCH] refine _block_ flow --- lib/ain-ocean/src/indexer/loan_token.rs | 14 +- lib/ain-ocean/src/indexer/mod.rs | 229 +++++++++--------------- lib/ain-ocean/src/indexer/poolswap.rs | 88 ++++++++- 3 files changed, 185 insertions(+), 146 deletions(-) diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index 85e896f7571..bb7fdf89508 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -6,7 +6,7 @@ use rust_decimal::{prelude::Zero, Decimal}; use rust_decimal_macros::dec; use crate::{ - indexer::{Context, Index, Result}, + indexer::{Context, Index, IndexBlockEnd, Result}, model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated}, network::Network, storage::{RepositoryOps, SortOrder}, @@ -32,6 +32,16 @@ impl Index for SetLoanToken { } } +impl IndexBlockEnd for SetLoanToken { + fn index_block_end(self, services: &Arc, block: &BlockContext) -> Result<()> { + index_active_price(services, block) + } + + fn invalidate_block_end(self, services: &Arc, block: &BlockContext) -> Result<()> { + invalidate_active_price(services, block) + } +} + fn is_aggregate_valid(aggregate: &OraclePriceAggregated, block: &BlockContext) -> bool { if (aggregate.block.time - block.time).abs() >= 3600 { return false; @@ -161,7 +171,7 @@ pub fn perform_active_price_tick( ticker_id: (Token, Currency), block: &BlockContext, ) -> Result<()> { - let id = (ticker_id.0.clone(), ticker_id.1.clone(), u32::MAX); + let id = (ticker_id.0, ticker_id.1, u32::MAX); let prev = services .oracle_price_aggregated diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index dbc341fd418..19ca0cbb3c4 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -18,14 +18,14 @@ use ain_dftx::{deserialize, is_skipped_tx, DfTx, Stack}; use defichain_rpc::json::blockchain::{Block, Transaction, Vin, VinStandard, Vout}; use helper::check_if_evm_tx; use log::trace; -pub use poolswap::{PoolSwapAggregatedInterval, AGGREGATED_INTERVALS}; +pub use poolswap::PoolSwapAggregatedInterval; use crate::{ error::{Error, IndexAction}, hex_encoder::as_sha256, index_transaction, invalidate_transaction, model::{ - Block as BlockMapper, BlockContext, PoolSwapAggregated, PoolSwapAggregatedAggregated, + Block as BlockMapper, BlockContext, ScriptActivity, ScriptActivityScript, ScriptActivityType, ScriptActivityTypeHex, ScriptActivityVin, ScriptActivityVout, ScriptAggregation, ScriptAggregationAmount, ScriptAggregationScript, ScriptAggregationStatistic, ScriptUnspent, ScriptUnspentScript, @@ -38,11 +38,21 @@ use crate::{ pub trait Index { fn index(self, services: &Arc, ctx: &Context) -> Result<()>; - // TODO: allow dead_code at the moment - #[allow(dead_code)] fn invalidate(&self, services: &Arc, ctx: &Context) -> Result<()>; } +pub trait IndexBlockStart: Index { + fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()>; + + fn invalidate_block_start(self, services: &Arc, block: &BlockContext) -> Result<()>; +} + +pub trait IndexBlockEnd: Index { + fn index_block_end(self, services: &Arc, block: &BlockContext) -> Result<()>; + + fn invalidate_block_end(self, services: &Arc, block: &BlockContext) -> Result<()>; +} + #[derive(Debug)] pub struct Context { block: BlockContext, @@ -55,92 +65,6 @@ fn log_elapsed + std::fmt::Display>(previous: Instant, msg: S) { trace!("{} in {} ms", msg, now.duration_since(previous).as_millis()); } -fn get_bucket(block: &Block, interval: i64) -> i64 { - block.mediantime - (block.mediantime % interval) -} - -fn index_block_start(services: &Arc, block: &Block) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); - pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); - - for interval in AGGREGATED_INTERVALS { - for pool_pair in &pool_pairs { - let repository = &services.pool_swap_aggregated; - - let prevs = repository - .by_key - .list( - Some((pool_pair.id, interval, i64::MAX)), - SortOrder::Descending, - )? - .take_while(|item| match item { - Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval, - _ => true, - }) - .next() - .transpose()?; - - let Some((_, prev_id)) = prevs else { - break; - }; - - let prev = repository.by_id.get(&prev_id)?; - - let Some(prev) = prev else { - break; - }; - - let bucket = get_bucket(block, i64::from(interval)); - - if prev.bucket >= bucket { - break; - } - - let aggregated = PoolSwapAggregated { - bucket, - aggregated: PoolSwapAggregatedAggregated { - amounts: Default::default(), - }, - block: BlockContext { - hash: block.hash, - height: block.height, - time: block.time, - median_time: block.mediantime, - }, - }; - - let pool_swap_aggregated_key = (pool_pair.id, interval, bucket); - let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash); - - repository - .by_key - .put(&pool_swap_aggregated_key, &pool_swap_aggregated_id)?; - repository - .by_id - .put(&pool_swap_aggregated_id, &aggregated)?; - } - } - - Ok(()) -} - -fn invalidate_block_start(services: &Arc, block: &Block) -> Result<()> { - let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); - pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); - - for interval in AGGREGATED_INTERVALS { - for pool_pair in &pool_pairs { - let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash); - services - .pool_swap_aggregated - .by_id - .delete(&pool_swap_aggregated_id)?; - } - } - - Ok(()) -} - fn get_vin_standard(vin: &Vin) -> Option { match vin { Vin::Coinbase(_vin) => None, @@ -599,16 +523,6 @@ fn invalidate_script_activity_vout( Ok(()) } -fn index_block_end(services: &Arc, block: &BlockContext) -> Result<()> { - loan_token::index_active_price(services, block)?; - Ok(()) -} - -fn invalidate_block_end(services: &Arc, block: &BlockContext) -> Result<()> { - loan_token::invalidate_active_price(services, block)?; - Ok(()) -} - pub fn index_block(services: &Arc, block: Block) -> Result<()> { trace!("[index_block] Indexing block..."); let start = Instant::now(); @@ -621,13 +535,13 @@ pub fn index_block(services: &Arc, block: Block) -> Resul median_time: block.mediantime, }; - index_block_start(services, &block)?; + let mut dftxs = Vec::new(); for (tx_idx, tx) in block.tx.clone().into_iter().enumerate() { if is_skipped_tx(&tx.txid) { continue; } - let start = Instant::now(); + let ctx = Context { block: block_ctx.clone(), tx, @@ -653,23 +567,42 @@ pub fn index_block(services: &Arc, block: Block) -> Resul match deserialize::(raw_tx) { Err(bitcoin::consensus::encode::Error::ParseFailed("Invalid marker")) => (), Err(e) => return Err(e.into()), - Ok(Stack { dftx, .. }) => { - match dftx { - DfTx::CreateMasternode(data) => data.index(services, &ctx)?, - DfTx::UpdateMasternode(data) => data.index(services, &ctx)?, - DfTx::ResignMasternode(data) => data.index(services, &ctx)?, - DfTx::AppointOracle(data) => data.index(services, &ctx)?, - DfTx::RemoveOracle(data) => data.index(services, &ctx)?, - DfTx::UpdateOracle(data) => data.index(services, &ctx)?, - DfTx::SetOracleData(data) => data.index(services, &ctx)?, - DfTx::PoolSwap(data) => data.index(services, &ctx)?, - DfTx::SetLoanToken(data) => data.index(services, &ctx)?, - DfTx::CompositeSwap(data) => data.index(services, &ctx)?, - DfTx::PlaceAuctionBid(data) => data.index(services, &ctx)?, - _ => (), - } - log_elapsed(start, "Indexed dftx"); - } + Ok(Stack { dftx, ..}) => dftxs.push((dftx, ctx)), + } + } + + // index_block_start + for (dftx, _) in &dftxs { + if let DfTx::PoolSwap(data) = dftx.clone() { + data.index_block_start(services, &block_ctx)? + } + } + + // index_dftx + for (dftx, ctx) in &dftxs { + let start = Instant::now(); + + match dftx.clone() { + DfTx::CreateMasternode(data) => data.index(services, ctx)?, + DfTx::UpdateMasternode(data) => data.index(services, ctx)?, + DfTx::ResignMasternode(data) => data.index(services, ctx)?, + DfTx::AppointOracle(data) => data.index(services, ctx)?, + DfTx::RemoveOracle(data) => data.index(services, ctx)?, + DfTx::UpdateOracle(data) => data.index(services, ctx)?, + DfTx::SetOracleData(data) => data.index(services, ctx)?, + DfTx::PoolSwap(data) => data.index(services, ctx)?, + DfTx::SetLoanToken(data) => data.index(services, ctx)?, + DfTx::CompositeSwap(data) => data.index(services, ctx)?, + DfTx::PlaceAuctionBid(data) => data.index(services, ctx)?, + _ => (), + } + log_elapsed(start, "Indexed dftx"); + } + + // index_block_end + for (dftx, _) in dftxs { + if let DfTx::SetLoanToken(data) = dftx.clone() { + data.index_block_end(services, &block_ctx)? } } @@ -693,8 +626,6 @@ pub fn index_block(services: &Arc, block: Block) -> Resul weight: block.weight, }; - index_block_end(services, &block_ctx)?; - // services.block.raw.put(&ctx.hash, &encoded_block)?; TODO services.block.by_id.put(&block_ctx.hash, &block_mapper)?; services @@ -715,14 +646,12 @@ pub fn invalidate_block(services: &Arc, block: Block) -> median_time: block.mediantime, }; - invalidate_block_end(services, &block_ctx)?; + let mut dftxs = Vec::new(); - // invalidate_dftx for (tx_idx, tx) in block.tx.clone().into_iter().enumerate() { if is_skipped_tx(&tx.txid) { continue; } - let start = Instant::now(); let ctx = Context { block: block_ctx.clone(), tx, @@ -750,27 +679,43 @@ pub fn invalidate_block(services: &Arc, block: Block) -> println!("Discarding invalid marker"); } Err(e) => return Err(e.into()), - Ok(Stack { dftx, .. }) => { - match dftx { - DfTx::CreateMasternode(data) => data.invalidate(services, &ctx)?, - DfTx::UpdateMasternode(data) => data.invalidate(services, &ctx)?, - DfTx::ResignMasternode(data) => data.invalidate(services, &ctx)?, - DfTx::AppointOracle(data) => data.invalidate(services, &ctx)?, - DfTx::RemoveOracle(data) => data.invalidate(services, &ctx)?, // check - DfTx::UpdateOracle(data) => data.invalidate(services, &ctx)?, // check - DfTx::SetOracleData(data) => data.invalidate(services, &ctx)?, - DfTx::PoolSwap(data) => data.invalidate(services, &ctx)?, // check - DfTx::SetLoanToken(data) => data.invalidate(services, &ctx)?, - DfTx::CompositeSwap(data) => data.invalidate(services, &ctx)?, - DfTx::PlaceAuctionBid(data) => data.invalidate(services, &ctx)?, - _ => (), - } - log_elapsed(start, "Invalidate dftx"); - } + Ok(Stack { dftx, .. }) => dftxs.push((dftx, ctx)), + } + } + + // invalidate_block_end + for (dftx, _) in &dftxs { + if let DfTx::SetLoanToken(data) = dftx.clone() { + data.invalidate_block_end(services, &block_ctx)? + } + } + + // invalidate_dftx + for (dftx, ctx) in &dftxs { + let start = Instant::now(); + match dftx { + DfTx::CreateMasternode(data) => data.invalidate(services, ctx)?, + DfTx::UpdateMasternode(data) => data.invalidate(services, ctx)?, + DfTx::ResignMasternode(data) => data.invalidate(services, ctx)?, + DfTx::AppointOracle(data) => data.invalidate(services, ctx)?, + DfTx::RemoveOracle(data) => data.invalidate(services, ctx)?, + DfTx::UpdateOracle(data) => data.invalidate(services, ctx)?, + DfTx::SetOracleData(data) => data.invalidate(services, ctx)?, + DfTx::PoolSwap(data) => data.invalidate(services, ctx)?, + DfTx::SetLoanToken(data) => data.invalidate(services, ctx)?, + DfTx::CompositeSwap(data) => data.invalidate(services, ctx)?, + DfTx::PlaceAuctionBid(data) => data.invalidate(services, ctx)?, + _ => (), } + log_elapsed(start, "Invalidate dftx"); } - invalidate_block_start(services, &block)?; + // invalidate_block_start + for (dftx, _) in &dftxs { + if let DfTx::PoolSwap(data) = dftx.clone() { + data.invalidate_block_start(services, &block_ctx)? + } + } // invalidate_block services.block.by_height.delete(&block.height)?; diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index 844e28b3581..3e749855d4a 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -7,11 +7,11 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; use snafu::OptionExt; -use super::Context; +use super::{Context, IndexBlockStart}; use crate::{ error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu}, indexer::{tx_result, Index, Result}, - model::{self, PoolSwapResult, TxResult}, + model::{self, BlockContext, PoolSwapResult, TxResult, PoolSwapAggregated, PoolSwapAggregatedAggregated}, storage::{RepositoryOps, SortOrder}, Services, }; @@ -147,6 +147,90 @@ fn invalidate_swap_aggregated( Ok(()) } +impl IndexBlockStart for PoolSwap { + fn index_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { + let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); + + for interval in AGGREGATED_INTERVALS { + for pool_pair in &pool_pairs { + let repository = &services.pool_swap_aggregated; + + let prevs = repository + .by_key + .list( + Some((pool_pair.id, interval, i64::MAX)), + SortOrder::Descending, + )? + .take_while(|item| match item { + Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval, + _ => true, + }) + .next() + .transpose()?; + + let Some((_, prev_id)) = prevs else { + break; + }; + + let prev = repository.by_id.get(&prev_id)?; + + let Some(prev) = prev else { + break; + }; + + let bucket = block.median_time - (block.median_time % interval as i64); + + if prev.bucket >= bucket { + break; + } + + let aggregated = PoolSwapAggregated { + bucket, + aggregated: PoolSwapAggregatedAggregated { + amounts: Default::default(), + }, + block: BlockContext { + hash: block.hash, + height: block.height, + time: block.time, + median_time: block.median_time, + }, + }; + + let pool_swap_aggregated_key = (pool_pair.id, interval, bucket); + let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash); + + repository + .by_key + .put(&pool_swap_aggregated_key, &pool_swap_aggregated_id)?; + repository + .by_id + .put(&pool_swap_aggregated_id, &aggregated)?; + } + } + + Ok(()) + } + + fn invalidate_block_start(self, services: &Arc, block: &BlockContext) -> Result<()> { + let mut pool_pairs = ain_cpp_imports::get_pool_pairs(); + pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height)); + + for interval in AGGREGATED_INTERVALS { + for pool_pair in &pool_pairs { + let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash); + services + .pool_swap_aggregated + .by_id + .delete(&pool_swap_aggregated_id)?; + } + } + + Ok(()) + } +} + impl Index for PoolSwap { fn index(self, services: &Arc, ctx: &Context) -> Result<()> { trace!("[Poolswap] Indexing...");