From bce655dbfe6242bc556dbd4451a293fd1ecbea7a Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Tue, 5 Dec 2023 13:00:28 -0500 Subject: [PATCH] Track CPU usage for spawned execution We log the request parameters when a programs uses more than 60 seconds of CPU time. Hopefully this can help us see if there's any people deliberately abusing the system. In the future, we could inform the user that they may kill the process if it's unintended. --- compiler/base/orchestrator/Cargo.lock | 162 +++++++++++++- compiler/base/orchestrator/Cargo.toml | 7 + compiler/base/orchestrator/src/coordinator.rs | 109 ++++++++- compiler/base/orchestrator/src/message.rs | 8 + compiler/base/orchestrator/src/worker.rs | 207 +++++++++++++++++- ui/Cargo.lock | 41 ++++ ui/src/server_axum/websocket.rs | 12 +- 7 files changed, 529 insertions(+), 17 deletions(-) diff --git a/compiler/base/orchestrator/Cargo.lock b/compiler/base/orchestrator/Cargo.lock index d3aa01369..e5233f336 100644 --- a/compiler/base/orchestrator/Cargo.lock +++ b/compiler/base/orchestrator/Cargo.lock @@ -78,6 +78,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitflags" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" + [[package]] name = "bytes" version = "1.5.0" @@ -111,6 +117,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -218,6 +234,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "indexmap" version = "2.1.0" @@ -246,12 +268,27 @@ version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +[[package]] +name = "linux-raw-sys" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" + [[package]] name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "memchr" version = "2.6.4" @@ -275,7 +312,7 @@ checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -321,8 +358,11 @@ dependencies = [ "assertables", "bincode", "futures", + "libc", + "mach2", "modify-cargo-toml", "once_cell", + "procfs", "serde", "serde_json", "snafu", @@ -392,6 +432,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags", + "hex", + "lazy_static", + "procfs-core", + "rustix", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags", + "hex", +] + [[package]] name = "quote" version = "1.0.33" @@ -482,6 +545,19 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustix" +version = "0.38.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "ryu" version = "1.0.15" @@ -640,7 +716,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -815,7 +891,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", ] [[package]] @@ -824,13 +909,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -839,42 +939,84 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.19" diff --git a/compiler/base/orchestrator/Cargo.toml b/compiler/base/orchestrator/Cargo.toml index f79af0d14..341cda200 100644 --- a/compiler/base/orchestrator/Cargo.toml +++ b/compiler/base/orchestrator/Cargo.toml @@ -20,6 +20,13 @@ tokio-util = { version = "0.7.8", default-features = false, features = ["io", "i toml = { version = "0.8.2", default-features = false, features = ["parse", "display"] } tracing = { version = "0.1.37", default-features = false, features = ["attributes"] } +[target.'cfg(target_os = "linux")'.dependencies] +procfs = { version = "0.16.0", default-features = false } + +[target.'cfg(target_os = "macos")'.dependencies] +libc = { version = "0.2.150", default-features = false } +mach2 = { version = "0.4.1", default-features = false } + [dev-dependencies] assert_matches = "1.5.0" assertables = "7.0.1" diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 4f84a4213..adc2e5624 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -29,9 +29,9 @@ use tracing::{instrument, trace, trace_span, warn, Instrument}; use crate::{ bincode_input_closed, message::{ - CoordinatorMessage, DeleteFileRequest, ExecuteCommandRequest, ExecuteCommandResponse, - JobId, Multiplexed, OneToOneResponse, ReadFileRequest, ReadFileResponse, SerializedError, - WorkerMessage, WriteFileRequest, + CommandStatistics, CoordinatorMessage, DeleteFileRequest, ExecuteCommandRequest, + ExecuteCommandResponse, JobId, Multiplexed, OneToOneResponse, ReadFileRequest, + ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest, }, DropErrorDetailsExt, }; @@ -1164,8 +1164,12 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = v; + drop(stdin_tx); + drop(status_rx); + let task = async { task.await?.map_err(VersionError::from) }; let o = WithOutput::try_absorb(task, stdout_rx, stderr_rx).await?; Ok(if o.success { Some(o.stdout) } else { None }) @@ -1191,9 +1195,12 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self.begin_execute(token, request).await?; drop(stdin_tx); + drop(status_rx); + WithOutput::try_absorb(task, stdout_rx, stderr_rx).await } @@ -1225,6 +1232,7 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await @@ -1250,6 +1258,7 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, }) } @@ -1301,12 +1310,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let commander = self.commander.clone(); let task = async move { @@ -1391,12 +1402,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let commander = self.commander.clone(); let task = async move { @@ -1471,12 +1484,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let task = async move { let ExecuteCommandResponse { @@ -1540,12 +1555,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let task = async move { let ExecuteCommandResponse { @@ -1612,12 +1629,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let task = async move { let ExecuteCommandResponse { @@ -1652,6 +1671,7 @@ impl Container { let (stdin_tx, mut stdin_rx) = mpsc::channel(8); let (stdout_tx, stdout_rx) = mpsc::channel(8); let (stderr_tx, stderr_rx) = mpsc::channel(8); + let (status_tx, status_rx) = mpsc::channel(8); let (to_worker_tx, mut from_worker_rx) = self .commander @@ -1703,6 +1723,9 @@ impl Container { WorkerMessage::StderrPacket(packet) => { stderr_tx.send(packet).await.ok(/* Receiver gone, that's OK */); } + WorkerMessage::CommandStatistics(stats) => { + status_tx.send(stats).await.ok(/* Receiver gone, that's OK */); + } _ => return UnexpectedMessageSnafu.fail(), } }, @@ -1719,6 +1742,7 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, }) } @@ -1739,6 +1763,7 @@ pub struct ActiveExecution { pub stdin_tx: mpsc::Sender, pub stdout_rx: mpsc::Receiver, pub stderr_rx: mpsc::Receiver, + pub status_rx: mpsc::Receiver, } impl fmt::Debug for ActiveExecution { @@ -1999,6 +2024,7 @@ struct SpawnCargo { stdin_tx: mpsc::Sender, stdout_rx: mpsc::Receiver, stderr_rx: mpsc::Receiver, + status_rx: mpsc::Receiver, } #[derive(Debug, Snafu)] @@ -2544,7 +2570,10 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok #[cfg(test)] mod tests { use assertables::*; - use futures::{future::try_join_all, Future, FutureExt}; + use futures::{ + future::{join, try_join_all}, + Future, FutureExt, + }; use once_cell::sync::Lazy; use std::{env, sync::Once, time::Duration}; use tempdir::TempDir; @@ -2951,10 +2980,12 @@ mod tests { stdin_tx, stdout_rx, stderr_rx, + status_rx: _status_rx, } = coordinator.begin_execute(token, request).await.unwrap(); stdin_tx.send("this is stdin\n".into()).await.unwrap(); - // Purposefully not dropping stdin_tx early -- a user might forget + // Purposefully not dropping stdin_tx / status_rx early -- + // real users might forget. let WithOutput { response, @@ -3001,6 +3032,7 @@ mod tests { stdin_tx, stdout_rx, stderr_rx, + status_rx: _, } = coordinator.begin_execute(token, request).await.unwrap(); for i in 0..3 { @@ -3055,6 +3087,7 @@ mod tests { stdin_tx: _, mut stdout_rx, stderr_rx, + status_rx: _, } = coordinator .begin_execute(token.clone(), request) .await @@ -3086,6 +3119,72 @@ mod tests { Ok(()) } + #[tokio::test] + #[snafu::report] + async fn execute_status() -> Result<()> { + let coordinator = new_coordinator().await; + + let request = ExecuteRequest { + code: r#" + use std::{time::{Instant, Duration}, thread}; + + const MORE_THAN_STATUS_INTERVAL: Duration = Duration::from_millis(1100); + + fn main() { + let start = Instant::now(); + while start.elapsed() < MORE_THAN_STATUS_INTERVAL { + // Busy loop + } + thread::sleep(MORE_THAN_STATUS_INTERVAL); + } + "# + .into(), + ..ARBITRARY_EXECUTE_REQUEST + }; + + let token = CancellationToken::new(); + let ActiveExecution { + task, + stdin_tx: _, + stdout_rx, + stderr_rx, + mut status_rx, + } = coordinator + .begin_execute(token.clone(), request) + .await + .unwrap(); + + let statuses = async { + let mut statuses = Vec::new(); + while let Some(s) = status_rx.recv().await { + statuses.push(s); + } + statuses + }; + + let output = WithOutput::try_absorb(task, stdout_rx, stderr_rx); + + let (statuses, output) = join(statuses, output).with_timeout().await; + + let WithOutput { + response, stderr, .. + } = output.unwrap(); + + assert!(response.success, "{stderr}"); + + let [first, last] = [statuses.first(), statuses.last()].map(|s| s.unwrap().total_time_secs); + + let cpu_time_used = last - first; + assert!( + cpu_time_used > 1.0, + "CPU usage did not increase enough ({first} -> {last})" + ); + + coordinator.shutdown().await?; + + Ok(()) + } + const HELLO_WORLD_CODE: &str = r#"fn main() { println!("Hello World!"); }"#; const ARBITRARY_COMPILE_REQUEST: CompileRequest = CompileRequest { diff --git a/compiler/base/orchestrator/src/message.rs b/compiler/base/orchestrator/src/message.rs index fe728d5b4..d40e71286 100644 --- a/compiler/base/orchestrator/src/message.rs +++ b/compiler/base/orchestrator/src/message.rs @@ -46,6 +46,7 @@ pub enum WorkerMessage { ExecuteCommand(ExecuteCommandResponse), StdoutPacket(String), StderrPacket(String), + CommandStatistics(CommandStatistics), Error(SerializedError), } @@ -73,6 +74,7 @@ impl_narrow_to_broad!( DeleteFile => DeleteFileResponse, ReadFile => ReadFileResponse, ExecuteCommand => ExecuteCommandResponse, + CommandStatistics => CommandStatistics, ); impl_broad_to_narrow_with_error!( @@ -136,6 +138,12 @@ pub struct ExecuteCommandResponse { pub exit_detail: String, } +#[derive(Debug, Serialize, Deserialize)] +pub struct CommandStatistics { + pub total_time_secs: f64, + pub resident_set_size_bytes: u64, +} + #[derive(Debug, Serialize, Deserialize)] pub struct SerializedError(pub String); diff --git a/compiler/base/orchestrator/src/worker.rs b/compiler/base/orchestrator/src/worker.rs index 9931dccd6..a6d531b04 100644 --- a/compiler/base/orchestrator/src/worker.rs +++ b/compiler/base/orchestrator/src/worker.rs @@ -447,6 +447,12 @@ impl ProcessState { } }; + let statistics_task = tokio::task::spawn_blocking({ + let child_id = child.id(); + let worker_msg_tx = worker_msg_tx.clone(); + move || stream_command_statistics(child_id, worker_msg_tx) + }); + let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr); self.kill_tokens.insert(job_id, token.clone()); @@ -455,7 +461,17 @@ impl ProcessState { let stdin_shutdown_tx = self.stdin_shutdown_tx.clone(); async move { worker_msg_tx - .send(process_end(token, child, task_set, stdin_shutdown_tx, job_id).await) + .send( + process_end( + token, + child, + task_set, + statistics_task, + stdin_shutdown_tx, + job_id, + ) + .await, + ) .await .context(UnableToSendExecuteCommandResponseSnafu) } @@ -590,6 +606,7 @@ async fn process_end( token: CancellationToken, mut child: Child, mut task_set: JoinSet>, + statistics_task: tokio::task::JoinHandle>, stdin_shutdown_tx: mpsc::Sender, job_id: JobId, ) -> Result { @@ -613,6 +630,11 @@ async fn process_end( .context(StdioTaskFailedSnafu)?; } + statistics_task + .await + .context(StatisticsTaskPanickedSnafu)? + .context(StatisticsTaskFailedSnafu)?; + let success = status.success(); let exit_detail = extract_exit_detail(status); @@ -754,6 +776,12 @@ pub enum ProcessError { #[snafu(display("The command's stdio task failed"))] StdioTaskFailed { source: StdioError }, + #[snafu(display("The command's statistics task panicked"))] + StatisticsTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The command's statistics task failed"))] + StatisticsTaskFailed { source: CommandStatisticsError }, + #[snafu(display("Failed to send the command started response to the coordinator"))] UnableToSendExecuteCommandStartedResponse { source: MultiplexingSenderError }, @@ -767,6 +795,183 @@ pub enum ProcessError { ProcessTaskPanicked { source: tokio::task::JoinError }, } +#[cfg(target_os = "macos")] +mod stats { + use libc; + use mach2::mach_time::{mach_timebase_info, mach_timebase_info_data_t}; + use snafu::prelude::*; + use std::mem::MaybeUninit; + + use crate::message::CommandStatistics; + + pub struct Process { + pid: i32, + timebase: mach_timebase_info_data_t, + } + + impl Process { + pub fn new(pid: i32) -> Result { + let timebase = timebase()?; + Ok(Self { pid, timebase }) + } + + pub fn stats(&self) -> Option { + let usage = proc_pid_rusage(self.pid).ok()?; + + let total_time_secs = self.ticks_to_seconds(usage.ri_user_time + usage.ri_system_time); + let resident_set_size_bytes = usage.ri_resident_size; + + Some(CommandStatistics { + total_time_secs, + resident_set_size_bytes, + }) + } + + fn ticks_to_seconds(&self, v: u64) -> f64 { + let nanos = v as f64 / self.timebase.denom as f64 * self.timebase.numer as f64; + nanos / 1_000_000_000.0 + } + } + + fn timebase() -> Result { + let mut timebase = Default::default(); + + // SAFETY: We've initialized the data structure + let retval = unsafe { mach_timebase_info(&mut timebase) }; + + if retval != mach2::kern_return::KERN_SUCCESS { + Snafu.fail() + } else { + Ok(timebase) + } + } + + fn proc_pid_rusage(pid: i32) -> std::io::Result { + // SAFETY: We only access the usage information after checking + // the function call succeeded. + unsafe { + let mut ri = MaybeUninit::::uninit(); + + let retval = libc::proc_pid_rusage(pid, libc::RUSAGE_INFO_V4, ri.as_mut_ptr().cast()); + + if retval == 0 { + Ok(ri.assume_init()) + } else { + Err(std::io::Error::last_os_error()) + } + } + } + + #[derive(Debug, Snafu)] + #[snafu(display("Unable to get the timebase conversion"))] + pub struct Error; +} + +#[cfg(target_os = "linux")] +mod stats { + use procfs::process::Process as ProcfsProcess; + use snafu::prelude::*; + + use crate::message::CommandStatistics; + + pub struct Process { + process: ProcfsProcess, + ticks_per_second: u64, + page_size: u64, + } + + impl Process { + pub fn new(pid: i32) -> Result { + let process = ProcfsProcess::new(pid).context(Snafu)?; + + let ticks_per_second = procfs::ticks_per_second(); + let page_size = procfs::page_size(); + + Ok(Self { + process, + ticks_per_second, + page_size, + }) + } + + pub fn stats(&self) -> Option { + let stat = self.process.stat().ok()?; + + let total_time_secs = self.ticks_to_seconds(stat.utime + stat.stime); + let resident_set_size_bytes = self.pages_to_bytes(stat.rss); + + Some(CommandStatistics { + total_time_secs, + resident_set_size_bytes, + }) + } + + fn ticks_to_seconds(&self, v: u64) -> f64 { + v as f64 / self.ticks_per_second as f64 + } + + fn pages_to_bytes(&self, v: u64) -> u64 { + v * self.page_size + } + } + + #[derive(Debug, Snafu)] + #[snafu(display("Could not get information for the process"))] + pub struct Error { + source: procfs::ProcError, + } +} + +fn stream_command_statistics( + child_id: Option, + worker_msg_tx: MultiplexingSender, +) -> Result<(), CommandStatisticsError> { + use command_statistics_error::*; + use stats::*; + use std::time::Duration; + + const STATISTIC_INTERVAL: Duration = Duration::from_secs(1); + + let process_id = child_id.context(ChildIdMissingSnafu)?; + + let process_id = process_id + .try_into() + .context(ProcessIdOutOfRangeSnafu { process_id })?; + + let process = Process::new(process_id).context(InvalidProcessSnafu { process_id })?; + + while let Some(stats) = process.stats() { + let sent = futures::executor::block_on(worker_msg_tx.send_ok(stats)); + if sent.is_err() { + // No one listening anymore + break; + } + + std::thread::sleep(STATISTIC_INTERVAL); + } + + Ok(()) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum CommandStatisticsError { + #[snafu(display("The child did not have a process ID"))] + ChildIdMissing, + + #[snafu(display("The process ID {process_id} could not be converted"))] + ProcessIdOutOfRange { + source: std::num::TryFromIntError, + process_id: u32, + }, + + #[snafu(display("The process ID {process_id} is not valid"))] + InvalidProcess { + source: stats::Error, + process_id: i32, + }, +} + fn stream_stdio( coordinator_tx: MultiplexingSender, mut stdin_rx: mpsc::Receiver, diff --git a/ui/Cargo.lock b/ui/Cargo.lock index 68bf17db0..25e5ab977 100644 --- a/ui/Cargo.lock +++ b/ui/Cargo.lock @@ -549,6 +549,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.11" @@ -836,6 +842,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -1017,7 +1032,10 @@ dependencies = [ "asm-cleanup", "bincode", "futures", + "libc", + "mach2", "modify-cargo-toml", + "procfs", "serde", "serde_json", "snafu", @@ -1136,6 +1154,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags 2.4.1", + "hex", + "lazy_static", + "procfs-core", + "rustix", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags 2.4.1", + "hex", +] + [[package]] name = "prometheus" version = "0.13.3" diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index 43c31d93f..4675e4865 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -589,8 +589,9 @@ async fn handle_execute_inner( stdin_tx, mut stdout_rx, mut stderr_rx, + mut status_rx, } = coordinator - .begin_execute(token.clone(), req) + .begin_execute(token.clone(), req.clone()) .await .context(BeginSnafu)?; @@ -613,6 +614,8 @@ async fn handle_execute_inner( .await }; + let mut reported = false; + let status = loop { tokio::select! { status = &mut task => break status, @@ -644,6 +647,13 @@ async fn handle_execute_inner( let sent = send_stderr(stderr).await; abandon_if_closed!(sent); }, + + Some(status) = status_rx.recv() => { + if !reported && status.total_time_secs > 60.0 { + error!("Request consumed more than 60s of CPU time: {req:?}"); + reported = true; + } + } } };