Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): erc options for max tasks & artifacts path #3061

Merged
merged 5 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions crates/torii/cli/src/args.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::path::PathBuf;

use anyhow::Result;
use camino::Utf8PathBuf;
use clap::Parser;
use dojo_utils::parse::parse_url;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -43,16 +42,15 @@
#[arg(long, help = "Configuration file to setup Torii.")]
pub config: Option<PathBuf>,

/// Path to a directory to store ERC artifacts
#[arg(long)]
pub artifacts_path: Option<Utf8PathBuf>,

#[command(flatten)]
pub indexing: IndexingOptions,

#[command(flatten)]
pub events: EventsOptions,

#[command(flatten)]
pub erc: ErcOptions,

#[cfg(feature = "server")]
#[command(flatten)]
pub metrics: MetricsOptions,
Expand Down Expand Up @@ -105,6 +103,10 @@
self.events = config.events.unwrap_or_default();
}

if self.erc == ErcOptions::default() {
self.erc = config.erc.unwrap_or_default();
}

#[cfg(feature = "server")]
{
if self.server == ServerOptions::default() {
Expand Down Expand Up @@ -133,6 +135,7 @@
pub explorer: Option<bool>,
pub indexing: Option<IndexingOptions>,
pub events: Option<EventsOptions>,
pub erc: Option<ErcOptions>,
#[cfg(feature = "server")]
pub metrics: Option<MetricsOptions>,
#[cfg(feature = "server")]
Expand Down Expand Up @@ -163,6 +166,7 @@
if args.indexing == IndexingOptions::default() { None } else { Some(args.indexing) };
config.events =
if args.events == EventsOptions::default() { None } else { Some(args.events) };
config.erc = if args.erc == ErcOptions::default() { None } else { Some(args.erc) };

Check warning on line 169 in crates/torii/cli/src/args.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/args.rs#L169

Added line #L169 was not covered by tests

#[cfg(feature = "server")]
{
Expand Down
36 changes: 33 additions & 3 deletions crates/torii/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::str::FromStr;

use anyhow::Context;
use camino::Utf8PathBuf;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use starknet::core::types::Felt;
Expand All @@ -15,11 +16,12 @@
pub const DEFAULT_BLOCKS_CHUNK_SIZE: u64 = 10240;
pub const DEFAULT_POLLING_INTERVAL: u64 = 500;
pub const DEFAULT_MAX_CONCURRENT_TASKS: usize = 100;

pub const DEFAULT_RELAY_PORT: u16 = 9090;
pub const DEFAULT_RELAY_WEBRTC_PORT: u16 = 9091;
pub const DEFAULT_RELAY_WEBSOCKET_PORT: u16 = 9092;

pub const DEFAULT_ERC_MAX_METADATA_TASKS: usize = 10;

#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)]
#[command(next_help_heading = "Relay options")]
pub struct RelayOptions {
Expand Down Expand Up @@ -118,11 +120,11 @@
#[serde(default = "default_polling_interval")]
pub polling_interval: u64,

/// Max concurrent tasks
/// Maximum number of concurrent tasks used for processing parallelizable events.
#[arg(
long = "indexing.max_concurrent_tasks",
default_value_t = DEFAULT_MAX_CONCURRENT_TASKS,
help = "Max concurrent tasks used to parallelize indexing."
help = "Maximum number of concurrent tasks processing parallelizable events."
)]
#[serde(default = "default_max_concurrent_tasks")]
pub max_concurrent_tasks: usize,
Expand Down Expand Up @@ -346,6 +348,30 @@
}
}

#[derive(Debug, clap::Args, Clone, Serialize, Deserialize, PartialEq)]
#[command(next_help_heading = "ERC options")]
pub struct ErcOptions {
/// The maximum number of concurrent tasks to use for indexing ERC721 and ERC1155 token
/// metadata.
#[arg(
long = "erc.max_metadata_tasks",
default_value_t = DEFAULT_ERC_MAX_METADATA_TASKS,
help = "The maximum number of concurrent tasks to use for indexing ERC721 and ERC1155 token metadata."
)]
#[serde(default = "default_erc_max_metadata_tasks")]
pub max_metadata_tasks: usize,

Check warning on line 362 in crates/torii/cli/src/options.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/options.rs#L362

Added line #L362 was not covered by tests

/// Path to a directory to store ERC artifacts
#[arg(long)]
pub artifacts_path: Option<Utf8PathBuf>,
}

impl Default for ErcOptions {
fn default() -> Self {
Self { max_metadata_tasks: DEFAULT_ERC_MAX_METADATA_TASKS, artifacts_path: None }
}
}

// Parses clap cli argument which is expected to be in the format:
// - erc_type:address:start_block
// - address:start_block (erc_type defaults to ERC20)
Expand Down Expand Up @@ -433,3 +459,7 @@
fn default_relay_websocket_port() -> u16 {
DEFAULT_RELAY_WEBSOCKET_PORT
}

fn default_erc_max_metadata_tasks() -> usize {
DEFAULT_ERC_MAX_METADATA_TASKS
}

Check warning on line 465 in crates/torii/cli/src/options.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/options.rs#L463-L465

Added lines #L463 - L465 were not covered by tests
3 changes: 2 additions & 1 deletion crates/torii/runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
pool.clone(),
shutdown_tx.clone(),
provider.clone(),
self.args.indexing.max_concurrent_tasks,
self.args.erc.max_metadata_tasks,

Check warning on line 145 in crates/torii/runner/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/runner/src/lib.rs#L145

Added line #L145 was not covered by tests
)
.await?;
let executor_handle = tokio::spawn(async move { executor.run().await });
Expand Down Expand Up @@ -208,6 +208,7 @@
let temp_dir = TempDir::new()?;
let artifacts_path = self
.args
.erc

Check warning on line 211 in crates/torii/runner/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/runner/src/lib.rs#L211

Added line #L211 was not covered by tests
.artifacts_path
.unwrap_or_else(|| Utf8PathBuf::from(temp_dir.path().to_str().unwrap()));

Expand Down
12 changes: 6 additions & 6 deletions crates/torii/sqlite/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub struct Executor<'c, P: Provider + Sync + Send + 'static> {
// It is used to make RPC calls to fetch token_uri data for erc721 contracts
provider: Arc<P>,
// Used to limit number of tasks that run in parallel to fetch metadata
semaphore: Arc<Semaphore>,
metadata_semaphore: Arc<Semaphore>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -234,13 +234,13 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
pool: Pool<Sqlite>,
shutdown_tx: Sender<()>,
provider: Arc<P>,
max_concurrent_tasks: usize,
max_metadata_tasks: usize,
) -> Result<(Self, UnboundedSender<QueryMessage>)> {
let (tx, rx) = unbounded_channel();
let transaction = pool.begin().await?;
let publish_queue = Vec::new();
let shutdown_rx = shutdown_tx.subscribe();
let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks));
let metadata_semaphore = Arc::new(Semaphore::new(max_metadata_tasks));

Ok((
Executor {
Expand All @@ -252,7 +252,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
register_tasks: JoinSet::new(),
deferred_query_messages: Vec::new(),
provider,
semaphore,
metadata_semaphore,
},
tx,
))
Expand Down Expand Up @@ -603,7 +603,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Applied balance diff.");
}
QueryType::RegisterNftToken(register_nft_token) => {
let semaphore = self.semaphore.clone();
let metadata_semaphore = self.metadata_semaphore.clone();
let provider = self.provider.clone();

let res = sqlx::query_as::<_, (String, String)>(&format!(
Expand Down Expand Up @@ -675,7 +675,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
};

self.register_tasks.spawn(async move {
let permit = semaphore.acquire().await.unwrap();
let permit = metadata_semaphore.acquire().await.unwrap();

let result = Self::process_register_nft_token_query(
register_nft_token,
Expand Down
Loading