diff --git a/src/preemptive/lwt_preemptive.ml b/src/preemptive/lwt_preemptive.ml index 9b9e456933..af5eddfbbc 100644 --- a/src/preemptive/lwt_preemptive.ml +++ b/src/preemptive/lwt_preemptive.ml @@ -55,8 +55,46 @@ let threads_count = ref 0 | Preemptive threads management | +-----------------------------------------------------------------+ *) +module CELL : +sig + type 'a t + + val make : unit -> 'a t + val get : 'a t -> 'a + val set : 'a t -> 'a -> unit +end = +struct + type 'a t = { + m : Mutex.t; + cv : Condition.t; + mutable cell : 'a option; + } + + let make () = { m = Mutex.create (); cv = Condition.create (); cell = None } + + let get t = + let rec await_value t = + match t.cell with + | None -> + Condition.wait t.cv t.m; + await_value t + | Some v -> + t.cell <- None; + Mutex.unlock t.m; + v + in + Mutex.lock t.m; + await_value t + + let set t v = + Mutex.lock t.m; + t.cell <- Some v; + Mutex.unlock t.m; + Condition.signal t.cv +end + type thread = { - task_channel: (int * (unit -> unit)) Event.channel; + task_cell: (int * (unit -> unit)) CELL.t; (* Channel used to communicate notification id and tasks to the worker thread. *) @@ -76,7 +114,7 @@ let waiters : thread Lwt.u Lwt_sequence.t = Lwt_sequence.create () (* Code executed by a worker: *) let rec worker_loop worker = - let id, task = Event.sync (Event.receive worker.task_channel) in + let id, task = CELL.get worker.task_cell in task (); (* If there is too much threads, exit. This can happen if the user decreased the maximum: *) @@ -89,7 +127,7 @@ let rec worker_loop worker = let make_worker () = incr threads_count; let worker = { - task_channel = Event.new_channel (); + task_cell = CELL.make (); thread = Thread.self (); reuse = true; } in @@ -171,8 +209,8 @@ let detach f args = Lwt.finalize (fun () -> (* Send the id and the task to the worker: *) - Event.sync (Event.send worker.task_channel (id, task)); - waiter) + CELL.set worker.task_cell (id, task); + waiter) (fun () -> if worker.reuse then (* Put back the worker to the pool: *) @@ -210,7 +248,7 @@ let job_notification = ignore (thunk ())) let run_in_main f = - let channel = Event.new_channel () in + let cell = CELL.make () in (* Create the job. *) let job () = (* Execute [f] and wait for its result. *) @@ -218,7 +256,7 @@ let run_in_main f = (fun ret -> Lwt.return (Value ret)) (fun exn -> Lwt.return (Error exn)) >>= fun result -> (* Send the result. *) - Event.sync (Event.send channel result); + CELL.set cell result; Lwt.return_unit in (* Add the job to the queue. *) @@ -228,6 +266,6 @@ let run_in_main f = (* Notify the main thread. *) Lwt_unix.send_notification job_notification; (* Wait for the result. *) - match Event.sync (Event.receive channel) with + match CELL.get cell with | Value ret -> ret | Error exn -> raise exn