From e1f02b75524308fa11c93eb60d37f45b06ee2070 Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 11 Feb 2025 09:42:29 +0800 Subject: [PATCH 01/10] chore(torii-client): expose token id --- crates/torii/grpc/src/types/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/torii/grpc/src/types/mod.rs b/crates/torii/grpc/src/types/mod.rs index da999276d6..71831c5a87 100644 --- a/crates/torii/grpc/src/types/mod.rs +++ b/crates/torii/grpc/src/types/mod.rs @@ -38,6 +38,7 @@ impl TryFrom for Controller { #[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] pub struct Token { + pub id: String, pub contract_address: Felt, pub name: String, pub symbol: String, @@ -49,6 +50,7 @@ impl TryFrom for Token { type Error = SchemaError; fn try_from(value: proto::types::Token) -> Result { Ok(Self { + id: value.token_id, contract_address: Felt::from_str(&value.contract_address)?, name: value.name, symbol: value.symbol, From c5b5ba4843184a5c97b20f5b5db4c61fd685696d Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 11 Feb 2025 10:03:55 +0800 Subject: [PATCH 02/10] add token sub to grpc client --- crates/torii/grpc/src/client.rs | 40 ++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/crates/torii/grpc/src/client.rs b/crates/torii/grpc/src/client.rs index 722c1664b3..0ee95d65eb 100644 --- a/crates/torii/grpc/src/client.rs +++ b/crates/torii/grpc/src/client.rs @@ -2,6 +2,7 @@ use std::num::ParseIntError; use std::time::Duration; +use crypto_bigint::U256; use futures_util::stream::MapOk; use futures_util::{Stream, StreamExt, TryStreamExt}; use starknet::core::types::{Felt, FromStrError, StateDiff, StateUpdate}; @@ -18,11 +19,11 @@ use crate::proto::world::{ SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse, SubscribeTokenBalancesResponse, UpdateEntitiesSubscriptionRequest, UpdateEventMessagesSubscriptionRequest, - UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, + UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, SubscribeTokensResponse }; use crate::types::schema::{Entity, SchemaError}; use crate::types::{ - EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, TokenBalance, + EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, TokenBalance, Token }; #[derive(Debug, thiserror::Error)] @@ -127,6 +128,18 @@ impl WorldClient { .map(|res| res.into_inner()) } + pub async fn subscribe_tokens( + &mut self, + contract_addresses: Vec, + ) -> Result { + let request = RetrieveTokensRequest { contract_addresses: contract_addresses.into_iter().map(|c| c.to_bytes_be().to_vec()).collect() }; + let stream = self.inner.subscribe_tokens(request).await.map_err(Error::Grpc).map(|res| res.into_inner())?; + Ok(TokenUpdateStreaming(stream.map_ok(Box::new(|res| match res.token { + Some(token) => token.try_into().expect("must able to serialize"), + None => Token { id: "".to_string(), contract_address: Felt::ZERO, name: "".to_string(), symbol: "".to_string(), decimals: 0, metadata: "".to_string() }, + })))) + } + pub async fn retrieve_token_balances( &mut self, account_addresses: Vec, @@ -338,7 +351,10 @@ impl WorldClient { .map_err(Error::Grpc) .map(|res| res.into_inner())?; Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| { - (res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize")) + (res.subscription_id, match res.balance { + Some(balance) => balance.try_into().expect("must able to serialize"), + None => TokenBalance { balance: U256::ZERO, account_address: Felt::ZERO, contract_address: Felt::ZERO, token_id: "".to_string() }, + }) })))) } @@ -368,6 +384,24 @@ impl WorldClient { } } +type TokenMappedStream = MapOk< + tonic::Streaming, + Box Token + Send>, +>; + +#[derive(Debug)] +pub struct TokenUpdateStreaming(TokenMappedStream); + +impl Stream for TokenUpdateStreaming { + type Item = ::Item; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.poll_next_unpin(cx) + } +} + type TokenBalanceMappedStream = MapOk< tonic::Streaming, Box (SubscriptionId, TokenBalance) + Send>, From 2e4a23a7b1acd0742ac9a103526211b17df6118e Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 11 Feb 2025 10:04:48 +0800 Subject: [PATCH 03/10] add to torii client --- crates/torii/client/src/client/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 58eaf6e4e3..d3df91ec9e 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -114,6 +114,13 @@ impl Client { Ok(tokens.into_iter().map(TryInto::try_into).collect::, _>>()?) } + /// A direct stream to grpc subscribe tokens + pub async fn on_token_updated(&self, contract_addresses: Vec) -> Result { + let mut grpc_client = self.inner.write().await; + let stream = grpc_client.subscribe_tokens(contract_addresses).await?; + Ok(stream) + } + /// Retrieves token balances for account addresses and contract addresses. pub async fn token_balances( &self, From 20039736e7dfd1e3faade735bed3795826fb6c66 Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 11 Feb 2025 15:43:38 +0800 Subject: [PATCH 04/10] update tokens subscription --- crates/torii/grpc/proto/world.proto | 16 ++++- crates/torii/grpc/src/client.rs | 72 +++++++++++++++---- crates/torii/grpc/src/server/mod.rs | 23 ++++-- .../grpc/src/server/subscriptions/token.rs | 31 ++++---- 4 files changed, 106 insertions(+), 36 deletions(-) diff --git a/crates/torii/grpc/proto/world.proto b/crates/torii/grpc/proto/world.proto index a2a7fd27ab..ae7ba8e2eb 100644 --- a/crates/torii/grpc/proto/world.proto +++ b/crates/torii/grpc/proto/world.proto @@ -43,6 +43,9 @@ service World { // Subscribe to token updates. rpc SubscribeTokens (RetrieveTokensRequest) returns (stream SubscribeTokensResponse); + // Update token subscription + rpc UpdateTokensSubscription (UpdateTokenSubscriptionRequest) returns (google.protobuf.Empty); + // Retrieve entities rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse); @@ -101,7 +104,18 @@ message RetrieveTokensResponse { // A response containing token updates message SubscribeTokensResponse { - types.Token token = 1; + // The subscription ID + uint64 subscription_id = 1; + // The token + types.Token token = 2; +} + +// A request to update a token subscription +message UpdateTokenSubscriptionRequest { + // The subscription ID + uint64 subscription_id = 1; + // The list of contract addresses to subscribe to + repeated bytes contract_addresses = 2; } // A request to retrieve token balances diff --git a/crates/torii/grpc/src/client.rs b/crates/torii/grpc/src/client.rs index 0ee95d65eb..6563b964bd 100644 --- a/crates/torii/grpc/src/client.rs +++ b/crates/torii/grpc/src/client.rs @@ -18,12 +18,13 @@ use crate::proto::world::{ SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse, SubscribeTokenBalancesResponse, - UpdateEntitiesSubscriptionRequest, UpdateEventMessagesSubscriptionRequest, - UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, SubscribeTokensResponse + SubscribeTokensResponse, UpdateEntitiesSubscriptionRequest, + UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest, + UpdateTokenSubscriptionRequest, WorldMetadataRequest, }; use crate::types::schema::{Entity, SchemaError}; use crate::types::{ - EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, TokenBalance, Token + EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, Token, TokenBalance, }; #[derive(Debug, thiserror::Error)] @@ -132,14 +133,51 @@ impl WorldClient { &mut self, contract_addresses: Vec, ) -> Result { - let request = RetrieveTokensRequest { contract_addresses: contract_addresses.into_iter().map(|c| c.to_bytes_be().to_vec()).collect() }; - let stream = self.inner.subscribe_tokens(request).await.map_err(Error::Grpc).map(|res| res.into_inner())?; - Ok(TokenUpdateStreaming(stream.map_ok(Box::new(|res| match res.token { - Some(token) => token.try_into().expect("must able to serialize"), - None => Token { id: "".to_string(), contract_address: Felt::ZERO, name: "".to_string(), symbol: "".to_string(), decimals: 0, metadata: "".to_string() }, + let request = RetrieveTokensRequest { + contract_addresses: contract_addresses + .into_iter() + .map(|c| c.to_bytes_be().to_vec()) + .collect(), + }; + let stream = self + .inner + .subscribe_tokens(request) + .await + .map_err(Error::Grpc) + .map(|res| res.into_inner())?; + Ok(TokenUpdateStreaming(stream.map_ok(Box::new(|res| { + ( + res.subscription_id, + match res.token { + Some(token) => token.try_into().expect("must able to serialize"), + None => Token { + id: "".to_string(), + contract_address: Felt::ZERO, + name: "".to_string(), + symbol: "".to_string(), + decimals: 0, + metadata: "".to_string(), + }, + }, + ) })))) } + pub async fn update_tokens_subscription( + &mut self, + subscription_id: u64, + contract_addresses: Vec, + ) -> Result<(), Error> { + let contract_addresses = + contract_addresses.into_iter().map(|c| c.to_bytes_be().to_vec()).collect(); + let request = UpdateTokenSubscriptionRequest { subscription_id, contract_addresses }; + self.inner + .update_tokens_subscription(request) + .await + .map_err(Error::Grpc) + .map(|res| res.into_inner()) + } + pub async fn retrieve_token_balances( &mut self, account_addresses: Vec, @@ -351,10 +389,18 @@ impl WorldClient { .map_err(Error::Grpc) .map(|res| res.into_inner())?; Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| { - (res.subscription_id, match res.balance { - Some(balance) => balance.try_into().expect("must able to serialize"), - None => TokenBalance { balance: U256::ZERO, account_address: Felt::ZERO, contract_address: Felt::ZERO, token_id: "".to_string() }, - }) + ( + res.subscription_id, + match res.balance { + Some(balance) => balance.try_into().expect("must able to serialize"), + None => TokenBalance { + balance: U256::ZERO, + account_address: Felt::ZERO, + contract_address: Felt::ZERO, + token_id: "".to_string(), + }, + }, + ) })))) } @@ -386,7 +432,7 @@ impl WorldClient { type TokenMappedStream = MapOk< tonic::Streaming, - Box Token + Send>, + Box (SubscriptionId, Token) + Send>, >; #[derive(Debug)] diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 731e73ed8d..ece449267a 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -57,13 +57,7 @@ use crate::proto::types::member_value::ValueType; use crate::proto::types::LogicalOperator; use crate::proto::world::world_server::WorldServer; use crate::proto::world::{ - RetrieveControllersRequest, RetrieveControllersResponse, RetrieveEntitiesStreamingResponse, - RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, RetrieveTokenBalancesResponse, - RetrieveTokensRequest, RetrieveTokensResponse, SubscribeEntitiesRequest, - SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, - SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse, - SubscribeTokensResponse, UpdateEventMessagesSubscriptionRequest, - UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse, + RetrieveControllersRequest, RetrieveControllersResponse, RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse, SubscribeTokensResponse, UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest, UpdateTokenSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse }; use crate::proto::{self}; use crate::types::schema::SchemaError; @@ -1406,6 +1400,21 @@ impl proto::world::world_server::World for DojoWorld { Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokensStream)) } + async fn update_tokens_subscription( + &self, + request: Request, + ) -> ServiceResult<()> { + let UpdateTokenSubscriptionRequest { subscription_id, contract_addresses } = request.into_inner(); + let contract_addresses = contract_addresses + .iter() + .map(|address| Felt::from_bytes_be_slice(address)) + .collect::>(); + self.token_manager + .update_subscriber(subscription_id, contract_addresses) + .await; + Ok(Response::new(())) + } + async fn retrieve_token_balances( &self, request: Request, diff --git a/crates/torii/grpc/src/server/subscriptions/token.rs b/crates/torii/grpc/src/server/subscriptions/token.rs index d223a21b3c..005e6f136b 100644 --- a/crates/torii/grpc/src/server/subscriptions/token.rs +++ b/crates/torii/grpc/src/server/subscriptions/token.rs @@ -20,7 +20,7 @@ use tracing::{error, trace}; use crate::proto; use crate::proto::world::SubscribeTokensResponse; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::balance"; +pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::token"; #[derive(Debug)] pub struct TokenSubscriber { @@ -45,7 +45,7 @@ impl TokenManager { let (sender, receiver) = channel(1); // Send initial empty response - let _ = sender.send(Ok(SubscribeTokensResponse { token: None })).await; + let _ = sender.send(Ok(SubscribeTokensResponse { subscription_id, token: None })).await; self.subscribers.write().await.insert( subscription_id, @@ -86,32 +86,32 @@ impl TokenManager { #[allow(missing_debug_implementations)] pub struct Service { simple_broker: Pin + Send>>, - balance_sender: UnboundedSender, + token_sender: UnboundedSender, } impl Service { pub fn new(subs_manager: Arc) -> Self { - let (balance_sender, balance_receiver) = unbounded_channel(); + let (token_sender, token_receiver) = unbounded_channel(); let service = - Self { simple_broker: Box::pin(SimpleBroker::::subscribe()), balance_sender }; + Self { simple_broker: Box::pin(SimpleBroker::::subscribe()), token_sender }; - tokio::spawn(Self::publish_updates(subs_manager, balance_receiver)); + tokio::spawn(Self::publish_updates(subs_manager, token_receiver)); service } async fn publish_updates( subs: Arc, - mut balance_receiver: UnboundedReceiver, + mut token_receiver: UnboundedReceiver, ) { - while let Some(balance) = balance_receiver.recv().await { - if let Err(e) = Self::process_balance_update(&subs, &balance).await { - error!(target = LOG_TARGET, error = %e, "Processing balance update."); + while let Some(token) = token_receiver.recv().await { + if let Err(e) = Self::process_token_update(&subs, &token).await { + error!(target = LOG_TARGET, error = %e, "Processing token update."); } } } - async fn process_balance_update(subs: &Arc, token: &Token) -> Result<(), Error> { + async fn process_token_update(subs: &Arc, token: &Token) -> Result<(), Error> { let mut closed_stream = Vec::new(); for (idx, sub) in subs.subscribers.read().await.iter() { @@ -126,6 +126,7 @@ impl Service { } let resp = SubscribeTokensResponse { + subscription_id: *idx, token: Some(proto::types::Token { token_id: token.id.clone(), contract_address: token.contract_address.clone(), @@ -142,7 +143,7 @@ impl Service { } for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing balance stream."); + trace!(target = LOG_TARGET, id = %id, "Closing token stream."); subs.remove_subscriber(id).await } @@ -156,9 +157,9 @@ impl Future for Service { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Poll::Ready(Some(balance)) = this.simple_broker.poll_next_unpin(cx) { - if let Err(e) = this.balance_sender.send(balance) { - error!(target = LOG_TARGET, error = %e, "Sending balance update to processor."); + while let Poll::Ready(Some(token)) = this.simple_broker.poll_next_unpin(cx) { + if let Err(e) = this.token_sender.send(token) { + error!(target = LOG_TARGET, error = %e, "Sending token update to processor."); } } From 1bab5dccb658cd677dee965086a41e09487ec527 Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 11 Feb 2025 15:55:00 +0800 Subject: [PATCH 05/10] fmt --- crates/torii/client/src/client/mod.rs | 5 ++- crates/torii/grpc/src/server/mod.rs | 16 +++++--- .../grpc/src/server/subscriptions/token.rs | 19 +++++---- .../src/server/subscriptions/token_balance.rs | 12 +++--- crates/torii/sqlite/src/executor/erc.rs | 7 +++- crates/torii/sqlite/src/executor/mod.rs | 41 ++++++------------- crates/torii/sqlite/src/types.rs | 21 ++++++++++ 7 files changed, 72 insertions(+), 49 deletions(-) diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index d3df91ec9e..56765c28fb 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -115,7 +115,10 @@ impl Client { } /// A direct stream to grpc subscribe tokens - pub async fn on_token_updated(&self, contract_addresses: Vec) -> Result { + pub async fn on_token_updated( + &self, + contract_addresses: Vec, + ) -> Result { let mut grpc_client = self.inner.write().await; let stream = grpc_client.subscribe_tokens(contract_addresses).await?; Ok(stream) diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index ece449267a..ed4a011103 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -57,7 +57,14 @@ use crate::proto::types::member_value::ValueType; use crate::proto::types::LogicalOperator; use crate::proto::world::world_server::WorldServer; use crate::proto::world::{ - RetrieveControllersRequest, RetrieveControllersResponse, RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse, SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse, SubscribeTokensResponse, UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest, UpdateTokenSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse + RetrieveControllersRequest, RetrieveControllersResponse, RetrieveEntitiesStreamingResponse, + RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, RetrieveTokenBalancesResponse, + RetrieveTokensRequest, RetrieveTokensResponse, SubscribeEntitiesRequest, + SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, + SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse, + SubscribeTokensResponse, UpdateEventMessagesSubscriptionRequest, + UpdateTokenBalancesSubscriptionRequest, UpdateTokenSubscriptionRequest, WorldMetadataRequest, + WorldMetadataResponse, }; use crate::proto::{self}; use crate::types::schema::SchemaError; @@ -1404,14 +1411,13 @@ impl proto::world::world_server::World for DojoWorld { &self, request: Request, ) -> ServiceResult<()> { - let UpdateTokenSubscriptionRequest { subscription_id, contract_addresses } = request.into_inner(); + let UpdateTokenSubscriptionRequest { subscription_id, contract_addresses } = + request.into_inner(); let contract_addresses = contract_addresses .iter() .map(|address| Felt::from_bytes_be_slice(address)) .collect::>(); - self.token_manager - .update_subscriber(subscription_id, contract_addresses) - .await; + self.token_manager.update_subscriber(subscription_id, contract_addresses).await; Ok(Response::new(())) } diff --git a/crates/torii/grpc/src/server/subscriptions/token.rs b/crates/torii/grpc/src/server/subscriptions/token.rs index 005e6f136b..37e2ecc868 100644 --- a/crates/torii/grpc/src/server/subscriptions/token.rs +++ b/crates/torii/grpc/src/server/subscriptions/token.rs @@ -14,7 +14,7 @@ use tokio::sync::mpsc::{ use tokio::sync::RwLock; use torii_sqlite::error::{Error, ParseError}; use torii_sqlite::simple_broker::SimpleBroker; -use torii_sqlite::types::Token; +use torii_sqlite::types::OptimisticToken; use tracing::{error, trace}; use crate::proto; @@ -85,15 +85,17 @@ impl TokenManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - simple_broker: Pin + Send>>, - token_sender: UnboundedSender, + simple_broker: Pin + Send>>, + token_sender: UnboundedSender, } impl Service { pub fn new(subs_manager: Arc) -> Self { let (token_sender, token_receiver) = unbounded_channel(); - let service = - Self { simple_broker: Box::pin(SimpleBroker::::subscribe()), token_sender }; + let service = Self { + simple_broker: Box::pin(SimpleBroker::::subscribe()), + token_sender, + }; tokio::spawn(Self::publish_updates(subs_manager, token_receiver)); @@ -102,7 +104,7 @@ impl Service { async fn publish_updates( subs: Arc, - mut token_receiver: UnboundedReceiver, + mut token_receiver: UnboundedReceiver, ) { while let Some(token) = token_receiver.recv().await { if let Err(e) = Self::process_token_update(&subs, &token).await { @@ -111,7 +113,10 @@ impl Service { } } - async fn process_token_update(subs: &Arc, token: &Token) -> Result<(), Error> { + async fn process_token_update( + subs: &Arc, + token: &OptimisticToken, + ) -> Result<(), Error> { let mut closed_stream = Vec::new(); for (idx, sub) in subs.subscribers.read().await.iter() { diff --git a/crates/torii/grpc/src/server/subscriptions/token_balance.rs b/crates/torii/grpc/src/server/subscriptions/token_balance.rs index e883d6fb0b..2e08cc2375 100644 --- a/crates/torii/grpc/src/server/subscriptions/token_balance.rs +++ b/crates/torii/grpc/src/server/subscriptions/token_balance.rs @@ -14,7 +14,7 @@ use tokio::sync::mpsc::{ use tokio::sync::RwLock; use torii_sqlite::error::{Error, ParseError}; use torii_sqlite::simple_broker::SimpleBroker; -use torii_sqlite::types::TokenBalance; +use torii_sqlite::types::OptimisticTokenBalance; use tracing::{error, trace}; use crate::proto; @@ -98,15 +98,15 @@ impl TokenBalanceManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - simple_broker: Pin + Send>>, - balance_sender: UnboundedSender, + simple_broker: Pin + Send>>, + balance_sender: UnboundedSender, } impl Service { pub fn new(subs_manager: Arc) -> Self { let (balance_sender, balance_receiver) = unbounded_channel(); let service = Self { - simple_broker: Box::pin(SimpleBroker::::subscribe()), + simple_broker: Box::pin(SimpleBroker::::subscribe()), balance_sender, }; @@ -117,7 +117,7 @@ impl Service { async fn publish_updates( subs: Arc, - mut balance_receiver: UnboundedReceiver, + mut balance_receiver: UnboundedReceiver, ) { while let Some(balance) = balance_receiver.recv().await { if let Err(e) = Self::process_balance_update(&subs, &balance).await { @@ -128,7 +128,7 @@ impl Service { async fn process_balance_update( subs: &Arc, - balance: &TokenBalance, + balance: &OptimisticTokenBalance, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index 4359206f8c..105bbdede8 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -16,7 +16,7 @@ use super::{ApplyBalanceDiffQuery, BrokerMessage, Executor}; use crate::constants::{SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE}; use crate::executor::LOG_TARGET; use crate::simple_broker::SimpleBroker; -use crate::types::{ContractType, Token, TokenBalance}; +use crate::types::{ContractType, OptimisticTokenBalance, Token, TokenBalance}; use crate::utils::{ felt_to_sql_string, fetch_content_from_ipfs, sanitize_json_string, sql_string_to_u256, u256_to_sql_string, I256, @@ -176,7 +176,10 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .await?; debug!(target: LOG_TARGET, token_balance = ?token_balance, "Applied balance diff"); - SimpleBroker::publish(token_balance); + SimpleBroker::publish(unsafe { + std::mem::transmute::(token_balance.clone()) + }); + self.publish_queue.push(BrokerMessage::TokenBalanceUpdated(token_balance)); Ok(()) } diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index c067feee55..2440454f6f 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -23,7 +23,7 @@ use crate::simple_broker::SimpleBroker; use crate::types::{ ContractCursor, ContractType, Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, OptimisticEntity, - OptimisticEventMessage, Token, + OptimisticEventMessage, Token, TokenBalance, }; use crate::utils::{felt_to_sql_string, I256}; @@ -49,6 +49,7 @@ pub enum BrokerMessage { EventMessageUpdated(EventMessageUpdated), EventEmitted(EventEmitted), TokenRegistered(Token), + TokenBalanceUpdated(TokenBalance), } #[derive(Debug, Clone)] @@ -511,19 +512,10 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { entity_updated.deleted = true; } - let optimistic_entity = OptimisticEntity { - id: entity_updated.id.clone(), - keys: entity_updated.keys.clone(), - event_id: entity_updated.event_id.clone(), - executed_at: entity_updated.executed_at, - created_at: entity_updated.created_at, - updated_at: entity_updated.updated_at, - updated_model: entity_updated.updated_model.clone(), - deleted: entity_updated.deleted, - }; - SimpleBroker::publish(optimistic_entity); - let broker_message = BrokerMessage::EntityUpdated(entity_updated); - self.publish_queue.push(broker_message); + SimpleBroker::publish(unsafe { + std::mem::transmute::(entity_updated.clone()) + }); + self.publish_queue.push(BrokerMessage::EntityUpdated(entity_updated)); } QueryType::RegisterModel => { let row = query.fetch_one(&mut **tx).await.with_context(|| { @@ -587,20 +579,12 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { event_message.updated_model = Some(em_query.ty); event_message.historical = em_query.is_historical; - let optimistic_event_message = OptimisticEventMessage { - id: event_message.id.clone(), - keys: event_message.keys.clone(), - event_id: event_message.event_id.clone(), - executed_at: event_message.executed_at, - created_at: event_message.created_at, - updated_at: event_message.updated_at, - updated_model: event_message.updated_model.clone(), - historical: event_message.historical, - }; - SimpleBroker::publish(optimistic_event_message); - - let broker_message = BrokerMessage::EventMessageUpdated(event_message); - self.publish_queue.push(broker_message); + SimpleBroker::publish(unsafe { + std::mem::transmute::( + event_message.clone(), + ) + }); + self.publish_queue.push(BrokerMessage::EventMessageUpdated(event_message)); } QueryType::StoreEvent => { let row = query.fetch_one(&mut **tx).await.with_context(|| { @@ -840,5 +824,6 @@ fn send_broker_message(message: BrokerMessage) { BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), BrokerMessage::TokenRegistered(token) => SimpleBroker::publish(token), + BrokerMessage::TokenBalanceUpdated(token_balance) => SimpleBroker::publish(token_balance), } } diff --git a/crates/torii/sqlite/src/types.rs b/crates/torii/sqlite/src/types.rs index f01829427d..759eb3ed13 100644 --- a/crates/torii/sqlite/src/types.rs +++ b/crates/torii/sqlite/src/types.rs @@ -124,6 +124,17 @@ pub struct Event { pub created_at: DateTime, } +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OptimisticToken { + pub id: String, + pub contract_address: String, + pub name: String, + pub symbol: String, + pub decimals: u8, + pub metadata: String, +} + #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Token { @@ -135,6 +146,16 @@ pub struct Token { pub metadata: String, } +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OptimisticTokenBalance { + pub id: String, + pub balance: String, + pub account_address: String, + pub contract_address: String, + pub token_id: String, +} + #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct TokenBalance { From 7e9eb78ebe9485e326313f41b13c9e48a7eef0a0 Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 11 Feb 2025 16:00:00 +0800 Subject: [PATCH 06/10] optimistic sub grpc --- crates/torii/sqlite/src/executor/erc.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index 105bbdede8..62183e9363 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -16,7 +16,7 @@ use super::{ApplyBalanceDiffQuery, BrokerMessage, Executor}; use crate::constants::{SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE}; use crate::executor::LOG_TARGET; use crate::simple_broker::SimpleBroker; -use crate::types::{ContractType, OptimisticTokenBalance, Token, TokenBalance}; +use crate::types::{ContractType, OptimisticToken, OptimisticTokenBalance, Token, TokenBalance}; use crate::utils::{ felt_to_sql_string, fetch_content_from_ipfs, sanitize_json_string, sql_string_to_u256, u256_to_sql_string, I256, @@ -360,6 +360,9 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .with_context(|| format!("Failed to execute721Token query: {:?}", result))?; if let Some(token) = token { + SimpleBroker::publish(unsafe { + std::mem::transmute::(token.clone()) + }); self.publish_queue.push(BrokerMessage::TokenRegistered(token)); } From 3ccb4239aeaa30d2ec2ff79d175b773532535576 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 12 Feb 2025 22:05:08 +0800 Subject: [PATCH 07/10] fix client build --- crates/torii/client/src/client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 56765c28fb..daf9ddbe29 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -11,7 +11,7 @@ use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; use tokio::sync::RwLock as AsyncRwLock; use torii_grpc::client::{ - EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming, TokenBalanceStreaming, + EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming, TokenBalanceStreaming, TokenUpdateStreaming, }; use torii_grpc::proto::world::{ RetrieveControllersResponse, RetrieveEntitiesResponse, RetrieveEventsResponse, From 70d7fc296e5b60675b6e9a9db26571a7602eb4c3 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 12 Feb 2025 22:05:20 +0800 Subject: [PATCH 08/10] fmt --- crates/torii/client/src/client/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index daf9ddbe29..a69d64782b 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -11,7 +11,8 @@ use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; use tokio::sync::RwLock as AsyncRwLock; use torii_grpc::client::{ - EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming, TokenBalanceStreaming, TokenUpdateStreaming, + EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming, TokenBalanceStreaming, + TokenUpdateStreaming, }; use torii_grpc::proto::world::{ RetrieveControllersResponse, RetrieveEntitiesResponse, RetrieveEventsResponse, From d645aa59c03275658974c09bf5f9ba814082593e Mon Sep 17 00:00:00 2001 From: glihm Date: Wed, 12 Feb 2025 11:26:14 -0600 Subject: [PATCH 09/10] ensure correct convertion from hex string to U256 --- crates/torii/grpc/src/types/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/torii/grpc/src/types/mod.rs b/crates/torii/grpc/src/types/mod.rs index 71831c5a87..eaf2684501 100644 --- a/crates/torii/grpc/src/types/mod.rs +++ b/crates/torii/grpc/src/types/mod.rs @@ -72,7 +72,8 @@ impl TryFrom for TokenBalance { type Error = SchemaError; fn try_from(value: proto::types::TokenBalance) -> Result { Ok(Self { - balance: U256::from_be_hex(&value.balance), + // Remove the "0x" prefix from the balance to be compatible with U256::from_be_hex. + balance: U256::from_be_hex(value.balance.trim_start_matches("0x")), account_address: Felt::from_str(&value.account_address)?, contract_address: Felt::from_str(&value.contract_address)?, token_id: value.token_id, From 657510e10cb889bce5e2e21a1236910043900450 Mon Sep 17 00:00:00 2001 From: glihm Date: Wed, 12 Feb 2025 11:33:22 -0600 Subject: [PATCH 10/10] fix registration of ERC20 by returning the inserted row --- crates/torii/sqlite/src/executor/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index 2dc57cc884..606f1de9d3 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -692,7 +692,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { QueryType::RegisterErc20Token(register_erc20_token) => { let query = sqlx::query_as::<_, Token>( "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, \ - ?, ?, ?, ?)", + ?, ?, ?, ?) RETURNING *", ) .bind(®ister_erc20_token.token_id) .bind(felt_to_sql_string(®ister_erc20_token.contract_address))