From f3466c89f2ce44bd0d7305a77e92a5b97f4c0fc6 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 10 Sep 2024 11:56:13 -0400 Subject: [PATCH 1/5] create a super handle for the entire node --- bin/katana/src/cli/node.rs | 9 ++++----- crates/dojo-test-utils/src/sequencer.rs | 19 +++++++++---------- crates/katana/node/src/lib.rs | 25 ++++++++++++++++--------- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/bin/katana/src/cli/node.rs b/bin/katana/src/cli/node.rs index 63055b068b..c2c8482132 100644 --- a/bin/katana/src/cli/node.rs +++ b/bin/katana/src/cli/node.rs @@ -226,18 +226,17 @@ impl NodeArgs { let starknet_config = self.starknet_config()?; // build the node and start it - let (rpc_handle, backend) = - katana_node::start(server_config, sequencer_config, starknet_config).await?; + let node = katana_node::start(server_config, sequencer_config, starknet_config).await?; if !self.silent { #[allow(deprecated)] - let genesis = &backend.config.genesis; - print_intro(&self, genesis, rpc_handle.addr); + let genesis = &node.backend.config.genesis; + print_intro(&self, genesis, node.rpc.addr); } // Wait until Ctrl + C is pressed, then shutdown ctrl_c().await?; - rpc_handle.handle.stop()?; + node.rpc.handle.stop()?; Ok(()) } diff --git a/crates/dojo-test-utils/src/sequencer.rs b/crates/dojo-test-utils/src/sequencer.rs index 534794623d..03f3192c20 100644 --- a/crates/dojo-test-utils/src/sequencer.rs +++ b/crates/dojo-test-utils/src/sequencer.rs @@ -7,7 +7,7 @@ use katana_core::constants::DEFAULT_SEQUENCER_ADDRESS; #[allow(deprecated)] pub use katana_core::sequencer::SequencerConfig; use katana_executor::implementation::blockifier::BlockifierFactory; -use katana_node::NodeHandle; +use katana_node::Node; use katana_primitives::chain::ChainId; use katana_rpc::config::ServerConfig; use katana_rpc_api::ApiKind; @@ -29,9 +29,8 @@ pub struct TestAccount { #[allow(missing_debug_implementations)] pub struct TestSequencer { url: Url, - handle: NodeHandle, + handle: Node, account: TestAccount, - backend: Arc>, } impl TestSequencer { @@ -46,19 +45,19 @@ impl TestSequencer { apis: vec![ApiKind::Starknet, ApiKind::Dev, ApiKind::Saya, ApiKind::Torii], }; - let (handle, backend) = katana_node::start(server_config, config, starknet_config) + let node = katana_node::start(server_config, config, starknet_config) .await .expect("Failed to build node components"); - let url = Url::parse(&format!("http://{}", handle.addr)).expect("Failed to parse URL"); + let url = Url::parse(&format!("http://{}", node.rpc.addr)).expect("Failed to parse URL"); - let account = backend.config.genesis.accounts().next().unwrap(); + let account = node.backend.config.genesis.accounts().next().unwrap(); let account = TestAccount { private_key: Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be()), account_address: Felt::from_bytes_be(&account.0.to_bytes_be()), }; - TestSequencer { backend, account, handle, url } + TestSequencer { handle: node, account, url } } pub fn account(&self) -> SingleOwnerAccount, LocalWallet> { @@ -80,7 +79,7 @@ impl TestSequencer { } pub fn backend(&self) -> &Arc> { - &self.backend + &self.handle.backend } pub fn account_at_index( @@ -88,7 +87,7 @@ impl TestSequencer { index: usize, ) -> SingleOwnerAccount, LocalWallet> { #[allow(deprecated)] - let accounts: Vec<_> = self.backend.config.genesis.accounts().collect::<_>(); + let accounts: Vec<_> = self.handle.backend.config.genesis.accounts().collect::<_>(); let account = accounts[index]; let private_key = Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be()); @@ -112,7 +111,7 @@ impl TestSequencer { } pub fn stop(self) -> Result<(), Error> { - self.handle.handle.stop() + self.handle.rpc.handle.stop() } pub fn url(&self) -> Url { diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 4b2f7c5221..98d91c4ca1 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -49,6 +49,14 @@ use starknet::providers::{JsonRpcClient, Provider}; use tower_http::cors::{AllowOrigin, CorsLayer}; use tracing::{info, trace}; +#[allow(missing_debug_implementations)] +pub struct Node { + pub backend: Arc>, + pub block_producer: Arc>, + pub pool: TxPool, + pub rpc: RpcServerHandle, +} + /// Build the core Katana components from the given configurations and start running the node. // TODO: placeholder until we implement a dedicated class that encapsulate building the node // components @@ -63,7 +71,7 @@ pub async fn start( server_config: ServerConfig, sequencer_config: SequencerConfig, mut starknet_config: StarknetConfig, -) -> anyhow::Result<(NodeHandle, Arc>)> { +) -> Result { // --- build executor factory let cfg_env = CfgEnv { @@ -211,17 +219,17 @@ pub async fn start( // --- spawn rpc server - let node_components = (pool, backend.clone(), block_producer, validator); - let rpc_handle = spawn(node_components, server_config).await?; + let node_components = (pool.clone(), backend.clone(), block_producer.clone(), validator); + let rpc = spawn(node_components, server_config).await?; - Ok((rpc_handle, backend)) + Ok(Node { backend, block_producer, pool, rpc }) } // Moved from `katana_rpc` crate pub async fn spawn( node_components: (TxPool, Arc>, Arc>, TxValidator), config: ServerConfig, -) -> Result { +) -> Result { let (pool, backend, block_producer, validator) = node_components; let mut methods = RpcModule::new(()); @@ -291,12 +299,11 @@ pub async fn spawn( let addr = server.local_addr()?; let handle = server.start(methods)?; - Ok(NodeHandle { config, handle, addr }) + Ok(RpcServerHandle { handle, addr }) } -#[derive(Debug, Clone)] -pub struct NodeHandle { +#[derive(Debug)] +pub struct RpcServerHandle { pub addr: SocketAddr, - pub config: ServerConfig, pub handle: ServerHandle, } From 60fb57032406e6942a281c3920f91f2f727e71e6 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 10 Sep 2024 12:10:33 -0400 Subject: [PATCH 2/5] rename to RcpServer --- crates/katana/node/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 98d91c4ca1..7c589e3573 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -54,7 +54,7 @@ pub struct Node { pub backend: Arc>, pub block_producer: Arc>, pub pool: TxPool, - pub rpc: RpcServerHandle, + pub rpc: RpcServer, } /// Build the core Katana components from the given configurations and start running the node. @@ -229,7 +229,7 @@ pub async fn start( pub async fn spawn( node_components: (TxPool, Arc>, Arc>, TxValidator), config: ServerConfig, -) -> Result { +) -> Result { let (pool, backend, block_producer, validator) = node_components; let mut methods = RpcModule::new(()); @@ -299,11 +299,11 @@ pub async fn spawn( let addr = server.local_addr()?; let handle = server.start(methods)?; - Ok(RpcServerHandle { handle, addr }) + Ok(RpcServer { handle, addr }) } #[derive(Debug)] -pub struct RpcServerHandle { +pub struct RpcServer { pub addr: SocketAddr, pub handle: ServerHandle, } From 3415580033da696327130877bec298e3e5532f67 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 10 Sep 2024 12:17:20 -0400 Subject: [PATCH 3/5] rename Node to Handle --- crates/dojo-test-utils/src/sequencer.rs | 4 ++-- crates/katana/node/src/lib.rs | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/dojo-test-utils/src/sequencer.rs b/crates/dojo-test-utils/src/sequencer.rs index 03f3192c20..039b26fab1 100644 --- a/crates/dojo-test-utils/src/sequencer.rs +++ b/crates/dojo-test-utils/src/sequencer.rs @@ -7,7 +7,7 @@ use katana_core::constants::DEFAULT_SEQUENCER_ADDRESS; #[allow(deprecated)] pub use katana_core::sequencer::SequencerConfig; use katana_executor::implementation::blockifier::BlockifierFactory; -use katana_node::Node; +use katana_node::Handle; use katana_primitives::chain::ChainId; use katana_rpc::config::ServerConfig; use katana_rpc_api::ApiKind; @@ -29,7 +29,7 @@ pub struct TestAccount { #[allow(missing_debug_implementations)] pub struct TestSequencer { url: Url, - handle: Node, + handle: Handle, account: TestAccount, } diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 7c589e3573..9c62aa0075 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -49,12 +49,13 @@ use starknet::providers::{JsonRpcClient, Provider}; use tower_http::cors::{AllowOrigin, CorsLayer}; use tracing::{info, trace}; +/// A handle to the instantiated Katana node. #[allow(missing_debug_implementations)] -pub struct Node { - pub backend: Arc>, - pub block_producer: Arc>, +pub struct Handle { pub pool: TxPool, pub rpc: RpcServer, + pub backend: Arc>, + pub block_producer: Arc>, } /// Build the core Katana components from the given configurations and start running the node. @@ -71,7 +72,7 @@ pub async fn start( server_config: ServerConfig, sequencer_config: SequencerConfig, mut starknet_config: StarknetConfig, -) -> Result { +) -> Result { // --- build executor factory let cfg_env = CfgEnv { @@ -222,7 +223,7 @@ pub async fn start( let node_components = (pool.clone(), backend.clone(), block_producer.clone(), validator); let rpc = spawn(node_components, server_config).await?; - Ok(Node { backend, block_producer, pool, rpc }) + Ok(Handle { backend, block_producer, pool, rpc }) } // Moved from `katana_rpc` crate From c1231dd98d0321c25c19431bd0a80f77476a320b Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 10 Sep 2024 13:59:09 -0400 Subject: [PATCH 4/5] integrate task manager and handle graceful shutdown --- Cargo.lock | 2 +- bin/katana/src/cli/node.rs | 11 +++++++--- crates/katana/node/Cargo.toml | 2 +- crates/katana/node/src/lib.rs | 21 +++++++++++++++--- crates/katana/tasks/src/manager.rs | 34 ++++++++++++++++++------------ 5 files changed, 49 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db32aed35c..04cdd5645b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7898,10 +7898,10 @@ dependencies = [ "katana-provider", "katana-rpc", "katana-rpc-api", + "katana-tasks", "num-traits 0.2.19", "serde_json", "starknet 0.11.0", - "tokio", "tower", "tower-http", "tracing", diff --git a/bin/katana/src/cli/node.rs b/bin/katana/src/cli/node.rs index c2c8482132..2d46261dbf 100644 --- a/bin/katana/src/cli/node.rs +++ b/bin/katana/src/cli/node.rs @@ -234,9 +234,14 @@ impl NodeArgs { print_intro(&self, genesis, node.rpc.addr); } - // Wait until Ctrl + C is pressed, then shutdown - ctrl_c().await?; - node.rpc.handle.stop()?; + // Wait until ctrl-c signal is received or TaskManager signals shutdown + tokio::select! { + _ = ctrl_c() => {}, + _ = node.task_manager.wait_for_shutdown() => {} + } + + info!("Shutting down..."); + node.stop().await?; Ok(()) } diff --git a/crates/katana/node/Cargo.toml b/crates/katana/node/Cargo.toml index 011bd8db62..c74b8e1380 100644 --- a/crates/katana/node/Cargo.toml +++ b/crates/katana/node/Cargo.toml @@ -14,6 +14,7 @@ katana-primitives.workspace = true katana-provider.workspace = true katana-rpc.workspace = true katana-rpc-api.workspace = true +katana-tasks.workspace = true anyhow.workspace = true dojo-metrics.workspace = true @@ -22,7 +23,6 @@ jsonrpsee.workspace = true num-traits.workspace = true serde_json.workspace = true starknet.workspace = true -tokio.workspace = true tower = { workspace = true, features = [ "full" ] } tower-http = { workspace = true, features = [ "full" ] } tracing.workspace = true diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index 9c62aa0075..bf0bdb4272 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -41,6 +41,7 @@ use katana_rpc_api::saya::SayaApiServer; use katana_rpc_api::starknet::{StarknetApiServer, StarknetTraceApiServer, StarknetWriteApiServer}; use katana_rpc_api::torii::ToriiApiServer; use katana_rpc_api::ApiKind; +use katana_tasks::TaskManager; use num_traits::ToPrimitive; use starknet::core::types::{BlockId, BlockStatus, MaybePendingBlockWithTxHashes}; use starknet::core::utils::parse_cairo_short_string; @@ -54,10 +55,21 @@ use tracing::{info, trace}; pub struct Handle { pub pool: TxPool, pub rpc: RpcServer, + pub task_manager: TaskManager, pub backend: Arc>, pub block_producer: Arc>, } +impl Handle { + /// Stops the Katana node. + pub async fn stop(self) -> Result<()> { + // TODO: wait for the rpc server to stop + self.rpc.handle.stop()?; + self.task_manager.shutdown().await; + Ok(()) + } +} + /// Build the core Katana components from the given configurations and start running the node. // TODO: placeholder until we implement a dedicated class that encapsulate building the node // components @@ -209,8 +221,11 @@ pub async fn start( let block_producer = Arc::new(block_producer); - // TODO: avoid dangling task, or at least store the handle to the NodeService - tokio::spawn(NodeService::new( + // Create a TaskManager using the ambient Tokio runtime + let task_manager = TaskManager::current(); + + // Spawn the NodeService as a critical task + task_manager.build_task().critical().name("NodeService").spawn(NodeService::new( pool.clone(), miner, block_producer.clone(), @@ -223,7 +238,7 @@ pub async fn start( let node_components = (pool.clone(), backend.clone(), block_producer.clone(), validator); let rpc = spawn(node_components, server_config).await?; - Ok(Handle { backend, block_producer, pool, rpc }) + Ok(Handle { backend, block_producer, pool, rpc, task_manager }) } // Moved from `katana_rpc` crate diff --git a/crates/katana/tasks/src/manager.rs b/crates/katana/tasks/src/manager.rs index 94a7e227ed..e7095786c5 100644 --- a/crates/katana/tasks/src/manager.rs +++ b/crates/katana/tasks/src/manager.rs @@ -42,21 +42,19 @@ impl TaskManager { self.spawn_inner(fut) } - /// Wait until all spawned tasks are completed. - pub async fn wait(&self) { - // need to close the tracker first before waiting - let _ = self.tracker.close(); - self.tracker.wait().await; - // reopen the tracker for spawning future tasks - let _ = self.tracker.reopen(); + /// Wait for the shutdown signal to be received. + pub async fn wait_for_shutdown(&self) { + self.on_cancel.cancelled().await; } - /// Consumes the manager and wait until all tasks are finished, either due to completion or + /// Shutdowns the manger and wait until all tasks are finished, either due to completion or /// cancellation. - pub async fn wait_shutdown(self) { + /// + /// No task can be spawned on the manager after this method is called. + pub async fn shutdown(self) { + self.wait_for_shutdown().await; // need to close the tracker first before waiting let _ = self.tracker.close(); - let _ = self.on_cancel.cancelled().await; self.tracker.wait().await; } @@ -70,6 +68,16 @@ impl TaskManager { TaskBuilder::new(self) } + /// Wait until all spawned tasks are completed. + #[cfg(test)] + async fn wait(&self) { + // need to close the tracker first before waiting + let _ = self.tracker.close(); + self.tracker.wait().await; + // reopen the tracker for spawning future tasks + let _ = self.tracker.reopen(); + } + fn spawn_inner(&self, task: F) -> TaskHandle where F: Future + Send + 'static, @@ -156,20 +164,20 @@ mod tests { manager.build_task().graceful_shutdown().spawn(future::ready(())); // wait until all task spawned to the manager have been completed - manager.wait_shutdown().await; + manager.shutdown().await; } #[tokio::test] async fn critical_task_implicit_graceful_shutdown() { let manager = TaskManager::current(); manager.build_task().critical().spawn(future::ready(())); - manager.wait_shutdown().await; + manager.shutdown().await; } #[tokio::test] async fn critical_task_graceful_shudown_on_panicked() { let manager = TaskManager::current(); manager.build_task().critical().spawn(async { panic!("panicking") }); - manager.wait_shutdown().await; + manager.shutdown().await; } } From 3822754c46bd47d36c07c8fb573162707514b64c Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 10 Sep 2024 14:07:58 -0400 Subject: [PATCH 5/5] simplified --- crates/katana/tasks/src/task.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/katana/tasks/src/task.rs b/crates/katana/tasks/src/task.rs index 47e7a421b8..09d8462779 100644 --- a/crates/katana/tasks/src/task.rs +++ b/crates/katana/tasks/src/task.rs @@ -79,15 +79,15 @@ impl<'a> TaskBuilder<'a> { let Self { manager, instrument, graceful_shutdown, .. } = self; // creates a future that will send a cancellation signal to the manager when the future is - // completed. - let fut = if graceful_shutdown { + // completed, regardless of success or error. + let fut = { let ct = manager.on_cancel.clone(); - Either::Left(fut.map(move |a| { - ct.cancel(); - a - })) - } else { - Either::Right(fut) + fut.map(move |res| { + if graceful_shutdown { + ct.cancel(); + } + res + }) }; let fut = if instrument {