From 7f4c2289bdf8f10c0589d112ba792086e40346e8 Mon Sep 17 00:00:00 2001 From: zhengpeng <847850277@qq.com> Date: Thu, 28 Nov 2024 11:15:58 +0800 Subject: [PATCH 1/4] [ISSUES#]2721 health_check_provider. --- crates/torii/core/src/engine.rs | 3 ++- crates/torii/core/src/utils.rs | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 86730665f3..5285d71f45 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -45,6 +45,7 @@ use crate::processors::{ }; use crate::sql::{Cursors, Sql}; use crate::types::{Contract, ContractType}; +use crate::utils::health_check_provider; type EventProcessorMap

= HashMap>>>; @@ -241,6 +242,7 @@ impl Engine

{ } pub async fn start(&mut self) -> Result<()> { + health_check_provider(self.provider.clone()).await; let mut backoff_delay = Duration::from_secs(1); let max_backoff_delay = Duration::from_secs(60); @@ -300,7 +302,6 @@ impl Engine

{ // TODO: since we now process blocks in chunks we can parallelize the fetching of data pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result { let latest_block = self.provider.block_hash_and_number().await?; - let from = cursors.head.unwrap_or(0); let total_remaining_blocks = latest_block.block_number - from; let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size); diff --git a/crates/torii/core/src/utils.rs b/crates/torii/core/src/utils.rs index 516e739e8a..665168cf64 100644 --- a/crates/torii/core/src/utils.rs +++ b/crates/torii/core/src/utils.rs @@ -1,11 +1,14 @@ +use std::sync::Arc; use std::time::Duration; use anyhow::Result; use chrono::{DateTime, Utc}; use futures_util::TryStreamExt; use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri}; +use starknet::core::types::{BlockId, BlockTag}; +use starknet::providers::Provider; use tokio_util::bytes::Bytes; -use tracing::info; +use tracing::{error, info}; use crate::constants::{ IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, @@ -20,7 +23,6 @@ pub fn must_utc_datetime_from_timestamp(timestamp: u64) -> DateTime { pub fn utc_dt_string_from_timestamp(timestamp: u64) -> String { must_utc_datetime_from_timestamp(timestamp).to_rfc3339() } - pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result { let client = IpfsClient::from_str(IPFS_CLIENT_URL)? .with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD); @@ -46,6 +48,18 @@ pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result( + provider: Arc

, +) { + let latest_block = provider.get_block_with_tx_hashes(BlockId::Tag(BlockTag::Latest)).await; + if let Ok(latest_block) = latest_block { + info!("Provider health_check latest_block is: {:?}", latest_block); + } else { + error!("Provider: {:?} is unhealthy. please check you config!", provider); + } +} + // tests #[cfg(test)] mod tests { From d8a489dea00e40b23d0d13f4bfd5d51de7109f45 Mon Sep 17 00:00:00 2001 From: zhengpeng <847850277@qq.com> Date: Thu, 28 Nov 2024 11:22:28 +0800 Subject: [PATCH 2/4] [ISSUES#]2721 health_check_provider. --- crates/torii/core/src/engine.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 5285d71f45..f88707726f 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -243,6 +243,7 @@ impl Engine

{ pub async fn start(&mut self) -> Result<()> { health_check_provider(self.provider.clone()).await; + let mut backoff_delay = Duration::from_secs(1); let max_backoff_delay = Duration::from_secs(60); @@ -302,6 +303,7 @@ impl Engine

{ // TODO: since we now process blocks in chunks we can parallelize the fetching of data pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result { let latest_block = self.provider.block_hash_and_number().await?; + let from = cursors.head.unwrap_or(0); let total_remaining_blocks = latest_block.block_number - from; let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size); From 14e9f34eb5bfc48bc57ee873cf6d453481232309 Mon Sep 17 00:00:00 2001 From: zhengpeng <847850277@qq.com> Date: Fri, 29 Nov 2024 11:28:01 +0800 Subject: [PATCH 3/4] [ISSUES#]2721 health_check_provider. --- crates/torii/core/src/engine.rs | 5 ++++- crates/torii/core/src/utils.rs | 24 ++++++++++++++---------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index f88707726f..2d1299ebda 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -242,7 +242,10 @@ impl Engine

{ } pub async fn start(&mut self) -> Result<()> { - health_check_provider(self.provider.clone()).await; + if let Err(e) = health_check_provider(self.provider.clone()).await { + error!(target: LOG_TARGET,"Provider health check failed during engine start"); + return Err(e); + } let mut backoff_delay = Duration::from_secs(1); let max_backoff_delay = Duration::from_secs(60); diff --git a/crates/torii/core/src/utils.rs b/crates/torii/core/src/utils.rs index 665168cf64..ace7efd5d6 100644 --- a/crates/torii/core/src/utils.rs +++ b/crates/torii/core/src/utils.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use std::time::Duration; use anyhow::Result; @@ -8,10 +7,10 @@ use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri}; use starknet::core::types::{BlockId, BlockTag}; use starknet::providers::Provider; use tokio_util::bytes::Bytes; -use tracing::{error, info}; +use tracing::info; use crate::constants::{ - IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, + IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, LOG_TARGET, }; pub fn must_utc_datetime_from_timestamp(timestamp: u64) -> DateTime { @@ -50,13 +49,18 @@ pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result( - provider: Arc

, -) { - let latest_block = provider.get_block_with_tx_hashes(BlockId::Tag(BlockTag::Latest)).await; - if let Ok(latest_block) = latest_block { - info!("Provider health_check latest_block is: {:?}", latest_block); - } else { - error!("Provider: {:?} is unhealthy. please check you config!", provider); + provider: P, +) -> Result<(), anyhow::Error> { + match provider.get_block_with_tx_hashes(BlockId::Tag(BlockTag::Latest)).await { + Ok(latest_block) => { + info!(target: LOG_TARGET, "health_check latest_block is: {:?}.", latest_block); + Ok(()) + } + Err(_) => { + let error_info = + format!("Unhealthy provider {:?}, please check your configuration.", provider); + Err(anyhow::anyhow!(error_info)) + } } } From 1c0680067d1ea192f8c867f866450fc4b6832e4b Mon Sep 17 00:00:00 2001 From: glihm Date: Fri, 29 Nov 2024 16:05:28 -0600 Subject: [PATCH 4/4] fix: change info to trace and refacto structured logging --- crates/torii/core/src/utils.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/torii/core/src/utils.rs b/crates/torii/core/src/utils.rs index ace7efd5d6..27f68f62d8 100644 --- a/crates/torii/core/src/utils.rs +++ b/crates/torii/core/src/utils.rs @@ -7,10 +7,10 @@ use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri}; use starknet::core::types::{BlockId, BlockTag}; use starknet::providers::Provider; use tokio_util::bytes::Bytes; -use tracing::info; +use tracing::{info, trace}; use crate::constants::{ - IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, LOG_TARGET, + IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, }; pub fn must_utc_datetime_from_timestamp(timestamp: u64) -> DateTime { @@ -52,8 +52,11 @@ pub async fn health_check_provider Result<(), anyhow::Error> { match provider.get_block_with_tx_hashes(BlockId::Tag(BlockTag::Latest)).await { - Ok(latest_block) => { - info!(target: LOG_TARGET, "health_check latest_block is: {:?}.", latest_block); + Ok(block) => { + trace!( + latest_block = ?block, + "Provider health check." + ); Ok(()) } Err(_) => {