Skip to content

Commit

Permalink
Lwt_preemptive: work around Event.sync perf issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
mfp authored and aantron committed Jul 2, 2016
1 parent 6b8ef2d commit 5c89415
Showing 1 changed file with 46 additions and 8 deletions.
54 changes: 46 additions & 8 deletions src/preemptive/lwt_preemptive.ml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,46 @@ let threads_count = ref 0
| Preemptive threads management |
+-----------------------------------------------------------------+ *)

module CELL :
sig
type 'a t

val make : unit -> 'a t
val get : 'a t -> 'a
val set : 'a t -> 'a -> unit
end =
struct
type 'a t = {
m : Mutex.t;
cv : Condition.t;
mutable cell : 'a option;
}

let make () = { m = Mutex.create (); cv = Condition.create (); cell = None }

let get t =
let rec await_value t =
match t.cell with
| None ->
Condition.wait t.cv t.m;
await_value t
| Some v ->
t.cell <- None;
Mutex.unlock t.m;
v
in
Mutex.lock t.m;
await_value t

let set t v =
Mutex.lock t.m;
t.cell <- Some v;
Mutex.unlock t.m;
Condition.signal t.cv
end

type thread = {
task_channel: (int * (unit -> unit)) Event.channel;
task_cell: (int * (unit -> unit)) CELL.t;
(* Channel used to communicate notification id and tasks to the
worker thread. *)

Expand All @@ -76,7 +114,7 @@ let waiters : thread Lwt.u Lwt_sequence.t = Lwt_sequence.create ()

(* Code executed by a worker: *)
let rec worker_loop worker =
let id, task = Event.sync (Event.receive worker.task_channel) in
let id, task = CELL.get worker.task_cell in
task ();
(* If there is too much threads, exit. This can happen if the user
decreased the maximum: *)
Expand All @@ -89,7 +127,7 @@ let rec worker_loop worker =
let make_worker () =
incr threads_count;
let worker = {
task_channel = Event.new_channel ();
task_cell = CELL.make ();
thread = Thread.self ();
reuse = true;
} in
Expand Down Expand Up @@ -171,8 +209,8 @@ let detach f args =
Lwt.finalize
(fun () ->
(* Send the id and the task to the worker: *)
Event.sync (Event.send worker.task_channel (id, task));
waiter)
CELL.set worker.task_cell (id, task);
waiter)
(fun () ->
if worker.reuse then
(* Put back the worker to the pool: *)
Expand Down Expand Up @@ -210,15 +248,15 @@ let job_notification =
ignore (thunk ()))

let run_in_main f =
let channel = Event.new_channel () in
let cell = CELL.make () in
(* Create the job. *)
let job () =
(* Execute [f] and wait for its result. *)
Lwt.try_bind f
(fun ret -> Lwt.return (Value ret))
(fun exn -> Lwt.return (Error exn)) >>= fun result ->
(* Send the result. *)
Event.sync (Event.send channel result);
CELL.set cell result;
Lwt.return_unit
in
(* Add the job to the queue. *)
Expand All @@ -228,6 +266,6 @@ let run_in_main f =
(* Notify the main thread. *)
Lwt_unix.send_notification job_notification;
(* Wait for the result. *)
match Event.sync (Event.receive channel) with
match CELL.get cell with
| Value ret -> ret
| Error exn -> raise exn

0 comments on commit 5c89415

Please sign in to comment.