From 22fa99e57ff74a827f0120b7ea00cb03ea7f1774 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 20 Aug 2024 10:47:55 -0400 Subject: [PATCH 1/4] task manager --- Cargo.lock | 31 +++-- Cargo.toml | 2 +- crates/katana/tasks/Cargo.toml | 2 + crates/katana/tasks/src/lib.rs | 7 ++ crates/katana/tasks/src/manager.rs | 175 +++++++++++++++++++++++++++++ crates/katana/tasks/src/task.rs | 161 ++++++++++++++++++++++++++ 6 files changed, 369 insertions(+), 9 deletions(-) create mode 100644 crates/katana/tasks/src/manager.rs create mode 100644 crates/katana/tasks/src/task.rs diff --git a/Cargo.lock b/Cargo.lock index 8b7167d592..e8eeb1e9bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8095,6 +8095,8 @@ dependencies = [ "rayon", "thiserror", "tokio", + "tokio-util", + "tracing", ] [[package]] @@ -9121,6 +9123,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "mirai-annotations" version = "1.12.0" @@ -9473,7 +9487,7 @@ dependencies = [ "kqueue", "libc", "log", - "mio", + "mio 0.8.11", "walkdir", "windows-sys 0.48.0", ] @@ -14129,21 +14143,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.1" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", "libc", - "mio", - "num_cpus", + "mio 1.0.2", "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2 0.5.7", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -14158,9 +14171,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -14254,6 +14267,8 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index aeb8b4612c..1a4fc8c31a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -199,7 +199,7 @@ strum_macros = "0.25" tempfile = "3.9.0" test-log = "0.2.11" thiserror = "1.0.32" -tokio = { version = "1.32.0", features = [ "full" ] } +tokio = { version = "1.39.2", features = [ "full" ] } toml = "0.8" tower = "0.4.13" tower-http = "0.4.4" diff --git a/crates/katana/tasks/Cargo.toml b/crates/katana/tasks/Cargo.toml index fd03a40729..16bd316ce3 100644 --- a/crates/katana/tasks/Cargo.toml +++ b/crates/katana/tasks/Cargo.toml @@ -10,3 +10,5 @@ futures.workspace = true rayon.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-util = { version = "0.7.11", features = [ "rt" ] } +tracing.workspace = true diff --git a/crates/katana/tasks/src/lib.rs b/crates/katana/tasks/src/lib.rs index a51cee743f..c329156ffc 100644 --- a/crates/katana/tasks/src/lib.rs +++ b/crates/katana/tasks/src/lib.rs @@ -1,3 +1,8 @@ +#![cfg_attr(not(test), warn(unused_crate_dependencies))] + +mod manager; +mod task; + use std::any::Any; use std::future::Future; use std::panic::{self, AssertUnwindSafe}; @@ -6,7 +11,9 @@ use std::sync::Arc; use std::task::Poll; use futures::channel::oneshot; +pub use manager::*; use rayon::ThreadPoolBuilder; +pub use task::*; use tokio::runtime::Handle; use tokio::task::JoinHandle; diff --git a/crates/katana/tasks/src/manager.rs b/crates/katana/tasks/src/manager.rs new file mode 100644 index 0000000000..71c7abec1b --- /dev/null +++ b/crates/katana/tasks/src/manager.rs @@ -0,0 +1,175 @@ +use std::future::Future; + +use tokio::runtime::Handle; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; + +use crate::task::{TaskBuilder, TaskResult}; + +pub type TaskHandle = JoinHandle>; + +/// Usage for this task manager is mainly to spawn tasks that can be cancelled, and captures +/// panicked tasks (which in the context of the task manager - a critical task) for graceful +/// shutdown. +#[derive(Debug)] +pub struct TaskManager { + /// A handle to the Tokio runtime. + handle: Handle, + /// Keep track of currently running tasks. + tracker: TaskTracker, + /// Used to cancel all running tasks. + /// + /// This is passed to all the tasks spawned by the manager. + pub(crate) on_cancel: CancellationToken, +} + +impl TaskManager { + /// Create a new [`TaskManager`] from the given Tokio runtime handle. + pub fn new(handle: Handle) -> Self { + Self { handle, tracker: TaskTracker::new(), on_cancel: CancellationToken::new() } + } + + pub fn current() -> Self { + Self::new(Handle::current()) + } + + pub fn spawn(&self, fut: F) -> TaskHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.spawn_inner(fut) + } + + /// Wait until all spawned tasks are completed. + pub async fn wait(&self) { + // need to close the tracker first before waiting + let _ = self.tracker.close(); + self.tracker.wait().await; + // reopen the tracker for spawning future tasks + let _ = self.tracker.reopen(); + } + + /// Consumes the manager and wait until all tasks are finished, either due to completion or + /// cancellation. + pub async fn wait_shutdown(self) { + // need to close the tracker first before waiting + let _ = self.tracker.close(); + let _ = self.on_cancel.cancelled().await; + self.tracker.wait().await; + } + + /// Return the handle to the Tokio runtime that the manager is associated with. + pub fn handle(&self) -> &Handle { + &self.handle + } + + /// Returns a new [`TaskBuilder`] for building a task to be spawned on this manager. + pub fn build_task(&self) -> TaskBuilder<'_> { + TaskBuilder::new(self) + } + + fn spawn_inner(&self, task: F) -> TaskHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let task = self.make_cancellable(task); + let task = self.tracker.track_future(task); + self.handle.spawn(task) + } + + fn make_cancellable(&self, fut: F) -> impl Future> + where + F: Future, + { + let ct = self.on_cancel.clone(); + async move { + tokio::select! { + _ = ct.cancelled() => { + TaskResult::Cancelled + }, + res = fut => { + TaskResult::Completed(res) + }, + } + } + } +} + +impl Drop for TaskManager { + fn drop(&mut self) { + self.on_cancel.cancel(); + } +} + +#[cfg(test)] +mod tests { + use futures::future; + use tokio::time::{self, Duration}; + + use super::*; + + #[tokio::test] + async fn normal_tasks() { + let manager = TaskManager::current(); + + manager.spawn(time::sleep(Duration::from_secs(1))); + manager.spawn(time::sleep(Duration::from_secs(1))); + manager.spawn(time::sleep(Duration::from_secs(1))); + + // 3 tasks should be spawned on the manager + assert_eq!(manager.tracker.len(), 3); + + // wait until all task spawned to the manager have been completed + manager.wait().await; + + assert!( + !manager.on_cancel.is_cancelled(), + "cancellation signal shouldn't be sent on normal task completion" + ) + } + + #[tokio::test] + async fn task_with_graceful_shutdown() { + let manager = TaskManager::current(); + + // mock long running normal task and a task with graceful shutdown + manager.build_task().spawn(async { + loop { + time::sleep(Duration::from_secs(1)).await + } + }); + + manager.build_task().spawn(async { + loop { + time::sleep(Duration::from_secs(1)).await + } + }); + + // assert that 2 tasks should've been spawned + assert_eq!(manager.tracker.len(), 2); + + // Spawn a task with graceful shuwdown that finish immediately. + // The long running task should be cancelled due to the graceful shutdown. + manager.build_task().graceful_shutdown().spawn(future::ready(())); + + // wait until all task spawned to the manager have been completed + manager.wait_shutdown().await; + } + + #[tokio::test] + async fn critical_task_implicit_graceful_shutdown() { + let manager = TaskManager::current(); + manager.build_task().critical().spawn(future::ready(())); + manager.wait_shutdown().await; + } + + #[tokio::test] + async fn critical_task_graceful_shudown_on_panicked() { + let manager = TaskManager::current(); + manager.build_task().critical().spawn(async { panic!("panicking") }); + manager.wait_shutdown().await; + } +} diff --git a/crates/katana/tasks/src/task.rs b/crates/katana/tasks/src/task.rs new file mode 100644 index 0000000000..c9d20b1355 --- /dev/null +++ b/crates/katana/tasks/src/task.rs @@ -0,0 +1,161 @@ +use std::any::Any; +use std::future::Future; +use std::panic::AssertUnwindSafe; + +use futures::future::Either; +use futures::{FutureExt, TryFutureExt}; +use thiserror::Error; +use tokio_metrics::TaskMonitor; +use tracing::error; + +use crate::manager::{TaskHandle, TaskManager}; + +/// A task result that can be either completed or cancelled. +#[derive(Debug, Copy, Clone)] +pub enum TaskResult { + /// The task completed successfully with the given result. + Completed(T), + /// The task was cancelled. + Cancelled, +} + +impl TaskResult { + /// Returns true if the task was cancelled. + pub fn is_cancelled(&self) -> bool { + matches!(self, TaskResult::Cancelled) + } +} + +/// A builder for building tasks to be spawned on the associated task manager. +/// +/// Can only be created using [`TaskManager::build_task`]. +#[derive(Debug)] +pub struct TaskBuilder<'a> { + /// The task manager that the task will be spawned on. + manager: &'a TaskManager, + /// The name of the task. + name: Option, + /// Indicates whether the task should be instrumented. + instrument: bool, + /// Notifies the task manager to perform a graceful shutdown when the task is finished due to + /// ompletion or cancellation. + graceful_shutdown: bool, +} + +impl<'a> TaskBuilder<'a> { + /// Creates a new task builder associated with the given task manager. + pub(crate) fn new(manager: &'a TaskManager) -> Self { + Self { manager, name: None, instrument: false, graceful_shutdown: false } + } + + pub fn critical(self) -> CriticalTaskBuilder<'a> { + CriticalTaskBuilder { builder: self.graceful_shutdown() } + } + + /// Sets the name of the task. + pub fn name(mut self, name: &str) -> Self { + self.name = Some(name.to_string()); + self + } + + /// Instruments the task for collecting metrics. Is a no-op for now. + pub fn instrument(mut self) -> Self { + self.instrument = true; + self + } + + /// Notifies the task manager to perform a graceful shutdown when the task is finished due to + /// completion or cancellation. + pub fn graceful_shutdown(mut self) -> Self { + self.graceful_shutdown = true; + self + } + + /// Spawns the given future based on the configured builder. + pub fn spawn(self, fut: F) -> TaskHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let Self { manager, instrument, graceful_shutdown, .. } = self; + + // creates a future that will send a cancellation signal to the manager when the future is + // completed. + let fut = if graceful_shutdown { + let ct = manager.on_cancel.clone(); + Either::Left(fut.map(move |a| { + ct.cancel(); + a + })) + } else { + Either::Right(fut) + }; + + let fut = if instrument { + // TODO: store the TaskMonitor + let monitor = TaskMonitor::new(); + Either::Left(monitor.instrument(fut)) + } else { + Either::Right(fut) + }; + + manager.spawn(fut) + } +} + +/// Builder for building critical tasks. This struct can only be created by calling +/// [`TaskBuilder::critical`] +#[derive(Debug)] +pub struct CriticalTaskBuilder<'a> { + builder: TaskBuilder<'a>, +} + +impl<'a> CriticalTaskBuilder<'a> { + pub fn name(mut self, name: &str) -> Self { + self.builder.name = Some(name.to_string()); + self + } + + /// Instruments the task for collecting metrics. Is a no-op for now. + pub fn instrument(mut self) -> Self { + self.builder.instrument = true; + self + } + + pub fn spawn(self, fut: F) -> TaskHandle<()> + where + F: Future + Send + 'static, + { + let task_name = self.builder.name.clone().unwrap_or("unnamed".to_string()); + let ct = self.builder.manager.on_cancel.clone(); + + let fut = AssertUnwindSafe(fut) + .catch_unwind() + .map_err(move |error| { + ct.cancel(); + let error = PanickedTaskError { error }; + error!(%error, task = task_name, "Critical task failed."); + error + }) + .map(drop); + + self.builder.spawn(fut) + } +} + +/// A simple wrapper type so that we can implement [`std::error::Error`] for `Box`. +#[derive(Debug, Error)] +pub struct PanickedTaskError { + /// The error that caused the panic. It is a boxed `dyn Any` due to the future returned by + /// [`catch_unwind`](futures::future::FutureExt::catch_unwind). + error: Box, +} + +impl std::fmt::Display for PanickedTaskError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.error.downcast_ref::() { + None => Ok(()), + Some(msg) => write!(f, "{msg}"), + } + } +} From 877da39eab550d0e668ff4ff2c9c5cccbe3a6929 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 20 Aug 2024 10:49:11 -0400 Subject: [PATCH 2/4] fix --- Cargo.lock | 13 +++++++++++++ crates/katana/tasks/Cargo.toml | 1 + 2 files changed, 14 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index e8eeb1e9bc..88e4133209 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8095,6 +8095,7 @@ dependencies = [ "rayon", "thiserror", "tokio", + "tokio-metrics", "tokio-util", "tracing", ] @@ -14180,6 +14181,18 @@ dependencies = [ "syn 2.0.71", ] +[[package]] +name = "tokio-metrics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" diff --git a/crates/katana/tasks/Cargo.toml b/crates/katana/tasks/Cargo.toml index 16bd316ce3..fda68dc089 100644 --- a/crates/katana/tasks/Cargo.toml +++ b/crates/katana/tasks/Cargo.toml @@ -10,5 +10,6 @@ futures.workspace = true rayon.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-metrics = "0.3.1" tokio-util = { version = "0.7.11", features = [ "rt" ] } tracing.workspace = true From e83419d21128752706b2cc92496e43d612b752dd Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 20 Aug 2024 11:44:19 -0400 Subject: [PATCH 3/4] derive clone --- crates/katana/tasks/src/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/katana/tasks/src/manager.rs b/crates/katana/tasks/src/manager.rs index 71c7abec1b..94a7e227ed 100644 --- a/crates/katana/tasks/src/manager.rs +++ b/crates/katana/tasks/src/manager.rs @@ -12,7 +12,7 @@ pub type TaskHandle = JoinHandle>; /// Usage for this task manager is mainly to spawn tasks that can be cancelled, and captures /// panicked tasks (which in the context of the task manager - a critical task) for graceful /// shutdown. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TaskManager { /// A handle to the Tokio runtime. handle: Handle, From dccc7f1f47beb96844810a958a63a8ca97373c40 Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Tue, 20 Aug 2024 12:03:32 -0400 Subject: [PATCH 4/4] fmt --- crates/katana/tasks/src/task.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/katana/tasks/src/task.rs b/crates/katana/tasks/src/task.rs index c9d20b1355..47e7a421b8 100644 --- a/crates/katana/tasks/src/task.rs +++ b/crates/katana/tasks/src/task.rs @@ -64,8 +64,7 @@ impl<'a> TaskBuilder<'a> { self } - /// Notifies the task manager to perform a graceful shutdown when the task is finished due to - /// completion or cancellation. + /// Notifies the task manager to perform a graceful shutdown when the task is finished. pub fn graceful_shutdown(mut self) -> Self { self.graceful_shutdown = true; self