Skip to content

Commit

Permalink
Use wait3 to accurately time spawned processes
Browse files Browse the repository at this point in the history
We fallback to the old method on win32.

Signed-off-by: Rudi Grinberg <[email protected]>
  • Loading branch information
rgrinberg committed Apr 23, 2021
1 parent 2d9e266 commit 5f73721
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 37 deletions.
2 changes: 1 addition & 1 deletion otherlibs/stdune-unstable/dune
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
dune_filesystem_stubs)
(foreign_stubs
(language c)
(names fcntl_stubs)))
(names fcntl_stubs wait3_stubs)))
15 changes: 15 additions & 0 deletions otherlibs/stdune-unstable/proc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,18 @@ let restore_cwd_and_execve prog argv ~env =
Stdlib.do_at_exit ();
Unix.execve prog argv env
)

type resource_usage =
{ utime : float
; stime : float
}

external stub_wait3 :
Unix.wait_flag list -> int * Unix.process_status * resource_usage
= "dune_wait3"

let wait3 flags =
if Sys.win32 then
Code_error.raise "wait3 not available on windows" []
else
stub_wait3 flags
7 changes: 7 additions & 0 deletions otherlibs/stdune-unstable/proc.mli
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
val restore_cwd_and_execve : string -> string list -> env:Env.t -> _

type resource_usage =
{ utime : float
; stime : float
}

val wait3 : Unix.wait_flag list -> int * Unix.process_status * resource_usage
73 changes: 73 additions & 0 deletions otherlibs/stdune-unstable/wait3_stubs.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include <caml/mlvalues.h>

#ifdef _WIN32
#include <caml/fail.h>

void dune_wait3(value flags) {
caml_failwith("wait3: not supported on windows");
}

#else

#include <caml/alloc.h>
#include <caml/memory.h>
#include <caml/signals.h>
#include <caml/unixsupport.h>

#include <sys/resource.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>

#define TAG_WEXITED 0
#define TAG_WSIGNALED 1
#define TAG_WSTOPPED 2

CAMLextern int caml_convert_signal_number(int);
CAMLextern int caml_rev_convert_signal_number(int);

static value alloc_process_status(int status) {
value st;

if (WIFEXITED(status)) {
st = caml_alloc_small(1, TAG_WEXITED);
Field(st, 0) = Val_int(WEXITSTATUS(status));
} else if (WIFSTOPPED(status)) {
st = caml_alloc_small(1, TAG_WSTOPPED);
Field(st, 0) = Val_int(caml_rev_convert_signal_number(WSTOPSIG(status)));
} else {
st = caml_alloc_small(1, TAG_WSIGNALED);
Field(st, 0) = Val_int(caml_rev_convert_signal_number(WTERMSIG(status)));
}
return st;
}

static int wait_flag_table[] = {WNOHANG, WUNTRACED};

value dune_wait3(value flags) {
CAMLparam1(flags);
CAMLlocal2(times, res);

int pid, status, cv_flags;
cv_flags = caml_convert_flag_list(flags, wait_flag_table);

struct rusage ru;

caml_enter_blocking_section();
pid = wait3(&status, cv_flags, &ru);
caml_leave_blocking_section();
if (pid == -1)
uerror("wait3", Nothing);

times = caml_alloc_small(2 * Double_wosize, Double_array_tag);
Store_double_field(times, 0, ru.ru_utime.tv_sec + ru.ru_utime.tv_usec / 1e6);
Store_double_field(times, 1, ru.ru_stime.tv_sec + ru.ru_stime.tv_usec / 1e6);

res = caml_alloc_tuple(3);
Store_field(res, 0, Val_int(pid));
Store_field(res, 1, alloc_process_status(status));
Store_field(res, 2, times);
CAMLreturn(res);
}

#endif
37 changes: 30 additions & 7 deletions src/dune_engine/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,22 @@ let report_process_end stats common ~id =
let event = Event.async (Int id) End common in
Stats.emit stats event

let report_process_event stats ~prog ~args
(resource_usage : Proc.resource_usage) =
let common =
let name = Filename.basename prog in
let ts = Timestamp.now () in
Event.common_fields ~cat:[ "process" ] ~name ~ts ()
in
let args =
[ ("process_args", `List (List.map args ~f:(fun arg -> `String arg))) ]
in
let dur =
Chrome_trace.Event.Timestamp.of_float_seconds resource_usage.stime
in
let event = Event.complete ~args ~dur common in
Stats.emit stats event

let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr)
?(stdin_from = Io.null In) ?(env = Env.initial) ~purpose fail_mode prog args
=
Expand Down Expand Up @@ -579,8 +595,12 @@ let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr)
env |> Dtemp.add_to_env |> Scheduler.Config.add_to_env config
in
let event_common =
Option.map config.stats
~f:(report_process_start ~id ~prog:prog_str ~args)
match config.stats with
| Some stats when Sys.win32 ->
(* We track process times manually on windows. Elsewhere, we can use
[wait3] *)
Some (report_process_start stats ~id ~prog:prog_str ~args)
| _ -> None
in
let env = Env.to_unix env |> Spawn.Env.of_list in
let pid =
Expand All @@ -595,11 +615,14 @@ let run_internal ?dir ?(stdout_to = Io.stdout) ?(stderr_to = Io.stderr)
in
Io.release stdout_to;
Io.release stderr_to;
let+ exit_status = Scheduler.wait_for_process pid in
(match (event_common, config.stats) with
| Some common, Some stats -> report_process_end stats common ~id
| None, None -> ()
| _, _ -> assert false);
let+ exit_status, resource_usage = Scheduler.wait_for_process pid in
(match (event_common, config.stats, resource_usage) with
| Some common, Some stats, None -> report_process_end stats common ~id
| None, Some stats, Some resource_usage ->
report_process_event stats ~prog:prog_str ~args resource_usage
| None, None, _ -> ()
| None, Some _, None -> assert false
| Some _, _, _ -> assert false);
Option.iter response_file ~f:Path.unlink;
let exit_status' : Exit_status.t =
match exit_status with
Expand Down
53 changes: 36 additions & 17 deletions src/dune_engine/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ end

type job =
{ pid : Pid.t
; ivar : Unix.process_status Fiber.Ivar.t
; ivar : (Unix.process_status * Proc.resource_usage option) Fiber.Ivar.t
}

module Signal = struct
Expand Down Expand Up @@ -113,7 +113,7 @@ end
module Event : sig
type t =
| Files_changed of Path.t list
| Job_completed of job * Unix.process_status
| Job_completed of job * Unix.process_status * Proc.resource_usage option
| Signal of Signal.t
| Rpc of Fiber.fill

Expand Down Expand Up @@ -150,23 +150,27 @@ module Event : sig
(** Send an event to the main thread. *)
val send_files_changed : t -> Path.t list -> unit

val send_job_completed : t -> job -> Unix.process_status -> unit
val send_job_completed :
t -> job -> Unix.process_status -> Proc.resource_usage option -> unit

val send_signal : t -> Signal.t -> unit

val send_sync_task : t -> (unit -> unit) -> unit

val track_resources : t -> bool
end
with type event := t
end = struct
type t =
| Files_changed of Path.t list
| Job_completed of job * Unix.process_status
| Job_completed of job * Unix.process_status * Proc.resource_usage option
| Signal of Signal.t
| Rpc of Fiber.fill

module Queue = struct
type t =
{ jobs_completed : (job * Unix.process_status) Queue.t
{ jobs_completed :
(job * Unix.process_status * Proc.resource_usage option) Queue.t
; sync_tasks : (unit -> unit) Queue.t
; mutable files_changed : Path.t list
; mutable signals : Signal.Set.t
Expand Down Expand Up @@ -256,10 +260,10 @@ end = struct
| None ->
q.pending_rpc <- q.pending_rpc - 1;
Rpc (Queue.pop_exn q.rpc_completed)
| Some (job, status) ->
| Some (job, status, resource_usage) ->
q.pending_jobs <- q.pending_jobs - 1;
assert (q.pending_jobs >= 0);
Job_completed (job, status))
Job_completed (job, status, resource_usage))
| fns -> (
q.files_changed <- [];
let files =
Expand Down Expand Up @@ -294,10 +298,10 @@ end = struct
if not avail then Condition.signal q.cond;
Mutex.unlock q.mutex

let send_job_completed q job status =
let send_job_completed q job status resource_usage =
Mutex.lock q.mutex;
let avail = available q in
Queue.push q.jobs_completed (job, status);
Queue.push q.jobs_completed (job, status, resource_usage);
if not avail then Condition.signal q.cond;
Mutex.unlock q.mutex

Expand All @@ -318,6 +322,8 @@ end = struct
let pending_jobs q = q.pending_jobs

let pending_rpc q = q.pending_rpc

let track_resources q = Option.is_some q.stats
end
end

Expand Down Expand Up @@ -515,7 +521,12 @@ end = struct
module Process_table : sig
val add : t -> job -> unit

val remove : t -> pid:Pid.t -> Unix.process_status -> unit
val remove :
t
-> pid:Pid.t
-> Unix.process_status
-> Proc.resource_usage option
-> unit

val running_count : t -> int

Expand All @@ -529,16 +540,16 @@ end = struct
if t.running_count = 1 then Condition.signal t.something_is_running
| Some (Zombie status) ->
Table.remove t.table job.pid;
Event.Queue.send_job_completed t.events job status
Event.Queue.send_job_completed t.events job status None
| Some (Running _) -> assert false

let remove t ~pid status =
let remove t ~pid status ru =
match Table.find t.table pid with
| None -> Table.set t.table pid (Zombie status)
| Some (Running job) ->
t.running_count <- t.running_count - 1;
Table.remove t.table pid;
Event.Queue.send_job_completed t.events job status
Event.Queue.send_job_completed t.events job status ru
| Some (Zombie _) -> assert false

let iter t ~f =
Expand Down Expand Up @@ -575,7 +586,7 @@ end = struct
| Finished (job, status) ->
(* We need to do the [Unix.waitpid] and remove the process while holding
the lock, otherwise the pid might be reused in between. *)
Process_table.remove t ~pid:job.pid status;
Process_table.remove t ~pid:job.pid status None;
true

let wait_win32 t =
Expand All @@ -587,10 +598,17 @@ end = struct

let wait_unix t =
Mutex.unlock t.mutex;
let pid, status = Unix.wait () in
let pid, status, resource_usage =
if Event.Queue.track_resources t.events then
let pid, status, ru = Proc.wait3 [] in
(pid, status, Some ru)
else
let pid, status = Unix.wait () in
(pid, status, None)
in
Mutex.lock t.mutex;
let pid = Pid.of_int pid in
Process_table.remove t ~pid status
Process_table.remove t ~pid status resource_usage

let wait =
if Sys.win32 then
Expand Down Expand Up @@ -846,7 +864,8 @@ end = struct
else (
t.handler t.config Tick;
match Event.Queue.next t.events with
| Job_completed (job, status) -> Fiber.Fill (job.ivar, status)
| Job_completed (job, status, resource_usage) ->
Fiber.Fill (job.ivar, (status, resource_usage))
| Files_changed changed_files -> (
(* CR-someday amokhov: In addition to tracking files, we also need to
track directory listings. Otherwise, when a new file is added to a
Expand Down
3 changes: 2 additions & 1 deletion src/dune_engine/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ val t : unit -> t Fiber.t
val with_job_slot : (Config.t -> 'a Fiber.t) -> 'a Fiber.t

(** Wait for the following process to terminate *)
val wait_for_process : Pid.t -> Unix.process_status Fiber.t
val wait_for_process :
Pid.t -> (Unix.process_status * Proc.resource_usage option) Fiber.t

(** Wait for dune cache to be disconnected. Drop any other event. *)
val wait_for_dune_cache : unit -> unit
Expand Down
17 changes: 6 additions & 11 deletions test/blackbox-tests/test-cases/trace-file.t/run.t
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@

This captures the commands that are being run:

$ <trace.json grep '"[be]"' | cut -c 2- | sed -E 's/:[0-9]+/:.../g'
{"args":{"process_args":["-config"]},"ph":"b","id":...,"name":"ocamlc.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"ph":"e","id":...,"name":"ocamlc.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"args":{"process_args":["-modules","-impl","prog.ml"]},"ph":"b","id":...,"name":"ocamldep.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"ph":"e","id":...,"name":"ocamldep.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"args":{"process_args":["-w","@[email protected]@30..39@[email protected]@[email protected]","-strict-sequence","-strict-formats","-short-paths","-keep-locs","-g","-bin-annot","-I",".prog.eobjs/byte","-no-alias-deps","-opaque","-o",".prog.eobjs/byte/prog.cmo","-c","-impl","prog.ml"]},"ph":"b","id":...,"name":"ocamlc.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"ph":"e","id":...,"name":"ocamlc.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"args":{"process_args":["-w","@[email protected]@30..39@[email protected]@[email protected]","-strict-sequence","-strict-formats","-short-paths","-keep-locs","-g","-I",".prog.eobjs/byte","-I",".prog.eobjs/native","-intf-suffix",".ml","-no-alias-deps","-opaque","-o",".prog.eobjs/native/prog.cmx","-c","-impl","prog.ml"]},"ph":"b","id":...,"name":"ocamlopt.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"ph":"e","id":...,"name":"ocamlopt.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"args":{"process_args":["-w","@[email protected]@30..39@[email protected]@[email protected]","-strict-sequence","-strict-formats","-short-paths","-keep-locs","-g","-o","prog.exe",".prog.eobjs/native/prog.cmx"]},"ph":"b","id":...,"name":"ocamlopt.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"ph":"e","id":...,"name":"ocamlopt.opt","cat":"process","ts":...,"pid":...,"tid":...}
$ <trace.json grep '"X"' | cut -c 2- | sed -E 's/:[0-9]+/:.../g'
{"args":{"process_args":["-config"]},"ph":"X","dur":...,"name":"ocamlc.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"args":{"process_args":["-modules","-impl","prog.ml"]},"ph":"X","dur":...,"name":"ocamldep.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"args":{"process_args":["-w","@[email protected]@30..39@[email protected]@[email protected]","-strict-sequence","-strict-formats","-short-paths","-keep-locs","-g","-bin-annot","-I",".prog.eobjs/byte","-no-alias-deps","-opaque","-o",".prog.eobjs/byte/prog.cmo","-c","-impl","prog.ml"]},"ph":"X","dur":...,"name":"ocamlc.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"args":{"process_args":["-w","@[email protected]@30..39@[email protected]@[email protected]","-strict-sequence","-strict-formats","-short-paths","-keep-locs","-g","-I",".prog.eobjs/byte","-I",".prog.eobjs/native","-intf-suffix",".ml","-no-alias-deps","-opaque","-o",".prog.eobjs/native/prog.cmx","-c","-impl","prog.ml"]},"ph":"X","dur":...,"name":"ocamlopt.opt","cat":"process","ts":...,"pid":...,"tid":...}
{"args":{"process_args":["-w","@[email protected]@30..39@[email protected]@[email protected]","-strict-sequence","-strict-formats","-short-paths","-keep-locs","-g","-o","prog.exe",".prog.eobjs/native/prog.cmx"]},"ph":"X","dur":...,"name":"ocamlopt.opt","cat":"process","ts":...,"pid":...,"tid":...}

As well as data about the garbage collector:

Expand Down

0 comments on commit 5f73721

Please sign in to comment.