From c235c5aca2bebb3087f615ada4e482e30f483aca Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Thu, 22 Apr 2021 23:55:58 -0700 Subject: [PATCH 1/3] Use wait3 to accurately time spawned processes We fallback to the old method on win32. Signed-off-by: Rudi Grinberg --- otherlibs/stdune-unstable/dune | 2 +- otherlibs/stdune-unstable/proc.ml | 15 ++++ otherlibs/stdune-unstable/proc.mli | 7 ++ otherlibs/stdune-unstable/wait3_stubs.c | 73 +++++++++++++++++++ src/dune_engine/process.ml | 37 ++++++++-- src/dune_engine/scheduler.ml | 53 +++++++++----- src/dune_engine/scheduler.mli | 3 +- .../test-cases/trace-file.t/run.t | 17 ++--- 8 files changed, 170 insertions(+), 37 deletions(-) create mode 100644 otherlibs/stdune-unstable/wait3_stubs.c diff --git a/otherlibs/stdune-unstable/dune b/otherlibs/stdune-unstable/dune index f2dd226752b..71d08d9b397 100644 --- a/otherlibs/stdune-unstable/dune +++ b/otherlibs/stdune-unstable/dune @@ -10,4 +10,4 @@ dune_filesystem_stubs) (foreign_stubs (language c) - (names fcntl_stubs))) + (names fcntl_stubs wait3_stubs))) diff --git a/otherlibs/stdune-unstable/proc.ml b/otherlibs/stdune-unstable/proc.ml index 0278fb89d48..d29028f2baf 100644 --- a/otherlibs/stdune-unstable/proc.ml +++ b/otherlibs/stdune-unstable/proc.ml @@ -22,3 +22,18 @@ let restore_cwd_and_execve prog argv ~env = Stdlib.do_at_exit (); Unix.execve prog argv env ) + +type resource_usage = + { utime : float + ; stime : float + } + +external stub_wait3 : + Unix.wait_flag list -> int * Unix.process_status * resource_usage + = "dune_wait3" + +let wait3 flags = + if Sys.win32 then + Code_error.raise "wait3 not available on windows" [] + else + stub_wait3 flags diff --git a/otherlibs/stdune-unstable/proc.mli b/otherlibs/stdune-unstable/proc.mli index 9519bbaa015..c12cc90f9b2 100644 --- a/otherlibs/stdune-unstable/proc.mli +++ b/otherlibs/stdune-unstable/proc.mli @@ -1 +1,8 @@ val restore_cwd_and_execve : string -> string list -> env:Env.t -> _ + +type resource_usage = + { utime : float + ; stime : float + } + +val wait3 : Unix.wait_flag list -> int * Unix.process_status * resource_usage diff --git a/otherlibs/stdune-unstable/wait3_stubs.c b/otherlibs/stdune-unstable/wait3_stubs.c new file mode 100644 index 00000000000..abe4f89e018 --- /dev/null +++ b/otherlibs/stdune-unstable/wait3_stubs.c @@ -0,0 +1,73 @@ +#include + +#ifdef _WIN32 +#include + +void dune_wait3(value flags) { + caml_failwith("wait3: not supported on windows"); +} + +#else + +#include +#include +#include +#include + +#include +#include +#include +#include + +#define TAG_WEXITED 0 +#define TAG_WSIGNALED 1 +#define TAG_WSTOPPED 2 + +CAMLextern int caml_convert_signal_number(int); +CAMLextern int caml_rev_convert_signal_number(int); + +static value alloc_process_status(int status) { + value st; + + if (WIFEXITED(status)) { + st = caml_alloc_small(1, TAG_WEXITED); + Field(st, 0) = Val_int(WEXITSTATUS(status)); + } else if (WIFSTOPPED(status)) { + st = caml_alloc_small(1, TAG_WSTOPPED); + Field(st, 0) = Val_int(caml_rev_convert_signal_number(WSTOPSIG(status))); + } else { + st = caml_alloc_small(1, TAG_WSIGNALED); + Field(st, 0) = Val_int(caml_rev_convert_signal_number(WTERMSIG(status))); + } + return st; +} + +static int wait_flag_table[] = {WNOHANG, WUNTRACED}; + +value dune_wait3(value flags) { + CAMLparam1(flags); + CAMLlocal2(times, res); + + int pid, status, cv_flags; + cv_flags = caml_convert_flag_list(flags, wait_flag_table); + + struct rusage ru; + + caml_enter_blocking_section(); + pid = wait3(&status, cv_flags, &ru); + caml_leave_blocking_section(); + if (pid == -1) + uerror("wait3", Nothing); + + times = caml_alloc_small(2 * Double_wosize, Double_array_tag); + Store_double_field(times, 0, ru.ru_utime.tv_sec + ru.ru_utime.tv_usec / 1e6); + Store_double_field(times, 1, ru.ru_stime.tv_sec + ru.ru_stime.tv_usec / 1e6); + + res = caml_alloc_tuple(3); + Store_field(res, 0, Val_int(pid)); + Store_field(res, 1, alloc_process_status(status)); + Store_field(res, 2, times); + CAMLreturn(res); +} + +#endif diff --git a/src/dune_engine/process.ml b/src/dune_engine/process.ml index b590af2ace0..5d2f54b5b78 100644 --- a/src/dune_engine/process.ml +++ b/src/dune_engine/process.ml @@ -493,6 +493,22 @@ let report_process_end stats common ~id = let event = Event.async (Int id) End common in Stats.emit stats event +let report_process_event stats ~prog ~args + (resource_usage : Proc.resource_usage) = + let common = + let name = Filename.basename prog in + let ts = Timestamp.now () in + Event.common_fields ~cat:[ "process" ] ~name ~ts () + in + let args = + [ ("process_args", `List (List.map args ~f:(fun arg -> `String arg))) ] + in + let dur = + Chrome_trace.Event.Timestamp.of_float_seconds resource_usage.stime + in + let event = Event.complete ~args ~dur common in + Stats.emit stats event + let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr) ?(stdin_from = Io.null In) ?(env = Env.initial) ~purpose fail_mode prog args = @@ -596,8 +612,12 @@ let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr) env |> Dtemp.add_to_env |> Scheduler.Config.add_to_env config in let event_common = - Option.map config.stats - ~f:(report_process_start ~id ~prog:prog_str ~args) + match config.stats with + | Some stats when Sys.win32 -> + (* We track process times manually on windows. Elsewhere, we can use + [wait3] *) + Some (report_process_start stats ~id ~prog:prog_str ~args) + | _ -> None in let env = Env.to_unix env |> Spawn.Env.of_list in let pid = @@ -612,11 +632,14 @@ let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr) in Io.release stdout_to; Io.release stderr_to; - let+ exit_status = Scheduler.wait_for_process pid in - (match (event_common, config.stats) with - | Some common, Some stats -> report_process_end stats common ~id - | None, None -> () - | _, _ -> assert false); + let+ exit_status, resource_usage = Scheduler.wait_for_process pid in + (match (event_common, config.stats, resource_usage) with + | Some common, Some stats, None -> report_process_end stats common ~id + | None, Some stats, Some resource_usage -> + report_process_event stats ~prog:prog_str ~args resource_usage + | None, None, _ -> () + | None, Some _, None -> assert false + | Some _, _, _ -> assert false); Option.iter response_file ~f:Path.unlink; let actual_stdout = match stdout_capture with diff --git a/src/dune_engine/scheduler.ml b/src/dune_engine/scheduler.ml index 8b04a1ad043..0b8fcdab670 100644 --- a/src/dune_engine/scheduler.ml +++ b/src/dune_engine/scheduler.ml @@ -52,7 +52,7 @@ end type job = { pid : Pid.t - ; ivar : Unix.process_status Fiber.Ivar.t + ; ivar : (Unix.process_status * Proc.resource_usage option) Fiber.Ivar.t } module Signal = struct @@ -113,7 +113,7 @@ end module Event : sig type t = | Files_changed of Path.t list - | Job_completed of job * Unix.process_status + | Job_completed of job * Unix.process_status * Proc.resource_usage option | Signal of Signal.t | Rpc of Fiber.fill @@ -150,23 +150,27 @@ module Event : sig (** Send an event to the main thread. *) val send_files_changed : t -> Path.t list -> unit - val send_job_completed : t -> job -> Unix.process_status -> unit + val send_job_completed : + t -> job -> Unix.process_status -> Proc.resource_usage option -> unit val send_signal : t -> Signal.t -> unit val send_sync_task : t -> (unit -> unit) -> unit + + val track_resources : t -> bool end with type event := t end = struct type t = | Files_changed of Path.t list - | Job_completed of job * Unix.process_status + | Job_completed of job * Unix.process_status * Proc.resource_usage option | Signal of Signal.t | Rpc of Fiber.fill module Queue = struct type t = - { jobs_completed : (job * Unix.process_status) Queue.t + { jobs_completed : + (job * Unix.process_status * Proc.resource_usage option) Queue.t ; sync_tasks : (unit -> unit) Queue.t ; mutable files_changed : Path.t list ; mutable signals : Signal.Set.t @@ -256,10 +260,10 @@ end = struct | None -> q.pending_rpc <- q.pending_rpc - 1; Rpc (Queue.pop_exn q.rpc_completed) - | Some (job, status) -> + | Some (job, status, resource_usage) -> q.pending_jobs <- q.pending_jobs - 1; assert (q.pending_jobs >= 0); - Job_completed (job, status)) + Job_completed (job, status, resource_usage)) | fns -> ( q.files_changed <- []; let files = @@ -294,10 +298,10 @@ end = struct if not avail then Condition.signal q.cond; Mutex.unlock q.mutex - let send_job_completed q job status = + let send_job_completed q job status resource_usage = Mutex.lock q.mutex; let avail = available q in - Queue.push q.jobs_completed (job, status); + Queue.push q.jobs_completed (job, status, resource_usage); if not avail then Condition.signal q.cond; Mutex.unlock q.mutex @@ -318,6 +322,8 @@ end = struct let pending_jobs q = q.pending_jobs let pending_rpc q = q.pending_rpc + + let track_resources q = Option.is_some q.stats end end @@ -515,7 +521,12 @@ end = struct module Process_table : sig val add : t -> job -> unit - val remove : t -> pid:Pid.t -> Unix.process_status -> unit + val remove : + t + -> pid:Pid.t + -> Unix.process_status + -> Proc.resource_usage option + -> unit val running_count : t -> int @@ -529,16 +540,16 @@ end = struct if t.running_count = 1 then Condition.signal t.something_is_running | Some (Zombie status) -> Table.remove t.table job.pid; - Event.Queue.send_job_completed t.events job status + Event.Queue.send_job_completed t.events job status None | Some (Running _) -> assert false - let remove t ~pid status = + let remove t ~pid status ru = match Table.find t.table pid with | None -> Table.set t.table pid (Zombie status) | Some (Running job) -> t.running_count <- t.running_count - 1; Table.remove t.table pid; - Event.Queue.send_job_completed t.events job status + Event.Queue.send_job_completed t.events job status ru | Some (Zombie _) -> assert false let iter t ~f = @@ -575,7 +586,7 @@ end = struct | Finished (job, status) -> (* We need to do the [Unix.waitpid] and remove the process while holding the lock, otherwise the pid might be reused in between. *) - Process_table.remove t ~pid:job.pid status; + Process_table.remove t ~pid:job.pid status None; true let wait_win32 t = @@ -587,10 +598,17 @@ end = struct let wait_unix t = Mutex.unlock t.mutex; - let pid, status = Unix.wait () in + let pid, status, resource_usage = + if Event.Queue.track_resources t.events then + let pid, status, ru = Proc.wait3 [] in + (pid, status, Some ru) + else + let pid, status = Unix.wait () in + (pid, status, None) + in Mutex.lock t.mutex; let pid = Pid.of_int pid in - Process_table.remove t ~pid status + Process_table.remove t ~pid status resource_usage let wait = if Sys.win32 then @@ -846,7 +864,8 @@ end = struct else ( t.handler t.config Tick; match Event.Queue.next t.events with - | Job_completed (job, status) -> Fiber.Fill (job.ivar, status) + | Job_completed (job, status, resource_usage) -> + Fiber.Fill (job.ivar, (status, resource_usage)) | Files_changed changed_files -> ( (* CR-someday amokhov: In addition to tracking files, we also need to track directory listings. Otherwise, when a new file is added to a diff --git a/src/dune_engine/scheduler.mli b/src/dune_engine/scheduler.mli index 8dc8e56d382..d9baf265b8f 100644 --- a/src/dune_engine/scheduler.mli +++ b/src/dune_engine/scheduler.mli @@ -80,7 +80,8 @@ val t : unit -> t Fiber.t val with_job_slot : (Config.t -> 'a Fiber.t) -> 'a Fiber.t (** Wait for the following process to terminate *) -val wait_for_process : Pid.t -> Unix.process_status Fiber.t +val wait_for_process : + Pid.t -> (Unix.process_status * Proc.resource_usage option) Fiber.t (** Wait for dune cache to be disconnected. Drop any other event. *) val wait_for_dune_cache : unit -> unit diff --git a/test/blackbox-tests/test-cases/trace-file.t/run.t b/test/blackbox-tests/test-cases/trace-file.t/run.t index aebc437f3f8..eb1f1d7954d 100644 --- a/test/blackbox-tests/test-cases/trace-file.t/run.t +++ b/test/blackbox-tests/test-cases/trace-file.t/run.t @@ -2,17 +2,12 @@ This captures the commands that are being run: - $ Date: Sat, 24 Apr 2021 13:55:57 -0700 Subject: [PATCH 2/3] Use fiber to run benchmarks Signed-off-by: Rudi Grinberg --- bench/bench.ml | 78 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 27 deletions(-) diff --git a/bench/bench.ml b/bench/bench.ml index 6f097e8ec2a..e21ceff10f6 100644 --- a/bench/bench.ml +++ b/bench/bench.ml @@ -1,4 +1,6 @@ open Stdune +module Process = Dune_engine.Process +module Config = Dune_util.Config module Json = struct include Chrome_trace.Json @@ -69,8 +71,6 @@ module Package = struct let make org name = { org; name } let clone t = - let module Process = Dune_engine.Process in - let module Config = Dune_util.Config in let stdout_to = Process.Io.(file Config.dev_null Out) in let stderr_to = Process.Io.(file Config.dev_null Out) in let stdin_from = Process.Io.(null In) in @@ -83,21 +83,10 @@ let duniverse = [ pkg "ocaml-dune" "dune-bench" ] let prepare_workspace () = - let module Scheduler = Dune_engine.Scheduler in - let config = - { Scheduler.Config.concurrency = 10 - ; display = Quiet - ; rpc = None - ; stats = None - } - in - Scheduler.Run.go config - ~on_event:(fun _ _ -> ()) - (fun () -> - Fiber.parallel_iter duniverse ~f:(fun (pkg : Package.t) -> - Fpath.rm_rf pkg.name; - Format.eprintf "cloning %s/%s@." pkg.org pkg.name; - Package.clone pkg)) + Fiber.parallel_iter duniverse ~f:(fun (pkg : Package.t) -> + Fpath.rm_rf pkg.name; + Format.eprintf "cloning %s/%s@." pkg.org pkg.name; + Package.clone pkg) let with_timer f = let start = Unix.time () in @@ -105,28 +94,63 @@ let with_timer f = let stop = Unix.time () in (stop -. start, res) +let dune_build () = + let stdin_from = Process.(Io.null In) in + let stdout_to = Process.(Io.file Config.dev_null Out) in + let stderr_to = Process.(Io.file Config.dev_null Out) in + let start = Unix.time () in + let open Fiber.O in + let+ () = + Process.run Strict (Lazy.force dune) ~stdin_from ~stdout_to ~stderr_to + [ "build"; "@install"; "--root"; "." ] + in + let stop = Unix.time () in + stop -. start + +let run_bench () = + let open Fiber.O in + let* clean = dune_build () in + let+ zero = + let open Fiber.O in + let rec zero acc n = + if n = 0 then + Fiber.return (List.rev acc) + else + let* time = dune_build () in + zero (time :: acc) (pred n) + in + zero [] 5 + in + (clean, zero) + let () = Dune_util.Log.init ~file:No_log_file (); let dir = Temp.create Dir ~prefix:"dune." ~suffix:".bench" in Sys.chdir (Path.to_string dir); - prepare_workspace (); - let clean, _ = - with_timer (fun () -> Sys.command "dune build @install --root . 1>&2") + let module Scheduler = Dune_engine.Scheduler in + let config = + { Scheduler.Config.concurrency = 10 + ; display = Quiet + ; rpc = None + ; stats = None + } in - let zeros = - List.init 5 ~f:(fun _ -> - let time, _ = - with_timer (fun () -> Sys.command "dune build @install --root . 1>&2") - in - `Float time) + let clean, zero = + Scheduler.Run.go config + ~on_event:(fun _ _ -> ()) + (fun () -> + let open Fiber.O in + let* () = prepare_workspace () in + run_bench ()) in + let zero = List.map zero ~f:(fun t -> `Float t) in let size = let stat : Unix.stats = Path.stat_exn (Lazy.force dune) in stat.st_size in let results = [ { Output.name = "clean_build"; metrics = [ ("time", `Float clean) ] } - ; { Output.name = "zero_build"; metrics = [ ("time", `List zeros) ] } + ; { Output.name = "zero_build"; metrics = [ ("time", `List zero) ] } ; { Output.name = "dune_size"; metrics = [ ("size", `Int size) ] } ] in From adbecd592e120832c86a5aa49f8f84dc26755044 Mon Sep 17 00:00:00 2001 From: Jeremie Dimino Date: Tue, 27 Apr 2021 12:29:40 +0100 Subject: [PATCH 3/3] _ Signed-off-by: Jeremie Dimino --- bench/bench.ml | 14 +-- otherlibs/stdune-unstable/proc.ml | 37 ++++++-- otherlibs/stdune-unstable/proc.mli | 32 +++++-- otherlibs/stdune-unstable/wait3_stubs.c | 7 +- src/dune_engine/process.ml | 113 ++++++++++++------------ src/dune_engine/process.mli | 11 +++ src/dune_engine/scheduler.ml | 79 +++++++---------- src/dune_engine/scheduler.mli | 3 +- 8 files changed, 167 insertions(+), 129 deletions(-) diff --git a/bench/bench.ml b/bench/bench.ml index e21ceff10f6..78cfc7b95ea 100644 --- a/bench/bench.ml +++ b/bench/bench.ml @@ -88,24 +88,16 @@ let prepare_workspace () = Format.eprintf "cloning %s/%s@." pkg.org pkg.name; Package.clone pkg) -let with_timer f = - let start = Unix.time () in - let res = f () in - let stop = Unix.time () in - (stop -. start, res) - let dune_build () = let stdin_from = Process.(Io.null In) in let stdout_to = Process.(Io.file Config.dev_null Out) in let stderr_to = Process.(Io.file Config.dev_null Out) in - let start = Unix.time () in let open Fiber.O in - let+ () = - Process.run Strict (Lazy.force dune) ~stdin_from ~stdout_to ~stderr_to + let+ times = + Process.run_with_times (Lazy.force dune) ~stdin_from ~stdout_to ~stderr_to [ "build"; "@install"; "--root"; "." ] in - let stop = Unix.time () in - stop -. start + times.elapsed_time let run_bench () = let open Fiber.O in diff --git a/otherlibs/stdune-unstable/proc.ml b/otherlibs/stdune-unstable/proc.ml index d29028f2baf..6d5c7495b5e 100644 --- a/otherlibs/stdune-unstable/proc.ml +++ b/otherlibs/stdune-unstable/proc.ml @@ -23,17 +23,40 @@ let restore_cwd_and_execve prog argv ~env = Unix.execve prog argv env ) -type resource_usage = - { utime : float - ; stime : float - } +module Resource_usage = struct + type t = + { user_cpu_time : float + ; system_cpu_time : float + } +end + +module Times = struct + type t = + { elapsed_time : float + ; resource_usage : Resource_usage.t option + } +end + +module Process_info = struct + type t = + { pid : Pid.t + ; status : Unix.process_status + ; end_time : float + ; resource_usage : Resource_usage.t option + } +end external stub_wait3 : - Unix.wait_flag list -> int * Unix.process_status * resource_usage + Unix.wait_flag list -> int * Unix.process_status * float * Resource_usage.t = "dune_wait3" -let wait3 flags = +let wait flags = if Sys.win32 then Code_error.raise "wait3 not available on windows" [] else - stub_wait3 flags + let pid, status, end_time, resource_usage = stub_wait3 flags in + { Process_info.pid = Pid.of_int pid + ; status + ; end_time + ; resource_usage = Some resource_usage + } diff --git a/otherlibs/stdune-unstable/proc.mli b/otherlibs/stdune-unstable/proc.mli index c12cc90f9b2..a8e142ade2e 100644 --- a/otherlibs/stdune-unstable/proc.mli +++ b/otherlibs/stdune-unstable/proc.mli @@ -1,8 +1,30 @@ val restore_cwd_and_execve : string -> string list -> env:Env.t -> _ -type resource_usage = - { utime : float - ; stime : float - } +module Resource_usage : sig + type t = + { user_cpu_time : float + (** Same as the "user" time reported by the "time" command *) + ; system_cpu_time : float + (** Same as the "sys" time reported by the "time" command *) + } +end -val wait3 : Unix.wait_flag list -> int * Unix.process_status * resource_usage +module Times : sig + type t = + { elapsed_time : float + (** Same as the "real" time reported by the "time" command *) + ; resource_usage : Resource_usage.t option + } +end + +module Process_info : sig + type t = + { pid : Pid.t + ; status : Unix.process_status + ; end_time : float (** Time at which the process finished. *) + ; resource_usage : Resource_usage.t option + } +end + +(** This function is not implemented on Windows *) +val wait : Unix.wait_flag list -> Process_info.t diff --git a/otherlibs/stdune-unstable/wait3_stubs.c b/otherlibs/stdune-unstable/wait3_stubs.c index abe4f89e018..780360e0610 100644 --- a/otherlibs/stdune-unstable/wait3_stubs.c +++ b/otherlibs/stdune-unstable/wait3_stubs.c @@ -49,12 +49,14 @@ value dune_wait3(value flags) { CAMLlocal2(times, res); int pid, status, cv_flags; + struct timeval tp; cv_flags = caml_convert_flag_list(flags, wait_flag_table); struct rusage ru; caml_enter_blocking_section(); pid = wait3(&status, cv_flags, &ru); + gettimeofday(&tp, NULL); caml_leave_blocking_section(); if (pid == -1) uerror("wait3", Nothing); @@ -63,10 +65,11 @@ value dune_wait3(value flags) { Store_double_field(times, 0, ru.ru_utime.tv_sec + ru.ru_utime.tv_usec / 1e6); Store_double_field(times, 1, ru.ru_stime.tv_sec + ru.ru_stime.tv_usec / 1e6); - res = caml_alloc_tuple(3); + res = caml_alloc_tuple(4); Store_field(res, 0, Val_int(pid)); Store_field(res, 1, alloc_process_status(status)); - Store_field(res, 2, times); + Store_field(res, 2, caml_copy_double(((double) tp.tv_sec + (double) tp.tv_usec / 1e6))); + Store_field(res, 3, times); CAMLreturn(res); } diff --git a/src/dune_engine/process.ml b/src/dune_engine/process.ml index 5d2f54b5b78..da68c5c114f 100644 --- a/src/dune_engine/process.ml +++ b/src/dune_engine/process.ml @@ -475,10 +475,10 @@ module Exit_status = struct :: Option.to_list output) end -let report_process_start stats ~id ~prog ~args = +let report_process_start stats ~id ~prog ~args ~now = let common = let name = Filename.basename prog in - let ts = Timestamp.now () in + let ts = Timestamp.of_float_seconds now in Event.common_fields ~cat:[ "process" ] ~name ~ts () in let args = @@ -486,26 +486,11 @@ let report_process_start stats ~id ~prog ~args = in let event = Event.async (Int id) ~args Start common in Stats.emit stats event; - common + (common, args) -let report_process_end stats common ~id = - let common = Event.set_ts common (Timestamp.now ()) in - let event = Event.async (Int id) End common in - Stats.emit stats event - -let report_process_event stats ~prog ~args - (resource_usage : Proc.resource_usage) = - let common = - let name = Filename.basename prog in - let ts = Timestamp.now () in - Event.common_fields ~cat:[ "process" ] ~name ~ts () - in - let args = - [ ("process_args", `List (List.map args ~f:(fun arg -> `String arg))) ] - in - let dur = - Chrome_trace.Event.Timestamp.of_float_seconds resource_usage.stime - in +let report_process_end stats (common, args) ~now (times : Proc.Times.t) = + let common = Event.set_ts common (Timestamp.of_float_seconds now) in + let dur = Chrome_trace.Event.Timestamp.of_float_seconds times.elapsed_time in let event = Event.complete ~args ~dur common in Stats.emit stats event @@ -602,7 +587,7 @@ let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr) (stdout, stderr) | _ -> ((`No_capture, stdout_to), (`No_capture, stderr_to)) in - let event_common, pid = + let event_common, started_at, pid = (* Output.fd might create the file with Unix.openfile. We need to make sure to call it before doing the chdir as the path might be relative. *) let stdout = Io.fd stdout_to in @@ -611,35 +596,37 @@ let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr) let env = env |> Dtemp.add_to_env |> Scheduler.Config.add_to_env config in - let event_common = - match config.stats with - | Some stats when Sys.win32 -> - (* We track process times manually on windows. Elsewhere, we can use - [wait3] *) - Some (report_process_start stats ~id ~prog:prog_str ~args) - | _ -> None - in let env = Env.to_unix env |> Spawn.Env.of_list in - let pid = - Spawn.spawn () ~prog:prog_str ~argv ~env ~stdout ~stderr ~stdin - ~cwd: - (match dir with - | None -> Inherit - | Some dir -> Path (Path.to_string dir)) - |> Pid.of_int + let started_at, pid = + (* jeremiedimino: I think we should do this just before the [execve] + in the stub for [Spawn.spawn] to be as precise as possible *) + let now = Unix.gettimeofday () in + ( now + , Spawn.spawn () ~prog:prog_str ~argv ~env ~stdout ~stderr ~stdin + ~cwd: + (match dir with + | None -> Inherit + | Some dir -> Path (Path.to_string dir)) + |> Pid.of_int ) + in + let event_common = + Option.map config.stats ~f:(fun stats -> + ( stats + , report_process_start stats ~id ~prog:prog_str ~args + ~now:started_at )) in - (event_common, pid) + (event_common, started_at, pid) in Io.release stdout_to; Io.release stderr_to; - let+ exit_status, resource_usage = Scheduler.wait_for_process pid in - (match (event_common, config.stats, resource_usage) with - | Some common, Some stats, None -> report_process_end stats common ~id - | None, Some stats, Some resource_usage -> - report_process_event stats ~prog:prog_str ~args resource_usage - | None, None, _ -> () - | None, Some _, None -> assert false - | Some _, _, _ -> assert false); + let+ process_info = Scheduler.wait_for_process pid in + let times = + { Proc.Times.elapsed_time = process_info.end_time -. started_at + ; resource_usage = process_info.resource_usage + } + in + Option.iter event_common ~f:(fun (stats, common) -> + report_process_end stats common ~now:process_info.end_time times); Option.iter response_file ~f:Path.unlink; let actual_stdout = match stdout_capture with @@ -667,7 +654,7 @@ let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr) has_unexpected_output stderr_on_success actual_stderr in let exit_status' : Exit_status.t = - match exit_status with + match process_info.status with | WEXITED n when (not has_unexpected_stdout) && (not has_unexpected_stderr) @@ -703,31 +690,43 @@ let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr) swallow_on_success_if_requested fn actual_stderr stderr_on_success in let output = stdout ^ stderr in - Log.command ~command_line ~output ~exit_status; - match (display, exit_status', output) with - | (Quiet | Progress), Ok n, "" -> n (* Optimisation for the common case *) - | Verbose, _, _ -> - Exit_status.handle_verbose exit_status' ~id - ~command_line:fancy_command_line ~output - | _ -> - Exit_status.handle_non_verbose exit_status' ~prog:prog_str ~command_line - ~output ~purpose ~display ~has_unexpected_stdout - ~has_unexpected_stderr) + Log.command ~command_line ~output ~exit_status:process_info.status; + let res = + match (display, exit_status', output) with + | (Quiet | Progress), Ok n, "" -> + n (* Optimisation for the common case *) + | Verbose, _, _ -> + Exit_status.handle_verbose exit_status' ~id + ~command_line:fancy_command_line ~output + | _ -> + Exit_status.handle_non_verbose exit_status' ~prog:prog_str + ~command_line ~output ~purpose ~display ~has_unexpected_stdout + ~has_unexpected_stderr + in + (res, times)) let run ?dir ?stdout_to ?stderr_to ?stdin_from ?env ?(purpose = Internal_job) fail_mode prog args = let+ run = run_internal ?dir ?stdout_to ?stderr_to ?stdin_from ?env ~purpose fail_mode prog args + >>| fst in map_result fail_mode run ~f:ignore +let run_with_times ?dir ?stdout_to ?stderr_to ?stdin_from ?env + ?(purpose = Internal_job) prog args = + run_internal ?dir ?stdout_to ?stderr_to ?stdin_from ?env ~purpose Strict prog + args + >>| snd + let run_capture_gen ?dir ?stderr_to ?stdin_from ?env ?(purpose = Internal_job) fail_mode prog args ~f = let fn = Temp.create File ~prefix:"dune" ~suffix:".output" in let+ run = run_internal ?dir ~stdout_to:(Io.file fn Io.Out) ?stderr_to ?stdin_from ?env ~purpose fail_mode prog args + >>| fst in map_result fail_mode run ~f:(fun () -> let x = f fn in diff --git a/src/dune_engine/process.mli b/src/dune_engine/process.mli index d9f9788f19a..3f4996acca0 100644 --- a/src/dune_engine/process.mli +++ b/src/dune_engine/process.mli @@ -69,6 +69,17 @@ val run : -> string list -> 'a Fiber.t +val run_with_times : + ?dir:Path.t + -> ?stdout_to:Io.output Io.t + -> ?stderr_to:Io.output Io.t + -> ?stdin_from:Io.input Io.t + -> ?env:Env.t + -> ?purpose:purpose + -> Path.t + -> string list + -> Proc.Times.t Fiber.t + (** Run a command and capture its output *) val run_capture : ?dir:Path.t diff --git a/src/dune_engine/scheduler.ml b/src/dune_engine/scheduler.ml index 0b8fcdab670..81934ca724d 100644 --- a/src/dune_engine/scheduler.ml +++ b/src/dune_engine/scheduler.ml @@ -52,7 +52,7 @@ end type job = { pid : Pid.t - ; ivar : (Unix.process_status * Proc.resource_usage option) Fiber.Ivar.t + ; ivar : Proc.Process_info.t Fiber.Ivar.t } module Signal = struct @@ -113,7 +113,7 @@ end module Event : sig type t = | Files_changed of Path.t list - | Job_completed of job * Unix.process_status * Proc.resource_usage option + | Job_completed of job * Proc.Process_info.t | Signal of Signal.t | Rpc of Fiber.fill @@ -150,27 +150,23 @@ module Event : sig (** Send an event to the main thread. *) val send_files_changed : t -> Path.t list -> unit - val send_job_completed : - t -> job -> Unix.process_status -> Proc.resource_usage option -> unit + val send_job_completed : t -> job -> Proc.Process_info.t -> unit val send_signal : t -> Signal.t -> unit val send_sync_task : t -> (unit -> unit) -> unit - - val track_resources : t -> bool end with type event := t end = struct type t = | Files_changed of Path.t list - | Job_completed of job * Unix.process_status * Proc.resource_usage option + | Job_completed of job * Proc.Process_info.t | Signal of Signal.t | Rpc of Fiber.fill module Queue = struct type t = - { jobs_completed : - (job * Unix.process_status * Proc.resource_usage option) Queue.t + { jobs_completed : (job * Proc.Process_info.t) Queue.t ; sync_tasks : (unit -> unit) Queue.t ; mutable files_changed : Path.t list ; mutable signals : Signal.Set.t @@ -260,10 +256,10 @@ end = struct | None -> q.pending_rpc <- q.pending_rpc - 1; Rpc (Queue.pop_exn q.rpc_completed) - | Some (job, status, resource_usage) -> + | Some (job, proc_info) -> q.pending_jobs <- q.pending_jobs - 1; assert (q.pending_jobs >= 0); - Job_completed (job, status, resource_usage)) + Job_completed (job, proc_info)) | fns -> ( q.files_changed <- []; let files = @@ -298,10 +294,10 @@ end = struct if not avail then Condition.signal q.cond; Mutex.unlock q.mutex - let send_job_completed q job status resource_usage = + let send_job_completed q job proc_info = Mutex.lock q.mutex; let avail = available q in - Queue.push q.jobs_completed (job, status, resource_usage); + Queue.push q.jobs_completed (job, proc_info); if not avail then Condition.signal q.cond; Mutex.unlock q.mutex @@ -322,8 +318,6 @@ end = struct let pending_jobs q = q.pending_jobs let pending_rpc q = q.pending_rpc - - let track_resources q = Option.is_some q.stats end end @@ -506,7 +500,7 @@ module Process_watcher : sig end = struct type process_state = | Running of job - | Zombie of Unix.process_status + | Zombie of Proc.Process_info.t (* This mutable table is safe: it does not interact with the state we track in the build system. *) @@ -521,12 +515,7 @@ end = struct module Process_table : sig val add : t -> job -> unit - val remove : - t - -> pid:Pid.t - -> Unix.process_status - -> Proc.resource_usage option - -> unit + val remove : t -> Proc.Process_info.t -> unit val running_count : t -> int @@ -538,18 +527,18 @@ end = struct Table.set t.table job.pid (Running job); t.running_count <- t.running_count + 1; if t.running_count = 1 then Condition.signal t.something_is_running - | Some (Zombie status) -> + | Some (Zombie proc_info) -> Table.remove t.table job.pid; - Event.Queue.send_job_completed t.events job status None + Event.Queue.send_job_completed t.events job proc_info | Some (Running _) -> assert false - let remove t ~pid status ru = - match Table.find t.table pid with - | None -> Table.set t.table pid (Zombie status) + let remove t (proc_info : Proc.Process_info.t) = + match Table.find t.table proc_info.pid with + | None -> Table.set t.table proc_info.pid (Zombie proc_info) | Some (Running job) -> t.running_count <- t.running_count - 1; - Table.remove t.table pid; - Event.Queue.send_job_completed t.events job status ru + Table.remove t.table proc_info.pid; + Event.Queue.send_job_completed t.events job proc_info | Some (Zombie _) -> assert false let iter t ~f = @@ -574,19 +563,28 @@ end = struct | Unix.Unix_error _ -> ()); Mutex.unlock t.mutex - exception Finished of job * Unix.process_status + exception Finished of Proc.Process_info.t let wait_nonblocking_win32 t = try Process_table.iter t ~f:(fun job -> let pid, status = Unix.waitpid [ WNOHANG ] (Pid.to_int job.pid) in - if pid <> 0 then raise_notrace (Finished (job, status))); + if pid <> 0 then + let now = Unix.gettimeofday () in + let info : Proc.Process_info.t = + { pid = Pid.of_int pid + ; status + ; end_time = now + ; resource_usage = None + } + in + raise_notrace (Finished info)); false with - | Finished (job, status) -> + | Finished proc_info -> (* We need to do the [Unix.waitpid] and remove the process while holding the lock, otherwise the pid might be reused in between. *) - Process_table.remove t ~pid:job.pid status None; + Process_table.remove t proc_info; true let wait_win32 t = @@ -598,17 +596,9 @@ end = struct let wait_unix t = Mutex.unlock t.mutex; - let pid, status, resource_usage = - if Event.Queue.track_resources t.events then - let pid, status, ru = Proc.wait3 [] in - (pid, status, Some ru) - else - let pid, status = Unix.wait () in - (pid, status, None) - in + let proc_info = Proc.wait [] in Mutex.lock t.mutex; - let pid = Pid.of_int pid in - Process_table.remove t ~pid status resource_usage + Process_table.remove t proc_info let wait = if Sys.win32 then @@ -864,8 +854,7 @@ end = struct else ( t.handler t.config Tick; match Event.Queue.next t.events with - | Job_completed (job, status, resource_usage) -> - Fiber.Fill (job.ivar, (status, resource_usage)) + | Job_completed (job, proc_info) -> Fiber.Fill (job.ivar, proc_info) | Files_changed changed_files -> ( (* CR-someday amokhov: In addition to tracking files, we also need to track directory listings. Otherwise, when a new file is added to a diff --git a/src/dune_engine/scheduler.mli b/src/dune_engine/scheduler.mli index d9baf265b8f..177db66c4e4 100644 --- a/src/dune_engine/scheduler.mli +++ b/src/dune_engine/scheduler.mli @@ -80,8 +80,7 @@ val t : unit -> t Fiber.t val with_job_slot : (Config.t -> 'a Fiber.t) -> 'a Fiber.t (** Wait for the following process to terminate *) -val wait_for_process : - Pid.t -> (Unix.process_status * Proc.resource_usage option) Fiber.t +val wait_for_process : Pid.t -> Proc.Process_info.t Fiber.t (** Wait for dune cache to be disconnected. Drop any other event. *) val wait_for_dune_cache : unit -> unit