From e605f06c29b8c0d13563c42eaa6a42007b3c26e0 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Wed, 12 Apr 2023 15:02:04 +0100 Subject: [PATCH] Remove eio_luv backend and add skeleton eio_windows replacement --- Makefile | 3 - README.md | 14 +- doc/rationale.md | 2 +- dune-project | 16 +- eio_main.opam | 3 +- eio_luv.opam => eio_windows.opam | 8 +- lib_eio_luv/dune | 4 - lib_eio_luv/eio_luv.ml | 1339 -------------------------- lib_eio_luv/eio_luv.mli | 227 ----- lib_eio_luv/tests/dune | 3 - lib_eio_luv/tests/files.md | 66 -- lib_eio_luv/tests/poll.md | 114 --- lib_eio_luv/tests/process.md | 120 --- lib_eio_windows/dune | 5 + lib_eio_windows/eio_windows.ml | 3 + lib_main/dune | 6 +- lib_main/eio_main.ml | 4 +- lib_main/eio_main.mli | 4 +- lib_main/luv_backend.disabled.ml | 1 - lib_main/luv_backend.enabled.ml | 1 - lib_main/windows_backend.disabled.ml | 1 + lib_main/windows_backend.enabled.ml | 1 + 22 files changed, 31 insertions(+), 1914 deletions(-) rename eio_luv.opam => eio_windows.opam (74%) delete mode 100644 lib_eio_luv/dune delete mode 100644 lib_eio_luv/eio_luv.ml delete mode 100644 lib_eio_luv/eio_luv.mli delete mode 100644 lib_eio_luv/tests/dune delete mode 100644 lib_eio_luv/tests/files.md delete mode 100644 lib_eio_luv/tests/poll.md delete mode 100644 lib_eio_luv/tests/process.md create mode 100644 lib_eio_windows/dune create mode 100644 lib_eio_windows/eio_windows.ml delete mode 100644 lib_main/luv_backend.disabled.ml delete mode 100644 lib_main/luv_backend.enabled.ml create mode 100644 lib_main/windows_backend.disabled.ml create mode 100644 lib_main/windows_backend.enabled.ml diff --git a/Makefile b/Makefile index 598eb4f47..783b9f73f 100644 --- a/Makefile +++ b/Makefile @@ -15,9 +15,6 @@ bench: dune exec -- ./bench/bench_cancel.exe if ocamlc -config | grep -q '^system: linux'; then dune exec -- ./lib_eio_linux/tests/bench_noop.exe; fi -test_luv: - EIO_BACKEND=luv dune runtest - test_posix: EIO_BACKEND=posix dune runtest diff --git a/README.md b/README.md index 065ff04bc..3fa8d148c 100644 --- a/README.md +++ b/README.md @@ -92,10 +92,9 @@ See [Awesome Multicore OCaml][] for links to work migrating other projects to Ei - [Eio][] provides concurrency primitives (promises, etc.) and a high-level, cross-platform OS API. - [Eio_posix][] provides a cross-platform backend for these APIs for POSIX-type systems. -- [Eio_luv][] provides a cross-platform backend for these APIs using [luv](https://github.com/aantron/luv) (libuv). - [Eio_linux][] provides a Linux io-uring backend for these APIs, plus a low-level API that can be used directly (in non-portable code). -- [Eio_main][] selects an appropriate backend (e.g. `eio_linux`, `eio_posix` or `eio_luv`), depending on your platform. +- [Eio_main][] selects an appropriate backend (e.g. `eio_linux` or `eio_posix`), depending on your platform. ## Getting OCaml 5.0 @@ -701,7 +700,7 @@ For example: let test r = try Eio.Buf_read.line r with - | Eio.Io (Eio.Net.E Connection_reset Eio_luv.Luv_error _, _) -> "Luv connection reset" + | Eio.Io (Eio.Net.E Connection_reset Eio_unix.Unix_error _, _) -> "Unix connection reset" | Eio.Io (Eio.Net.E Connection_reset _, _) -> "Connection reset" | Eio.Io (Eio.Net.E _, _) -> "Some network error" | Eio.Io _ -> "Some I/O error" @@ -757,11 +756,11 @@ it can be annoying to have the full backend-specific error displayed: Switch.run @@ fun sw -> Eio.Net.connect ~sw net (`Tcp (Eio.Net.Ipaddr.V4.loopback, 1234));; Exception: -Eio.Io Net Connection_failure Refused Eio_luv.Luv_error(ECONNREFUSED) (* connection refused *), +Eio.Io Net Connection_failure Refused Unix_error (Connection refused, "connect", ""), connecting to tcp:127.0.0.1:1234 ``` -If we ran this using e.g. the Linux io_uring backend, the `Luv_error` part would change. +If we ran this using another backend, the `Unix_error` part might change. To avoid this problem, you can use `Eio.Exn.Backend.show` to hide the backend-specific part of errors: ```ocaml @@ -887,10 +886,6 @@ A program that operates on the current directory will probably want to use `cwd` whereas a program that accepts a path from the user will probably want to use `fs`, perhaps with `open_dir` to constrain all access to be within that directory. -Note: the `eio_luv` backend doesn't have the `openat`, `mkdirat`, etc., -calls that are necessary to implement these checks without races, -so be careful if symlinks out of the subtree may be created while the program is running. - ## Time The standard environment provides a [clock][Eio.Time] with the usual POSIX time: @@ -1732,7 +1727,6 @@ Some background about the effects system can be found in: [Eio.Promise]: https://ocaml-multicore.github.io/eio/eio/Eio/Promise/index.html [Eio.Stream]: https://ocaml-multicore.github.io/eio/eio/Eio/Stream/index.html [Eio_posix]: https://github.com/ocaml-multicore/eio/blob/main/lib_eio_posix/eio_posix.mli -[Eio_luv]: https://ocaml-multicore.github.io/eio/eio_luv/Eio_luv/index.html [Eio_linux]: https://ocaml-multicore.github.io/eio/eio_linux/Eio_linux/index.html [Eio_main]: https://ocaml-multicore.github.io/eio/eio_main/Eio_main/index.html [Eio.traceln]: https://ocaml-multicore.github.io/eio/eio/Eio/index.html#val-traceln diff --git a/doc/rationale.md b/doc/rationale.md index 1140d1bb0..55e698ae1 100644 --- a/doc/rationale.md +++ b/doc/rationale.md @@ -69,7 +69,7 @@ For example, there are many ways to provide a stream of bytes (from a file, TCP Often this choice is determined by the user at runtime, for example by providing a URL giving the scheme to use. We may even need to choose a completely different Eio backend at runtime. For example `Eio_main.run` will use the io_uring backend if the Linux kernel is new enough, -but fall back to `Eio_luv` if not. +but fall back to `Eio_posix` if not. For these reasons, Eio needs to use dynamic dispatch. A resource whose implementation isn't known until runtime can be represented in many ways, including: diff --git a/dune-project b/dune-project index d5b67fd77..620fdf225 100644 --- a/dune-project +++ b/dune-project @@ -53,23 +53,19 @@ (mdx (and (>= 2.2.0) :with-test)) (fmt (>= 0.8.9)))) (package - (name eio_luv) - (synopsis "Eio implementation using luv (libuv)") - (description "An Eio implementation for most platforms, using luv.") + (name eio_windows) + (synopsis "Eio implementation for Windows") + (description "An Eio implementation using I/O Completion Ports") + (allow_empty) ; Work-around for dune bug #6938 (depends - (eio (= :version)) - (luv (>= 0.5.11)) - (luv_unix (>= 0.5.0)) - (mdx (and (>= 2.2.0) :with-test)) - (fmt (>= 0.8.9)))) + (eio (= :version)))) (package (name eio_main) (synopsis "Effect-based direct-style IO mainloop for OCaml") (description "Selects an appropriate Eio backend for the current platform.") - (depopts eio_luv) (depends (mdx (and (>= 2.2.0) :with-test)) (eio_linux (and (= :version) (= :os "linux"))) (eio_posix (and (= :version) (<> :os "windows"))) - (eio_luv (and (= :version) (or (= :os "windows") :with-test))))) + (eio_windows (and (= :version) (= :os "windows"))))) (using mdx 0.2) diff --git a/eio_main.opam b/eio_main.opam index 2731d5f5c..f3a471cd8 100644 --- a/eio_main.opam +++ b/eio_main.opam @@ -13,10 +13,9 @@ depends: [ "mdx" {>= "2.2.0" & with-test} "eio_linux" {= version & os = "linux"} "eio_posix" {= version & os != "windows"} - "eio_luv" {= version & os = "windows" | with-test} + "eio_windows" {= version & os = "windows"} "odoc" {with-doc} ] -depopts: ["eio_luv"] build: [ ["dune" "subst"] {dev} [ diff --git a/eio_luv.opam b/eio_windows.opam similarity index 74% rename from eio_luv.opam rename to eio_windows.opam index 25307e4d9..bf718229f 100644 --- a/eio_luv.opam +++ b/eio_windows.opam @@ -1,7 +1,7 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" -synopsis: "Eio implementation using luv (libuv)" -description: "An Eio implementation for most platforms, using luv." +synopsis: "Eio implementation for Windows" +description: "An Eio implementation using I/O Completion Ports" maintainer: ["anil@recoil.org"] authors: ["Anil Madhavapeddy" "Thomas Leonard"] license: "ISC" @@ -11,10 +11,6 @@ bug-reports: "https://github.com/ocaml-multicore/eio/issues" depends: [ "dune" {>= "3.7"} "eio" {= version} - "luv" {>= "0.5.11"} - "luv_unix" {>= "0.5.0"} - "mdx" {>= "2.2.0" & with-test} - "fmt" {>= "0.8.9"} "odoc" {with-doc} ] build: [ diff --git a/lib_eio_luv/dune b/lib_eio_luv/dune deleted file mode 100644 index 3dc55d38e..000000000 --- a/lib_eio_luv/dune +++ /dev/null @@ -1,4 +0,0 @@ -(library - (name eio_luv) - (public_name eio_luv) - (libraries eio eio.unix luv luv_unix eio.utils fmt)) diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml deleted file mode 100644 index 61de3dd68..000000000 --- a/lib_eio_luv/eio_luv.ml +++ /dev/null @@ -1,1339 +0,0 @@ -(* - * Copyright (C) 2021 Thomas Leonard - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -open Eio.Std - -module Ctf = Eio.Private.Ctf - -module Fiber_context = Eio.Private.Fiber_context -module Lf_queue = Eio_utils.Lf_queue - -type Eio.Exn.Backend.t += - | Luv_error of Luv.Error.t - | Outside_sandbox of string * string - | Absolute_path - -let unclassified_error e = Eio.Exn.create (Eio.Exn.X e) - -let () = - Eio.Exn.Backend.register_pp (fun f -> function - | Luv_error e -> Fmt.pf f "Eio_luv.Luv_error(%s) (* %s *)" (Luv.Error.err_name e) (Luv.Error.strerror e); true - | Outside_sandbox (path, dir) -> Fmt.pf f "Outside_sandbox (%S, %S)" path dir; true - | Absolute_path -> Fmt.pf f "Absolute_path"; true - | _ -> false - ) - -let wrap_error = function - | `ECONNREFUSED as e -> Eio.Net.err (Connection_failure (Refused (Luv_error e))) - | `ECONNRESET | `EPIPE as e -> Eio.Net.err (Connection_reset (Luv_error e)) - | e -> unclassified_error (Luv_error e) - -let wrap_error_fs e = - match e with - | `EEXIST -> Eio.Fs.err (Already_exists (Luv_error e)) - | `ENOENT -> Eio.Fs.err (Not_found (Luv_error e)) - | `EPERM | `EACCES -> Eio.Fs.err (Permission_denied (Luv_error e)) - | e -> wrap_error e - -let or_raise = function - | Ok x -> x - | Error e -> raise (wrap_error e) - -let or_raise_fs = function - | Ok x -> x - | Error e -> raise (wrap_error_fs e) - -(* Luv can't handle buffers with more than 2^32-1 bytes, limit it to - 31bit so we can also make sure 32bit archs don't overflow. - See https://github.com/ocaml-multicore/eio/issues/335 *) -let max_luv_buffer_size = 0x3fffffff (* Max signed int on 32-bit OCaml platforms *) - -(* Return as much of [buf] as luv can handle. This is suitable if a short read/write is acceptable. *) -let cstruct_to_luv_truncate buf = - Cstruct.to_bigarray @@ - if Cstruct.length buf <= max_luv_buffer_size then buf - else Cstruct.sub buf 0 max_luv_buffer_size - -(* Raise if the buffer is too big. Use this for atomic reads and writes. *) -let cstruct_to_luv_exn buf = - if Cstruct.length buf <= max_luv_buffer_size then Cstruct.to_bigarray buf - else Fmt.invalid_arg "Buffer too large for luv (%d > %d)" (Cstruct.length buf) max_luv_buffer_size - -(* For vectors, we can just split long buffers into two. *) -let rec cstructv_to_luv = function - | [] -> [] - | x :: xs when Cstruct.length x <= max_luv_buffer_size -> - Cstruct.to_bigarray x :: cstructv_to_luv xs - | x :: xs -> - let x1, x2 = Cstruct.split x max_luv_buffer_size in - Cstruct.to_bigarray x1 :: cstructv_to_luv (x2 :: xs) - -module Suspended = struct - type 'a t = { - fiber : Eio.Private.Fiber_context.t; - k : ('a, unit) Effect.Deep.continuation; - } - - let tid t = Eio.Private.Fiber_context.tid t.fiber - - let continue t v = - Ctf.note_switch (tid t); - Effect.Deep.continue t.k v - - let discontinue t ex = - Ctf.note_switch (tid t); - Effect.Deep.discontinue t.k ex - - let continue_result t = function - | Ok x -> continue t x - | Error x -> discontinue t x - - let set_cancel_fn t fn = Fiber_context.set_cancel_fn t.fiber fn - let clear_cancel_fn t = Fiber_context.clear_cancel_fn t.fiber -end - -type runnable = - | IO - | Thread of (unit -> unit) - -type fd_event_waiters = { - fd : Unix.file_descr; - handle : Luv.Poll.t; - read : unit Suspended.t Lwt_dllist.t; - write : unit Suspended.t Lwt_dllist.t; -} - -module Fd_map = Map.Make(struct type t = Unix.file_descr let compare = Stdlib.compare end) - -type t = { - loop : Luv.Loop.t; - async : Luv.Async.t; (* Will process [run_q] when prodded. *) - run_q : runnable Lf_queue.t; - mutable fd_map : fd_event_waiters Fd_map.t; (* Used for mapping readable/writable poll handles *) -} - -type _ Effect.t += Await : (Luv.Loop.t -> Eio.Private.Fiber_context.t -> ('a -> unit) -> unit) -> 'a Effect.t - -type _ Effect.t += Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t -type _ Effect.t += Enter_unchecked : (t -> 'a Suspended.t -> unit) -> 'a Effect.t - -let enter fn = Effect.perform (Enter fn) -let enter_unchecked fn = Effect.perform (Enter_unchecked fn) - -let enqueue_thread t k v = - Lf_queue.push t.run_q (Thread (fun () -> Suspended.clear_cancel_fn k; Suspended.continue k v)); - Luv.Async.send t.async |> or_raise - -let enqueue_result_thread t k r = - Lf_queue.push t.run_q (Thread (fun () -> Suspended.clear_cancel_fn k; Suspended.continue_result k r)); - Luv.Async.send t.async |> or_raise - -let enqueue_failed_thread t k ex = - Lf_queue.push t.run_q (Thread (fun () -> Suspended.clear_cancel_fn k; Suspended.discontinue k ex)); - Luv.Async.send t.async |> or_raise - -module Poll : sig - val await_readable : t -> unit Suspended.t -> Unix.file_descr -> unit - val await_writable : t -> unit Suspended.t -> Unix.file_descr -> unit - - val cancel_all : t -> Unix.file_descr -> unit - (** [cancel_all t fd] should be called just before [fd] is closed. - Any waiters will be cancelled. *) -end = struct - (* According to the libuv docs: - - It is not okay to have multiple poll handles for the same file descriptor. - - The user should not close the file descriptor while it is being polled - by an active poll handle. - - As such, we keep track of the mapping between poll handle and FD in the [fd_map]. - This contains two queues of waiters for a given handle; those waiting for readability and - those waiting for writability. - - Whenever the [read] queue is non-empty we enable polling for the READ event, and - whenevent the [write] queue is non-empty we enable polling for WRTIE. When both are - empty we stop polling. *) - - let apply_all q fn = - let rec loop = function - | None -> () - | Some v -> fn v; loop (Lwt_dllist.take_opt_r q) - in - loop (Lwt_dllist.take_opt_r q) - - let enqueue_and_remove t fn k v = - Suspended.clear_cancel_fn k; - fn t k v - - let rec poll_callback t events r = - begin match r with - | Ok (es : Luv.Poll.Event.t list) -> - if List.mem `READABLE es then apply_all events.read (fun k -> enqueue_and_remove t enqueue_thread k ()); - if List.mem `WRITABLE es then apply_all events.write (fun k -> enqueue_and_remove t enqueue_thread k ()); - | Error e -> - let e = unclassified_error (Luv_error e) in - apply_all events.read (fun k -> enqueue_and_remove t enqueue_failed_thread k e); - apply_all events.write (fun k -> enqueue_and_remove t enqueue_failed_thread k e) - end; - update t events - and update t events = - let m = if Lwt_dllist.is_empty events.write then [] else [`WRITABLE] in - let m = if Lwt_dllist.is_empty events.read then m else `READABLE :: m in - if m = [] then ( - Luv.Poll.stop events.handle |> or_raise; - t.fd_map <- Fd_map.remove events.fd t.fd_map; - Luv.Handle.close events.handle (fun () -> ()) - ) else ( - Luv.Poll.start events.handle m (poll_callback t events) - ) - - let cancel_all t fd = - match Fd_map.find_opt fd t.fd_map with - | Some v -> - let ex = Failure "Closed file descriptor whilst polling" in - apply_all v.read (fun k -> enqueue_and_remove t enqueue_failed_thread k ex); - apply_all v.write (fun k -> enqueue_and_remove t enqueue_failed_thread k ex); - update t v - | None -> () - - let get_events t fd = - match Fd_map.find_opt fd t.fd_map with - | Some events -> events - | None -> - let handle = Luv.Poll.init ~loop:t.loop (Obj.magic fd : int) |> or_raise in - let events = { - fd; - handle; - read = Lwt_dllist.create (); - write = Lwt_dllist.create (); - } in - t.fd_map <- Fd_map.add fd events t.fd_map; - events - - let await t (k:unit Suspended.t) events queue = - let was_empty = Lwt_dllist.is_empty queue in - let node = Lwt_dllist.add_l k queue in - (* Set the fiber cancel function, which first removes the continutation - from the list to continue when the FD becomes readable or writeable. - Then it checks if the poll handle can be stopped and the mapping - removed. *) - Fiber_context.set_cancel_fn k.fiber (fun ex -> - Lwt_dllist.remove node; - if Lwt_dllist.is_empty queue then update t events; - enqueue_failed_thread t k ex - ); - if was_empty then update t events - - let await_readable t k fd = - let events = get_events t fd in - await t k events events.read - - let await_writable t k fd = - let events = get_events t fd in - await t k events events.write -end - -(* Can only be called from our domain. *) -let enqueue_at_head t k v = - Lf_queue.push_head t.run_q (Thread (fun () -> Suspended.continue k v)); - Luv.Async.send t.async |> or_raise - -let unix_fstat fd = - let ust = Unix.LargeFile.fstat fd in - let st_kind : Eio.File.Stat.kind = - match ust.st_kind with - | Unix.S_REG -> `Regular_file - | Unix.S_DIR -> `Directory - | Unix.S_CHR -> `Character_special - | Unix.S_BLK -> `Block_device - | Unix.S_LNK -> `Symbolic_link - | Unix.S_FIFO -> `Fifo - | Unix.S_SOCK -> `Socket - in - Eio.File.Stat.{ - dev = ust.st_dev |> Int64.of_int; - ino = ust.st_ino |> Int64.of_int; - kind = st_kind; - perm = ust.st_perm; - nlink = ust.st_nlink |> Int64.of_int; - uid = ust.st_uid |> Int64.of_int; - gid = ust.st_gid |> Int64.of_int; - rdev = ust.st_rdev |> Int64.of_int; - size = ust.st_size |> Optint.Int63.of_int64; - atime = ust.st_atime; - mtime = ust.st_mtime; - ctime = ust.st_ctime; - } - -module Low_level = struct - type 'a or_error = ('a, Luv.Error.t) result - - let or_raise = or_raise - - let get_loop () = - enter_unchecked @@ fun t k -> - Suspended.continue k t.loop - - let await fn = - Effect.perform (Await fn) - - let await_exn fn = - Effect.perform (Await fn) |> or_raise - - let await_with_cancel ~request fn = - enter (fun st k -> - let cancel_reason = ref None in - Suspended.set_cancel_fn k (fun ex -> - cancel_reason := Some ex; - ignore (Luv.Request.cancel request : _ result); - ); - fn st.loop (fun result -> - match result, !cancel_reason with - | Error _, Some cex -> - (* If we got an error and we were cancelling, report the cancellation as the reason *) - enqueue_failed_thread st k cex - | v, _ -> - Suspended.clear_cancel_fn k; - enqueue_thread st k v - ) - ) - - module Handle = struct - type 'a t = { - mutable release_hook : Eio.Switch.hook; (* Use this on close to remove switch's [on_release] hook. *) - close_unix : bool; - mutable fd : [`Open of 'a Luv.Handle.t | `Closed] - } constraint 'a = [< `Poll | `Stream of [< `Pipe | `TCP | `TTY ] | `UDP ] - - let get op = function - | { fd = `Open fd; _ } -> fd - | { fd = `Closed ; _ } -> invalid_arg (op ^ ": handle used after calling close!") - - let is_open = function - | { fd = `Open _; _ } -> true - | { fd = `Closed; _ } -> false - - let close t = - Ctf.label "close"; - let fd = get "close" t in - t.fd <- `Closed; - Eio.Switch.remove_hook t.release_hook; - if t.close_unix then ( - enter_unchecked @@ fun t k -> - begin match Luv.Handle.fileno fd with - | Ok fd -> Poll.cancel_all t (Luv_unix.Os_fd.Fd.to_unix fd) - | Error `EBADF -> () (* We don't have a Unix FD yet, so we can't be watching it. *) - | Error e -> raise (unclassified_error (Luv_error e)) - end; - Luv.Handle.close fd (enqueue_thread t k) - ) - - let ensure_closed t = - if is_open t then close t - - let to_luv x = get "to_luv" x - - let of_luv_no_hook ~close_unix fd = - { fd = `Open fd; release_hook = Eio.Switch.null_hook; close_unix } - - let of_luv ?(close_unix=true) ~sw fd = - let t = of_luv_no_hook ~close_unix fd in - t.release_hook <- Switch.on_release_cancellable sw (fun () -> ensure_closed t); - t - - let to_unix_opt op (t:_ t) = - match Luv.Handle.fileno (to_luv t) with - | Error _ -> None - | Ok os_fd -> - let fd = Luv_unix.Os_fd.Fd.to_unix os_fd in - match op with - | `Peek -> Some fd - | `Take -> - t.fd <- `Closed; - Eio.Switch.remove_hook t.release_hook; - Some fd - end - - module File = struct - type t = { - mutable release_hook : Eio.Switch.hook; (* Use this on close to remove switch's [on_release] hook. *) - mutable fd : [`Open of Luv.File.t | `Closed] - } - - let get op = function - | { fd = `Open fd; _ } -> fd - | { fd = `Closed ; _ } -> invalid_arg (op ^ ": file descriptor used after calling close!") - - let is_open = function - | { fd = `Open _; _ } -> true - | { fd = `Closed; _ } -> false - - let close t = - Ctf.label "close"; - let fd = get "close" t in - t.fd <- `Closed; - Eio.Switch.remove_hook t.release_hook; - enter_unchecked (fun st k -> - let os_fd = Luv.File.get_osfhandle fd |> or_raise in - let unix_fd = Luv_unix.Os_fd.Fd.to_unix os_fd in - Poll.cancel_all st unix_fd; - Luv.File.close ~loop:st.loop fd (enqueue_thread st k) - ) |> or_raise - - let ensure_closed t = - if is_open t then close t - - let to_luv = get "to_luv" - - let of_luv_no_hook fd = - { fd = `Open fd; release_hook = Eio.Switch.null_hook } - - let of_luv ~sw fd = - let t = of_luv_no_hook fd in - t.release_hook <- Switch.on_release_cancellable sw (fun () -> ensure_closed t); - t - - let fstat fd = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.fstat ~loop ~request (to_luv fd)) - - let open_ ~sw ?mode path flags = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.open_ ~loop ?mode ~request path flags) - |> Result.map (of_luv ~sw) - - let read ?file_offset fd bufs = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.read ~loop ~request ?file_offset (get "read" fd) bufs) - - let write_single ?file_offset fd bufs = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.write ~loop ~request ?file_offset (get "write" fd) bufs) - - let rec write fd bufs = - match write_single fd bufs with - | Error _ as e -> e - | Ok sent -> - let rec aux = function - | [] -> Ok () - | x :: xs when Luv.Buffer.size x = 0 -> aux xs - | bufs -> write fd bufs - in - aux @@ Luv.Buffer.drop bufs (Unsigned.Size_t.to_int sent) - - let realpath path = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.realpath ~loop ~request path) - - let mkdir ~mode path = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.mkdir ~loop ~request ~mode path) - - let unlink path = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.unlink ~loop ~request path) - - let rmdir path = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.rmdir ~loop ~request path) - - let rename old_path new_path = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.rename ~loop ~request old_path ~to_:new_path) - - let opendir path = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.opendir ~loop ~request path) - - let closedir path = - let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.closedir ~loop ~request path) - - let with_dir_to_read path fn = - match opendir path with - | Ok dir -> - Fun.protect ~finally:(fun () -> closedir dir |> or_raise) @@ fun () -> fn dir - | Error _ as e -> e - - let readdir path = - let fn dir = - let request = Luv.File.Request.make () in - match await_with_cancel ~request (fun loop -> Luv.File.readdir ~loop ~request dir) with - | Ok dirents -> - let dirs = Array.map (fun v -> v.Luv.File.Dirent.name) dirents |> Array.to_list in - Ok dirs - | Error _ as e -> e - in - with_dir_to_read path fn - - let to_unix op t = - let os_fd = Luv.File.get_osfhandle (get "to_unix" t) |> or_raise in - let fd = Luv_unix.Os_fd.Fd.to_unix os_fd in - match op with - | `Peek -> fd - | `Take -> - t.fd <- `Closed; - Eio.Switch.remove_hook t.release_hook; - fd - end - - module Random = struct - let rec fill buf = - let request = Luv.Random.Request.make () in - match await_with_cancel ~request (fun loop -> Luv.Random.random ~loop ~request buf) with - | Ok x -> x - | Error `EINTR -> fill buf - | Error x -> raise @@ wrap_error x - end - - module Stream = struct - type 'a t = [`Stream of 'a] Handle.t - - let rec read_into (sock:'a t) buf = - let r = enter (fun t k -> - Suspended.set_cancel_fn k (fun ex -> - Luv.Stream.read_stop (Handle.get "read_into:cancel" sock) |> or_raise; - enqueue_failed_thread t k ex - ); - Luv.Stream.read_start (Handle.get "read_start" sock) ~allocate:(fun _ -> buf) (fun r -> - Suspended.clear_cancel_fn k; - Luv.Stream.read_stop (Handle.get "read_stop" sock) |> or_raise; - enqueue_thread t k r - ) - ) in - match r with - | Ok buf' -> - let len = Luv.Buffer.size buf' in - if len > 0 then len - else read_into sock buf (* Luv uses a zero-length read to mean EINTR! *) - | Error `EOF -> raise End_of_file - | Error x -> raise (wrap_error x) - - let rec skip_empty = function - | empty :: xs when Luv.Buffer.size empty = 0 -> skip_empty xs - | xs -> xs - - let rec write t bufs = - let err, n = - (* note: libuv doesn't seem to allow cancelling stream writes *) - enter (fun st k -> - Luv.Stream.write (Handle.get "write_stream" t) bufs @@ fun err n -> - enqueue_thread st k (err, n) - ) - in - match err with - | Error e -> raise (wrap_error e) - | Ok () -> - match Luv.Buffer.drop bufs n |> skip_empty with - | [] -> () - | bufs -> write t bufs - - let to_unix_opt = Handle.to_unix_opt - - let of_unix fd = - Luv_unix.Os_fd.Socket.from_unix fd |> or_raise - - let connect_pipe ~sw path = - let sock = Luv.Pipe.init ~loop:(get_loop ()) () |> or_raise |> Handle.of_luv ~sw in - match await (fun _loop _fiber -> Luv.Pipe.connect (Handle.get "connect" sock) path) with - | Ok () -> sock - | Error e -> raise @@ wrap_error_fs e - - let connect_tcp ~sw addr = - let sock = Luv.TCP.init ~loop:(get_loop ()) () |> or_raise in - enter (fun st k -> - Luv.TCP.connect sock addr (fun v -> - Suspended.clear_cancel_fn k; - match v with - | Ok () -> enqueue_thread st k () - | Error e -> - Luv.Handle.close sock ignore; - match Fiber_context.get_error k.fiber with - | Some ex -> enqueue_failed_thread st k ex - | None -> enqueue_failed_thread st k (wrap_error e) - ); - Fiber_context.set_cancel_fn k.fiber (fun _ex -> - match Luv.Handle.fileno sock with - | Error _ -> () - | Ok os_fd -> - let fd = Luv_unix.Os_fd.Fd.to_unix os_fd in - Unix.shutdown fd Unix.SHUTDOWN_ALL; - (* Luv.Handle.close sock ignore *) - ) - ); - Handle.of_luv ~sw sock - end - - module Pipe = struct - type t = [`Stream of [`Pipe]] Handle.t - - let init ?for_handle_passing ~sw () = - Luv.Pipe.init ~loop:(get_loop ()) ?for_handle_passing () - |> or_raise - |> Handle.of_luv ~close_unix:true ~sw - end - - module Process = struct - type t = { - proc : Luv.Process.t; - status : (int * int64) Promise.t; - } - - let pid t = Luv.Process.pid t.proc - - let to_parent_pipe ?readable_in_child ?writable_in_child ?overlapped ~fd ~parent_pipe () = - let parent_pipe = Handle.to_luv parent_pipe in - Luv.Process.to_parent_pipe ?readable_in_child ?writable_in_child ?overlapped ~fd ~parent_pipe () - - let await_exit t = Promise.await t.status - - let has_exited t = Promise.is_resolved t.status - - let send_signal t i = Luv.Process.kill t.proc i |> or_raise - - let spawn ?cwd ?env ?uid ?gid ?(redirect=[]) ~sw cmd args = - let status, set_status = Promise.create () in - let hook = ref None in - let on_exit proc ~exit_status ~term_signal = - Option.iter Switch.remove_hook !hook; - Luv.Handle.close proc (fun () -> ()); - Promise.resolve set_status (term_signal, exit_status) - in - let proc = Luv.Process.spawn ?environment:env ?uid ?gid ~loop:(get_loop ()) ?working_directory:cwd ~redirect ~on_exit cmd args |> or_raise in - if not (Promise.is_resolved status) then ( - let h = Switch.on_release_cancellable sw (fun () -> - Luv.Process.kill proc Luv.Signal.sigkill |> or_raise; - ignore (Promise.await status) - ) in - hook := Some h - ); - { proc; status } - end - - module Poll = Poll - - let sleep_ms delay = - enter @@ fun st k -> - let timer = Luv.Timer.init ~loop:st.loop () |> or_raise in - Suspended.set_cancel_fn k (fun ex -> - Luv.Timer.stop timer |> or_raise; - Luv.Handle.close timer (fun () -> ()); - enqueue_failed_thread st k ex - ); - Luv.Timer.start timer delay (fun () -> - Suspended.clear_cancel_fn k; - Luv.Handle.close timer (fun () -> ()); - enqueue_thread st k () - ) |> or_raise - - let sleep_until due = - let delay = 1000. *. (due -. Unix.gettimeofday ()) |> ceil |> truncate |> max 0 in - sleep_ms delay - - (* https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml *) - let getaddrinfo ~service node = - let ( let* ) o f = Option.bind o f in - let to_eio_sockaddr_t {Luv.DNS.Addr_info.family; addr; socktype; protocol; _ } = - match family, socktype with - | (`INET | `INET6), - (`STREAM | `DGRAM) -> ( - let* host = Luv.Sockaddr.to_string addr in - let* port = Luv.Sockaddr.port addr in - let ipaddr = Unix.inet_addr_of_string host |> Eio_unix.Ipaddr.of_unix in - match protocol with - | 6 -> Some (`Tcp (ipaddr, port)) - | 17 -> Some (`Udp (ipaddr, port)) - | _ -> None) - | _ -> None - in - let request = Luv.DNS.Addr_info.Request.make () in - await_with_cancel ~request (fun loop -> Luv.DNS.getaddrinfo ~loop ~request ~service ~node ()) - |> or_raise - |> List.filter_map to_eio_sockaddr_t - -end - -open Low_level - -type _ Eio.Generic.ty += FD : Low_level.File.t Eio.Generic.ty - -type has_fd = < fd : Low_level.File.t > -type source = < Eio.Flow.source; Eio.Flow.close; has_fd > -type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd > - -let get_fd (t : ) = t#fd - -let get_fd_opt t = Eio.Generic.probe t FD - -let flow fd = object (_ : ) - method fd = fd - method close = Low_level.File.close fd - method unix_fd op = File.to_unix op fd - - method stat = unix_fstat (File.to_unix `Peek fd) - - method probe : type a. a Eio.Generic.ty -> a option = function - | FD -> Some fd - | Eio_unix.Private.Unix_file_descr op -> Some (File.to_unix op fd) - | _ -> None - - method read_into buf = - let buf = cstruct_to_luv_truncate buf in - match File.read fd [buf] |> or_raise |> Unsigned.Size_t.to_int with - | 0 -> raise End_of_file - | got -> got - - method pread ~file_offset bufs = - let bufs = cstructv_to_luv bufs in - let file_offset = Optint.Int63.to_int64 file_offset in - match File.read ~file_offset fd bufs |> or_raise |> Unsigned.Size_t.to_int with - | 0 -> raise End_of_file - | got -> got - - method pwrite ~file_offset bufs = - let bufs = cstructv_to_luv bufs in - let file_offset = Optint.Int63.to_int64 file_offset in - File.write_single ~file_offset fd bufs |> or_raise |> Unsigned.Size_t.to_int - - method read_methods = [] - - method write bufs = - let bufs = cstructv_to_luv bufs in - File.write fd bufs |> or_raise - - method copy src = - let buf = Luv.Buffer.create 4096 in - try - while true do - let got = Eio.Flow.single_read src (Cstruct.of_bigarray buf) in - let sub = Luv.Buffer.sub buf ~offset:0 ~length:got in - File.write fd [sub] |> or_raise - done - with End_of_file -> () -end - -let source fd = (flow fd :> source) -let sink fd = (flow fd :> sink) - -let socket sock = object - inherit Eio.Flow.two_way as super - - method! probe : type a. a Eio.Generic.ty -> a option = function - | Eio_unix.Private.Unix_file_descr op -> Stream.to_unix_opt op sock - | x -> super#probe x - - method unix_fd op = Stream.to_unix_opt op sock |> Option.get - - method read_into buf = - let buf = cstruct_to_luv_truncate buf in - Stream.read_into sock buf - - method! write bufs = - let bufs = cstructv_to_luv bufs in - Stream.write sock bufs - - method copy src = - let buf = Luv.Buffer.create 4096 in - try - while true do - let got = Eio.Flow.single_read src (Cstruct.of_bigarray buf) in - let buf' = Luv.Buffer.sub buf ~offset:0 ~length:got in - Stream.write sock [buf'] - done - with End_of_file -> () - - method close = - Handle.close sock - - method shutdown = function - | `Send -> await_exn (fun _loop _fiber -> Luv.Stream.shutdown (Handle.get "shutdown" sock)) - | `Receive | `All as cmd -> - let fd = Stream.to_unix_opt `Peek sock |> Option.get in - Unix.shutdown fd @@ match cmd with - | `Receive -> Unix.SHUTDOWN_RECEIVE - | `All -> Unix.SHUTDOWN_ALL -end - -class virtual ['a] listening_socket ~backlog sock = object (self) - inherit Eio.Net.listening_socket as super - - method! probe : type a. a Eio.Generic.ty -> a option = function - | Eio_unix.Private.Unix_file_descr op -> Stream.to_unix_opt op sock - | x -> super#probe x - - val ready = Eio.Semaphore.make 0 - - method private virtual make_client : 'a Luv.Stream.t - method private virtual get_client_addr : 'a Stream.t -> Eio.Net.Sockaddr.stream - - method close = Handle.close sock - - method accept ~sw = - Eio.Semaphore.acquire ready; - let client = self#make_client |> Handle.of_luv_no_hook ~close_unix:true in - match Luv.Stream.accept ~server:(Handle.get "accept" sock) ~client:(Handle.get "accept" client) with - | Error e -> - Handle.close client; - raise (wrap_error e) - | Ok () -> - Switch.on_release sw (fun () -> Handle.ensure_closed client); - let flow = (socket client :> ) in - let client_addr = self#get_client_addr client in - flow, client_addr - - initializer - Luv.Stream.listen ~backlog (Handle.get "listen" sock) (fun x -> - or_raise x; - Eio.Semaphore.release ready - ) -end - -(* TODO: implement, or maybe remove from the Eio API. - Luv makes TCP sockets reuse_addr by default, and maybe that's fine everywhere. - Extracting the FD will require https://github.com/aantron/luv/issues/120 *) -let luv_reuse_addr _sock _v = () -let luv_reuse_port _sock _v = () - -let luv_addr_of_eio host port = - let host = Unix.string_of_inet_addr (Eio_unix.Ipaddr.to_unix host) in - match Luv.Sockaddr.ipv6 host port with - | Ok addr -> addr - | Error _ -> Luv.Sockaddr.ipv4 host port |> or_raise - -let luv_ip_addr_to_eio addr = - let host = Luv.Sockaddr.to_string addr |> Option.get in - let port = Luv.Sockaddr.port addr |> Option.get in - (Eio_unix.Ipaddr.of_unix (Unix.inet_addr_of_string host), port) - -module Udp = struct - type 'a t = [`UDP] Handle.t - - (* When the sender address in the callback of [recv_start] is [None], this usually indicates - EAGAIN according to the luv documentation which can be ignored. Libuv calls the callback - in case C programs wish to handle the allocated buffer in some way. *) - let recv (sock:'a t) buf = - let r = enter (fun t k -> - Suspended.set_cancel_fn k (fun ex -> - Luv.UDP.recv_stop (Handle.get "recv_into:cancel" sock) |> or_raise; - enqueue_failed_thread t k ex - ); - Luv.UDP.recv_start (Handle.get "recv_start" sock) ~allocate:(fun _ -> buf) (function - | Ok (_, None, _) -> () - | Ok (buf, Some addr, flags) -> - Suspended.clear_cancel_fn k; - Luv.UDP.recv_stop (Handle.get "recv_stop" sock) |> or_raise; - enqueue_thread t k (Ok (buf, addr, flags)) - | Error _ as err -> - Suspended.clear_cancel_fn k; - Luv.UDP.recv_stop (Handle.get "recv_stop" sock) |> or_raise; - enqueue_thread t k err - ) - ) in - match r with - | Ok (buf', sockaddr, _recv_flags) -> - `Udp (luv_ip_addr_to_eio sockaddr), Luv.Buffer.size buf' - | Error e -> raise (wrap_error e) - - let send t buf = function - | `Udp (host, port) -> - let bufs = cstructv_to_luv [ buf ] in - match await (fun _loop _fiber -> Luv.UDP.send (Handle.get "send" t) bufs (luv_addr_of_eio host port)) with - | Ok () -> () - | Error e -> raise (wrap_error e) -end - -let udp_socket endp = object - inherit Eio.Net.datagram_socket - - method close = Handle.close endp - - method send sockaddr bufs = Udp.send endp bufs sockaddr - method recv buf = - let buf = cstruct_to_luv_exn buf in - Udp.recv endp buf -end - -let listening_ip_socket ~backlog sock = object - inherit [[ `TCP ]] listening_socket ~backlog sock - - method private make_client = Luv.TCP.init ~loop:(get_loop ()) () |> or_raise - - method private get_client_addr c = - `Tcp (Luv.TCP.getpeername (Handle.get "get_client_addr" c) |> or_raise |> luv_ip_addr_to_eio) -end - -let listening_unix_socket ~backlog sock = object - inherit [[ `Pipe ]] listening_socket ~backlog sock - - method private make_client = Luv.Pipe.init ~loop:(get_loop ()) () |> or_raise - method private get_client_addr c = - `Unix (Luv.Pipe.getpeername (Handle.get "get_client_addr" c) |> or_raise) -end - -let socket_domain_of = function - | `UdpV4 -> `INET - | `UdpV6 -> `INET6 - | `Udp (host, _) - | `Tcp (host, _) -> - Eio.Net.Ipaddr.fold host - ~v4:(fun _ -> `INET) - ~v6:(fun _ -> `INET6) - -let net = object - inherit Eio.Net.t - - method listen ~reuse_addr ~reuse_port ~backlog ~sw = function - | `Tcp (host, port) -> - let sock = Luv.TCP.init ~loop:(get_loop ()) () |> or_raise |> Handle.of_luv ~sw in - luv_reuse_addr sock reuse_addr; - luv_reuse_port sock reuse_port; - let addr = luv_addr_of_eio host port in - Luv.TCP.bind (Handle.get "bind" sock) addr |> or_raise; - listening_ip_socket ~backlog sock - | `Unix path -> - let sock = Luv.Pipe.init ~loop:(get_loop ()) () |> or_raise |> Handle.of_luv ~sw in - luv_reuse_addr sock reuse_addr; - if reuse_addr then ( - match Unix.lstat path with - | Unix.{ st_kind = S_SOCK; _ } -> Unix.unlink path - | _ -> () - | exception Unix.Unix_error (Unix.ENOENT, _, _) -> () - ); - Luv.Pipe.bind (Handle.get "bind" sock) path |> or_raise; - (* Remove the path when done (except for abstract sockets). *) - if String.length path > 0 && path.[0] <> Char.chr 0 then - Switch.on_release sw (fun () -> Unix.unlink path); - listening_unix_socket ~backlog sock - - method connect ~sw = function - | `Tcp (host, port) -> - let sock = Stream.connect_tcp ~sw (luv_addr_of_eio host port) in - (socket sock :> < Eio.Flow.two_way; Eio.Flow.close >) - | `Unix path -> - let sock = Stream.connect_pipe ~sw path in - (socket sock :> < Eio.Flow.two_way; Eio.Flow.close >) - - method datagram_socket ~reuse_addr ~reuse_port ~sw saddr = - let domain = socket_domain_of saddr in - let sock = Luv.UDP.init ~domain ~loop:(get_loop ()) () |> or_raise in - let dg_sock = Handle.of_luv ~sw sock in - begin match saddr with - | `Udp (host, port) -> - let addr = luv_addr_of_eio host port in - luv_reuse_addr sock reuse_addr; - luv_reuse_port sock reuse_port; - Luv.UDP.bind sock addr |> or_raise - | `UdpV4 | `UdpV6 -> () - end; - udp_socket dg_sock - - method getaddrinfo = Low_level.getaddrinfo - - method getnameinfo = Eio_unix.getnameinfo -end - -let secure_random = - object - inherit Eio.Flow.source - - method read_into buf = - List.fold_left - (fun dim ba -> Random.fill ba; dim + Bigarray.Array1.dim ba) - 0 (cstructv_to_luv [ buf ]) - end - -type stdenv = < - stdin : source; - stdout : sink; - stderr : sink; - net : Eio.Net.t; - domain_mgr : Eio.Domain_manager.t; - clock : Eio.Time.clock; - mono_clock : Eio.Time.Mono.t; - fs : Eio.Fs.dir Eio.Path.t; - cwd : Eio.Fs.dir Eio.Path.t; - secure_random : Eio.Flow.source; - debug : Eio.Debug.t; -> - -let domain_mgr ~run_event_loop = - let run_raw (type a) ~pre_spawn fn = - let domain_k : (unit Domain.t * a Suspended.t) option ref = ref None in - let result = ref None in - enter @@ fun st k -> - let async = Luv.Async.init ~loop:st.loop (fun async -> - (* This is called in the parent domain after returning to the mainloop, - so [domain_k] must be set by then. *) - let domain, k = Option.get !domain_k in - Domain.join domain; - Luv.Handle.close async @@ fun () -> - Suspended.clear_cancel_fn k; - Suspended.continue_result k (Option.get !result) - ) |> or_raise - in - pre_spawn k; - let d = Domain.spawn (fun () -> - result := Some (match fn () with - | v -> Ok v - | exception ex -> Error ex - ); - Luv.Async.send async |> or_raise - ) in - domain_k := Some (d, k) - in - object - inherit Eio.Domain_manager.t - - method run_raw fn = - run_raw ~pre_spawn:ignore fn - - method run fn = - let cancelled, set_cancelled = Promise.create () in - let pre_spawn k = Suspended.set_cancel_fn k (Promise.resolve set_cancelled) in - run_raw ~pre_spawn @@ (fun () -> - let result = ref None in - run_event_loop (fun _ -> - result := Some (fn ~cancelled) - ); - Option.get !result - ) - end - -let clock = object - inherit Eio.Time.clock - - method now = Unix.gettimeofday () - method sleep_until = sleep_until -end - -let mono_clock = object - inherit Eio.Time.Mono.t - - method now = Mtime_clock.now () - - method sleep_until time = - let now = Mtime.to_uint64_ns (Mtime_clock.now ()) in - let time = Mtime.to_uint64_ns time in - if Int64.unsigned_compare now time >= 0 then Fiber.yield () - else ( - let delay_ns = Int64.sub time now |> Int64.to_float in - let delay_ms = delay_ns /. 1e6 |> ceil |> truncate |> max 0 in - Low_level.sleep_ms delay_ms - ) -end - -type _ Eio.Generic.ty += Dir_resolve_new : (string -> string) Eio.Generic.ty -let dir_resolve_new x = Eio.Generic.probe x Dir_resolve_new - -(* Warning: libuv doesn't provide [openat], etc, and so there is probably no way to make this safe. - We make a best-efforts attempt to enforce the sandboxing using realpath and [`NOFOLLOW]. - todo: this needs more testing *) -class dir ~label (dir_path : string) = object (self) - inherit Eio.Fs.dir - - val mutable closed = false - - method! probe : type a. a Eio.Generic.ty -> a option = function - | Dir_resolve_new -> Some self#resolve_new - | _ -> None - - (* Resolve a relative path to an absolute one, with no symlinks. - @raise Eio.Fs.Permission_denied if it's outside of [dir_path]. *) - method private resolve path = - if closed then Fmt.invalid_arg "Attempt to use closed directory %S" dir_path; - if Filename.is_relative path then ( - let dir_path = File.realpath dir_path |> or_raise_fs in - let full = File.realpath (Filename.concat dir_path path) |> or_raise_fs in - let prefix_len = String.length dir_path + 1 in - if String.length full >= prefix_len && String.sub full 0 prefix_len = dir_path ^ Filename.dir_sep then - full - else if full = dir_path then - full - else - raise @@ Eio.Fs.err (Permission_denied (Outside_sandbox (full, dir_path))) - ) else ( - raise @@ Eio.Fs.err (Permission_denied Absolute_path) - ) - - (* We want to create [path]. Check that the parent is in the sandbox. *) - method private resolve_new path = - let dir, leaf = Filename.dirname path, Filename.basename path in - if leaf = ".." then Fmt.failwith "New path %S ends in '..'!" path - else - let dir = self#resolve dir in - Filename.concat dir leaf - - method open_in ~sw path = - let fd = File.open_ ~sw (self#resolve path) [`NOFOLLOW; `RDONLY] |> or_raise_fs in - (flow fd :> ) - - method open_out ~sw ~append ~create path = - let mode, flags = - match create with - | `Never -> 0, [] - | `If_missing perm -> perm, [`CREAT] - | `Or_truncate perm -> perm, [`CREAT; `TRUNC] - | `Exclusive perm -> perm, [`CREAT; `EXCL] - in - let flags = if append then `APPEND :: flags else flags in - let flags = `RDWR :: `NOFOLLOW :: flags in - let real_path = - if create = `Never then self#resolve path - else self#resolve_new path - in - let fd = File.open_ ~sw real_path flags ~mode:[`NUMERIC mode] |> or_raise_fs in - (flow fd :> ) - - method open_dir ~sw path = - Switch.check sw; - let label = Filename.basename path in - let d = new dir ~label (self#resolve path) in - Switch.on_release sw (fun () -> d#close); - d - - (* libuv doesn't seem to provide a race-free way to do this. *) - method mkdir ~perm path = - let real_path = self#resolve_new path in - File.mkdir ~mode:[`NUMERIC perm] real_path |> or_raise_fs - - (* libuv doesn't seem to provide a race-free way to do this. *) - method unlink path = - let dir_path = Filename.dirname path in - let leaf = Filename.basename path in - let real_dir_path = self#resolve dir_path in - File.unlink (Filename.concat real_dir_path leaf) |> or_raise_fs - - (* libuv doesn't seem to provide a race-free way to do this. *) - method rmdir path = - let dir_path = Filename.dirname path in - let leaf = Filename.basename path in - let real_dir_path = self#resolve dir_path in - File.rmdir (Filename.concat real_dir_path leaf) |> or_raise_fs - - method read_dir path = - let path = self#resolve path in - File.readdir path |> or_raise_fs - - method rename old_path new_dir new_path = - match dir_resolve_new new_dir with - | None -> invalid_arg "Target is not a luv directory!" - | Some new_resolve_new -> - let old_path = self#resolve old_path in - let new_path = new_resolve_new new_path in - File.rename old_path new_path |> or_raise_fs - - method close = closed <- true - - method pp f = Fmt.string f (String.escaped label) -end - -(* Full access to the filesystem. *) -let fs = object - inherit dir ~label:"fs" "." - - (* No checks *) - method! private resolve path = path -end - -let cwd = object - inherit dir ~label:"cwd" "." -end - -let stdenv ~run_event_loop = - let stdin = lazy (source (File.of_luv_no_hook Luv.File.stdin)) in - let stdout = lazy (sink (File.of_luv_no_hook Luv.File.stdout)) in - let stderr = lazy (sink (File.of_luv_no_hook Luv.File.stderr)) in - object (_ : stdenv) - method stdin = Lazy.force stdin - method stdout = Lazy.force stdout - method stderr = Lazy.force stderr - method net = net - method domain_mgr = domain_mgr ~run_event_loop - method clock = clock - method mono_clock = mono_clock - method fs = (fs :> Eio.Fs.dir), "." - method cwd = (cwd :> Eio.Fs.dir), "." - method secure_random = secure_random - method debug = Eio.Private.Debug.v - end - -let rec wakeup ~async ~io_queued run_q = - match Lf_queue.pop run_q with - | Some (Thread f) -> - if not !io_queued then ( - Lf_queue.push run_q IO; - io_queued := true; - ); - f (); - wakeup ~async ~io_queued run_q - | Some IO -> - (* If threads keep yielding they could prevent pending IO from being processed. - Therefore, we keep an [IO] job on the queue to force us to check from time to time. *) - io_queued := false; - if not (Lf_queue.is_empty run_q) then - Luv.Async.send async |> or_raise - | None -> () - -let rec run2 : type a. (_ -> a) -> a = fun main -> - let loop = Luv.Loop.init () |> or_raise in - let run_q = Lf_queue.create () in - let io_queued = ref false in - let async = Luv.Async.init ~loop (fun async -> - try wakeup ~async ~io_queued run_q - with ex -> - let bt = Printexc.get_raw_backtrace () in - Fmt.epr "Uncaught exception in run loop:@,%a@." Fmt.exn_backtrace (ex, bt); - Luv.Loop.stop loop - ) |> or_raise in - let st = { loop; async; run_q; fd_map = Fd_map.empty } in - let stdenv = stdenv ~run_event_loop:run2 in - let rec fork ~new_fiber:fiber fn = - Ctf.note_switch (Fiber_context.tid fiber); - let open Effect.Deep in - match_with fn () - { retc = (fun () -> Fiber_context.destroy fiber); - exnc = (fun e -> Fiber_context.destroy fiber; raise e); - effc = fun (type a) (e : a Effect.t) -> - match e with - | Await fn -> - Some (fun k -> - let k = { Suspended.k; fiber } in - fn loop fiber (enqueue_thread st k)) - | Eio.Private.Effects.Fork (new_fiber, f) -> - Some (fun k -> - let k = { Suspended.k; fiber } in - enqueue_at_head st k (); - fork ~new_fiber f - ) - | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fiber) - | Enter_unchecked fn -> Some (fun k -> - fn st { Suspended.k; fiber } - ) - | Enter fn -> Some (fun k -> - match Fiber_context.get_error fiber with - | Some e -> discontinue k e - | None -> fn st { Suspended.k; fiber } - ) - | Eio.Private.Effects.Suspend fn -> - Some (fun k -> - let k = { Suspended.k; fiber } in - fn fiber (enqueue_result_thread st k) - ) - | Eio_unix.Private.Await_readable fd -> Some (fun k -> - match Fiber_context.get_error fiber with - | Some e -> discontinue k e - | None -> - let k = { Suspended.k; fiber } in - Poll.await_readable st k fd - ) - | Eio_unix.Private.Await_writable fd -> Some (fun k -> - match Fiber_context.get_error fiber with - | Some e -> discontinue k e - | None -> - let k = { Suspended.k; fiber } in - Poll.await_writable st k fd - ) - | Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k mono_clock) - | Eio_unix.Private.Socket_of_fd (sw, close_unix, fd) -> Some (fun k -> - match - let fd = Low_level.Stream.of_unix fd in - let sock = Luv.TCP.init ~loop () |> or_raise in - let handle = Handle.of_luv ~sw ~close_unix sock in - Luv.TCP.open_ sock fd |> or_raise; - (socket handle :> Eio_unix.socket) - with - | sock -> continue k sock - | exception (Eio.Io _ as ex) -> discontinue k ex - ) - | Eio_unix.Private.Socketpair (sw, domain, ty, protocol) -> Some (fun k -> - match - if domain <> Unix.PF_UNIX then failwith "Only PF_UNIX sockets are supported by libuv"; - let ty = - match ty with - | Unix.SOCK_DGRAM -> `DGRAM - | Unix.SOCK_STREAM -> `STREAM - | Unix.SOCK_RAW -> `RAW - | Unix.SOCK_SEQPACKET -> failwith "Type SEQPACKET not support by libuv" - in - let a, b = Luv.TCP.socketpair ty protocol |> or_raise in - let wrap x = - let sock = Luv.TCP.init ~loop () |> or_raise in - Luv.TCP.open_ sock x |> or_raise; - let h = Handle.of_luv ~sw ~close_unix:true sock in - (socket h :> Eio_unix.socket) - in - (wrap a, wrap b) - with - | x -> continue k x - | exception (Eio.Io _ as ex) -> discontinue k ex - ) - | Eio_unix.Private.Pipe sw -> Some (fun k -> - let r, w = Luv.Pipe.pipe ~read_flags:[] ~write_flags:[] () |> or_raise in - let r = (flow (File.of_luv ~sw r) :> ) in - let w = (flow (File.of_luv ~sw w) :> ) in - continue k (r, w) - ) - | _ -> None - } - in - let main_status = ref `Running in - let new_fiber = Fiber_context.make_root () in - fork ~new_fiber (fun () -> - begin match main stdenv with - | v -> main_status := `Done v - | exception ex -> main_status := `Ex (ex, Printexc.get_raw_backtrace ()) - end; - Luv.Handle.close async @@ fun () -> - Luv.Loop.stop loop - ); - ignore (Luv.Loop.run ~loop () : bool); - Luv.Loop.close loop |> or_raise; - Lf_queue.close st.run_q; - match !main_status with - | `Done v -> v - | `Ex (ex, bt) -> Printexc.raise_with_backtrace ex bt - | `Running -> failwith "Deadlock detected: no events scheduled but main function hasn't returned" - -let start_signal_thread () = - let all = List.init 64 (fun x -> x) in - let omask = Thread.sigmask SIG_SETMASK all in - let inp, outp = Unix.pipe ~cloexec:true () in - let tid = - Thread.create - (fun () -> - Thread.sigmask SIG_SETMASK [] |> ignore; - let bytes = Bytes.create 1 in - let rec loop () = - match Unix.read inp bytes 0 1 with - | exception Unix.Unix_error (Unix.EINTR, _, _) -> loop () - | 0 -> Unix.close inp - | _ -> failwith "signal pipe didn't return EOF or EINTR" - in - loop () - ) () - in - tid, omask, outp - -let stop_signal_thread (tid, omask, outp) = - Unix.close outp; - Thread.join tid; - Unix.sigprocmask SIG_SETMASK omask |> ignore - -let run main = - (* Unix's SIGPIPE makes no sense in a modern application. *) - if Sys.os_type = "Unix" then Sys.(set_signal sigpipe Signal_ignore); - let sigctx = start_signal_thread () in - Fun.protect (fun () -> run2 main) - ~finally:(fun () -> stop_signal_thread sigctx) diff --git a/lib_eio_luv/eio_luv.mli b/lib_eio_luv/eio_luv.mli deleted file mode 100644 index 9de90c30a..000000000 --- a/lib_eio_luv/eio_luv.mli +++ /dev/null @@ -1,227 +0,0 @@ -(** Eio backend using libuv. - - You will normally not use this module directly. - Instead, use {!Eio_main.run} to start an event loop and then use the API in the {!Eio} module. - - However, it is possible to use this module directly if you only want to support libuv. *) - -open Eio.Std - -type Eio.Exn.Backend.t += - | Luv_error of Luv.Error.t - | Outside_sandbox of string * string - | Absolute_path - -module Low_level : sig - type 'a or_error = ('a, Luv.Error.t) result - - val get_loop : unit -> Luv.Loop.t - (** [get_loop ()] returns the current fiber's event loop. - - When using the {!Luv} API directly, you {b must} pass this to any Luv function - that accepts a loop as an optional argument in order to use the resource with Eio. - The wrapper functions in this file all do this for you. *) - - val or_raise : 'a or_error -> 'a - (** [or_raise (Error e)] raises [Eio.Exn.Io] with [e]. *) - - val await_with_cancel : - request:[< `File | `Addr_info | `Name_info | `Random | `Thread_pool ] Luv.Request.t -> - (Luv.Loop.t -> (('a, Luv.Error.t) result -> unit) -> unit) -> ('a, Luv.Error.t) result - (** [await_with_cancel ~request fn] converts a function using a luv-style callback to one using effects. - It sets the fiber's cancel function to cancel [request], and clears it when the operation completes. *) - - (** {1 Time functions} *) - - val sleep_until : float -> unit - (** [sleep_until time] blocks until the current time is [time]. *) - - (** {1 DNS functions} *) - - val getaddrinfo : service:string -> string -> Eio.Net.Sockaddr.t list - (** [getaddrinfo ~service host] returns a list of IP addresses for [host]. [host] is either a domain name or - an ipaddress. *) - - (** {1 Low-level wrappers for Luv functions} *) - - module File : sig - type t - - val is_open : t -> bool - (** [is_open t] is [true] if {!close} hasn't been called yet. *) - - val close : t -> unit - (** [close t] closes [t]. - @raise Invalid_arg if [t] is already closed. *) - - val of_luv : sw:Switch.t -> Luv.File.t -> t - (** [of_luv ~sw fd] wraps [fd] as an open file descriptor. - This is unsafe if [fd] is closed directly (before or after wrapping it). - @param sw The FD is closed when [sw] is released, if not closed manually first. *) - - val to_luv : t -> Luv.File.t - (** [to_luv t] returns the wrapped descriptor. - This allows unsafe access to the FD. - @raise Invalid_arg if [t] is closed. *) - - val fstat : t -> Luv.File.Stat.t or_error - (** [fstat fd] returns the stat of [fd]. *) - - val open_ : - sw:Switch.t -> - ?mode:Luv.File.Mode.t list -> - string -> Luv.File.Open_flag.t list -> t or_error - (** Wraps {!Luv.File.open_} *) - - val read : ?file_offset:int64 -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error - (** Wraps {!Luv.File.read} *) - - val write_single : ?file_offset:int64 -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error - (** [write_single t bufs] performs a single write call and returns the number of bytes written, - which may be less than the amount of data provided in [bufs]. *) - - val write : t -> Luv.Buffer.t list -> unit or_error - (** [write t bufs] writes all the data in [bufs] (which may take several calls to {!write_single}). *) - - val realpath : string -> string or_error - (** Wraps {!Luv.File.realpath} *) - - val mkdir : mode:Luv.File.Mode.t list -> string -> unit or_error - (** Wraps {!Luv.File.mkdir} *) - - val rmdir : string -> unit or_error - (** Wraps {!Luv.File.rmdir} *) - - val unlink : string -> unit or_error - (** Wraps {!Luv.File.unlink} *) - - val readdir : string -> string list or_error - (** Wraps {!Luv.File.readdir}. [readdir] opens and closes the directory for reading for the user. *) - end - - module Random : sig - val fill : Luv.Buffer.t -> unit - (** Wraps {!Luv.Random.random} *) - end - - module Handle : sig - type 'a t - constraint 'a = [< `Poll | `Stream of [< `Pipe | `TCP | `TTY ] | `UDP ] - - val is_open : 'a t -> bool - (** [is_open t] is [true] if {!close} hasn't been called yet. *) - - val close : 'a t -> unit - (** [close t] closes [t]. - @raise Invalid_arg if [t] is already closed. *) - - val to_luv : 'a t -> 'a Luv.Handle.t - (** [to_luv t] returns the wrapped handle. - - This allows unsafe access to the handle. - @raise Invalid_arg if [t] is closed. *) - - val of_luv : ?close_unix:bool -> sw:Switch.t -> 'a Luv.Handle.t -> 'a t - (** [of_luv ~sw h] wraps [h] as an open handle. - - You {b must} pass the loop (from {!get_loop}) to any Luv function - that accepts one as an optional argument - in order to use the resource with the correct event loop. - - This function is unsafe if [h] is closed directly (before or after wrapping it). - - @param sw The handle is closed when [sw] is released, if not closed manually first. - @param close_unix if [true] (the default), calling [close] also closes [fd]. *) - end - - module Stream : sig - type 'a t = [`Stream of 'a] Handle.t - - val read_into : [< `Pipe | `TCP | `TTY ] t -> Luv.Buffer.t -> int - (** [read_into handle buf] reads some bytes from [handle] into [buf] returning the number - of bytes read. - @raise End_of_file if there is no more data to read *) - - val write : [ `Stream of [< `Pipe | `TCP | `TTY ] ] Handle.t -> Luv.Buffer.t list -> unit - (** [write handle bufs] writes the contents of [bufs] to [handle]. *) - end - - module Pipe : sig - type t = [`Pipe] Stream.t - (** A pipe *) - - val init : ?for_handle_passing:bool -> sw:Switch.t -> unit -> t - (** Wraps {!Luv.Pipe.init}*) - end - - module Process : sig - type t - (** A process *) - - val pid : t -> int - (** [pid t] returns the process id of [t]. *) - - val to_parent_pipe : - ?readable_in_child:bool -> - ?writable_in_child:bool -> - ?overlapped:bool -> - fd:int -> - parent_pipe:Pipe.t -> - unit -> - Luv.Process.redirection - (** Wraps {!Luv.Process.to_parent_pipe}*) - - val await_exit : t -> int * int64 - (** [await_exit t] waits for the process [t] to finish. - - It returns [(exit_status, term_signal)], see {!Luv.Process.spawn} for - more details on these values. *) - - val has_exited : t -> bool - (** [has_exited t] checks if the process [t] has exited or not. *) - - val send_signal : t -> int -> unit - (** A wrapper for {!Luv.Process.kill}. *) - - val spawn : - ?cwd:string -> - ?env:(string * string) list -> - ?uid:int -> - ?gid:int -> - ?redirect:Luv.Process.redirection list -> - sw:Switch.t -> - string -> - string list -> t - (** Wraps {!Luv.Process.spawn}. - - The process will be stopped when the switch is released if - it has not already exited.*) - end -end - -(** {1 Eio API} *) - -type has_fd = < fd : Low_level.File.t > -type source = < Eio.Flow.source; Eio.Flow.close; has_fd > -type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd > - -type stdenv = < - stdin : source; - stdout : sink; - stderr : sink; - net : Eio.Net.t; - domain_mgr : Eio.Domain_manager.t; - clock : Eio.Time.clock; - mono_clock : Eio.Time.Mono.t; - fs : Eio.Fs.dir Eio.Path.t; - cwd : Eio.Fs.dir Eio.Path.t; - secure_random : Eio.Flow.source; - debug : Eio.Debug.t; -> - -val get_fd : -> Low_level.File.t -val get_fd_opt : #Eio.Generic.t -> Low_level.File.t option - -(** {1 Main Loop} *) - -val run : (stdenv -> 'a) -> 'a diff --git a/lib_eio_luv/tests/dune b/lib_eio_luv/tests/dune deleted file mode 100644 index 0e1ed9770..000000000 --- a/lib_eio_luv/tests/dune +++ /dev/null @@ -1,3 +0,0 @@ -(mdx - (package eio_luv) - (deps (package eio_luv))) diff --git a/lib_eio_luv/tests/files.md b/lib_eio_luv/tests/files.md deleted file mode 100644 index 51a60d223..000000000 --- a/lib_eio_luv/tests/files.md +++ /dev/null @@ -1,66 +0,0 @@ -# Set up the test environment - -```ocaml -# #require "eio_luv";; -# open Eio.Std;; -``` - -```ocaml -let rec read_exactly fd buf = - let size = Luv.Buffer.size buf in - if size > 0 then ( - let got = Eio_luv.Low_level.File.read fd [buf] |> Eio_luv.Low_level.or_raise |> Unsigned.Size_t.to_int in - let next = Luv.Buffer.sub buf ~offset:got ~length:(size - got) in - read_exactly fd next - ) - -let () = - Luv.Error.set_on_unhandled_exception @@ fun ex -> - Printf.printf "Unhandled luv exception: %s\n%!" (Printexc.to_string ex) -``` - -# Hello, world - -```ocaml -# Eio_luv.run @@ fun env -> - Eio.Flow.copy_string "Hello, world!\n" (Eio.Stdenv.stdout env);; -Hello, world! -- : unit = () -``` - -# Read a few bytes from /dev/zero - -```ocaml -let main _stdenv = - Switch.run @@ fun sw -> - let fd = Eio_luv.Low_level.File.open_ ~sw "/dev/zero" [] |> Eio_luv.Low_level.or_raise in - let buf = Luv.Buffer.create 4 in - read_exactly fd buf; - traceln "Read %S" (Luv.Buffer.to_string buf); - Eio_luv.Low_level.File.close fd -``` - -```ocaml -# Eio_luv.run main;; -+Read "\000\000\000\000" -- : unit = () -``` - -# Test cancellation - -```ocaml -let main env = - let name = "hang.pipe" in - Unix.mkfifo name 0o700; - Fun.protect ~finally:(fun () -> Unix.unlink name) @@ fun () -> - Switch.run @@ fun sw -> - let fd = Eio_luv.Low_level.File.open_ ~sw name [`NONBLOCK] |> Eio_luv.Low_level.or_raise in - Fiber.both - (fun () -> read_exactly fd (Luv.Buffer.create 1)) - (fun () -> raise Exit);; -``` - -```ocaml -# Eio_luv.run main;; -Exception: Stdlib.Exit. -``` diff --git a/lib_eio_luv/tests/poll.md b/lib_eio_luv/tests/poll.md deleted file mode 100644 index 7a7e23c39..000000000 --- a/lib_eio_luv/tests/poll.md +++ /dev/null @@ -1,114 +0,0 @@ -# Set up the test environment - -```ocaml -# #require "eio_luv";; -# open Eio.Std;; -# open Eio;; -``` - -A helper function to create two sockets and pass their FDs to a function: - -```ocaml -let with_sockets fn = - Eio_luv.run @@ fun _env -> - Switch.run @@ fun sw -> - let src, dst = Eio_unix.socketpair ~sw () in - let src_fd = Option.get @@ Eio_unix.FD.peek_opt src in - let dst_fd = Option.get @@ Eio_unix.FD.peek_opt dst in - fn ~sw ((src, src_fd), (dst, dst_fd)) -``` - -Waiting for the same file descriptor to become writable does not raise `EEXIST`. - -```ocaml -# with_sockets @@ fun ~sw ((src, src_fd), (dst, dst_fd)) -> - Eio.Fiber.both - (fun () -> Eio_unix.await_writable src_fd) - (fun () -> Eio_unix.await_writable src_fd);; -- : unit = () -``` - -An example of reading and writing with different file descriptors. - -```ocaml -# with_sockets @@ fun ~sw ((src, src_fd), (dst, dst_fd)) -> - let message = "hello" in - let buffer = Buffer.create (String.length message) in - Eio.Fiber.both - (fun () -> - Eio_unix.await_readable src_fd; - Eio.Flow.copy src (Flow.buffer_sink buffer)) - (fun () -> - Eio_unix.await_writable dst_fd; - Eio.Flow.copy_string message dst; - Eio.Flow.close dst - ); - Buffer.contents buffer;; -- : string = "hello" -``` - -Waiting for reading and writing on the same file descriptor. - -```ocaml -# with_sockets @@ fun ~sw ((src, src_fd), (dst, dst_fd)) -> - let message = "hello" in - let buffer = Buffer.create (String.length message) in - Eio.Fiber.both - (fun () -> - Eio_unix.await_writable src_fd; - Eio_unix.await_readable src_fd; - Eio.Flow.copy src (Flow.buffer_sink buffer)) - (fun () -> - Eio_unix.await_writable dst_fd; - Eio.Flow.copy_string message dst; - Eio.Flow.close dst - ); - Buffer.contents buffer;; -- : string = "hello" -``` - -Waiting for reading and writing on the same file descriptor, at the same time. - -```ocaml -# with_sockets @@ fun ~sw ((src, src_fd), (dst, dst_fd)) -> - Eio.Fiber.both - (fun () -> - Eio_unix.await_readable src_fd) - (fun () -> - Eio_unix.await_writable src_fd; - Eio.Flow.close dst);; -- : unit = () -``` - -Cancelling a fiber removes a fiber but does not stop polling if others are still waiting. - -```ocaml -# with_sockets @@ fun ~sw ((src, src_fd), (dst, _dst_fd)) -> - let buffer = Buffer.create 5 in - Fiber.fork ~sw (fun () -> - Eio_unix.await_readable src_fd; - Eio.Flow.copy src (Flow.buffer_sink buffer); - traceln "Still received: %s" (Buffer.contents buffer) - ); - (try - Eio.Fiber.both - (fun () -> Eio_unix.await_readable src_fd) - (fun () -> raise (Failure "Simulate a cancel")) - with - exn -> traceln "%s" (Printexc.to_string exn)); - Flow.copy_string "Hello" dst; - Flow.close dst;; -+Failure("Simulate a cancel") -+Still received: Hello -- : unit = () -``` - -Closing a file descriptor with an actively waiting poll fails the fiber that is waiting. - -```ocaml -# with_sockets @@ fun ~sw ((src, src_fd), (_dst, _dst_fd)) -> - Eio.Fiber.both - (fun () -> Eio_unix.await_readable src_fd) - (fun () -> Eio.Flow.close src);; -Exception: Failure "Closed file descriptor whilst polling". -``` diff --git a/lib_eio_luv/tests/process.md b/lib_eio_luv/tests/process.md deleted file mode 100644 index 64a69ed99..000000000 --- a/lib_eio_luv/tests/process.md +++ /dev/null @@ -1,120 +0,0 @@ -# Set up the test environment - -```ocaml -# #require "eio_luv";; -# open Eio.Std;; -# open Eio;; -# module Process = Eio_luv.Low_level.Process;; -module Process = Eio_luv.Low_level.Process -``` - -A helper function for reading all of the bytes from a handle. - -```ocaml -let read_all handle = - let buf = Luv.Buffer.create 32 in - let acc_buffer = Buffer.create 42 in - let rec read () = - match Eio_luv.Low_level.Stream.read_into handle buf with - | i -> - Buffer.add_string acc_buffer - (Luv.Buffer.to_string (Luv.Buffer.sub buf ~offset:0 ~length:i)); - read () - | exception End_of_file -> Buffer.contents acc_buffer - in read () -``` - -A simple `echo hello` process redirects to stdout. - -```ocaml -# Eio_luv.run @@ fun _env -> - Switch.run @@ fun sw -> - let redirect = Luv.Process.[ - inherit_fd ~fd:stdout ~from_parent_fd:stdout () - ] in - let t = Process.spawn ~sw ~redirect "echo" [ "echo"; "hello" ] in - Process.await_exit t;; -hello -- : int * int64 = (0, 0L) -``` - -Using a pipe to redirect output to a buffer. - -```ocaml -# Eio_luv.run @@ fun _env -> - Switch.run @@ fun sw -> - let parent_pipe = Eio_luv.Low_level.Pipe.init ~sw () in - let redirect = Eio_luv.Low_level.Process.[ - to_parent_pipe ~fd:Luv.Process.stdout ~parent_pipe () - ] in - let t = Process.spawn ~sw ~redirect "echo" [ "echo"; "Hello,"; "World!" ] in - let result = read_all parent_pipe in - let _ = Process.await_exit t in - result;; -- : string = "Hello, World!\n" -``` - -Writing to stdin of a process works. - -```ocaml -# Eio_luv.run @@ fun _env -> - Switch.run @@ fun sw -> - let parent_pipe = Eio_luv.Low_level.Pipe.init ~sw () in - let bufs = [ Luv.Buffer.from_string "Hello!" ] in - let redirect = Luv.Process.[ - inherit_fd ~fd:stdout ~from_parent_fd:stdout (); - Process.to_parent_pipe ~fd:stdin ~parent_pipe () - ] in - let t = Process.spawn ~sw ~redirect "head" [ "head" ] in - Eio_luv.Low_level.Stream.write parent_pipe bufs; - Eio_luv.Low_level.Handle.close parent_pipe; - Process.await_exit t;; -Hello! -- : int * int64 = (0, 0L) -``` - -Stopping a process works. - -```ocaml -# Eio_luv.run @@ fun _env -> - Switch.run @@ fun sw -> - let redirect = Luv.Process.[ - inherit_fd ~fd:stdout ~from_parent_fd:stdout () - ] in - let t = Process.spawn ~sw ~redirect "sleep" [ "sleep"; "10" ] in - Process.send_signal t Luv.Signal.sigkill; - Process.await_exit t;; -- : int * int64 = (9, 0L) -``` - -Forgetting to wait for a process to finish stops the process. - -```ocaml -# Eio_luv.run @@ fun _env -> - let proc = - Switch.run @@ fun sw -> - let redirect = Luv.Process.[ - inherit_fd ~fd:stdout ~from_parent_fd:stdout () - ] in - Process.spawn ~sw ~redirect "sleep" [ "sleep"; "10" ] - in - Process.await_exit proc;; -- : int * int64 = (9, 0L) -``` - -Stopping a process interacts nicely with switches. - -```ocaml -# Eio_luv.run @@ fun _env -> - let proc = - Switch.run @@ fun sw -> - let redirect = Luv.Process.[ - inherit_fd ~fd:stdout ~from_parent_fd:stdout () - ] in - let t = Process.spawn ~sw ~redirect "sleep" [ "sleep"; "10" ] in - Process.send_signal t Luv.Signal.sigkill; - t - in - Process.await_exit proc;; -- : int * int64 = (9, 0L) -``` diff --git a/lib_eio_windows/dune b/lib_eio_windows/dune new file mode 100644 index 000000000..9e2203057 --- /dev/null +++ b/lib_eio_windows/dune @@ -0,0 +1,5 @@ +(library + (name eio_windows) + (public_name eio_windows) + (enabled_if (= %{os_type} "Win32")) + (libraries eio eio.utils fmt)) diff --git a/lib_eio_windows/eio_windows.ml b/lib_eio_windows/eio_windows.ml new file mode 100644 index 000000000..2cee04113 --- /dev/null +++ b/lib_eio_windows/eio_windows.ml @@ -0,0 +1,3 @@ +(* Can base this on the eio_posix directory structure. + See HACKING.md for instructions on creating a new backend. *) +let run _main = failwith "TODO: Windows support." diff --git a/lib_main/dune b/lib_main/dune index a2e412ef0..0cdf6327c 100644 --- a/lib_main/dune +++ b/lib_main/dune @@ -8,7 +8,7 @@ (select posix_backend.ml from (eio_posix -> posix_backend.enabled.ml) ( -> posix_backend.disabled.ml)) - (select luv_backend.ml from - (eio_luv -> luv_backend.enabled.ml) - ( -> luv_backend.disabled.ml)) + (select windows_backend.ml from + (eio_windows -> windows_backend.enabled.ml) + ( -> windows_backend.disabled.ml)) )) diff --git a/lib_main/eio_main.ml b/lib_main/eio_main.ml index 89a1ff09c..2ccc6bad3 100644 --- a/lib_main/eio_main.ml +++ b/lib_main/eio_main.ml @@ -5,11 +5,11 @@ let run fn = match Sys.getenv_opt "EIO_BACKEND" with | Some ("io-uring" | "linux") -> force Linux_backend.run fn | Some "posix" -> force Posix_backend.run fn - | Some "luv" -> force Luv_backend.run fn + | Some "windows" -> force Windows_backend.run fn | None | Some "" -> Linux_backend.run fn ~fallback:(fun _ -> Posix_backend.run fn ~fallback:(fun _ -> - force Luv_backend.run fn + force Windows_backend.run fn ) ) | Some x -> Fmt.failwith "Unknown Eio backend %S (from $EIO_BACKEND)" x diff --git a/lib_main/eio_main.mli b/lib_main/eio_main.mli index 22344460f..ce2acd7b0 100644 --- a/lib_main/eio_main.mli +++ b/lib_main/eio_main.mli @@ -15,8 +15,8 @@ val run : (Eio.Stdenv.t -> 'a) -> 'a and prevent using the library with other Eio libraries. [run] will select an appropriate event loop for the current platform. - On many systems, it will use {!Eio_luv.run}. + On many systems, it will use {!Eio_posix.run}. On recent-enough versions of Linux, it will use {!Eio_linux.run}. You can override this by setting the $EIO_BACKEND environment variable to - either "linux" or "luv". *) + either "linux", "posix" or "windows". *) diff --git a/lib_main/luv_backend.disabled.ml b/lib_main/luv_backend.disabled.ml deleted file mode 100644 index 6987c7457..000000000 --- a/lib_main/luv_backend.disabled.ml +++ /dev/null @@ -1 +0,0 @@ -let run ~fallback _ = fallback (`Msg "The Luv backend was disabled at compile-time") diff --git a/lib_main/luv_backend.enabled.ml b/lib_main/luv_backend.enabled.ml deleted file mode 100644 index 44dc595ee..000000000 --- a/lib_main/luv_backend.enabled.ml +++ /dev/null @@ -1 +0,0 @@ -let run ~fallback:_ fn = Eio_luv.run (fun env -> fn (env :> Eio.Stdenv.t)) diff --git a/lib_main/windows_backend.disabled.ml b/lib_main/windows_backend.disabled.ml new file mode 100644 index 000000000..44bd75d4b --- /dev/null +++ b/lib_main/windows_backend.disabled.ml @@ -0,0 +1 @@ +let run ~fallback _ = fallback (`Msg "The Windows backend was disabled at compile-time") diff --git a/lib_main/windows_backend.enabled.ml b/lib_main/windows_backend.enabled.ml new file mode 100644 index 000000000..e4ba70ec9 --- /dev/null +++ b/lib_main/windows_backend.enabled.ml @@ -0,0 +1 @@ +let run ~fallback:_ fn = Eio_windows.run (fun env -> fn (env :> Eio.Stdenv.t))