Skip to content

Commit

Permalink
[ISSUES#]2721 health_check_provider.
Browse files Browse the repository at this point in the history
  • Loading branch information
847850277 committed Nov 28, 2024
1 parent ebcc23d commit 7f4c228
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
3 changes: 2 additions & 1 deletion crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::processors::{
};
use crate::sql::{Cursors, Sql};
use crate::types::{Contract, ContractType};
use crate::utils::health_check_provider;

type EventProcessorMap<P> = HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>>;

Expand Down Expand Up @@ -241,6 +242,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

pub async fn start(&mut self) -> Result<()> {
health_check_provider(self.provider.clone()).await;
let mut backoff_delay = Duration::from_secs(1);
let max_backoff_delay = Duration::from_secs(60);

Expand Down Expand Up @@ -300,7 +302,6 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// TODO: since we now process blocks in chunks we can parallelize the fetching of data
pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> {
let latest_block = self.provider.block_hash_and_number().await?;

let from = cursors.head.unwrap_or(0);
let total_remaining_blocks = latest_block.block_number - from;
let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size);
Expand Down
18 changes: 16 additions & 2 deletions crates/torii/core/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri};
use starknet::core::types::{BlockId, BlockTag};
use starknet::providers::Provider;
use tokio_util::bytes::Bytes;
use tracing::info;
use tracing::{error, info};

use crate::constants::{
IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME,
Expand All @@ -20,7 +23,6 @@ pub fn must_utc_datetime_from_timestamp(timestamp: u64) -> DateTime<Utc> {
pub fn utc_dt_string_from_timestamp(timestamp: u64) -> String {
must_utc_datetime_from_timestamp(timestamp).to_rfc3339()
}

pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result<Bytes> {
let client = IpfsClient::from_str(IPFS_CLIENT_URL)?
.with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD);
Expand All @@ -46,6 +48,18 @@ pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result<Bytes
IPFS_CLIENT_MAX_RETRY, cid
)))
}

pub async fn health_check_provider<P: Provider + Sync + std::fmt::Debug + 'static>(
provider: Arc<P>,
) {
let latest_block = provider.get_block_with_tx_hashes(BlockId::Tag(BlockTag::Latest)).await;
if let Ok(latest_block) = latest_block {
info!("Provider health_check latest_block is: {:?}", latest_block);
} else {
error!("Provider: {:?} is unhealthy. please check you config!", provider);
}
}

// tests
#[cfg(test)]
mod tests {
Expand Down

0 comments on commit 7f4c228

Please sign in to comment.