Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-client): token subscription & update subscription via id #3006

Merged
merged 11 commits into from
Feb 12, 2025
11 changes: 11 additions & 0 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{
EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming, TokenBalanceStreaming,
TokenUpdateStreaming,
};
use torii_grpc::proto::world::{
RetrieveControllersResponse, RetrieveEntitiesResponse, RetrieveEventsResponse,
Expand Down Expand Up @@ -114,6 +115,16 @@
Ok(tokens.into_iter().map(TryInto::try_into).collect::<Result<Vec<Token>, _>>()?)
}

/// A direct stream to grpc subscribe tokens
pub async fn on_token_updated(
&self,
contract_addresses: Vec<Felt>,
) -> Result<TokenUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_tokens(contract_addresses).await?;
Ok(stream)
}

Check warning on line 126 in crates/torii/client/src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/client/src/client/mod.rs#L119-L126

Added lines #L119 - L126 were not covered by tests

/// Retrieves token balances for account addresses and contract addresses.
pub async fn token_balances(
&self,
Expand Down
16 changes: 15 additions & 1 deletion crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ service World {
// Subscribe to token updates.
rpc SubscribeTokens (RetrieveTokensRequest) returns (stream SubscribeTokensResponse);

// Update token subscription
rpc UpdateTokensSubscription (UpdateTokenSubscriptionRequest) returns (google.protobuf.Empty);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse);

Expand Down Expand Up @@ -101,7 +104,18 @@ message RetrieveTokensResponse {

// A response containing token updates
message SubscribeTokensResponse {
types.Token token = 1;
// The subscription ID
uint64 subscription_id = 1;
// The token
types.Token token = 2;
}

// A request to update a token subscription
message UpdateTokenSubscriptionRequest {
// The subscription ID
uint64 subscription_id = 1;
// The list of contract addresses to subscribe to
repeated bytes contract_addresses = 2;
}

// A request to retrieve token balances
Expand Down
88 changes: 84 additions & 4 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::num::ParseIntError;
use std::time::Duration;

use crypto_bigint::U256;
use futures_util::stream::MapOk;
use futures_util::{Stream, StreamExt, TryStreamExt};
use starknet::core::types::{Felt, FromStrError, StateDiff, StateUpdate};
Expand All @@ -17,12 +18,13 @@
SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
SubscribeModelsRequest, SubscribeModelsResponse, SubscribeTokenBalancesResponse,
UpdateEntitiesSubscriptionRequest, UpdateEventMessagesSubscriptionRequest,
UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest,
SubscribeTokensResponse, UpdateEntitiesSubscriptionRequest,
UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest,
UpdateTokenSubscriptionRequest, WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{
EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, TokenBalance,
EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, Token, TokenBalance,
};

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -127,6 +129,55 @@
.map(|res| res.into_inner())
}

pub async fn subscribe_tokens(
&mut self,
contract_addresses: Vec<Felt>,
) -> Result<TokenUpdateStreaming, Error> {
let request = RetrieveTokensRequest {
contract_addresses: contract_addresses
.into_iter()
.map(|c| c.to_bytes_be().to_vec())
.collect(),
};
let stream = self
.inner
.subscribe_tokens(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Ok(TokenUpdateStreaming(stream.map_ok(Box::new(|res| {
(
res.subscription_id,
match res.token {
Some(token) => token.try_into().expect("must able to serialize"),
None => Token {
id: "".to_string(),
contract_address: Felt::ZERO,
name: "".to_string(),
symbol: "".to_string(),
decimals: 0,
metadata: "".to_string(),
},
},

Check warning on line 161 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L133-L161

Added lines #L133 - L161 were not covered by tests
Comment on lines +154 to +162
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider throwing an error instead of returning an empty token.

Returning an empty token when None is received might hide potential issues. Consider throwing an error to make the failure more explicit.

-                    None => Token {
-                        id: "".to_string(),
-                        contract_address: Felt::ZERO,
-                        name: "".to_string(),
-                        symbol: "".to_string(),
-                        decimals: 0,
-                        metadata: "".to_string(),
-                    },
+                    None => return Err(Error::Schema(SchemaError::MissingExpectedData("token".to_string()))),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
None => Token {
id: "".to_string(),
contract_address: Felt::ZERO,
name: "".to_string(),
symbol: "".to_string(),
decimals: 0,
metadata: "".to_string(),
},
},
None => return Err(Error::Schema(SchemaError::MissingExpectedData("token".to_string()))),
},

)
}))))
}

Check warning on line 165 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L164-L165

Added lines #L164 - L165 were not covered by tests
pub async fn update_tokens_subscription(
&mut self,
subscription_id: u64,
contract_addresses: Vec<Felt>,
) -> Result<(), Error> {
let contract_addresses =
contract_addresses.into_iter().map(|c| c.to_bytes_be().to_vec()).collect();
let request = UpdateTokenSubscriptionRequest { subscription_id, contract_addresses };
self.inner
.update_tokens_subscription(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())
}

Check warning on line 180 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L167-L180

Added lines #L167 - L180 were not covered by tests
pub async fn retrieve_token_balances(
&mut self,
account_addresses: Vec<Felt>,
Expand Down Expand Up @@ -338,7 +389,18 @@
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| {
(res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize"))
(
res.subscription_id,
match res.balance {
Some(balance) => balance.try_into().expect("must able to serialize"),
None => TokenBalance {
balance: U256::ZERO,
account_address: Felt::ZERO,
contract_address: Felt::ZERO,
token_id: "".to_string(),
},
},

Check warning on line 402 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L392-L402

Added lines #L392 - L402 were not covered by tests
)
Comment on lines +393 to +404
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Apply consistent error handling pattern.

Similar to the token subscription, let's handle the error gracefully here as well.

-                    Some(balance) => balance.try_into().expect("must able to serialize"),
+                    Some(balance) => balance.try_into().map_err(Error::Schema)?,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
(
res.subscription_id,
match res.balance {
Some(balance) => balance.try_into().expect("must able to serialize"),
None => TokenBalance {
balance: U256::ZERO,
account_address: Felt::ZERO,
contract_address: Felt::ZERO,
token_id: "".to_string(),
},
},
)
(
res.subscription_id,
match res.balance {
Some(balance) => balance.try_into().map_err(Error::Schema)?,
None => TokenBalance {
balance: U256::ZERO,
account_address: Felt::ZERO,
contract_address: Felt::ZERO,
token_id: "".to_string(),
},
},
)

}))))
}

Expand Down Expand Up @@ -368,6 +430,24 @@
}
}

type TokenMappedStream = MapOk<
tonic::Streaming<SubscribeTokensResponse>,
Box<dyn Fn(SubscribeTokensResponse) -> (SubscriptionId, Token) + Send>,
>;

#[derive(Debug)]
pub struct TokenUpdateStreaming(TokenMappedStream);

impl Stream for TokenUpdateStreaming {
type Item = <TokenMappedStream as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}

Check warning on line 449 in crates/torii/grpc/src/client.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/client.rs#L444-L449

Added lines #L444 - L449 were not covered by tests

type TokenBalanceMappedStream = MapOk<
tonic::Streaming<SubscribeTokenBalancesResponse>,
Box<dyn Fn(SubscribeTokenBalancesResponse) -> (SubscriptionId, TokenBalance) + Send>,
Expand Down
17 changes: 16 additions & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse,
SubscribeIndexerRequest, SubscribeIndexerResponse, SubscribeTokenBalancesResponse,
SubscribeTokensResponse, UpdateEventMessagesSubscriptionRequest,
UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
UpdateTokenBalancesSubscriptionRequest, UpdateTokenSubscriptionRequest, WorldMetadataRequest,
WorldMetadataResponse,
};
use crate::proto::{self};
use crate::types::schema::SchemaError;
Expand Down Expand Up @@ -1406,6 +1407,20 @@
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokensStream))
}

async fn update_tokens_subscription(
&self,
request: Request<UpdateTokenSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateTokenSubscriptionRequest { subscription_id, contract_addresses } =
request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
self.token_manager.update_subscriber(subscription_id, contract_addresses).await;
Ok(Response::new(()))
}

Check warning on line 1422 in crates/torii/grpc/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/mod.rs#L1413-L1422

Added lines #L1413 - L1422 were not covered by tests

async fn retrieve_token_balances(
&self,
request: Request<RetrieveTokenBalancesRequest>,
Expand Down
42 changes: 24 additions & 18 deletions crates/torii/grpc/src/server/subscriptions/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
use tokio::sync::RwLock;
use torii_sqlite::error::{Error, ParseError};
use torii_sqlite::simple_broker::SimpleBroker;
use torii_sqlite::types::Token;
use torii_sqlite::types::OptimisticToken;
use tracing::{error, trace};

use crate::proto;
use crate::proto::world::SubscribeTokensResponse;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::balance";
pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::token";

#[derive(Debug)]
pub struct TokenSubscriber {
Expand All @@ -45,7 +45,7 @@
let (sender, receiver) = channel(1);

// Send initial empty response
let _ = sender.send(Ok(SubscribeTokensResponse { token: None })).await;
let _ = sender.send(Ok(SubscribeTokensResponse { subscription_id, token: None })).await;

Check warning on line 48 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L48

Added line #L48 was not covered by tests

self.subscribers.write().await.insert(
subscription_id,
Expand Down Expand Up @@ -85,33 +85,38 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
simple_broker: Pin<Box<dyn Stream<Item = Token> + Send>>,
balance_sender: UnboundedSender<Token>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticToken> + Send>>,
token_sender: UnboundedSender<OptimisticToken>,
}

impl Service {
pub fn new(subs_manager: Arc<TokenManager>) -> Self {
let (balance_sender, balance_receiver) = unbounded_channel();
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Token>::subscribe()), balance_sender };
let (token_sender, token_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<OptimisticToken>::subscribe()),
token_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, balance_receiver));
tokio::spawn(Self::publish_updates(subs_manager, token_receiver));

service
}

async fn publish_updates(
subs: Arc<TokenManager>,
mut balance_receiver: UnboundedReceiver<Token>,
mut token_receiver: UnboundedReceiver<OptimisticToken>,
) {
while let Some(balance) = balance_receiver.recv().await {
if let Err(e) = Self::process_balance_update(&subs, &balance).await {
error!(target = LOG_TARGET, error = %e, "Processing balance update.");
while let Some(token) = token_receiver.recv().await {
if let Err(e) = Self::process_token_update(&subs, &token).await {
error!(target = LOG_TARGET, error = %e, "Processing token update.");

Check warning on line 111 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L110-L111

Added lines #L110 - L111 were not covered by tests
}
}
}

async fn process_balance_update(subs: &Arc<TokenManager>, token: &Token) -> Result<(), Error> {
async fn process_token_update(
subs: &Arc<TokenManager>,
token: &OptimisticToken,
) -> Result<(), Error> {

Check warning on line 119 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L116-L119

Added lines #L116 - L119 were not covered by tests
let mut closed_stream = Vec::new();

for (idx, sub) in subs.subscribers.read().await.iter() {
Expand All @@ -126,6 +131,7 @@
}

let resp = SubscribeTokensResponse {
subscription_id: *idx,

Check warning on line 134 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L134

Added line #L134 was not covered by tests
token: Some(proto::types::Token {
token_id: token.id.clone(),
contract_address: token.contract_address.clone(),
Expand All @@ -142,7 +148,7 @@
}

for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing balance stream.");
trace!(target = LOG_TARGET, id = %id, "Closing token stream.");

Check warning on line 151 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L151

Added line #L151 was not covered by tests
subs.remove_subscriber(id).await
}

Expand All @@ -156,9 +162,9 @@
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(balance)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.balance_sender.send(balance) {
error!(target = LOG_TARGET, error = %e, "Sending balance update to processor.");
while let Poll::Ready(Some(token)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.token_sender.send(token) {
error!(target = LOG_TARGET, error = %e, "Sending token update to processor.");

Check warning on line 167 in crates/torii/grpc/src/server/subscriptions/token.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token.rs#L166-L167

Added lines #L166 - L167 were not covered by tests
}
}

Expand Down
12 changes: 6 additions & 6 deletions crates/torii/grpc/src/server/subscriptions/token_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use tokio::sync::RwLock;
use torii_sqlite::error::{Error, ParseError};
use torii_sqlite::simple_broker::SimpleBroker;
use torii_sqlite::types::TokenBalance;
use torii_sqlite::types::OptimisticTokenBalance;
use tracing::{error, trace};

use crate::proto;
Expand Down Expand Up @@ -98,15 +98,15 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
simple_broker: Pin<Box<dyn Stream<Item = TokenBalance> + Send>>,
balance_sender: UnboundedSender<TokenBalance>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticTokenBalance> + Send>>,
balance_sender: UnboundedSender<OptimisticTokenBalance>,
}

impl Service {
pub fn new(subs_manager: Arc<TokenBalanceManager>) -> Self {
let (balance_sender, balance_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<TokenBalance>::subscribe()),
simple_broker: Box::pin(SimpleBroker::<OptimisticTokenBalance>::subscribe()),
balance_sender,
};

Expand All @@ -117,7 +117,7 @@

async fn publish_updates(
subs: Arc<TokenBalanceManager>,
mut balance_receiver: UnboundedReceiver<TokenBalance>,
mut balance_receiver: UnboundedReceiver<OptimisticTokenBalance>,
) {
while let Some(balance) = balance_receiver.recv().await {
if let Err(e) = Self::process_balance_update(&subs, &balance).await {
Expand All @@ -128,7 +128,7 @@

async fn process_balance_update(
subs: &Arc<TokenBalanceManager>,
balance: &TokenBalance,
balance: &OptimisticTokenBalance,

Check warning on line 131 in crates/torii/grpc/src/server/subscriptions/token_balance.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/token_balance.rs#L131

Added line #L131 was not covered by tests
) -> Result<(), Error> {
let mut closed_stream = Vec::new();

Expand Down
2 changes: 2 additions & 0 deletions crates/torii/grpc/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct Token {
pub id: String,
pub contract_address: Felt,
pub name: String,
pub symbol: String,
Expand All @@ -49,6 +50,7 @@
type Error = SchemaError;
fn try_from(value: proto::types::Token) -> Result<Self, Self::Error> {
Ok(Self {
id: value.token_id,

Check warning on line 53 in crates/torii/grpc/src/types/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/types/mod.rs#L53

Added line #L53 was not covered by tests
contract_address: Felt::from_str(&value.contract_address)?,
name: value.name,
symbol: value.symbol,
Expand Down
Loading
Loading