From d281cb41abc87881338c24f05670da555ca5af17 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 10 Dec 2024 10:53:46 -0500 Subject: [PATCH] flatten rpc server building logic --- crates/katana/node/src/exit.rs | 2 +- crates/katana/node/src/lib.rs | 81 +++++++++++++++------------------- 2 files changed, 37 insertions(+), 46 deletions(-) diff --git a/crates/katana/node/src/exit.rs b/crates/katana/node/src/exit.rs index de1e42c9f3..f2c5e80311 100644 --- a/crates/katana/node/src/exit.rs +++ b/crates/katana/node/src/exit.rs @@ -18,7 +18,7 @@ impl<'a> NodeStoppedFuture<'a> { pub(crate) fn new(handle: &'a LaunchedNode) -> Self { let fut = Box::pin(async { handle.node.task_manager.wait_for_shutdown().await; - handle.stop().await?; + handle.rpc.handle.clone().stopped().await; Ok(()) }); Self { fut } diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index f876bcfb05..7b23f05744 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -11,7 +11,7 @@ use std::future::IntoFuture; use std::sync::Arc; use anyhow::Result; -use config::rpc::{ApiKind, RpcConfig}; +use config::rpc::ApiKind; use config::Config; use dojo_metrics::exporters::prometheus::PrometheusRecorder; use dojo_metrics::{Report, Server as MetricsServer}; @@ -28,7 +28,7 @@ use katana_core::env::BlockContextGenerator; use katana_core::service::block_producer::BlockProducer; use katana_db::mdbx::DbEnv; use katana_executor::implementation::blockifier::BlockifierFactory; -use katana_executor::{ExecutionFlags, ExecutorFactory}; +use katana_executor::ExecutionFlags; use katana_pool::ordering::FiFo; use katana_pool::TxPool; use katana_primitives::block::GasPrices; @@ -83,18 +83,18 @@ impl LaunchedNode { pub struct Node { pub pool: TxPool, pub db: Option, + pub rpc_server: RpcServer, pub task_manager: TaskManager, pub backend: Arc>, pub block_producer: BlockProducer, pub config: Arc, - forked_client: Option, } impl Node { /// Start the node. /// /// This method will start all the node process, running them until the node is stopped. - pub async fn launch(mut self) -> Result { + pub async fn launch(self) -> Result { let chain = self.backend.chain_spec.id; info!(%chain, "Starting node."); @@ -135,16 +135,18 @@ impl Node { .name("Sequencing") .spawn(sequencing.into_future()); - let node_components = (pool, backend, block_producer, self.forked_client.take()); - let rpc = spawn(node_components, self.config.rpc.clone()).await?; + // --- start the rpc server + + let rpc_handle = self.rpc_server.start(self.config.rpc.socket_addr()).await?; // --- start the gas oracle worker task + if let Some(ref url) = self.config.l1_provider_url { self.backend.gas_oracle.run_worker(self.task_manager.task_spawner()); info!(%url, "Gas Price Oracle started."); }; - Ok(LaunchedNode { node: self, rpc }) + Ok(LaunchedNode { node: self, rpc: rpc_handle }) } } @@ -240,36 +242,18 @@ pub async fn build(mut config: Config) -> Result { let validator = block_producer.validator(); let pool = TxPool::new(validator.clone(), FiFo::new()); - let node = Node { - db, - pool, - backend, - forked_client, - block_producer, - config: Arc::new(config), - task_manager: TaskManager::current(), - }; - - Ok(node) -} - -// Moved from `katana_rpc` crate -pub async fn spawn( - node_components: (TxPool, Arc>, BlockProducer, Option), - config: RpcConfig, -) -> Result { - let (pool, backend, block_producer, forked_client) = node_components; + // --- build rpc server - let mut modules = RpcModule::new(()); + let mut rpc_modules = RpcModule::new(()); let cors = Cors::new() - .allow_origins(config.cors_origins.clone()) - // Allow `POST` when accessing the resource - .allow_methods([Method::POST, Method::GET]) - .allow_headers([hyper::header::CONTENT_TYPE, "argent-client".parse().unwrap(), "argent-version".parse().unwrap()]); + .allow_origins(config.rpc.cors_origins.clone()) + // Allow `POST` when accessing the resource + .allow_methods([Method::POST, Method::GET]) + .allow_headers([hyper::header::CONTENT_TYPE, "argent-client".parse().unwrap(), "argent-version".parse().unwrap()]); - if config.apis.contains(&ApiKind::Starknet) { - let cfg = StarknetApiConfig { max_event_page_size: config.max_event_page_size }; + if config.rpc.apis.contains(&ApiKind::Starknet) { + let cfg = StarknetApiConfig { max_event_page_size: config.rpc.max_event_page_size }; let api = if let Some(client) = forked_client { StarknetApi::new_forked( @@ -283,28 +267,35 @@ pub async fn spawn( StarknetApi::new(backend.clone(), pool.clone(), Some(block_producer.clone()), cfg) }; - modules.merge(StarknetApiServer::into_rpc(api.clone()))?; - modules.merge(StarknetWriteApiServer::into_rpc(api.clone()))?; - modules.merge(StarknetTraceApiServer::into_rpc(api))?; + rpc_modules.merge(StarknetApiServer::into_rpc(api.clone()))?; + rpc_modules.merge(StarknetWriteApiServer::into_rpc(api.clone()))?; + rpc_modules.merge(StarknetTraceApiServer::into_rpc(api))?; } - if config.apis.contains(&ApiKind::Dev) { + if config.rpc.apis.contains(&ApiKind::Dev) { let api = DevApi::new(backend.clone(), block_producer.clone()); - modules.merge(api.into_rpc())?; + rpc_modules.merge(api.into_rpc())?; } - if config.apis.contains(&ApiKind::Torii) { + if config.rpc.apis.contains(&ApiKind::Torii) { let api = ToriiApi::new(backend.clone(), pool.clone(), block_producer.clone()); - modules.merge(api.into_rpc())?; + rpc_modules.merge(api.into_rpc())?; } - if config.apis.contains(&ApiKind::Saya) { + if config.rpc.apis.contains(&ApiKind::Saya) { let api = SayaApi::new(backend.clone(), block_producer.clone()); - modules.merge(api.into_rpc())?; + rpc_modules.merge(api.into_rpc())?; } - let server = RpcServer::new().metrics().health_check().cors(cors).module(modules); - let handle = server.start(config.socket_addr()).await?; + let rpc_server = RpcServer::new().metrics().health_check().cors(cors).module(rpc_modules); - Ok(handle) + Ok(Node { + db, + pool, + backend, + rpc_server, + block_producer, + config: Arc::new(config), + task_manager: TaskManager::current(), + }) }