From 7310d10a9c3c5a4cffa8fbcc6e0292255304c450 Mon Sep 17 00:00:00 2001 From: Arseniy Alekseyev Date: Thu, 17 Jun 2021 12:40:58 +0100 Subject: [PATCH] Rpc higher priority than build jobs (#4743) * refactor scheduler event sources * actually switch the order Signed-off-by: Arseniy Alekseyev --- src/dune_engine/scheduler.ml | 153 +++++++++++++++++++++++------------ 1 file changed, 103 insertions(+), 50 deletions(-) diff --git a/src/dune_engine/scheduler.ml b/src/dune_engine/scheduler.ml index d53ba1bdb0d..be4afde57e9 100644 --- a/src/dune_engine/scheduler.ml +++ b/src/dune_engine/scheduler.ml @@ -190,6 +190,8 @@ end = struct end module Queue = struct + type event = t + type t = { jobs_completed : (job * Proc.Process_info.t) Queue.t ; mutable invalidation_events : Invalidation_event.t list @@ -265,60 +267,111 @@ end = struct q.yield <- Some ivar; Fiber.Ivar.read ivar + module Event_source : sig + type queue := t + + type t + + val signal : t + + val invalidation : t + + val jobs_completed : t + + val worker_tasks_completed : t + + val yield : t + + val chain : t list -> t + + val run : t -> queue -> event option + end = struct + type queue = t + + type t = queue -> event option + + let run t q = t q + + let signal : t = + fun q -> + Option.map (Signal.Set.choose q.signals) ~f:(fun signal -> + q.signals <- Signal.Set.remove q.signals signal; + Signal signal) + + let invalidation q = + match q.invalidation_events with + | [] -> None + | events -> ( + q.invalidation_events <- []; + let terminated = ref false in + let events = + List.filter_map events ~f:(function + | Filesystem_event Sync -> Some (Sync : build_input_change) + | Invalidation invalidation -> + Some (Invalidation invalidation : build_input_change) + | Filesystem_event Watcher_terminated -> + terminated := true; + None + | Filesystem_event (File_changed path) -> + let abs_path = Path.to_absolute_filename path in + if Table.mem q.ignored_files abs_path then ( + (* only use ignored record once *) + Table.remove q.ignored_files abs_path; + None + ) else + (* CR-soon amokhov: Generate more precise events. *) + Some (Fs_event (Fs_memo.Event.create ~kind:Unknown ~path))) + in + match !terminated with + | true -> Some File_system_watcher_terminated + | false -> + Option.map (Nonempty_list.of_list events) ~f:(fun events -> + Build_inputs_changed events)) + + let jobs_completed q = + Option.map (Queue.pop q.jobs_completed) ~f:(fun (job, proc_info) -> + q.pending_jobs <- q.pending_jobs - 1; + assert (q.pending_jobs >= 0); + Job_completed (job, proc_info)) + + let worker_tasks_completed q = + Option.map (Queue.pop q.worker_tasks_completed) ~f:(fun fill -> + q.pending_worker_tasks <- q.pending_worker_tasks - 1; + Worker_task fill) + + let yield q = + Option.map q.yield ~f:(fun ivar -> + q.yield <- None; + Yield ivar) + + let chain list q = List.find_map list ~f:(fun f -> f q) + end + let next q = Option.iter q.stats ~f:Dune_stats.record_gc_and_fd; Mutex.lock q.mutex; let rec loop () = - match Signal.Set.choose q.signals with - | Some signal -> - q.signals <- Signal.Set.remove q.signals signal; - Signal signal - | None -> ( - match q.invalidation_events with - | [] -> ( - match Queue.pop q.jobs_completed with - | None -> ( - match Queue.pop q.worker_tasks_completed with - | Some fill -> - q.pending_worker_tasks <- q.pending_worker_tasks - 1; - Worker_task fill - | None -> ( - match q.yield with - | Some ivar -> - q.yield <- None; - Yield ivar - | None -> wait ())) - | Some (job, proc_info) -> - q.pending_jobs <- q.pending_jobs - 1; - assert (q.pending_jobs >= 0); - Job_completed (job, proc_info)) - | events -> ( - q.invalidation_events <- []; - let terminated = ref false in - let events = - List.filter_map events ~f:(function - | Filesystem_event Sync -> Some (Sync : build_input_change) - | Invalidation invalidation -> - Some (Invalidation invalidation : build_input_change) - | Filesystem_event Watcher_terminated -> - terminated := true; - None - | Filesystem_event (File_changed path) -> - let abs_path = Path.to_absolute_filename path in - if Table.mem q.ignored_files abs_path then ( - (* only use ignored record once *) - Table.remove q.ignored_files abs_path; - None - ) else - (* CR-soon amokhov: Generate more precise events. *) - Some (Fs_event (Fs_memo.Event.create ~kind:Unknown ~path))) - in - match !terminated with - | true -> File_system_watcher_terminated - | false -> ( - match Nonempty_list.of_list events with - | None -> loop () - | Some events -> Build_inputs_changed events))) + match + Event_source.( + run + (chain + (* Event sources are listed in priority order. Signals are the + highest priority to maximize responsiveness to Ctrl+C. + [worker_tasks_completed] and [invalidation] is used for + reacting to user input, so their latency is also important. + [jobs_completed] and [yield] are where the bulk of the work + is done, so they are the lowest priority to avoid starving + other things. *) + [ signal + ; invalidation + ; worker_tasks_completed + ; jobs_completed + ; yield + ])) + q + with + | None -> wait () + | Some event -> event and wait () = q.got_event <- false; Condition.wait q.cond q.mutex;