From 7921d00ced55e21e408dd017fa96ce3c1dfbb1f2 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Mon, 24 Apr 2023 21:21:02 +0300 Subject: [PATCH] WIP: Add support for domain local await --- README.md | 62 ++++++++++++++++++++++++++++++++++++++ dune-project | 2 ++ eio.opam | 2 ++ lib_eio/core/dla.ml | 24 +++++++++++++++ lib_eio/core/dune | 2 +- lib_eio/core/eio__core.ml | 2 ++ lib_eio/core/eio__core.mli | 3 ++ lib_eio/mock/backend.ml | 6 +++- lib_eio_linux/sched.ml | 20 +++++++----- lib_eio_posix/sched.ml | 8 +++-- 10 files changed, 119 insertions(+), 12 deletions(-) create mode 100644 lib_eio/core/dla.ml diff --git a/README.md b/README.md index 0ee20eb8d..6f3eccea0 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ Eio replaces existing concurrency libraries such as Lwt * [Lwt](#lwt) * [Unix and System Threads](#unix-and-system-threads) * [Domainslib](#domainslib) + * [kcas](#kcas) * [Best Practices](#best-practices) * [Switches](#switches-1) * [Casting](#casting) @@ -1616,6 +1617,66 @@ while most Eio functions can only be used from Eio domains. The bridge function `run_in_pool` makes use of the fact that `Domainslib.Task.async` is able to run from an Eio domain, and `Eio.Promise.resolve` is able to run from a Domainslib one. +### kcas + +Eio provides the domain local await mechanism that the [kcas][] library uses to +provide blocking support for the lock-free software transactional memory (STM) +implementation that it provides. This means that one can use all the composable +lock-free data structures and primitives for communication and synchronization +implemented using **kcas** to communicate and synchronize between Eio fibers, +raw domains, and any other schedulers that provide the domain local await +mechanism. + +To demonstrate **kcas** + +```ocaml +# #require "kcas" +# open Kcas +``` + +let's first create a couple of shared memory locations + +```ocaml +# let x = Loc.make 0 +val x : int Loc.t = +# let y = Loc.make 0 +val y : int Loc.t = +``` + +and spawn a domain + +```ocaml +# let foreign_domain = Domain.spawn @@ fun () -> + let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in + Loc.set y 22; + x +val foreign_domain : int Domain.t = +``` + +that first waits for one of the locations to change value and then writes to the +other location. + +Then we run a Eio program + +```ocaml +# let y = Eio_main.run @@ fun _env -> + Loc.set x 20; + Loc.get_as (fun y -> Retry.unless (y <> 0); y) y +val y : int = 22 +``` + +that first writes to the location the other domain is waiting on and then waits +for the other domain to write to the other location. + +Joining with the other domain + +```ocaml +# y + Domain.join foreign_domain +- : int = 42 +``` + +we arrive at the answer. + ## Best Practices This section contains some recommendations for designing library APIs for use with Eio. @@ -1776,3 +1837,4 @@ Some background about the effects system can be found in: [Eio.Semaphore]: https://ocaml-multicore.github.io/eio/eio/Eio/Semaphore/index.html [Eio.Condition]: https://ocaml-multicore.github.io/eio/eio/Eio/Condition/index.html [Domainslib]: https://github.com/ocaml-multicore/domainslib +[kcas]: https://github.com/ocaml-multicore/kcas diff --git a/dune-project b/dune-project index 620fdf225..3af6ed896 100644 --- a/dune-project +++ b/dune-project @@ -25,8 +25,10 @@ (psq (>= 0.2.0)) (fmt (>= 0.8.9)) (hmap (>= 0.8.1)) + (domain-local-await (>= 0.1.0)) (crowbar (and (>= 0.2) :with-test)) (mtime (>= 2.0.0)) + (kcas (and (>= 0.3.0) :with-test)) (mdx (and (>= 2.2.0) :with-test)) (alcotest (and (>= 1.4.0) :with-test)) (dscheck (and (>= 0.1.0) :with-test)))) diff --git a/eio.opam b/eio.opam index 5400b05f6..c5fcecd6b 100644 --- a/eio.opam +++ b/eio.opam @@ -18,8 +18,10 @@ depends: [ "psq" {>= "0.2.0"} "fmt" {>= "0.8.9"} "hmap" {>= "0.8.1"} + "domain-local-await" {>= "0.1.0"} "crowbar" {>= "0.2" & with-test} "mtime" {>= "2.0.0"} + "kcas" {>= "0.3.0" & with-test} "mdx" {>= "2.2.0" & with-test} "alcotest" {>= "1.4.0" & with-test} "dscheck" {>= "0.1.0" & with-test} diff --git a/lib_eio/core/dla.ml b/lib_eio/core/dla.ml new file mode 100644 index 000000000..63494656f --- /dev/null +++ b/lib_eio/core/dla.ml @@ -0,0 +1,24 @@ +let prepare_for_await () = + let state = Atomic.make `Init in + let release () = + if Atomic.get state != `Released then + match Atomic.exchange state `Released with + | `Awaiting b -> Broadcast.resume_all b + | _ -> () + and await () = + if Atomic.get state != `Released then + let b = Broadcast.create () in + if Atomic.compare_and_set state `Init (`Awaiting b) then + let resume ctx enqueue = + match Broadcast.suspend b (fun () -> enqueue (Ok ())) with + | None -> () + | Some request -> ( + match Atomic.get state with + | `Awaiting _ -> + Cancel.Fiber_context.set_cancel_fn ctx @@ fun ex -> + if Broadcast.cancel request then enqueue (Error ex) + | _ -> if Broadcast.cancel request then enqueue (Ok ())) + in + Suspend.enter resume + in + Domain_local_await.{ release; await } diff --git a/lib_eio/core/dune b/lib_eio/core/dune index 4063be78f..17cda9c66 100644 --- a/lib_eio/core/dune +++ b/lib_eio/core/dune @@ -1,4 +1,4 @@ (library (name eio__core) (public_name eio.core) - (libraries cstruct hmap lwt-dllist fmt optint)) + (libraries cstruct hmap lwt-dllist fmt optint domain-local-await)) diff --git a/lib_eio/core/eio__core.ml b/lib_eio/core/eio__core.ml index a5d4e4b2c..fc7e072c9 100644 --- a/lib_eio/core/eio__core.ml +++ b/lib_eio/core/eio__core.ml @@ -19,4 +19,6 @@ module Private = struct | Fork = Fiber.Fork | Get_context = Cancel.Get_context end + + module Dla = Dla end diff --git a/lib_eio/core/eio__core.mli b/lib_eio/core/eio__core.mli index adfc10322..1e0a9d0fc 100644 --- a/lib_eio/core/eio__core.mli +++ b/lib_eio/core/eio__core.mli @@ -806,4 +806,7 @@ module Private : sig (** Backends should use this for {!Eio.Stdenv.debug}. *) end + module Dla : sig + val prepare_for_await : unit -> Domain_local_await.t + end end diff --git a/lib_eio/mock/backend.ml b/lib_eio/mock/backend.ml index e7683daac..557ca1682 100644 --- a/lib_eio/mock/backend.ml +++ b/lib_eio/mock/backend.ml @@ -63,7 +63,11 @@ let run main = in let new_fiber = Fiber_context.make_root () in let result = ref None in - let Exit_scheduler = fork ~new_fiber (fun () -> result := Some (main ())) in + let Exit_scheduler = + Domain_local_await.using + ~prepare_for_await:Eio.Private.Dla.prepare_for_await + ~while_running:(fun () -> + fork ~new_fiber (fun () -> result := Some (main ()))) in match !result with | None -> raise Deadlock_detected | Some x -> x diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml index 68ad739cc..f42810f95 100644 --- a/lib_eio_linux/sched.ml +++ b/lib_eio_linux/sched.ml @@ -465,14 +465,18 @@ let run ~extra_effects st main arg = let result = ref None in let `Exit_scheduler = let new_fiber = Fiber_context.make_root () in - fork ~new_fiber (fun () -> - Switch.run_protected (fun sw -> - Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st); - match main arg with - | x -> result := Some (Ok x) - | exception ex -> - let bt = Printexc.get_raw_backtrace () in - result := Some (Error (ex, bt)) + Kcas.Domain_local_await.using + ~prepare_for_await:Eio.Private.Dla.prepare_for_await + ~while_running:(fun () -> + fork ~new_fiber (fun () -> + Switch.run_protected (fun sw -> + Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st); + match main arg with + | x -> result := Some (Ok x) + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + result := Some (Error (ex, bt)) + ) ) ) in diff --git a/lib_eio_posix/sched.ml b/lib_eio_posix/sched.ml index 5c15f43c5..954e15409 100644 --- a/lib_eio_posix/sched.ml +++ b/lib_eio_posix/sched.ml @@ -351,8 +351,12 @@ let run ~extra_effects t main x = let result = ref None in let `Exit_scheduler = let new_fiber = Fiber_context.make_root () in - fork ~new_fiber (fun () -> - result := Some (with_op t main x); + Domain_local_await.using + ~prepare_for_await:Eio.Private.Dla.prepare_for_await + ~while_running:(fun () -> + fork ~new_fiber (fun () -> + result := Some (with_op t main x); + ) ) in match !result with