From e1f12c790d5662b1f56a9e2bfb0c2fd0a46f2b6a Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Fri, 6 Sep 2024 16:13:44 +0530 Subject: [PATCH] refactor(torii): queries for them to not block commit-id:6c5aebce --- crates/torii/core/src/query_queue.rs | 73 ++++++++++++++++++++++++--- crates/torii/core/src/sql.rs | 72 ++++++++++++++------------ crates/torii/libp2p/src/server/mod.rs | 2 + 3 files changed, 107 insertions(+), 40 deletions(-) diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/query_queue.rs index d42fdb94b3..44bdee806d 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/query_queue.rs @@ -1,5 +1,8 @@ +use anyhow::Result; +use dojo_types::schema::Ty; use std::collections::VecDeque; +use anyhow::Context; use sqlx::{Executor, Pool, Sqlite}; use starknet::core::types::Felt; @@ -33,11 +36,22 @@ pub struct QueryQueue { // publishes that are related to queries in the queue, they should be sent // after the queries are executed pub publish_queue: VecDeque, + pub publish_queries: VecDeque<(String, Vec, QueryType)>, +} + +#[derive(Debug, Clone)] +pub enum QueryType { + SetEntity(Ty), } impl QueryQueue { pub fn new(pool: Pool) -> Self { - QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() } + QueryQueue { + pool, + queue: VecDeque::new(), + publish_queue: VecDeque::new(), + publish_queries: VecDeque::new(), + } } pub fn enqueue>(&mut self, statement: S, arguments: Vec) { @@ -52,7 +66,16 @@ impl QueryQueue { self.publish_queue.push_back(value); } - pub async fn execute_all(&mut self) -> sqlx::Result { + pub fn push_publish_query( + &mut self, + statement: String, + arguments: Vec, + query_type: QueryType, + ) { + self.publish_queries.push_back((statement, arguments, query_type)); + } + + pub async fn execute_all(&mut self) -> Result { let mut total_affected = 0_u64; let mut tx = self.pool.begin().await?; @@ -69,20 +92,54 @@ impl QueryQueue { } } - total_affected += tx.execute(query).await?.rows_affected(); + total_affected += tx + .execute(query) + .await + .with_context(|| format!("Failed to execute query: {}", statement))? + .rows_affected(); } tx.commit().await?; while let Some(message) = self.publish_queue.pop_front() { - match message { - BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), - BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), - BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), - BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), + send_broker_message(message); + } + + while let Some((statement, arguments, query_type)) = self.publish_queries.pop_front() { + let mut query = sqlx::query_as(&statement); + for arg in &arguments { + query = match arg { + Argument::Null => query.bind(None::), + Argument::Int(integer) => query.bind(integer), + Argument::Bool(bool) => query.bind(bool), + Argument::String(string) => query.bind(string), + Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), + } } + + let broker_message = match query_type { + QueryType::SetEntity(entity) => { + let mut result: EntityUpdated = query + .fetch_one(&self.pool) + .await + .with_context(|| format!("Failed to fetch entity: {}", statement))?; + result.updated_model = Some(entity); + result.deleted = false; + BrokerMessage::EntityUpdated(result) + } + }; + send_broker_message(broker_message); } Ok(total_affected) } } + +fn send_broker_message(message: BrokerMessage) { + match message { + BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), + BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), + BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), + BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), + } +} diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 7587dbebe2..c5b791a55b 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -16,7 +16,7 @@ use starknet_crypto::poseidon_hash_many; use tracing::debug; use crate::cache::{Model, ModelCache}; -use crate::query_queue::{Argument, BrokerMessage, QueryQueue}; +use crate::query_queue::{Argument, BrokerMessage, QueryQueue, QueryType}; use crate::types::{ Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, @@ -193,29 +193,28 @@ impl Sql { let entity_id = format!("{:#x}", poseidon_hash_many(&keys)); let model_id = format!("{:#x}", compute_selector_from_names(model_namespace, model_name)); - self.query_queue.enqueue( - "INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ - model_id) DO NOTHING", - vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], - ); - let keys_str = felts_sql_string(&keys); + let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \ ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ - event_id=EXCLUDED.event_id RETURNING *"; - // if timeout doesn't work - // fetch to get entity - // if not available, insert into queue - let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities) - .bind(&entity_id) - .bind(&keys_str) - .bind(event_id) - .bind(utc_dt_string_from_timestamp(block_timestamp)) - .fetch_one(&self.pool) - .await?; + event_id=EXCLUDED.event_id"; + + self.query_queue.enqueue( + insert_entities, + vec![ + Argument::String(entity_id.clone()), + Argument::String(keys_str.clone()), + Argument::String(event_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ], + ); - entity_updated.updated_model = Some(entity.clone()); + self.query_queue.enqueue( + "INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ + model_id) DO NOTHING", + vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], + ); let path = vec![namespaced_name]; self.build_set_entity_queries_recursive( @@ -227,7 +226,12 @@ impl Sql { &vec![], ); - self.query_queue.push_publish(BrokerMessage::EntityUpdated(entity_updated)); + let query_entities_for_publish = "SELECT * FROM entities WHERE id = ?"; + self.query_queue.push_publish_query( + query_entities_for_publish.to_string(), + vec![Argument::String(entity_id.clone())], + QueryType::SetEntity(entity.clone()), + ); Ok(()) } @@ -316,18 +320,18 @@ impl Sql { ); self.execute().await?; - let mut update_entity = sqlx::query_as::<_, EntityUpdated>( - "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id \ - = ? RETURNING *", - ) - .bind(utc_dt_string_from_timestamp(block_timestamp)) - .bind(event_id) - .bind(entity_id) - .fetch_one(&self.pool) - .await?; + let update_query = "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \ + event_id=? WHERE id = ? RETURNING *"; - update_entity.updated_model = Some(wrapped_ty); - self.query_queue.push_publish(BrokerMessage::EntityUpdated(update_entity)); + self.query_queue.push_publish_query( + update_query.to_string(), + vec![ + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + Argument::String(event_id.to_string()), + Argument::String(entity_id.clone()), + ], + QueryType::SetEntity(wrapped_ty), + ); Ok(()) } @@ -774,7 +778,11 @@ impl Sql { Ty::Enum(e) => { if e.options.iter().all( |o| { - if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false } + if let Ty::Tuple(t) = &o.ty { + t.is_empty() + } else { + false + } }, ) { return; diff --git a/crates/torii/libp2p/src/server/mod.rs b/crates/torii/libp2p/src/server/mod.rs index 5b54bb9f18..409ce35800 100644 --- a/crates/torii/libp2p/src/server/mod.rs +++ b/crates/torii/libp2p/src/server/mod.rs @@ -283,6 +283,7 @@ impl Relay

{ error = %e, "Setting message." ); + self.db.execute().await.unwrap(); continue; } else { info!( @@ -291,6 +292,7 @@ impl Relay

{ peer_id = %peer_id, "Message set." ); + self.db.execute().await.unwrap(); continue; } }