Skip to content

Commit

Permalink
refine <index|invalidate>_block_<start|end> flow
Browse files Browse the repository at this point in the history
  • Loading branch information
canonbrother committed Oct 24, 2024
1 parent 5bdf3da commit ed84304
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 150 deletions.
14 changes: 12 additions & 2 deletions lib/ain-ocean/src/indexer/loan_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rust_decimal::{prelude::Zero, Decimal};
use rust_decimal_macros::dec;

use crate::{
indexer::{Context, Index, Result},
indexer::{Context, Index, IndexBlockEnd, Result},
model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated},
network::Network,
storage::{RepositoryOps, SortOrder},
Expand All @@ -32,6 +32,16 @@ impl Index for SetLoanToken {
}
}

impl IndexBlockEnd for SetLoanToken {
fn index_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
index_active_price(services, block)
}

fn invalidate_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
invalidate_active_price(services, block)
}
}

fn is_aggregate_valid(aggregate: &OraclePriceAggregated, block: &BlockContext) -> bool {
if (aggregate.block.time - block.time).abs() >= 3600 {
return false;
Expand Down Expand Up @@ -161,7 +171,7 @@ pub fn perform_active_price_tick(
ticker_id: (Token, Currency),
block: &BlockContext,
) -> Result<()> {
let id = (ticker_id.0.clone(), ticker_id.1.clone(), u32::MAX);
let id = (ticker_id.0, ticker_id.1, u32::MAX);

let prev = services
.oracle_price_aggregated
Expand Down
237 changes: 91 additions & 146 deletions lib/ain-ocean/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ use ain_dftx::{deserialize, is_skipped_tx, DfTx, Stack};
use defichain_rpc::json::blockchain::{Block, Transaction, Vin, VinStandard, Vout};
use helper::check_if_evm_tx;
use log::trace;
pub use poolswap::{PoolSwapAggregatedInterval, AGGREGATED_INTERVALS};
pub use poolswap::PoolSwapAggregatedInterval;

use crate::{
error::{Error, IndexAction},
hex_encoder::as_sha256,
index_transaction, invalidate_transaction,
model::{
Block as BlockMapper, BlockContext, PoolSwapAggregated, PoolSwapAggregatedAggregated,
ScriptActivity, ScriptActivityScript, ScriptActivityType, ScriptActivityTypeHex,
ScriptActivityVin, ScriptActivityVout, ScriptAggregation, ScriptAggregationAmount,
ScriptAggregationScript, ScriptAggregationStatistic, ScriptUnspent, ScriptUnspentScript,
ScriptUnspentVout, TransactionVout, TransactionVoutScript,
Block as BlockMapper, BlockContext, ScriptActivity, ScriptActivityScript,
ScriptActivityType, ScriptActivityTypeHex, ScriptActivityVin, ScriptActivityVout,
ScriptAggregation, ScriptAggregationAmount, ScriptAggregationScript,
ScriptAggregationStatistic, ScriptUnspent, ScriptUnspentScript, ScriptUnspentVout,
TransactionVout, TransactionVoutScript,
},
storage::{RepositoryOps, SortOrder},
Result, Services,
Expand All @@ -38,11 +38,21 @@ use crate::{
pub trait Index {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()>;

// TODO: allow dead_code at the moment
#[allow(dead_code)]
fn invalidate(&self, services: &Arc<Services>, ctx: &Context) -> Result<()>;
}

pub trait IndexBlockStart: Index {
fn index_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;

fn invalidate_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;
}

pub trait IndexBlockEnd: Index {
fn index_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;

fn invalidate_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;
}

#[derive(Debug)]
pub struct Context {
block: BlockContext,
Expand All @@ -55,92 +65,6 @@ fn log_elapsed<S: AsRef<str> + std::fmt::Display>(previous: Instant, msg: S) {
trace!("{} in {} ms", msg, now.duration_since(previous).as_millis());
}

fn get_bucket(block: &Block<Transaction>, interval: i64) -> i64 {
block.mediantime - (block.mediantime % interval)
}

fn index_block_start(services: &Arc<Services>, block: &Block<Transaction>) -> Result<()> {
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));

for interval in AGGREGATED_INTERVALS {
for pool_pair in &pool_pairs {
let repository = &services.pool_swap_aggregated;

let prevs = repository
.by_key
.list(
Some((pool_pair.id, interval, i64::MAX)),
SortOrder::Descending,
)?
.take_while(|item| match item {
Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval,
_ => true,
})
.next()
.transpose()?;

let Some((_, prev_id)) = prevs else {
break;
};

let prev = repository.by_id.get(&prev_id)?;

let Some(prev) = prev else {
break;
};

let bucket = get_bucket(block, i64::from(interval));

if prev.bucket >= bucket {
break;
}

let aggregated = PoolSwapAggregated {
bucket,
aggregated: PoolSwapAggregatedAggregated {
amounts: Default::default(),
},
block: BlockContext {
hash: block.hash,
height: block.height,
time: block.time,
median_time: block.mediantime,
},
};

let pool_swap_aggregated_key = (pool_pair.id, interval, bucket);
let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash);

repository
.by_key
.put(&pool_swap_aggregated_key, &pool_swap_aggregated_id)?;
repository
.by_id
.put(&pool_swap_aggregated_id, &aggregated)?;
}
}

Ok(())
}

fn invalidate_block_start(services: &Arc<Services>, block: &Block<Transaction>) -> Result<()> {
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));

for interval in AGGREGATED_INTERVALS {
for pool_pair in &pool_pairs {
let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash);
services
.pool_swap_aggregated
.by_id
.delete(&pool_swap_aggregated_id)?;
}
}

Ok(())
}

fn get_vin_standard(vin: &Vin) -> Option<VinStandard> {
match vin {
Vin::Coinbase(_vin) => None,
Expand Down Expand Up @@ -599,16 +523,6 @@ fn invalidate_script_activity_vout(
Ok(())
}

fn index_block_end(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
loan_token::index_active_price(services, block)?;
Ok(())
}

fn invalidate_block_end(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
loan_token::invalidate_active_price(services, block)?;
Ok(())
}

pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Result<()> {
trace!("[index_block] Indexing block...");
let start = Instant::now();
Expand All @@ -621,13 +535,13 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
median_time: block.mediantime,
};

index_block_start(services, &block)?;
let mut dftxs = Vec::new();

for (tx_idx, tx) in block.tx.clone().into_iter().enumerate() {
if is_skipped_tx(&tx.txid) {
continue;
}
let start = Instant::now();

let ctx = Context {
block: block_ctx.clone(),
tx,
Expand All @@ -653,23 +567,42 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
match deserialize::<Stack>(raw_tx) {
Err(bitcoin::consensus::encode::Error::ParseFailed("Invalid marker")) => (),
Err(e) => return Err(e.into()),
Ok(Stack { dftx, .. }) => {
match dftx {
DfTx::CreateMasternode(data) => data.index(services, &ctx)?,
DfTx::UpdateMasternode(data) => data.index(services, &ctx)?,
DfTx::ResignMasternode(data) => data.index(services, &ctx)?,
DfTx::AppointOracle(data) => data.index(services, &ctx)?,
DfTx::RemoveOracle(data) => data.index(services, &ctx)?,
DfTx::UpdateOracle(data) => data.index(services, &ctx)?,
DfTx::SetOracleData(data) => data.index(services, &ctx)?,
DfTx::PoolSwap(data) => data.index(services, &ctx)?,
DfTx::SetLoanToken(data) => data.index(services, &ctx)?,
DfTx::CompositeSwap(data) => data.index(services, &ctx)?,
DfTx::PlaceAuctionBid(data) => data.index(services, &ctx)?,
_ => (),
}
log_elapsed(start, "Indexed dftx");
}
Ok(Stack { dftx, .. }) => dftxs.push((dftx, ctx)),
}
}

// index_block_start
for (dftx, _) in &dftxs {
if let DfTx::PoolSwap(data) = dftx.clone() {
data.index_block_start(services, &block_ctx)?
}
}

// index_dftx
for (dftx, ctx) in &dftxs {
let start = Instant::now();

match dftx.clone() {
DfTx::CreateMasternode(data) => data.index(services, ctx)?,
DfTx::UpdateMasternode(data) => data.index(services, ctx)?,
DfTx::ResignMasternode(data) => data.index(services, ctx)?,
DfTx::AppointOracle(data) => data.index(services, ctx)?,
DfTx::RemoveOracle(data) => data.index(services, ctx)?,
DfTx::UpdateOracle(data) => data.index(services, ctx)?,
DfTx::SetOracleData(data) => data.index(services, ctx)?,
DfTx::PoolSwap(data) => data.index(services, ctx)?,
DfTx::SetLoanToken(data) => data.index(services, ctx)?,
DfTx::CompositeSwap(data) => data.index(services, ctx)?,
DfTx::PlaceAuctionBid(data) => data.index(services, ctx)?,
_ => (),
}
log_elapsed(start, "Indexed dftx");
}

// index_block_end
for (dftx, _) in dftxs {
if let DfTx::SetLoanToken(data) = dftx.clone() {
data.index_block_end(services, &block_ctx)?
}
}

Expand All @@ -693,8 +626,6 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
weight: block.weight,
};

index_block_end(services, &block_ctx)?;

// services.block.raw.put(&ctx.hash, &encoded_block)?; TODO
services.block.by_id.put(&block_ctx.hash, &block_mapper)?;
services
Expand All @@ -715,14 +646,12 @@ pub fn invalidate_block(services: &Arc<Services>, block: Block<Transaction>) ->
median_time: block.mediantime,
};

invalidate_block_end(services, &block_ctx)?;
let mut dftxs = Vec::new();

// invalidate_dftx
for (tx_idx, tx) in block.tx.clone().into_iter().enumerate() {
if is_skipped_tx(&tx.txid) {
continue;
}
let start = Instant::now();
let ctx = Context {
block: block_ctx.clone(),
tx,
Expand Down Expand Up @@ -750,27 +679,43 @@ pub fn invalidate_block(services: &Arc<Services>, block: Block<Transaction>) ->
println!("Discarding invalid marker");
}
Err(e) => return Err(e.into()),
Ok(Stack { dftx, .. }) => {
match dftx {
DfTx::CreateMasternode(data) => data.invalidate(services, &ctx)?,
DfTx::UpdateMasternode(data) => data.invalidate(services, &ctx)?,
DfTx::ResignMasternode(data) => data.invalidate(services, &ctx)?,
DfTx::AppointOracle(data) => data.invalidate(services, &ctx)?,
DfTx::RemoveOracle(data) => data.invalidate(services, &ctx)?, // check
DfTx::UpdateOracle(data) => data.invalidate(services, &ctx)?, // check
DfTx::SetOracleData(data) => data.invalidate(services, &ctx)?,
DfTx::PoolSwap(data) => data.invalidate(services, &ctx)?, // check
DfTx::SetLoanToken(data) => data.invalidate(services, &ctx)?,
DfTx::CompositeSwap(data) => data.invalidate(services, &ctx)?,
DfTx::PlaceAuctionBid(data) => data.invalidate(services, &ctx)?,
_ => (),
}
log_elapsed(start, "Invalidate dftx");
}
Ok(Stack { dftx, .. }) => dftxs.push((dftx, ctx)),
}
}

// invalidate_block_end
for (dftx, _) in &dftxs {
if let DfTx::SetLoanToken(data) = dftx.clone() {
data.invalidate_block_end(services, &block_ctx)?
}
}

// invalidate_dftx
for (dftx, ctx) in &dftxs {
let start = Instant::now();
match dftx {
DfTx::CreateMasternode(data) => data.invalidate(services, ctx)?,
DfTx::UpdateMasternode(data) => data.invalidate(services, ctx)?,
DfTx::ResignMasternode(data) => data.invalidate(services, ctx)?,
DfTx::AppointOracle(data) => data.invalidate(services, ctx)?,
DfTx::RemoveOracle(data) => data.invalidate(services, ctx)?,
DfTx::UpdateOracle(data) => data.invalidate(services, ctx)?,
DfTx::SetOracleData(data) => data.invalidate(services, ctx)?,
DfTx::PoolSwap(data) => data.invalidate(services, ctx)?,
DfTx::SetLoanToken(data) => data.invalidate(services, ctx)?,
DfTx::CompositeSwap(data) => data.invalidate(services, ctx)?,
DfTx::PlaceAuctionBid(data) => data.invalidate(services, ctx)?,
_ => (),
}
log_elapsed(start, "Invalidate dftx");
}

invalidate_block_start(services, &block)?;
// invalidate_block_start
for (dftx, _) in &dftxs {
if let DfTx::PoolSwap(data) = dftx.clone() {
data.invalidate_block_start(services, &block_ctx)?
}
}

// invalidate_block
services.block.by_height.delete(&block.height)?;
Expand Down
Loading

0 comments on commit ed84304

Please sign in to comment.