Skip to content

Commit

Permalink
refine <index|invalidate>_block_<start|end> flow
Browse files Browse the repository at this point in the history
  • Loading branch information
canonbrother committed Oct 24, 2024
1 parent 112baba commit 8d2faaa
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 146 deletions.
14 changes: 12 additions & 2 deletions lib/ain-ocean/src/indexer/loan_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rust_decimal::{prelude::Zero, Decimal};
use rust_decimal_macros::dec;

use crate::{
indexer::{Context, Index, Result},
indexer::{Context, Index, IndexBlockEnd, Result},
model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated},
network::Network,
storage::{RepositoryOps, SortOrder},
Expand All @@ -32,6 +32,16 @@ impl Index for SetLoanToken {
}
}

impl IndexBlockEnd for SetLoanToken {
fn index_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
index_active_price(services, block)
}

fn invalidate_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()> {
invalidate_active_price(services, block)
}
}

fn is_aggregate_valid(aggregate: &OraclePriceAggregated, block: &BlockContext) -> bool {
if (aggregate.block.time - block.time).abs() >= 3600 {
return false;
Expand Down Expand Up @@ -161,7 +171,7 @@ pub fn perform_active_price_tick(
ticker_id: (Token, Currency),
block: &BlockContext,
) -> Result<()> {
let id = (ticker_id.0.clone(), ticker_id.1.clone(), u32::MAX);
let id = (ticker_id.0, ticker_id.1, u32::MAX);

let prev = services
.oracle_price_aggregated
Expand Down
229 changes: 87 additions & 142 deletions lib/ain-ocean/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use ain_dftx::{deserialize, is_skipped_tx, DfTx, Stack};
use defichain_rpc::json::blockchain::{Block, Transaction, Vin, VinStandard, Vout};
use helper::check_if_evm_tx;
use log::trace;
pub use poolswap::{PoolSwapAggregatedInterval, AGGREGATED_INTERVALS};
pub use poolswap::PoolSwapAggregatedInterval;

use crate::{
error::{Error, IndexAction},
hex_encoder::as_sha256,
index_transaction, invalidate_transaction,
model::{
Block as BlockMapper, BlockContext, PoolSwapAggregated, PoolSwapAggregatedAggregated,
Block as BlockMapper, BlockContext,
ScriptActivity, ScriptActivityScript, ScriptActivityType, ScriptActivityTypeHex,
ScriptActivityVin, ScriptActivityVout, ScriptAggregation, ScriptAggregationAmount,
ScriptAggregationScript, ScriptAggregationStatistic, ScriptUnspent, ScriptUnspentScript,
Expand All @@ -38,11 +38,21 @@ use crate::{
pub trait Index {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()>;

// TODO: allow dead_code at the moment
#[allow(dead_code)]
fn invalidate(&self, services: &Arc<Services>, ctx: &Context) -> Result<()>;
}

pub trait IndexBlockStart: Index {
fn index_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;

fn invalidate_block_start(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;
}

pub trait IndexBlockEnd: Index {
fn index_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;

fn invalidate_block_end(self, services: &Arc<Services>, block: &BlockContext) -> Result<()>;
}

#[derive(Debug)]
pub struct Context {
block: BlockContext,
Expand All @@ -55,92 +65,6 @@ fn log_elapsed<S: AsRef<str> + std::fmt::Display>(previous: Instant, msg: S) {
trace!("{} in {} ms", msg, now.duration_since(previous).as_millis());
}

fn get_bucket(block: &Block<Transaction>, interval: i64) -> i64 {
block.mediantime - (block.mediantime % interval)
}

fn index_block_start(services: &Arc<Services>, block: &Block<Transaction>) -> Result<()> {
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));

for interval in AGGREGATED_INTERVALS {
for pool_pair in &pool_pairs {
let repository = &services.pool_swap_aggregated;

let prevs = repository
.by_key
.list(
Some((pool_pair.id, interval, i64::MAX)),
SortOrder::Descending,
)?
.take_while(|item| match item {
Ok((k, _)) => k.0 == pool_pair.id && k.1 == interval,
_ => true,
})
.next()
.transpose()?;

let Some((_, prev_id)) = prevs else {
break;
};

let prev = repository.by_id.get(&prev_id)?;

let Some(prev) = prev else {
break;
};

let bucket = get_bucket(block, i64::from(interval));

if prev.bucket >= bucket {
break;
}

let aggregated = PoolSwapAggregated {
bucket,
aggregated: PoolSwapAggregatedAggregated {
amounts: Default::default(),
},
block: BlockContext {
hash: block.hash,
height: block.height,
time: block.time,
median_time: block.mediantime,
},
};

let pool_swap_aggregated_key = (pool_pair.id, interval, bucket);
let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash);

repository
.by_key
.put(&pool_swap_aggregated_key, &pool_swap_aggregated_id)?;
repository
.by_id
.put(&pool_swap_aggregated_id, &aggregated)?;
}
}

Ok(())
}

fn invalidate_block_start(services: &Arc<Services>, block: &Block<Transaction>) -> Result<()> {
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));

for interval in AGGREGATED_INTERVALS {
for pool_pair in &pool_pairs {
let pool_swap_aggregated_id = (pool_pair.id, interval, block.hash);
services
.pool_swap_aggregated
.by_id
.delete(&pool_swap_aggregated_id)?;
}
}

Ok(())
}

fn get_vin_standard(vin: &Vin) -> Option<VinStandard> {
match vin {
Vin::Coinbase(_vin) => None,
Expand Down Expand Up @@ -599,16 +523,6 @@ fn invalidate_script_activity_vout(
Ok(())
}

fn index_block_end(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
loan_token::index_active_price(services, block)?;
Ok(())
}

fn invalidate_block_end(services: &Arc<Services>, block: &BlockContext) -> Result<()> {
loan_token::invalidate_active_price(services, block)?;
Ok(())
}

pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Result<()> {
trace!("[index_block] Indexing block...");
let start = Instant::now();
Expand All @@ -621,13 +535,13 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
median_time: block.mediantime,
};

index_block_start(services, &block)?;
let mut dftxs = Vec::new();

for (tx_idx, tx) in block.tx.clone().into_iter().enumerate() {
if is_skipped_tx(&tx.txid) {
continue;
}
let start = Instant::now();

let ctx = Context {
block: block_ctx.clone(),
tx,
Expand All @@ -653,23 +567,42 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
match deserialize::<Stack>(raw_tx) {
Err(bitcoin::consensus::encode::Error::ParseFailed("Invalid marker")) => (),
Err(e) => return Err(e.into()),
Ok(Stack { dftx, .. }) => {
match dftx {
DfTx::CreateMasternode(data) => data.index(services, &ctx)?,
DfTx::UpdateMasternode(data) => data.index(services, &ctx)?,
DfTx::ResignMasternode(data) => data.index(services, &ctx)?,
DfTx::AppointOracle(data) => data.index(services, &ctx)?,
DfTx::RemoveOracle(data) => data.index(services, &ctx)?,
DfTx::UpdateOracle(data) => data.index(services, &ctx)?,
DfTx::SetOracleData(data) => data.index(services, &ctx)?,
DfTx::PoolSwap(data) => data.index(services, &ctx)?,
DfTx::SetLoanToken(data) => data.index(services, &ctx)?,
DfTx::CompositeSwap(data) => data.index(services, &ctx)?,
DfTx::PlaceAuctionBid(data) => data.index(services, &ctx)?,
_ => (),
}
log_elapsed(start, "Indexed dftx");
}
Ok(Stack { dftx, ..}) => dftxs.push((dftx, ctx)),
}
}

// index_block_start
for (dftx, _) in &dftxs {
if let DfTx::PoolSwap(data) = dftx.clone() {
data.index_block_start(services, &block_ctx)?
}
}

// index_dftx
for (dftx, ctx) in &dftxs {
let start = Instant::now();

match dftx.clone() {
DfTx::CreateMasternode(data) => data.index(services, ctx)?,
DfTx::UpdateMasternode(data) => data.index(services, ctx)?,
DfTx::ResignMasternode(data) => data.index(services, ctx)?,
DfTx::AppointOracle(data) => data.index(services, ctx)?,
DfTx::RemoveOracle(data) => data.index(services, ctx)?,
DfTx::UpdateOracle(data) => data.index(services, ctx)?,
DfTx::SetOracleData(data) => data.index(services, ctx)?,
DfTx::PoolSwap(data) => data.index(services, ctx)?,
DfTx::SetLoanToken(data) => data.index(services, ctx)?,
DfTx::CompositeSwap(data) => data.index(services, ctx)?,
DfTx::PlaceAuctionBid(data) => data.index(services, ctx)?,
_ => (),
}
log_elapsed(start, "Indexed dftx");
}

// index_block_end
for (dftx, _) in dftxs {
if let DfTx::SetLoanToken(data) = dftx.clone() {
data.index_block_end(services, &block_ctx)?
}
}

Expand All @@ -693,8 +626,6 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
weight: block.weight,
};

index_block_end(services, &block_ctx)?;

// services.block.raw.put(&ctx.hash, &encoded_block)?; TODO
services.block.by_id.put(&block_ctx.hash, &block_mapper)?;
services
Expand All @@ -715,14 +646,12 @@ pub fn invalidate_block(services: &Arc<Services>, block: Block<Transaction>) ->
median_time: block.mediantime,
};

invalidate_block_end(services, &block_ctx)?;
let mut dftxs = Vec::new();

// invalidate_dftx
for (tx_idx, tx) in block.tx.clone().into_iter().enumerate() {
if is_skipped_tx(&tx.txid) {
continue;
}
let start = Instant::now();
let ctx = Context {
block: block_ctx.clone(),
tx,
Expand Down Expand Up @@ -750,27 +679,43 @@ pub fn invalidate_block(services: &Arc<Services>, block: Block<Transaction>) ->
println!("Discarding invalid marker");
}
Err(e) => return Err(e.into()),
Ok(Stack { dftx, .. }) => {
match dftx {
DfTx::CreateMasternode(data) => data.invalidate(services, &ctx)?,
DfTx::UpdateMasternode(data) => data.invalidate(services, &ctx)?,
DfTx::ResignMasternode(data) => data.invalidate(services, &ctx)?,
DfTx::AppointOracle(data) => data.invalidate(services, &ctx)?,
DfTx::RemoveOracle(data) => data.invalidate(services, &ctx)?, // check
DfTx::UpdateOracle(data) => data.invalidate(services, &ctx)?, // check
DfTx::SetOracleData(data) => data.invalidate(services, &ctx)?,
DfTx::PoolSwap(data) => data.invalidate(services, &ctx)?, // check
DfTx::SetLoanToken(data) => data.invalidate(services, &ctx)?,
DfTx::CompositeSwap(data) => data.invalidate(services, &ctx)?,
DfTx::PlaceAuctionBid(data) => data.invalidate(services, &ctx)?,
_ => (),
}
log_elapsed(start, "Invalidate dftx");
}
Ok(Stack { dftx, .. }) => dftxs.push((dftx, ctx)),
}
}

// invalidate_block_end
for (dftx, _) in &dftxs {
if let DfTx::SetLoanToken(data) = dftx.clone() {
data.invalidate_block_end(services, &block_ctx)?
}
}

// invalidate_dftx
for (dftx, ctx) in &dftxs {
let start = Instant::now();
match dftx {
DfTx::CreateMasternode(data) => data.invalidate(services, ctx)?,
DfTx::UpdateMasternode(data) => data.invalidate(services, ctx)?,
DfTx::ResignMasternode(data) => data.invalidate(services, ctx)?,
DfTx::AppointOracle(data) => data.invalidate(services, ctx)?,
DfTx::RemoveOracle(data) => data.invalidate(services, ctx)?,
DfTx::UpdateOracle(data) => data.invalidate(services, ctx)?,
DfTx::SetOracleData(data) => data.invalidate(services, ctx)?,
DfTx::PoolSwap(data) => data.invalidate(services, ctx)?,
DfTx::SetLoanToken(data) => data.invalidate(services, ctx)?,
DfTx::CompositeSwap(data) => data.invalidate(services, ctx)?,
DfTx::PlaceAuctionBid(data) => data.invalidate(services, ctx)?,
_ => (),
}
log_elapsed(start, "Invalidate dftx");
}

invalidate_block_start(services, &block)?;
// invalidate_block_start
for (dftx, _) in &dftxs {
if let DfTx::PoolSwap(data) = dftx.clone() {
data.invalidate_block_start(services, &block_ctx)?
}
}

// invalidate_block
services.block.by_height.delete(&block.height)?;
Expand Down
Loading

0 comments on commit 8d2faaa

Please sign in to comment.