diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 58eaf6e4e3..a69d64782b 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -12,6 +12,7 @@ use starknet::providers::JsonRpcClient; use tokio::sync::RwLock as AsyncRwLock; use torii_grpc::client::{ EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming, TokenBalanceStreaming, + TokenUpdateStreaming, }; use torii_grpc::proto::world::{ RetrieveControllersResponse, RetrieveEntitiesResponse, RetrieveEventsResponse, @@ -114,6 +115,16 @@ impl Client { Ok(tokens.into_iter().map(TryInto::try_into).collect::, _>>()?) } + /// A direct stream to grpc subscribe tokens + pub async fn on_token_updated( + &self, + contract_addresses: Vec, + ) -> Result { + let mut grpc_client = self.inner.write().await; + let stream = grpc_client.subscribe_tokens(contract_addresses).await?; + Ok(stream) + } + /// Retrieves token balances for account addresses and contract addresses. pub async fn token_balances( &self, diff --git a/crates/torii/grpc/proto/world.proto b/crates/torii/grpc/proto/world.proto index a2a7fd27ab..ae7ba8e2eb 100644 --- a/crates/torii/grpc/proto/world.proto +++ b/crates/torii/grpc/proto/world.proto @@ -43,6 +43,9 @@ service World { // Subscribe to token updates. rpc SubscribeTokens (RetrieveTokensRequest) returns (stream SubscribeTokensResponse); + // Update token subscription + rpc UpdateTokensSubscription (UpdateTokenSubscriptionRequest) returns (google.protobuf.Empty); + // Retrieve entities rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse); @@ -101,7 +104,18 @@ message RetrieveTokensResponse { // A response containing token updates message SubscribeTokensResponse { - types.Token token = 1; + // The subscription ID + uint64 subscription_id = 1; + // The token + types.Token token = 2; +} + +// A request to update a token subscription +message UpdateTokenSubscriptionRequest { + // The subscription ID + uint64 subscription_id = 1; + // The list of contract addresses to subscribe to + repeated bytes contract_addresses = 2; } // A request to retrieve token balances diff --git a/crates/torii/grpc/src/client.rs b/crates/torii/grpc/src/client.rs index ce15feb1d9..cdf87370ee 100644 --- a/crates/torii/grpc/src/client.rs +++ b/crates/torii/grpc/src/client.rs @@ -3,6 +3,7 @@ use std::num::ParseIntError; #[cfg(not(target_arch = "wasm32"))] use std::time::Duration; +use crypto_bigint::U256; use futures_util::stream::MapOk; use futures_util::{Stream, StreamExt, TryStreamExt}; use starknet::core::types::{Felt, FromStrError, StateDiff, StateUpdate}; @@ -18,12 +19,13 @@ use crate::proto::world::{ SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse, SubscribeTokenBalancesResponse, - UpdateEntitiesSubscriptionRequest, UpdateEventMessagesSubscriptionRequest, - UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, + SubscribeTokensResponse, UpdateEntitiesSubscriptionRequest, + UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest, + UpdateTokenSubscriptionRequest, WorldMetadataRequest, }; use crate::types::schema::{Entity, SchemaError}; use crate::types::{ - EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, TokenBalance, + EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, Token, TokenBalance, }; #[derive(Debug, thiserror::Error)] @@ -128,6 +130,55 @@ impl WorldClient { .map(|res| res.into_inner()) } + pub async fn subscribe_tokens( + &mut self, + contract_addresses: Vec, + ) -> Result { + let request = RetrieveTokensRequest { + contract_addresses: contract_addresses + .into_iter() + .map(|c| c.to_bytes_be().to_vec()) + .collect(), + }; + let stream = self + .inner + .subscribe_tokens(request) + .await + .map_err(Error::Grpc) + .map(|res| res.into_inner())?; + Ok(TokenUpdateStreaming(stream.map_ok(Box::new(|res| { + ( + res.subscription_id, + match res.token { + Some(token) => token.try_into().expect("must able to serialize"), + None => Token { + id: "".to_string(), + contract_address: Felt::ZERO, + name: "".to_string(), + symbol: "".to_string(), + decimals: 0, + metadata: "".to_string(), + }, + }, + ) + })))) + } + + pub async fn update_tokens_subscription( + &mut self, + subscription_id: u64, + contract_addresses: Vec, + ) -> Result<(), Error> { + let contract_addresses = + contract_addresses.into_iter().map(|c| c.to_bytes_be().to_vec()).collect(); + let request = UpdateTokenSubscriptionRequest { subscription_id, contract_addresses }; + self.inner + .update_tokens_subscription(request) + .await + .map_err(Error::Grpc) + .map(|res| res.into_inner()) + } + pub async fn retrieve_token_balances( &mut self, account_addresses: Vec, @@ -339,7 +390,18 @@ impl WorldClient { .map_err(Error::Grpc) .map(|res| res.into_inner())?; Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| { - (res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize")) + ( + res.subscription_id, + match res.balance { + Some(balance) => balance.try_into().expect("must able to serialize"), + None => TokenBalance { + balance: U256::ZERO, + account_address: Felt::ZERO, + contract_address: Felt::ZERO, + token_id: "".to_string(), + }, + }, + ) })))) } @@ -369,6 +431,24 @@ impl WorldClient { } } +type TokenMappedStream = MapOk< + tonic::Streaming, + Box (SubscriptionId, Token) + Send>, +>; + +#[derive(Debug)] +pub struct TokenUpdateStreaming(TokenMappedStream); + +impl Stream for TokenUpdateStreaming { + type Item = ::Item; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.poll_next_unpin(cx) + } +} + type TokenBalanceMappedStream = MapOk< tonic::Streaming, Box (SubscriptionId, TokenBalance) + Send>, diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 731e73ed8d..ed4a011103 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -63,7 +63,8 @@ use crate::proto::world::{ SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse, SubscribeTokensResponse, UpdateEventMessagesSubscriptionRequest, - UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse, + UpdateTokenBalancesSubscriptionRequest, UpdateTokenSubscriptionRequest, WorldMetadataRequest, + WorldMetadataResponse, }; use crate::proto::{self}; use crate::types::schema::SchemaError; @@ -1406,6 +1407,20 @@ impl proto::world::world_server::World for DojoWorld { Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokensStream)) } + async fn update_tokens_subscription( + &self, + request: Request, + ) -> ServiceResult<()> { + let UpdateTokenSubscriptionRequest { subscription_id, contract_addresses } = + request.into_inner(); + let contract_addresses = contract_addresses + .iter() + .map(|address| Felt::from_bytes_be_slice(address)) + .collect::>(); + self.token_manager.update_subscriber(subscription_id, contract_addresses).await; + Ok(Response::new(())) + } + async fn retrieve_token_balances( &self, request: Request, diff --git a/crates/torii/grpc/src/server/subscriptions/token.rs b/crates/torii/grpc/src/server/subscriptions/token.rs index d223a21b3c..37e2ecc868 100644 --- a/crates/torii/grpc/src/server/subscriptions/token.rs +++ b/crates/torii/grpc/src/server/subscriptions/token.rs @@ -14,13 +14,13 @@ use tokio::sync::mpsc::{ use tokio::sync::RwLock; use torii_sqlite::error::{Error, ParseError}; use torii_sqlite::simple_broker::SimpleBroker; -use torii_sqlite::types::Token; +use torii_sqlite::types::OptimisticToken; use tracing::{error, trace}; use crate::proto; use crate::proto::world::SubscribeTokensResponse; -pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::balance"; +pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::token"; #[derive(Debug)] pub struct TokenSubscriber { @@ -45,7 +45,7 @@ impl TokenManager { let (sender, receiver) = channel(1); // Send initial empty response - let _ = sender.send(Ok(SubscribeTokensResponse { token: None })).await; + let _ = sender.send(Ok(SubscribeTokensResponse { subscription_id, token: None })).await; self.subscribers.write().await.insert( subscription_id, @@ -85,33 +85,38 @@ impl TokenManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - simple_broker: Pin + Send>>, - balance_sender: UnboundedSender, + simple_broker: Pin + Send>>, + token_sender: UnboundedSender, } impl Service { pub fn new(subs_manager: Arc) -> Self { - let (balance_sender, balance_receiver) = unbounded_channel(); - let service = - Self { simple_broker: Box::pin(SimpleBroker::::subscribe()), balance_sender }; + let (token_sender, token_receiver) = unbounded_channel(); + let service = Self { + simple_broker: Box::pin(SimpleBroker::::subscribe()), + token_sender, + }; - tokio::spawn(Self::publish_updates(subs_manager, balance_receiver)); + tokio::spawn(Self::publish_updates(subs_manager, token_receiver)); service } async fn publish_updates( subs: Arc, - mut balance_receiver: UnboundedReceiver, + mut token_receiver: UnboundedReceiver, ) { - while let Some(balance) = balance_receiver.recv().await { - if let Err(e) = Self::process_balance_update(&subs, &balance).await { - error!(target = LOG_TARGET, error = %e, "Processing balance update."); + while let Some(token) = token_receiver.recv().await { + if let Err(e) = Self::process_token_update(&subs, &token).await { + error!(target = LOG_TARGET, error = %e, "Processing token update."); } } } - async fn process_balance_update(subs: &Arc, token: &Token) -> Result<(), Error> { + async fn process_token_update( + subs: &Arc, + token: &OptimisticToken, + ) -> Result<(), Error> { let mut closed_stream = Vec::new(); for (idx, sub) in subs.subscribers.read().await.iter() { @@ -126,6 +131,7 @@ impl Service { } let resp = SubscribeTokensResponse { + subscription_id: *idx, token: Some(proto::types::Token { token_id: token.id.clone(), contract_address: token.contract_address.clone(), @@ -142,7 +148,7 @@ impl Service { } for id in closed_stream { - trace!(target = LOG_TARGET, id = %id, "Closing balance stream."); + trace!(target = LOG_TARGET, id = %id, "Closing token stream."); subs.remove_subscriber(id).await } @@ -156,9 +162,9 @@ impl Future for Service { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Poll::Ready(Some(balance)) = this.simple_broker.poll_next_unpin(cx) { - if let Err(e) = this.balance_sender.send(balance) { - error!(target = LOG_TARGET, error = %e, "Sending balance update to processor."); + while let Poll::Ready(Some(token)) = this.simple_broker.poll_next_unpin(cx) { + if let Err(e) = this.token_sender.send(token) { + error!(target = LOG_TARGET, error = %e, "Sending token update to processor."); } } diff --git a/crates/torii/grpc/src/server/subscriptions/token_balance.rs b/crates/torii/grpc/src/server/subscriptions/token_balance.rs index e883d6fb0b..2e08cc2375 100644 --- a/crates/torii/grpc/src/server/subscriptions/token_balance.rs +++ b/crates/torii/grpc/src/server/subscriptions/token_balance.rs @@ -14,7 +14,7 @@ use tokio::sync::mpsc::{ use tokio::sync::RwLock; use torii_sqlite::error::{Error, ParseError}; use torii_sqlite::simple_broker::SimpleBroker; -use torii_sqlite::types::TokenBalance; +use torii_sqlite::types::OptimisticTokenBalance; use tracing::{error, trace}; use crate::proto; @@ -98,15 +98,15 @@ impl TokenBalanceManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - simple_broker: Pin + Send>>, - balance_sender: UnboundedSender, + simple_broker: Pin + Send>>, + balance_sender: UnboundedSender, } impl Service { pub fn new(subs_manager: Arc) -> Self { let (balance_sender, balance_receiver) = unbounded_channel(); let service = Self { - simple_broker: Box::pin(SimpleBroker::::subscribe()), + simple_broker: Box::pin(SimpleBroker::::subscribe()), balance_sender, }; @@ -117,7 +117,7 @@ impl Service { async fn publish_updates( subs: Arc, - mut balance_receiver: UnboundedReceiver, + mut balance_receiver: UnboundedReceiver, ) { while let Some(balance) = balance_receiver.recv().await { if let Err(e) = Self::process_balance_update(&subs, &balance).await { @@ -128,7 +128,7 @@ impl Service { async fn process_balance_update( subs: &Arc, - balance: &TokenBalance, + balance: &OptimisticTokenBalance, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); diff --git a/crates/torii/grpc/src/types/mod.rs b/crates/torii/grpc/src/types/mod.rs index da999276d6..eaf2684501 100644 --- a/crates/torii/grpc/src/types/mod.rs +++ b/crates/torii/grpc/src/types/mod.rs @@ -38,6 +38,7 @@ impl TryFrom for Controller { #[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] pub struct Token { + pub id: String, pub contract_address: Felt, pub name: String, pub symbol: String, @@ -49,6 +50,7 @@ impl TryFrom for Token { type Error = SchemaError; fn try_from(value: proto::types::Token) -> Result { Ok(Self { + id: value.token_id, contract_address: Felt::from_str(&value.contract_address)?, name: value.name, symbol: value.symbol, @@ -70,7 +72,8 @@ impl TryFrom for TokenBalance { type Error = SchemaError; fn try_from(value: proto::types::TokenBalance) -> Result { Ok(Self { - balance: U256::from_be_hex(&value.balance), + // Remove the "0x" prefix from the balance to be compatible with U256::from_be_hex. + balance: U256::from_be_hex(value.balance.trim_start_matches("0x")), account_address: Felt::from_str(&value.account_address)?, contract_address: Felt::from_str(&value.contract_address)?, token_id: value.token_id, diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index d724f6b504..5037826484 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -16,7 +16,7 @@ use super::{ApplyBalanceDiffQuery, BrokerMessage, Executor}; use crate::constants::{SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE}; use crate::executor::LOG_TARGET; use crate::simple_broker::SimpleBroker; -use crate::types::{ContractType, Token, TokenBalance}; +use crate::types::{ContractType, OptimisticToken, OptimisticTokenBalance, Token, TokenBalance}; use crate::utils::{ felt_to_sql_string, fetch_content_from_ipfs, sanitize_json_string, sql_string_to_u256, u256_to_sql_string, I256, @@ -196,7 +196,10 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .await?; debug!(target: LOG_TARGET, token_balance = ?token_balance, "Applied balance diff"); - SimpleBroker::publish(token_balance); + SimpleBroker::publish(unsafe { + std::mem::transmute::(token_balance.clone()) + }); + self.publish_queue.push(BrokerMessage::TokenBalanceUpdated(token_balance)); Ok(()) } @@ -407,6 +410,9 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .with_context(|| format!("Failed to execute721Token query: {:?}", result))?; if let Some(token) = token { + SimpleBroker::publish(unsafe { + std::mem::transmute::(token.clone()) + }); self.publish_queue.push(BrokerMessage::TokenRegistered(token)); } diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index 2b1883589d..606f1de9d3 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -24,7 +24,7 @@ use crate::simple_broker::SimpleBroker; use crate::types::{ ContractCursor, ContractType, Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, OptimisticEntity, - OptimisticEventMessage, Token, + OptimisticEventMessage, Token, TokenBalance, }; use crate::utils::{felt_to_sql_string, I256}; @@ -50,6 +50,7 @@ pub enum BrokerMessage { EventMessageUpdated(EventMessageUpdated), EventEmitted(EventEmitted), TokenRegistered(Token), + TokenBalanceUpdated(TokenBalance), } #[derive(Debug, Clone)] @@ -512,19 +513,10 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { entity_updated.deleted = true; } - let optimistic_entity = OptimisticEntity { - id: entity_updated.id.clone(), - keys: entity_updated.keys.clone(), - event_id: entity_updated.event_id.clone(), - executed_at: entity_updated.executed_at, - created_at: entity_updated.created_at, - updated_at: entity_updated.updated_at, - updated_model: entity_updated.updated_model.clone(), - deleted: entity_updated.deleted, - }; - SimpleBroker::publish(optimistic_entity); - let broker_message = BrokerMessage::EntityUpdated(entity_updated); - self.publish_queue.push(broker_message); + SimpleBroker::publish(unsafe { + std::mem::transmute::(entity_updated.clone()) + }); + self.publish_queue.push(BrokerMessage::EntityUpdated(entity_updated)); } QueryType::RegisterModel => { let row = query.fetch_one(&mut **tx).await.with_context(|| { @@ -588,20 +580,12 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { event_message.updated_model = Some(em_query.ty); event_message.historical = em_query.is_historical; - let optimistic_event_message = OptimisticEventMessage { - id: event_message.id.clone(), - keys: event_message.keys.clone(), - event_id: event_message.event_id.clone(), - executed_at: event_message.executed_at, - created_at: event_message.created_at, - updated_at: event_message.updated_at, - updated_model: event_message.updated_model.clone(), - historical: event_message.historical, - }; - SimpleBroker::publish(optimistic_event_message); - - let broker_message = BrokerMessage::EventMessageUpdated(event_message); - self.publish_queue.push(broker_message); + SimpleBroker::publish(unsafe { + std::mem::transmute::( + event_message.clone(), + ) + }); + self.publish_queue.push(BrokerMessage::EventMessageUpdated(event_message)); } QueryType::StoreEvent => { let row = query.fetch_one(&mut **tx).await.with_context(|| { @@ -708,7 +692,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { QueryType::RegisterErc20Token(register_erc20_token) => { let query = sqlx::query_as::<_, Token>( "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, \ - ?, ?, ?, ?)", + ?, ?, ?, ?) RETURNING *", ) .bind(®ister_erc20_token.token_id) .bind(felt_to_sql_string(®ister_erc20_token.contract_address)) @@ -845,5 +829,6 @@ fn send_broker_message(message: BrokerMessage) { BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), BrokerMessage::TokenRegistered(token) => SimpleBroker::publish(token), + BrokerMessage::TokenBalanceUpdated(token_balance) => SimpleBroker::publish(token_balance), } } diff --git a/crates/torii/sqlite/src/types.rs b/crates/torii/sqlite/src/types.rs index ab76bbd9e5..b9214b6a75 100644 --- a/crates/torii/sqlite/src/types.rs +++ b/crates/torii/sqlite/src/types.rs @@ -124,6 +124,17 @@ pub struct Event { pub created_at: DateTime, } +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OptimisticToken { + pub id: String, + pub contract_address: String, + pub name: String, + pub symbol: String, + pub decimals: u8, + pub metadata: String, +} + #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Token { @@ -135,6 +146,16 @@ pub struct Token { pub metadata: String, } +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OptimisticTokenBalance { + pub id: String, + pub balance: String, + pub account_address: String, + pub contract_address: String, + pub token_id: String, +} + #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct TokenBalance {