Skip to content

Commit

Permalink
fix(torii-indexer): incoherent block ordering with multiple contracts (
Browse files Browse the repository at this point in the history
…#3081)

* fix(torii-indexer): incoherent block ordering with multiple contracts

* chore
  • Loading branch information
Larkooo authored Mar 6, 2025
1 parent a2797e2 commit f971dc1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
2 changes: 1 addition & 1 deletion crates/torii/indexer/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub(crate) const LOG_TARGET: &str = "torii::engine";
pub(crate) const LOG_TARGET: &str = "torii_indexer::engine";
45 changes: 25 additions & 20 deletions crates/torii/indexer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl FetchDataResult {
pub struct FetchRangeResult {
// (block_number, transaction_hash) -> events
// NOTE: LinkedList might contains blocks in different order
pub transactions: LinkedHashMap<(u64, Felt), Vec<EmittedEvent>>,
pub transactions: BTreeMap<u64, LinkedHashMap<Felt, Vec<EmittedEvent>>>,
pub blocks: BTreeMap<u64, u64>,
pub latest_block_number: u64,
}
Expand Down Expand Up @@ -312,6 +312,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
self.db.flush().await?;
self.db.apply_cache_diff(block_id).await?;
self.db.execute().await?;
debug!(target: LOG_TARGET, block_number = ?block_id, "Flushed and applied cache diff.");
}
},
Err(e) => {
Expand Down Expand Up @@ -444,7 +445,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

// Flatten events pages and events according to the pending block cursor
// to array of (block_number, transaction_hash)
let mut transactions = LinkedHashMap::new();
let mut transactions = BTreeMap::new();

let mut block_set = HashSet::new();
for event in events {
Expand All @@ -456,7 +457,9 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
block_set.insert(block_number);

transactions
.entry((block_number, event.transaction_hash))
.entry(block_number)
.or_insert(LinkedHashMap::new())
.entry(event.transaction_hash)
.or_insert(vec![])
.push(event);
}
Expand Down Expand Up @@ -575,24 +578,26 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// Process all transactions
let mut processed_blocks = HashSet::new();
let mut cursor_map = HashMap::new();
for ((block_number, transaction_hash), events) in data.transactions {
debug!("Processing transaction hash: {:#x}", transaction_hash);
// Process transaction
let transaction = if self.config.flags.contains(IndexingFlags::TRANSACTIONS) {
Some(self.provider.get_transaction_by_hash(transaction_hash).await?)
} else {
None
};
for (block_number, transactions) in data.transactions {
for (transaction_hash, events) in transactions {
debug!("Processing transaction hash: {:#x}", transaction_hash);
// Process transaction
let transaction = if self.config.flags.contains(IndexingFlags::TRANSACTIONS) {
Some(self.provider.get_transaction_by_hash(transaction_hash).await?)
} else {
None
};

self.process_transaction_with_events(
transaction_hash,
events.as_slice(),
block_number,
data.blocks[&block_number],
transaction,
&mut cursor_map,
)
.await?;
self.process_transaction_with_events(
transaction_hash,
events.as_slice(),
block_number,
data.blocks[&block_number],
transaction,
&mut cursor_map,
)
.await?;
}

// Process block
if !processed_blocks.contains(&block_number) {
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/indexer/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::processors::EventProcessorConfig;

pub const TASK_ID_SEQUENTIAL: TaskId = 0;

const LOG_TARGET: &str = "torii::indexer::task_manager";
const LOG_TARGET: &str = "torii_indexer::task_manager";

pub type TaskId = u64;
pub type TaskPriority = usize;
Expand Down

0 comments on commit f971dc1

Please sign in to comment.