From 21b1c79568284a36369c088d51d02aea95e97cab Mon Sep 17 00:00:00 2001 From: Larko <59736843+Larkooo@users.noreply.github.com> Date: Thu, 27 Feb 2025 01:39:24 +0800 Subject: [PATCH] feat(torii): erc options for max tasks & artifacts path (#3061) * feat(torii): erc options for max tasks & artifacts path * fmt * fix naming * fmt * scope erc metadat tasks semaphore --- crates/torii/cli/src/args.rs | 14 ++++++---- crates/torii/cli/src/options.rs | 36 ++++++++++++++++++++++--- crates/torii/runner/src/lib.rs | 3 ++- crates/torii/sqlite/src/executor/mod.rs | 12 ++++----- 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/crates/torii/cli/src/args.rs b/crates/torii/cli/src/args.rs index 1d451cc0a4..87d92df01f 100644 --- a/crates/torii/cli/src/args.rs +++ b/crates/torii/cli/src/args.rs @@ -1,7 +1,6 @@ use std::path::PathBuf; use anyhow::Result; -use camino::Utf8PathBuf; use clap::Parser; use dojo_utils::parse::parse_url; use serde::{Deserialize, Serialize}; @@ -43,16 +42,15 @@ pub struct ToriiArgs { #[arg(long, help = "Configuration file to setup Torii.")] pub config: Option, - /// Path to a directory to store ERC artifacts - #[arg(long)] - pub artifacts_path: Option, - #[command(flatten)] pub indexing: IndexingOptions, #[command(flatten)] pub events: EventsOptions, + #[command(flatten)] + pub erc: ErcOptions, + #[cfg(feature = "server")] #[command(flatten)] pub metrics: MetricsOptions, @@ -105,6 +103,10 @@ impl ToriiArgs { self.events = config.events.unwrap_or_default(); } + if self.erc == ErcOptions::default() { + self.erc = config.erc.unwrap_or_default(); + } + #[cfg(feature = "server")] { if self.server == ServerOptions::default() { @@ -133,6 +135,7 @@ pub struct ToriiArgsConfig { pub explorer: Option, pub indexing: Option, pub events: Option, + pub erc: Option, #[cfg(feature = "server")] pub metrics: Option, #[cfg(feature = "server")] @@ -163,6 +166,7 @@ impl TryFrom for ToriiArgsConfig { if args.indexing == IndexingOptions::default() { None } else { Some(args.indexing) }; config.events = if args.events == EventsOptions::default() { None } else { Some(args.events) }; + config.erc = if args.erc == ErcOptions::default() { None } else { Some(args.erc) }; #[cfg(feature = "server")] { diff --git a/crates/torii/cli/src/options.rs b/crates/torii/cli/src/options.rs index 22db9cd4dd..152086ec63 100644 --- a/crates/torii/cli/src/options.rs +++ b/crates/torii/cli/src/options.rs @@ -2,6 +2,7 @@ use std::net::{IpAddr, Ipv4Addr}; use std::str::FromStr; use anyhow::Context; +use camino::Utf8PathBuf; use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; use starknet::core::types::Felt; @@ -15,11 +16,12 @@ pub const DEFAULT_EVENTS_CHUNK_SIZE: u64 = 1024; pub const DEFAULT_BLOCKS_CHUNK_SIZE: u64 = 10240; pub const DEFAULT_POLLING_INTERVAL: u64 = 500; pub const DEFAULT_MAX_CONCURRENT_TASKS: usize = 100; - pub const DEFAULT_RELAY_PORT: u16 = 9090; pub const DEFAULT_RELAY_WEBRTC_PORT: u16 = 9091; pub const DEFAULT_RELAY_WEBSOCKET_PORT: u16 = 9092; +pub const DEFAULT_ERC_MAX_METADATA_TASKS: usize = 10; + #[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)] #[command(next_help_heading = "Relay options")] pub struct RelayOptions { @@ -118,11 +120,11 @@ pub struct IndexingOptions { #[serde(default = "default_polling_interval")] pub polling_interval: u64, - /// Max concurrent tasks + /// Maximum number of concurrent tasks used for processing parallelizable events. #[arg( long = "indexing.max_concurrent_tasks", default_value_t = DEFAULT_MAX_CONCURRENT_TASKS, - help = "Max concurrent tasks used to parallelize indexing." + help = "Maximum number of concurrent tasks processing parallelizable events." )] #[serde(default = "default_max_concurrent_tasks")] pub max_concurrent_tasks: usize, @@ -346,6 +348,30 @@ impl Default for MetricsOptions { } } +#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)] +#[command(next_help_heading = "ERC options")] +pub struct ErcOptions { + /// The maximum number of concurrent tasks to use for indexing ERC721 and ERC1155 token + /// metadata. + #[arg( + long = "erc.max_metadata_tasks", + default_value_t = DEFAULT_ERC_MAX_METADATA_TASKS, + help = "The maximum number of concurrent tasks to use for indexing ERC721 and ERC1155 token metadata." + )] + #[serde(default = "default_erc_max_metadata_tasks")] + pub max_metadata_tasks: usize, + + /// Path to a directory to store ERC artifacts + #[arg(long)] + pub artifacts_path: Option, +} + +impl Default for ErcOptions { + fn default() -> Self { + Self { max_metadata_tasks: DEFAULT_ERC_MAX_METADATA_TASKS, artifacts_path: None } + } +} + // Parses clap cli argument which is expected to be in the format: // - erc_type:address:start_block // - address:start_block (erc_type defaults to ERC20) @@ -433,3 +459,7 @@ fn default_relay_webrtc_port() -> u16 { fn default_relay_websocket_port() -> u16 { DEFAULT_RELAY_WEBSOCKET_PORT } + +fn default_erc_max_metadata_tasks() -> usize { + DEFAULT_ERC_MAX_METADATA_TASKS +} diff --git a/crates/torii/runner/src/lib.rs b/crates/torii/runner/src/lib.rs index ccc74d6863..3faa6ec409 100644 --- a/crates/torii/runner/src/lib.rs +++ b/crates/torii/runner/src/lib.rs @@ -142,7 +142,7 @@ impl Runner { pool.clone(), shutdown_tx.clone(), provider.clone(), - self.args.indexing.max_concurrent_tasks, + self.args.erc.max_metadata_tasks, ) .await?; let executor_handle = tokio::spawn(async move { executor.run().await }); @@ -208,6 +208,7 @@ impl Runner { let temp_dir = TempDir::new()?; let artifacts_path = self .args + .erc .artifacts_path .unwrap_or_else(|| Utf8PathBuf::from(temp_dir.path().to_str().unwrap())); diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index 29d9983bdf..88773dd076 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -143,7 +143,7 @@ pub struct Executor<'c, P: Provider + Sync + Send + 'static> { // It is used to make RPC calls to fetch token_uri data for erc721 contracts provider: Arc

, // Used to limit number of tasks that run in parallel to fetch metadata - semaphore: Arc, + metadata_semaphore: Arc, } #[derive(Debug)] @@ -234,13 +234,13 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { pool: Pool, shutdown_tx: Sender<()>, provider: Arc

, - max_concurrent_tasks: usize, + max_metadata_tasks: usize, ) -> Result<(Self, UnboundedSender)> { let (tx, rx) = unbounded_channel(); let transaction = pool.begin().await?; let publish_queue = Vec::new(); let shutdown_rx = shutdown_tx.subscribe(); - let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks)); + let metadata_semaphore = Arc::new(Semaphore::new(max_metadata_tasks)); Ok(( Executor { @@ -252,7 +252,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { register_tasks: JoinSet::new(), deferred_query_messages: Vec::new(), provider, - semaphore, + metadata_semaphore, }, tx, )) @@ -603,7 +603,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Applied balance diff."); } QueryType::RegisterNftToken(register_nft_token) => { - let semaphore = self.semaphore.clone(); + let metadata_semaphore = self.metadata_semaphore.clone(); let provider = self.provider.clone(); let res = sqlx::query_as::<_, (String, String)>(&format!( @@ -675,7 +675,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { }; self.register_tasks.spawn(async move { - let permit = semaphore.acquire().await.unwrap(); + let permit = metadata_semaphore.acquire().await.unwrap(); let result = Self::process_register_nft_token_query( register_nft_token,