Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lwt_preemptive: work around Event.sync perf issue. #219

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions src/preemptive/lwt_preemptive.ml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,46 @@ let threads_count = ref 0
| Preemptive threads management |
+-----------------------------------------------------------------+ *)

module CELL :
sig
type 'a t

val make : unit -> 'a t
val get : 'a t -> 'a
val set : 'a t -> 'a -> unit
end =
struct
type 'a t = {
m : Mutex.t;
cv : Condition.t;
mutable cell : 'a option;
}

let make () = { m = Mutex.create (); cv = Condition.create (); cell = None }

let get t =
let rec await_value t =
match t.cell with
| None ->
Condition.wait t.cv t.m;
await_value t
| Some v ->
t.cell <- None;
Mutex.unlock t.m;
v
in
Mutex.lock t.m;
await_value t

let set t v =
Mutex.lock t.m;
t.cell <- Some v;
Mutex.unlock t.m;
Condition.signal t.cv
end

type thread = {
task_channel: (int * (unit -> unit)) Event.channel;
task_cell: (int * (unit -> unit)) CELL.t;
(* Channel used to communicate notification id and tasks to the
worker thread. *)

Expand All @@ -76,7 +114,7 @@ let waiters : thread Lwt.u Lwt_sequence.t = Lwt_sequence.create ()

(* Code executed by a worker: *)
let rec worker_loop worker =
let id, task = Event.sync (Event.receive worker.task_channel) in
let id, task = CELL.get worker.task_cell in
task ();
(* If there is too much threads, exit. This can happen if the user
decreased the maximum: *)
Expand All @@ -89,7 +127,7 @@ let rec worker_loop worker =
let make_worker () =
incr threads_count;
let worker = {
task_channel = Event.new_channel ();
task_cell = CELL.make ();
thread = Thread.self ();
reuse = true;
} in
Expand Down Expand Up @@ -171,8 +209,8 @@ let detach f args =
Lwt.finalize
(fun () ->
(* Send the id and the task to the worker: *)
Event.sync (Event.send worker.task_channel (id, task));
waiter)
CELL.set worker.task_cell (id, task);
waiter)
(fun () ->
if worker.reuse then
(* Put back the worker to the pool: *)
Expand Down Expand Up @@ -210,15 +248,15 @@ let job_notification =
ignore (thunk ()))

let run_in_main f =
let channel = Event.new_channel () in
let cell = CELL.make () in
(* Create the job. *)
let job () =
(* Execute [f] and wait for its result. *)
Lwt.try_bind f
(fun ret -> Lwt.return (Value ret))
(fun exn -> Lwt.return (Error exn)) >>= fun result ->
(* Send the result. *)
Event.sync (Event.send channel result);
CELL.set cell result;
Lwt.return_unit
in
(* Add the job to the queue. *)
Expand All @@ -228,6 +266,6 @@ let run_in_main f =
(* Notify the main thread. *)
Lwt_unix.send_notification job_notification;
(* Wait for the result. *)
match Event.sync (Event.receive channel) with
match CELL.get cell with
| Value ret -> ret
| Error exn -> raise exn