Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(katana): graceful shutdown on critical tasks termination #2409

Merged
merged 6 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions bin/katana/src/cli/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,14 @@
print_intro(&self, genesis, node.rpc.addr);
}

// Wait until Ctrl + C is pressed, then shutdown
ctrl_c().await?;
node.rpc.handle.stop()?;
// Wait until ctrl-c signal is received or TaskManager signals shutdown
tokio::select! {
_ = ctrl_c() => {},
_ = node.task_manager.wait_for_shutdown() => {}
}

info!("Shutting down...");
node.stop().await?;

Check warning on line 244 in bin/katana/src/cli/node.rs

View check run for this annotation

Codecov / codecov/patch

bin/katana/src/cli/node.rs#L243-L244

Added lines #L243 - L244 were not covered by tests

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/katana/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ katana-primitives.workspace = true
katana-provider.workspace = true
katana-rpc.workspace = true
katana-rpc-api.workspace = true
katana-tasks.workspace = true

anyhow.workspace = true
dojo-metrics.workspace = true
Expand All @@ -22,7 +23,6 @@ jsonrpsee.workspace = true
num-traits.workspace = true
serde_json.workspace = true
starknet.workspace = true
tokio.workspace = true
tower = { workspace = true, features = [ "full" ] }
tower-http = { workspace = true, features = [ "full" ] }
tracing.workspace = true
Expand Down
21 changes: 18 additions & 3 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
use katana_rpc_api::starknet::{StarknetApiServer, StarknetTraceApiServer, StarknetWriteApiServer};
use katana_rpc_api::torii::ToriiApiServer;
use katana_rpc_api::ApiKind;
use katana_tasks::TaskManager;
use num_traits::ToPrimitive;
use starknet::core::types::{BlockId, BlockStatus, MaybePendingBlockWithTxHashes};
use starknet::core::utils::parse_cairo_short_string;
Expand All @@ -54,10 +55,21 @@
pub struct Handle {
pub pool: TxPool,
pub rpc: RpcServer,
pub task_manager: TaskManager,
pub backend: Arc<Backend<BlockifierFactory>>,
pub block_producer: Arc<BlockProducer<BlockifierFactory>>,
}

impl Handle {
/// Stops the Katana node.
pub async fn stop(self) -> Result<()> {
// TODO: wait for the rpc server to stop
self.rpc.handle.stop()?;
self.task_manager.shutdown().await;
Ok(())
}

Check warning on line 70 in crates/katana/node/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/lib.rs#L65-L70

Added lines #L65 - L70 were not covered by tests
}

/// Build the core Katana components from the given configurations and start running the node.
// TODO: placeholder until we implement a dedicated class that encapsulate building the node
// components
Expand Down Expand Up @@ -209,8 +221,11 @@

let block_producer = Arc::new(block_producer);

// TODO: avoid dangling task, or at least store the handle to the NodeService
tokio::spawn(NodeService::new(
// Create a TaskManager using the ambient Tokio runtime
let task_manager = TaskManager::current();

// Spawn the NodeService as a critical task
task_manager.build_task().critical().name("NodeService").spawn(NodeService::new(
pool.clone(),
miner,
block_producer.clone(),
Expand All @@ -223,7 +238,7 @@
let node_components = (pool.clone(), backend.clone(), block_producer.clone(), validator);
let rpc = spawn(node_components, server_config).await?;

Ok(Handle { backend, block_producer, pool, rpc })
Ok(Handle { backend, block_producer, pool, rpc, task_manager })
}

// Moved from `katana_rpc` crate
Expand Down
34 changes: 21 additions & 13 deletions crates/katana/tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,19 @@ impl TaskManager {
self.spawn_inner(fut)
}

/// Wait until all spawned tasks are completed.
pub async fn wait(&self) {
// need to close the tracker first before waiting
let _ = self.tracker.close();
self.tracker.wait().await;
// reopen the tracker for spawning future tasks
let _ = self.tracker.reopen();
/// Wait for the shutdown signal to be received.
pub async fn wait_for_shutdown(&self) {
self.on_cancel.cancelled().await;
}

/// Consumes the manager and wait until all tasks are finished, either due to completion or
/// Shutdowns the manger and wait until all tasks are finished, either due to completion or
/// cancellation.
pub async fn wait_shutdown(self) {
///
/// No task can be spawned on the manager after this method is called.
pub async fn shutdown(self) {
self.wait_for_shutdown().await;
// need to close the tracker first before waiting
let _ = self.tracker.close();
let _ = self.on_cancel.cancelled().await;
self.tracker.wait().await;
}

Expand All @@ -70,6 +68,16 @@ impl TaskManager {
TaskBuilder::new(self)
}

/// Wait until all spawned tasks are completed.
#[cfg(test)]
async fn wait(&self) {
// need to close the tracker first before waiting
let _ = self.tracker.close();
self.tracker.wait().await;
// reopen the tracker for spawning future tasks
let _ = self.tracker.reopen();
}

fn spawn_inner<F>(&self, task: F) -> TaskHandle<F::Output>
where
F: Future + Send + 'static,
Expand Down Expand Up @@ -156,20 +164,20 @@ mod tests {
manager.build_task().graceful_shutdown().spawn(future::ready(()));

// wait until all task spawned to the manager have been completed
manager.wait_shutdown().await;
manager.shutdown().await;
}

#[tokio::test]
async fn critical_task_implicit_graceful_shutdown() {
let manager = TaskManager::current();
manager.build_task().critical().spawn(future::ready(()));
manager.wait_shutdown().await;
manager.shutdown().await;
}

#[tokio::test]
async fn critical_task_graceful_shudown_on_panicked() {
let manager = TaskManager::current();
manager.build_task().critical().spawn(async { panic!("panicking") });
manager.wait_shutdown().await;
manager.shutdown().await;
}
}
16 changes: 8 additions & 8 deletions crates/katana/tasks/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ impl<'a> TaskBuilder<'a> {
let Self { manager, instrument, graceful_shutdown, .. } = self;

// creates a future that will send a cancellation signal to the manager when the future is
// completed.
let fut = if graceful_shutdown {
// completed, regardless of success or error.
let fut = {
let ct = manager.on_cancel.clone();
Either::Left(fut.map(move |a| {
ct.cancel();
a
}))
} else {
Either::Right(fut)
fut.map(move |res| {
if graceful_shutdown {
ct.cancel();
}
res
})
};

let fut = if instrument {
Expand Down
Loading