From 6b557ed40bd852c9e8ef000cc8987c7cd9b71dbc Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Tue, 21 Mar 2023 11:22:37 +0000 Subject: [PATCH] eio_linux: split into multiple files --- lib_eio_linux/eio_linux.ml | 1058 +----------------------------------- lib_eio_linux/err.ml | 16 + lib_eio_linux/fd.ml | 69 +++ lib_eio_linux/log.ml | 2 + lib_eio_linux/low_level.ml | 434 +++++++++++++++ lib_eio_linux/sched.ml | 541 ++++++++++++++++++ 6 files changed, 1077 insertions(+), 1043 deletions(-) create mode 100644 lib_eio_linux/err.ml create mode 100644 lib_eio_linux/fd.ml create mode 100644 lib_eio_linux/log.ml create mode 100644 lib_eio_linux/low_level.ml create mode 100644 lib_eio_linux/sched.ml diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 641892a7a..62c369e6f 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -17,9 +17,6 @@ [@@@alert "-unstable"] -let src = Logs.Src.create "eio_linux" ~doc:"Effect-based IO system for Linux/io-uring" -module Log = (val Logs.src_log src : Logs.LOG) - open Eio.Std module Fiber_context = Eio.Private.Fiber_context @@ -29,873 +26,15 @@ module Suspended = Eio_utils.Suspended module Zzz = Eio_utils.Zzz module Lf_queue = Eio_utils.Lf_queue -type amount = Exactly of int | Upto of int - -let system_thread = Ctf.mint_id () - -let unclassified_error e = Eio.Exn.create (Eio.Exn.X e) - -let wrap_error code name arg = - let ex = Eio_unix.Unix_error (code, name, arg) in - match code with - | ECONNREFUSED -> Eio.Net.err (Connection_failure (Refused ex)) - | ECONNRESET | EPIPE -> Eio.Net.err (Connection_reset ex) - | _ -> unclassified_error ex - -let wrap_error_fs code name arg = - let e = Eio_unix.Unix_error (code, name, arg) in - match code with - | Unix.EEXIST -> Eio.Fs.err (Already_exists e) - | Unix.ENOENT -> Eio.Fs.err (Not_found e) - | Unix.EXDEV | EPERM | EACCES -> Eio.Fs.err (Permission_denied e) - | _ -> wrap_error code name arg - -module FD = struct - module Rcfd = Eio_unix.Private.Rcfd - - type t = { - fd : Rcfd.t; - seekable : bool; - close_unix : bool; (* Whether closing this also closes the underlying FD. *) - mutable release_hook : Eio.Switch.hook; (* Use this on close to remove switch's [on_release] hook. *) - } - - let err_closed op = Invalid_argument (op ^ ": file descriptor used after calling close!") - - let use t f ~if_closed = Rcfd.use t.fd f ~if_closed - - let use_exn op t f = - Rcfd.use t.fd f ~if_closed:(fun () -> raise (err_closed op)) - - let rec use_exn_list op xs k = - match xs with - | [] -> k [] - | x :: xs -> - use_exn op x @@ fun x -> - use_exn_list op xs @@ fun xs -> - k (x :: xs) - - let is_open t = Rcfd.is_open t.fd - - let close t = - Ctf.label "close"; - Switch.remove_hook t.release_hook; - if t.close_unix then ( - if not (Rcfd.close t.fd) then raise (err_closed "close") - ) else ( - match Rcfd.remove t.fd with - | Some _ -> () - | None -> raise (err_closed "close") - ) - - let is_seekable fd = - match Unix.lseek fd 0 Unix.SEEK_CUR with - | (_ : int) -> true - | exception Unix.Unix_error(Unix.ESPIPE, "lseek", "") -> false - - let to_unix op t = - match op with - | `Peek -> Rcfd.peek t.fd - | `Take -> - Switch.remove_hook t.release_hook; - match Rcfd.remove t.fd with - | Some fd -> fd - | None -> raise (err_closed "to_unix") - - let of_unix_no_hook ~seekable ~close_unix fd = - { fd = Rcfd.make fd; seekable; close_unix; release_hook = Switch.null_hook } - - let of_unix ~sw ~seekable ~close_unix fd = - let t = of_unix_no_hook ~seekable ~close_unix fd in - t.release_hook <- Switch.on_release_cancellable sw (fun () -> close t); - t - - let uring_file_offset t = - if t.seekable then Optint.Int63.minus_one else Optint.Int63.zero - - let file_offset t = function - | Some x -> `Pos x - | None when t.seekable -> `Seekable_current - | None -> `Nonseekable_current -end - -let fstat t = - (* todo: use uring *) - try - let ust = FD.use_exn "fstat" t Unix.LargeFile.fstat in - let st_kind : Eio.File.Stat.kind = - match ust.st_kind with - | Unix.S_REG -> `Regular_file - | Unix.S_DIR -> `Directory - | Unix.S_CHR -> `Character_special - | Unix.S_BLK -> `Block_device - | Unix.S_LNK -> `Symbolic_link - | Unix.S_FIFO -> `Fifo - | Unix.S_SOCK -> `Socket - in - Eio.File.Stat.{ - dev = ust.st_dev |> Int64.of_int; - ino = ust.st_ino |> Int64.of_int; - kind = st_kind; - perm = ust.st_perm; - nlink = ust.st_nlink |> Int64.of_int; - uid = ust.st_uid |> Int64.of_int; - gid = ust.st_gid |> Int64.of_int; - rdev = ust.st_rdev |> Int64.of_int; - size = ust.st_size |> Optint.Int63.of_int64; - atime = ust.st_atime; - mtime = ust.st_mtime; - ctime = ust.st_ctime; - } - with Unix.Unix_error (code, name, arg) -> raise @@ wrap_error_fs code name arg +module FD = Fd +module Low_level = Low_level type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty let get_fd_opt t = Eio.Generic.probe t FD -type dir_fd = - | FD of FD.t - | Cwd (* Confined to "." *) - | Fs (* Unconfined "."; also allows absolute paths *) - -type _ Eio.Generic.ty += Dir_fd : dir_fd Eio.Generic.ty +type _ Eio.Generic.ty += Dir_fd : Low_level.dir_fd Eio.Generic.ty let get_dir_fd_opt t = Eio.Generic.probe t Dir_fd -type file_offset = [ - | `Pos of Optint.Int63.t - | `Seekable_current - | `Nonseekable_current -] - -type rw_req = { - op : [`R|`W]; - file_offset : file_offset; (* Read from here + cur_off (unless using current pos) *) - fd : Unix.file_descr; - len : amount; - buf : Uring.Region.chunk; - mutable cur_off : int; - action : int Suspended.t; -} - -(* Type of user-data attached to jobs. *) -type io_job = - | Read : rw_req -> io_job - | Job_no_cancel : int Suspended.t -> io_job - | Cancel_job : io_job - | Job : int Suspended.t -> io_job (* A negative result indicates error, and may report cancellation *) - | Write : rw_req -> io_job - | Job_fn : 'a Suspended.t * (int -> [`Exit_scheduler]) -> io_job - (* When done, remove the cancel_fn from [Suspended.t] and call the callback (unless cancelled). *) - -type runnable = - | IO : runnable - | Thread : 'a Suspended.t * 'a -> runnable - | Failed_thread : 'a Suspended.t * exn -> runnable - -type t = { - uring: io_job Uring.t; - mem: Uring.Region.t option; - io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *) - mem_q : Uring.Region.chunk Suspended.t Queue.t; - - (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) - run_q : runnable Lf_queue.t; - - (* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event. - In that case, [need_wakeup = true] and you must signal using [eventfd]. *) - eventfd : FD.t; - - (* If [false], the main thread will check [run_q] before sleeping again - (possibly because an event has been or will be sent to [eventfd]). - It can therefore be set to [false] in either of these cases: - - By the receiving thread because it will check [run_q] before sleeping, or - - By the sending thread because it will signal the main thread later *) - need_wakeup : bool Atomic.t; - - sleep_q: Zzz.t; -} - -let wake_buffer = - let b = Bytes.create 8 in - Bytes.set_int64_ne b 0 1L; - b - -(* This can be called from any systhread (including ones not running Eio), - and also from signal handlers or GC finalizers. It must not take any locks. *) -let wakeup t = - Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *) - FD.use t.eventfd - (fun fd -> - let sent = Unix.single_write fd wake_buffer 0 8 in - assert (sent = 8) - ) - ~if_closed:ignore (* Domain has shut down (presumably after handling the event) *) - -(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) -let enqueue_thread st k x = - Lf_queue.push st.run_q (Thread (k, x)); - if Atomic.get st.need_wakeup then wakeup st - -(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) -let enqueue_failed_thread st k ex = - Lf_queue.push st.run_q (Failed_thread (k, ex)); - if Atomic.get st.need_wakeup then wakeup st - -(* Can only be called from our own domain, so no need to check for wakeup. *) -let enqueue_at_head st k x = - Lf_queue.push_head st.run_q (Thread (k, x)) - -type _ Effect.t += Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t -type _ Effect.t += Cancel : io_job Uring.job -> unit Effect.t -let enter fn = Effect.perform (Enter fn) - -let rec enqueue_job t fn = - match fn () with - | Some _ as r -> r - | None -> - if Uring.submit t.uring > 0 then enqueue_job t fn - else None - -(* Cancellations always come from the same domain, so no need to send wake events here. *) -let rec enqueue_cancel job t = - Ctf.label "cancel"; - match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with - | None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q - | Some _ -> () - -let cancel job = Effect.perform (Cancel job) - -(* Cancellation - - For operations that can be cancelled we need to set the fiber's cancellation function. - The typical sequence is: - - 1. We submit an operation, getting back a uring job (needed for cancellation). - 2. We set the cancellation function. The function uses the uring job to cancel. - - When the job completes, we clear the cancellation function. The function - must have been set by this point because we don't poll for completions until - the above steps have finished. - - If the context is cancelled while the operation is running, the function will get removed and called, - which will submit a cancellation request to uring. We know the job is still valid at this point because - we clear the cancel function when it completes. - - If the operation completes before Linux processes the cancellation, we get [ENOENT], which we ignore. *) - -(* [with_cancel_hook ~action t fn] calls [fn] to create a job, - then sets the fiber's cancel function to cancel it. - If [action] is already cancelled, it schedules [action] to be discontinued. - @return Whether to retry the operation later, once there is space. *) -let with_cancel_hook ~action t fn = - match Fiber_context.get_error action.Suspended.fiber with - | Some ex -> enqueue_failed_thread t action ex; false - | None -> - match enqueue_job t fn with - | None -> true - | Some job -> - Fiber_context.set_cancel_fn action.fiber (fun _ -> cancel job); - false - -let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) = - let {uring;io_q;_} = st in - let off = Uring.Region.to_offset buf + cur_off in - let len = match len with Exactly l | Upto l -> l in - let len = len - cur_off in - let retry = with_cancel_hook ~action st (fun () -> - let file_offset = - match file_offset with - | `Pos x -> Optint.Int63.add x (Optint.Int63.of_int cur_off) - | `Seekable_current -> Optint.Int63.minus_one - | `Nonseekable_current -> Optint.Int63.zero - in - match op with - |`R -> Uring.read_fixed uring ~file_offset fd ~off ~len (Read req) - |`W -> Uring.write_fixed uring ~file_offset fd ~off ~len (Write req) - ) - in - if retry then ( - Ctf.label "await-sqe"; - (* wait until an sqe is available *) - Queue.push (fun st -> submit_rw_req st req) io_q - ) - -(* TODO bind from unixsupport *) -let errno_is_retry = function -62 | -11 | -4 -> true |_ -> false - -let enqueue_read st action (file_offset,fd,buf,len) = - let req = { op=`R; file_offset; len; fd; cur_off = 0; buf; action } in - Ctf.label "read"; - submit_rw_req st req - -let rec enqueue_readv args st action = - let (file_offset,fd,bufs) = args in - Ctf.label "readv"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.readv st.uring ~file_offset fd bufs (Job action)) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_readv args st action) st.io_q - -let rec enqueue_writev args st action = - let (file_offset,fd,bufs) = args in - Ctf.label "writev"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.writev st.uring ~file_offset fd bufs (Job action) - ) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_writev args st action) st.io_q - -let rec enqueue_poll_add fd poll_mask st action = - Ctf.label "poll_add"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.poll_add st.uring fd poll_mask (Job action) - ) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q - -let rec enqueue_poll_add_unix fd poll_mask st action cb = - Ctf.label "poll_add"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.poll_add st.uring fd poll_mask (Job_fn (action, cb)) - ) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_poll_add_unix fd poll_mask st action cb) st.io_q - -let enqueue_write st action (file_offset,fd,buf,len) = - let req = { op=`W; file_offset; len; fd; cur_off = 0; buf; action } in - Ctf.label "write"; - submit_rw_req st req - -let rec enqueue_splice ~src ~dst ~len st action = - Ctf.label "splice"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.splice st.uring (Job action) ~src ~dst ~len - ) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q - -let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st action = - Ctf.label "openat2"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action) - ) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_openat2 args st action) st.io_q - -let rec enqueue_unlink ((dir, fd, path) as args) st action = - Ctf.label "unlinkat"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.unlink st.uring ~dir ~fd path (Job action) - ) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_unlink args st action) st.io_q - -let rec enqueue_connect fd addr st action = - Ctf.label "connect"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.connect st.uring fd addr (Job action) - ) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q - -let rec enqueue_send_msg fd ~fds ~dst buf st action = - Ctf.label "send_msg"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.send_msg st.uring fd ~fds ?dst buf (Job action) - ) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q - -let rec enqueue_recv_msg fd msghdr st action = - Ctf.label "recv_msg"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.recv_msg st.uring fd msghdr (Job action); - ) - in - if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q - -let rec enqueue_accept fd client_addr st action = - Ctf.label "accept"; - let retry = with_cancel_hook ~action st (fun () -> - Uring.accept st.uring fd client_addr (Job action) - ) in - if retry then ( - (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_accept fd client_addr st action) st.io_q - ) - -let rec enqueue_noop t action = - Ctf.label "noop"; - let job = enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in - if job = None then ( - (* wait until an sqe is available *) - Queue.push (fun t -> enqueue_noop t action) t.io_q - ) - -let submit_pending_io st = - match Queue.take_opt st.io_q with - | None -> () - | Some fn -> - Ctf.label "submit_pending_io"; - fn st - -(* Switch control to the next ready continuation. - If none is ready, wait until we get an event to wake one and then switch. - Returns only if there is nothing to do and no queued operations. *) -let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = - (* This is not a fair scheduler *) - (* Wakeup any paused fibers *) - match Lf_queue.pop run_q with - | None -> assert false (* We should always have an IO job, at least *) - | Some Thread (k, v) -> (* We already have a runnable task *) - Fiber_context.clear_cancel_fn k.fiber; - Suspended.continue k v - | Some Failed_thread (k, ex) -> - Fiber_context.clear_cancel_fn k.fiber; - Suspended.discontinue k ex - | Some IO -> (* Note: be sure to re-inject the IO task before continuing! *) - (* This is not a fair scheduler: timers always run before all other IO *) - let now = Mtime_clock.now () in - match Zzz.pop ~now sleep_q with - | `Due k -> - Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) - Suspended.continue k () (* A sleeping task is now due *) - | `Wait_until _ | `Nothing as next_due -> - (* Handle any pending events before submitting. This is faster. *) - match Uring.get_cqe_nonblocking uring with - | Some { data = runnable; result } -> - Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) - handle_complete st ~runnable result - | None -> - ignore (Uring.submit uring : int); - let timeout = - match next_due with - | `Wait_until time -> - let time = Mtime.to_uint64_ns time in - let now = Mtime.to_uint64_ns now in - let diff_ns = Int64.sub time now |> Int64.to_float in - Some (diff_ns /. 1e9) - | `Nothing -> None - in - if not (Lf_queue.is_empty st.run_q) then ( - Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) - schedule st - ) else if timeout = None && Uring.active_ops uring = 0 then ( - (* Nothing further can happen at this point. - If there are no events in progress but also still no memory available, something has gone wrong! *) - assert (Queue.length mem_q = 0); - Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *) - `Exit_scheduler - ) else ( - Atomic.set st.need_wakeup true; - if Lf_queue.is_empty st.run_q then ( - (* At this point we're not going to check [run_q] again before sleeping. - If [need_wakeup] is still [true], this is fine because we don't promise to do that. - If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) - Ctf.(note_hiatus Wait_for_work); - let result = Uring.wait ?timeout uring in - Ctf.note_resume system_thread; - Atomic.set st.need_wakeup false; - Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) - match result with - | None -> - (* Woken by a timeout, which is now due, or by a signal. *) - schedule st - | Some { data = runnable; result } -> - handle_complete st ~runnable result - ) else ( - (* Someone added a new job while we were setting [need_wakeup] to [true]. - They might or might not have seen that, so we can't be sure they'll send an event. *) - Atomic.set st.need_wakeup false; - Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) - schedule st - ) - ) -and handle_complete st ~runnable result = - submit_pending_io st; (* If something was waiting for a slot, submit it now. *) - match runnable with - | Read req -> - complete_rw_req st req result - | Write req -> - complete_rw_req st req result - | Job k -> - Fiber_context.clear_cancel_fn k.fiber; - if result >= 0 then Suspended.continue k result - else ( - match Fiber_context.get_error k.fiber with - | None -> Suspended.continue k result - | Some e -> - (* If cancelled, report that instead. *) - Suspended.discontinue k e - ) - | Job_no_cancel k -> - Suspended.continue k result - | Cancel_job -> - begin match result with - | 0 (* Operation cancelled successfully *) - | -2 (* ENOENT - operation completed before cancel took effect *) - | -114 (* EALREADY - operation already in progress *) - -> () - | errno -> - Log.warn (fun f -> f "Cancel returned unexpected error: %s" (Unix.error_message (Uring.error_of_errno errno))) - end; - schedule st - | Job_fn (k, f) -> - Fiber_context.clear_cancel_fn k.fiber; - (* Should we only do this on error, to avoid losing the return value? - We already do that with rw jobs. *) - begin match Fiber_context.get_error k.fiber with - | None -> f result - | Some e -> Suspended.discontinue k e - end -and complete_rw_req st ({len; cur_off; action; _} as req) res = - Fiber_context.clear_cancel_fn action.fiber; - match res, len with - | 0, _ -> Suspended.discontinue action End_of_file - | e, _ when e < 0 -> - begin match Fiber_context.get_error action.fiber with - | Some e -> Suspended.discontinue action e (* If cancelled, report that instead. *) - | None -> - if errno_is_retry e then ( - submit_rw_req st req; - schedule st - ) else ( - Suspended.continue action e - ) - end - | n, Exactly len when n < len - cur_off -> - req.cur_off <- req.cur_off + n; - submit_rw_req st req; - schedule st - | _, Exactly len -> Suspended.continue action len - | n, Upto _ -> Suspended.continue action n - -module Low_level = struct - let alloc_buf_or_wait st k = - match st.mem with - | None -> Suspended.discontinue k (Failure "No fixed buffer available") - | Some mem -> - match Uring.Region.alloc mem with - | buf -> Suspended.continue k buf - | exception Uring.Region.No_space -> - Queue.push k st.mem_q; - schedule st - - let free_buf st buf = - match Queue.take_opt st.mem_q with - | None -> Uring.Region.free buf - | Some k -> enqueue_thread st k buf - - let noop () = - let result = enter enqueue_noop in - if result <> 0 then raise (unclassified_error (Eio_unix.Unix_error (Uring.error_of_errno result, "noop", ""))) - - let sleep_until time = - enter @@ fun t k -> - let job = Zzz.add t.sleep_q time k in - Fiber_context.set_cancel_fn k.fiber (fun ex -> - Zzz.remove t.sleep_q job; - enqueue_failed_thread t k ex - ) - - let read ?file_offset fd buf amount = - let file_offset = FD.file_offset fd file_offset in - FD.use_exn "read" fd @@ fun fd -> - let res = enter (fun t k -> enqueue_read t k (file_offset, fd, buf, amount)) in - if res < 0 then ( - raise @@ wrap_error (Uring.error_of_errno res) "read" "" - ) else res - - let read_exactly ?file_offset fd buf len = - ignore (read ?file_offset fd buf (Exactly len) : int) - - let read_upto ?file_offset fd buf len = - read ?file_offset fd buf (Upto len) - - let readv ?file_offset fd bufs = - let file_offset = - match file_offset with - | Some x -> x - | None -> FD.uring_file_offset fd - in - FD.use_exn "readv" fd @@ fun fd -> - let res = enter (enqueue_readv (file_offset, fd, bufs)) in - if res < 0 then ( - raise @@ wrap_error (Uring.error_of_errno res) "readv" "" - ) else if res = 0 then ( - raise End_of_file - ) else ( - res - ) - - let writev_single ?file_offset fd bufs = - let file_offset = - match file_offset with - | Some x -> x - | None -> FD.uring_file_offset fd - in - FD.use_exn "writev" fd @@ fun fd -> - let res = enter (enqueue_writev (file_offset, fd, bufs)) in - if res < 0 then ( - raise @@ wrap_error (Uring.error_of_errno res) "writev" "" - ) else ( - res - ) - - let rec writev ?file_offset fd bufs = - let bytes_written = writev_single ?file_offset fd bufs in - match Cstruct.shiftv bufs bytes_written with - | [] -> () - | bufs -> - let file_offset = - let module I63 = Optint.Int63 in - match file_offset with - | None -> None - | Some ofs when ofs = I63.minus_one -> Some I63.minus_one - | Some ofs -> Some (I63.add ofs (I63.of_int bytes_written)) - in - writev ?file_offset fd bufs - - let await_readable fd = - FD.use_exn "await_readable" fd @@ fun fd -> - let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in - if res < 0 then ( - raise (unclassified_error (Eio_unix.Unix_error (Uring.error_of_errno res, "await_readable", ""))) - ) - - let await_writable fd = - FD.use_exn "await_writable" fd @@ fun fd -> - let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollout + pollerr))) in - if res < 0 then ( - raise (unclassified_error (Eio_unix.Unix_error (Uring.error_of_errno res, "await_writable", ""))) - ) - - let write ?file_offset fd buf len = - let file_offset = FD.file_offset fd file_offset in - FD.use_exn "write" fd @@ fun fd -> - let res = enter (fun t k -> enqueue_write t k (file_offset, fd, buf, Exactly len)) in - if res < 0 then ( - raise @@ wrap_error (Uring.error_of_errno res) "write" "" - ) - - type _ Effect.t += Alloc : Uring.Region.chunk option Effect.t - let alloc_fixed () = Effect.perform Alloc - - type _ Effect.t += Alloc_or_wait : Uring.Region.chunk Effect.t - let alloc_fixed_or_wait () = Effect.perform Alloc_or_wait - - type _ Effect.t += Free : Uring.Region.chunk -> unit Effect.t - let free_fixed buf = Effect.perform (Free buf) - - let splice src ~dst ~len = - FD.use_exn "splice-src" src @@ fun src -> - FD.use_exn "splice-dst" dst @@ fun dst -> - let res = enter (enqueue_splice ~src ~dst ~len) in - if res > 0 then res - else if res = 0 then raise End_of_file - else raise @@ wrap_error (Uring.error_of_errno res) "splice" "" - - let connect fd addr = - FD.use_exn "connect" fd @@ fun fd -> - let res = enter (enqueue_connect fd addr) in - if res < 0 then ( - let ex = - match addr with - | ADDR_UNIX _ -> wrap_error_fs (Uring.error_of_errno res) "connect" "" - | ADDR_INET _ -> wrap_error (Uring.error_of_errno res) "connect" "" - in - raise ex - ) - - let send_msg fd ?(fds=[]) ?dst buf = - FD.use_exn "send_msg" fd @@ fun fd -> - FD.use_exn_list "send_msg" fds @@ fun fds -> - let res = enter (enqueue_send_msg fd ~fds ~dst buf) in - if res < 0 then ( - raise @@ wrap_error (Uring.error_of_errno res) "send_msg" "" - ) - - let recv_msg fd buf = - FD.use_exn "recv_msg" fd @@ fun fd -> - let addr = Uring.Sockaddr.create () in - let msghdr = Uring.Msghdr.create ~addr buf in - let res = enter (enqueue_recv_msg fd msghdr) in - if res < 0 then ( - raise @@ wrap_error (Uring.error_of_errno res) "recv_msg" "" - ); - addr, res - - let recv_msg_with_fds ~sw ~max_fds fd buf = - FD.use_exn "recv_msg_with_fds" fd @@ fun fd -> - let addr = Uring.Sockaddr.create () in - let msghdr = Uring.Msghdr.create ~n_fds:max_fds ~addr buf in - let res = enter (enqueue_recv_msg fd msghdr) in - if res < 0 then ( - raise @@ wrap_error (Uring.error_of_errno res) "recv_msg" "" - ); - let fds = - Uring.Msghdr.get_fds msghdr - |> List.map (fun fd -> FD.of_unix ~sw ~seekable:(FD.is_seekable fd) ~close_unix:true fd) - in - addr, res, fds - - let with_chunk ~fallback fn = - match alloc_fixed () with - | Some chunk -> - Fun.protect ~finally:(fun () -> free_fixed chunk) @@ fun () -> - fn chunk - | None -> - fallback () - - let openat2 ~sw ?seekable ~access ~flags ~perm ~resolve ?dir path = - let use dir = - let res = enter (enqueue_openat2 (access, flags, perm, resolve, dir, path)) in - if res < 0 then ( - Switch.check sw; (* If cancelled, report that instead. *) - raise @@ wrap_error_fs (Uring.error_of_errno res) "openat2" "" - ); - let fd : Unix.file_descr = Obj.magic res in - let seekable = - match seekable with - | None -> FD.is_seekable fd - | Some x -> x - in - FD.of_unix ~sw ~seekable ~close_unix:true fd - in - match dir with - | None -> use None - | Some dir -> FD.use_exn "openat2" dir (fun x -> use (Some x)) - - let openat ~sw ?seekable ~access ~flags ~perm dir path = - match dir with - | FD dir -> openat2 ~sw ?seekable ~access ~flags ~perm ~resolve:Uring.Resolve.beneath ~dir path - | Cwd -> openat2 ~sw ?seekable ~access ~flags ~perm ~resolve:Uring.Resolve.beneath path - | Fs -> openat2 ~sw ?seekable ~access ~flags ~perm ~resolve:Uring.Resolve.empty path - - let fstat fd = fstat fd - - external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_mkdirat" - - external eio_renameat : Unix.file_descr -> string -> Unix.file_descr -> string -> unit = "caml_eio_renameat" - - external eio_getrandom : Cstruct.buffer -> int -> int -> int = "caml_eio_getrandom" - - external eio_getdents : Unix.file_descr -> string list = "caml_eio_getdents" - - let getrandom { Cstruct.buffer; off; len } = - let rec loop n = - if n = len then - () - else - loop (n + eio_getrandom buffer (off + n) (len - n)) - in - loop 0 - - (* [with_parent_dir dir path fn] runs [fn parent (basename path)], - where [parent] is a path FD for [path]'s parent, resolved using [Resolve.beneath]. *) - let with_parent_dir op dir path fn = - let dir_path = Filename.dirname path in - let leaf = Filename.basename path in - Switch.run (fun sw -> - let parent = - match dir with - | FD d when dir_path = "." -> d - | _ -> - openat ~sw ~seekable:false dir dir_path - ~access:`R - ~flags:Uring.Open_flags.(cloexec + path + directory) - ~perm:0 - in - FD.use_exn op parent @@ fun parent -> - fn parent leaf - ) - - let mkdir_beneath ~perm dir path = - (* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *) - with_parent_dir "mkdir" dir path @@ fun parent leaf -> - try eio_mkdirat parent leaf perm - with Unix.Unix_error (code, name, arg) -> raise @@ wrap_error_fs code name arg - - let unlink ~rmdir dir path = - (* [unlink] is really an operation on [path]'s parent. Get a reference to that first: *) - with_parent_dir "unlink" dir path @@ fun parent leaf -> - let res = enter (enqueue_unlink (rmdir, parent, leaf)) in - if res <> 0 then raise @@ wrap_error_fs (Uring.error_of_errno res) "unlinkat" "" - - let rename old_dir old_path new_dir new_path = - with_parent_dir "renameat-old" old_dir old_path @@ fun old_parent old_leaf -> - with_parent_dir "renameat-new" new_dir new_path @@ fun new_parent new_leaf -> - try - eio_renameat - old_parent old_leaf - new_parent new_leaf - with Unix.Unix_error (code, name, arg) -> raise @@ wrap_error_fs code name arg - - let shutdown socket command = - FD.use_exn "shutdown" socket @@ fun fd -> - try Unix.shutdown fd command - with Unix.Unix_error (code, name, arg) -> raise @@ wrap_error code name arg - - let accept ~sw fd = - Ctf.label "accept"; - FD.use_exn "accept" fd @@ fun fd -> - let client_addr = Uring.Sockaddr.create () in - let res = enter (enqueue_accept fd client_addr) in - if res < 0 then ( - raise @@ wrap_error (Uring.error_of_errno res) "accept" "" - ) else ( - let unix : Unix.file_descr = Obj.magic res in - let client = FD.of_unix ~sw ~seekable:false ~close_unix:true unix in - let client_addr = Uring.Sockaddr.get client_addr in - client, client_addr - ) - - let open_dir ~sw dir path = - openat ~sw ~seekable:false dir path - ~access:`R - ~flags:Uring.Open_flags.(cloexec + directory) - ~perm:0 - - let read_dir fd = - FD.use_exn "read_dir" fd @@ fun fd -> - let rec read_all acc fd = - match eio_getdents fd with - | [] -> acc - | files -> - let files = List.filter (function ".." | "." -> false | _ -> true) files in - read_all (files @ acc) fd - in - Eio_unix.run_in_systhread (fun () -> read_all [] fd) - - (* https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml *) - let getaddrinfo ~service node = - let to_eio_sockaddr_t {Unix.ai_family; ai_addr; ai_socktype; ai_protocol; _ } = - match ai_family, ai_socktype, ai_addr with - | (Unix.PF_INET | PF_INET6), - (Unix.SOCK_STREAM | SOCK_DGRAM), - Unix.ADDR_INET (inet_addr,port) -> ( - match ai_protocol with - | 6 -> Some (`Tcp (Eio_unix.Ipaddr.of_unix inet_addr, port)) - | 17 -> Some (`Udp (Eio_unix.Ipaddr.of_unix inet_addr, port)) - | _ -> None) - | _ -> None - in - Eio_unix.run_in_systhread @@ fun () -> - Unix.getaddrinfo node service [] - |> List.filter_map to_eio_sockaddr_t -end - -external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd" - type has_fd = < fd : FD.t > type source = < Eio.Flow.source; Eio.Flow.close; has_fd > type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd > @@ -998,7 +137,7 @@ let flow fd = method fd = fd method close = FD.close fd - method stat = fstat fd + method stat = Low_level.fstat fd method probe : type a. a Eio.Generic.ty -> a option = function | FD -> Some fd @@ -1089,7 +228,7 @@ let net = object | Unix.{ st_kind = S_SOCK; _ } -> Unix.unlink path | _ -> () | exception Unix.Unix_error (Unix.ENOENT, _, _) -> () - | exception Unix.Unix_error (code, name, arg) -> raise @@ wrap_error code name arg + | exception Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg ); Unix.SOCK_STREAM, Unix.ADDR_UNIX path | `Tcp (host, port) -> @@ -1166,14 +305,14 @@ let domain_mgr ~run_event_loop = object method run_raw fn = let domain = ref None in - enter (fun t k -> - domain := Some (Domain.spawn (fun () -> Fun.protect fn ~finally:(fun () -> enqueue_thread t k ()))) + Sched.enter (fun t k -> + domain := Some (Domain.spawn (fun () -> Fun.protect fn ~finally:(fun () -> Sched.enqueue_thread t k ()))) ); Domain.join (Option.get !domain) method run fn = let domain = ref None in - enter (fun t k -> + Sched.enter (fun t k -> let cancelled, set_cancelled = Promise.create () in Fiber_context.set_cancel_fn k.fiber (Promise.resolve set_cancelled); domain := Some (Domain.spawn (fun () -> @@ -1183,7 +322,7 @@ let domain_mgr ~run_event_loop = object run_event_loop (fun () -> result := Some (fn ~cancelled)) (); Option.get !result ) - ~finally:(fun () -> enqueue_thread t k ()))) + ~finally:(fun () -> Sched.enqueue_thread t k ()))) ); Domain.join (Option.get !domain) end @@ -1208,7 +347,7 @@ let clock = object Eio.Time.Mono.sleep mono_clock d end -class dir ~label (fd : dir_fd) = object +class dir ~label (fd : Low_level.dir_fd) = object inherit Eio.Fs.dir method! probe : type a. a Eio.Generic.ty -> a option = function @@ -1246,7 +385,7 @@ class dir ~label (fd : dir_fd) = object ~perm:0 in let label = Filename.basename path in - (new dir ~label (FD fd) :> ) + (new dir ~label (Low_level.FD fd) :> ) method mkdir ~perm path = Low_level.mkdir_beneath ~perm fd path @@ -1297,178 +436,11 @@ let stdenv ~run_event_loop = method debug = Eio.Private.Debug.v end -let monitor_event_fd t = - let buf = Cstruct.create 8 in - while true do - let got = Low_level.readv t.eventfd [buf] in - assert (got = 8); - (* We just go back to sleep now, but this will cause the scheduler to look - at the run queue again and notice any new items. *) - done; - assert false - -let no_fallback (`Msg msg) = failwith msg - -type config = { - queue_depth : int; - n_blocks : int; - block_size : int; - polling_timeout : int option; -} - -let config ?(queue_depth=64) ?n_blocks ?(block_size=4096) ?polling_timeout () = - let n_blocks = Option.value n_blocks ~default:queue_depth in - { - queue_depth; - n_blocks; - block_size; - polling_timeout; - } - -let with_sched ?(fallback=no_fallback) config fn = - let { queue_depth; n_blocks; block_size; polling_timeout } = config in - match Uring.create ~queue_depth ?polling_timeout () with - | exception Unix.Unix_error(Unix.ENOSYS, _, _) -> fallback (`Msg "io_uring is not available on this system") - | uring -> - let probe = Uring.get_probe uring in - if not (Uring.op_supported probe Uring.Op.shutdown) then - fallback (`Msg "Linux >= 5.11 is required for io_uring support") - else ( - match - let mem = - let fixed_buf_len = block_size * n_blocks in - let buf = Bigarray.(Array1.create char c_layout fixed_buf_len) in - match Uring.set_fixed_buffer uring buf with - | Ok () -> - Some (Uring.Region.init ~block_size buf n_blocks) - | Error `ENOMEM -> - Log.warn (fun f -> f "Failed to allocate %d byte fixed buffer" fixed_buf_len); - None - in - let run_q = Lf_queue.create () in - Lf_queue.push run_q IO; - let sleep_q = Zzz.create () in - let io_q = Queue.create () in - let mem_q = Queue.create () in - let eventfd = FD.of_unix_no_hook ~seekable:false ~close_unix:true (eio_eventfd 0) in - fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } - with - | x -> Uring.exit uring; x - | exception ex -> - let bt = Printexc.get_raw_backtrace () in - begin - try Uring.exit uring - with ex2 -> - let bt2 = Printexc.get_raw_backtrace () in - raise (Eio.Exn.Multiple [(ex2, bt2); (ex, bt)]) - end; - Printexc.raise_with_backtrace ex bt - ) - -type exit = [`Exit_scheduler] - -let run_sched ~extra_effects st main arg = - let rec fork ~new_fiber:fiber fn = - let open Effect.Deep in - Ctf.note_switch (Fiber_context.tid fiber); - match_with fn () - { retc = (fun () -> Fiber_context.destroy fiber; schedule st); - exnc = (fun ex -> - Fiber_context.destroy fiber; - Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ()) - ); - effc = fun (type a) (e : a Effect.t) -> - match e with - | Enter fn -> Some (fun k -> - match Fiber_context.get_error fiber with - | Some e -> discontinue k e - | None -> - let k = { Suspended.k; fiber } in - fn st k; - schedule st - ) - | Cancel job -> Some (fun k -> - enqueue_cancel job st; - continue k () - ) - | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fiber) - | Eio.Private.Effects.Suspend f -> Some (fun k -> - let k = { Suspended.k; fiber } in - f fiber (function - | Ok v -> enqueue_thread st k v - | Error ex -> enqueue_failed_thread st k ex - ); - schedule st - ) - | Eio.Private.Effects.Fork (new_fiber, f) -> Some (fun k -> - let k = { Suspended.k; fiber } in - enqueue_at_head st k (); - fork ~new_fiber f - ) - | Eio_unix.Private.Await_readable fd -> Some (fun k -> - match Fiber_context.get_error fiber with - | Some e -> discontinue k e - | None -> - let k = { Suspended.k; fiber } in - enqueue_poll_add_unix fd Uring.Poll_mask.(pollin + pollerr) st k (fun res -> - if res >= 0 then Suspended.continue k () - else Suspended.discontinue k (Unix.Unix_error (Uring.error_of_errno res, "await_readable", "")) - ); - schedule st - ) - | Eio_unix.Private.Await_writable fd -> Some (fun k -> - match Fiber_context.get_error fiber with - | Some e -> discontinue k e - | None -> - let k = { Suspended.k; fiber } in - enqueue_poll_add_unix fd Uring.Poll_mask.(pollout + pollerr) st k (fun res -> - if res >= 0 then Suspended.continue k () - else Suspended.discontinue k (Unix.Unix_error (Uring.error_of_errno res, "await_writable", "")) - ); - schedule st - ) - | Low_level.Alloc -> Some (fun k -> - match st.mem with - | None -> continue k None - | Some mem -> - match Uring.Region.alloc mem with - | buf -> continue k (Some buf) - | exception Uring.Region.No_space -> continue k None - ) - | Low_level.Alloc_or_wait -> Some (fun k -> - let k = { Suspended.k; fiber } in - Low_level.alloc_buf_or_wait st k - ) - | Low_level.Free buf -> Some (fun k -> - Low_level.free_buf st buf; - continue k () - ) - | e -> extra_effects.effc e - } - in - let result = ref None in - let `Exit_scheduler = - let new_fiber = Fiber_context.make_root () in - fork ~new_fiber (fun () -> - Switch.run_protected (fun sw -> - Switch.on_release sw (fun () -> - FD.close st.eventfd - ); - result := Some ( - Fiber.first - (fun () -> main arg) - (fun () -> monitor_event_fd st) - ) - ) - ) - in - Option.get !result - let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a = - with_sched ?fallback config @@ fun st -> + Sched.with_sched ?fallback config @@ fun st -> let open Effect.Deep in let extra_effects : _ effect_handler = { - effc = fun (type a) (e : a Effect.t) : ((a, exit) continuation -> exit) option -> + effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option -> match e with | Eio_unix.Private.Get_monotonic_clock -> Some (fun k -> continue k mono_clock) | Eio_unix.Private.Socket_of_fd (sw, close_unix, fd) -> Some (fun k -> @@ -1492,10 +464,10 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a = ) | _ -> None } in - run_sched ~extra_effects st main arg + Sched.run ~extra_effects st main arg let run ?queue_depth ?n_blocks ?block_size ?polling_timeout ?fallback main = - let config = config ?queue_depth ?n_blocks ?block_size ?polling_timeout () in + let config = Sched.config ?queue_depth ?n_blocks ?block_size ?polling_timeout () in let stdenv = stdenv ~run_event_loop:(run_event_loop ?fallback:None config) in (* SIGPIPE makes no sense in a modern application. *) Sys.(set_signal sigpipe Signal_ignore); diff --git a/lib_eio_linux/err.ml b/lib_eio_linux/err.ml new file mode 100644 index 000000000..6099cc5a5 --- /dev/null +++ b/lib_eio_linux/err.ml @@ -0,0 +1,16 @@ +let unclassified e = Eio.Exn.create (Eio.Exn.X e) + +let wrap code name arg = + let ex = Eio_unix.Unix_error (code, name, arg) in + match code with + | ECONNREFUSED -> Eio.Net.err (Connection_failure (Refused ex)) + | ECONNRESET | EPIPE -> Eio.Net.err (Connection_reset ex) + | _ -> unclassified ex + +let wrap_fs code name arg = + let e = Eio_unix.Unix_error (code, name, arg) in + match code with + | Unix.EEXIST -> Eio.Fs.err (Already_exists e) + | Unix.ENOENT -> Eio.Fs.err (Not_found e) + | Unix.EXDEV | EPERM | EACCES -> Eio.Fs.err (Permission_denied e) + | _ -> wrap code name arg diff --git a/lib_eio_linux/fd.ml b/lib_eio_linux/fd.ml new file mode 100644 index 000000000..8c37820c3 --- /dev/null +++ b/lib_eio_linux/fd.ml @@ -0,0 +1,69 @@ +open Eio.Std + +module Rcfd = Eio_unix.Private.Rcfd +module Ctf = Eio.Private.Ctf + +type t = { + fd : Rcfd.t; + seekable : bool; + close_unix : bool; (* Whether closing this also closes the underlying FD. *) + mutable release_hook : Eio.Switch.hook; (* Use this on close to remove switch's [on_release] hook. *) +} + +let err_closed op = Invalid_argument (op ^ ": file descriptor used after calling close!") + +let use t f ~if_closed = Rcfd.use t.fd f ~if_closed + +let use_exn op t f = + Rcfd.use t.fd f ~if_closed:(fun () -> raise (err_closed op)) + +let rec use_exn_list op xs k = + match xs with + | [] -> k [] + | x :: xs -> + use_exn op x @@ fun x -> + use_exn_list op xs @@ fun xs -> + k (x :: xs) + +let is_open t = Rcfd.is_open t.fd + +let close t = + Ctf.label "close"; + Switch.remove_hook t.release_hook; + if t.close_unix then ( + if not (Rcfd.close t.fd) then raise (err_closed "close") + ) else ( + match Rcfd.remove t.fd with + | Some _ -> () + | None -> raise (err_closed "close") + ) + +let is_seekable fd = + match Unix.lseek fd 0 Unix.SEEK_CUR with + | (_ : int) -> true + | exception Unix.Unix_error(Unix.ESPIPE, "lseek", "") -> false + +let to_unix op t = + match op with + | `Peek -> Rcfd.peek t.fd + | `Take -> + Switch.remove_hook t.release_hook; + match Rcfd.remove t.fd with + | Some fd -> fd + | None -> raise (err_closed "to_unix") + +let of_unix_no_hook ~seekable ~close_unix fd = + { fd = Rcfd.make fd; seekable; close_unix; release_hook = Switch.null_hook } + +let of_unix ~sw ~seekable ~close_unix fd = + let t = of_unix_no_hook ~seekable ~close_unix fd in + t.release_hook <- Switch.on_release_cancellable sw (fun () -> close t); + t + +let uring_file_offset t = + if t.seekable then Optint.Int63.minus_one else Optint.Int63.zero + +let file_offset t = function + | Some x -> `Pos x + | None when t.seekable -> `Seekable_current + | None -> `Nonseekable_current diff --git a/lib_eio_linux/log.ml b/lib_eio_linux/log.ml new file mode 100644 index 000000000..8a4e83c88 --- /dev/null +++ b/lib_eio_linux/log.ml @@ -0,0 +1,2 @@ +let src = Logs.Src.create "eio_linux" ~doc:"Effect-based IO system for Linux/io-uring" +include (val Logs.src_log src : Logs.LOG) diff --git a/lib_eio_linux/low_level.ml b/lib_eio_linux/low_level.ml new file mode 100644 index 000000000..a61af2751 --- /dev/null +++ b/lib_eio_linux/low_level.ml @@ -0,0 +1,434 @@ +[@@@alert "-unstable"] + +open Eio.Std + +module Ctf = Eio.Private.Ctf + +type dir_fd = + | FD of Fd.t + | Cwd (* Confined to "." *) + | Fs (* Unconfined "."; also allows absolute paths *) + +let enqueue_read st action (file_offset,fd,buf,len) = + let req = { Sched.op=`R; file_offset; len; fd; cur_off = 0; buf; action } in + Ctf.label "read"; + Sched.submit_rw_req st req + +let rec enqueue_writev args st action = + let (file_offset,fd,bufs) = args in + Ctf.label "writev"; + let retry = Sched.with_cancel_hook ~action st (fun () -> + Uring.writev st.uring ~file_offset fd bufs (Job action) + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_writev args st action) st.io_q + +let enqueue_write st action (file_offset,fd,buf,len) = + let req = { Sched.op=`W; file_offset; len; fd; cur_off = 0; buf; action } in + Ctf.label "write"; + Sched.submit_rw_req st req + +let rec enqueue_splice ~src ~dst ~len st action = + Ctf.label "splice"; + let retry = Sched.with_cancel_hook ~action st (fun () -> + Uring.splice st.uring (Job action) ~src ~dst ~len + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q + +let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st action = + Ctf.label "openat2"; + let retry = Sched.with_cancel_hook ~action st (fun () -> + Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action) + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_openat2 args st action) st.io_q + +let rec enqueue_unlink ((dir, fd, path) as args) st action = + Ctf.label "unlinkat"; + let retry = Sched.with_cancel_hook ~action st (fun () -> + Uring.unlink st.uring ~dir ~fd path (Job action) + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_unlink args st action) st.io_q + +let rec enqueue_connect fd addr st action = + Ctf.label "connect"; + let retry = Sched.with_cancel_hook ~action st (fun () -> + Uring.connect st.uring fd addr (Job action) + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q + +let rec enqueue_send_msg fd ~fds ~dst buf st action = + Ctf.label "send_msg"; + let retry = Sched.with_cancel_hook ~action st (fun () -> + Uring.send_msg st.uring fd ~fds ?dst buf (Job action) + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q + +let rec enqueue_recv_msg fd msghdr st action = + Ctf.label "recv_msg"; + let retry = Sched.with_cancel_hook ~action st (fun () -> + Uring.recv_msg st.uring fd msghdr (Job action); + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q + +let rec enqueue_accept fd client_addr st action = + Ctf.label "accept"; + let retry = Sched.with_cancel_hook ~action st (fun () -> + Uring.accept st.uring fd client_addr (Job action) + ) in + if retry then ( + (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_accept fd client_addr st action) st.io_q + ) + +let rec enqueue_noop t action = + Ctf.label "noop"; + let job = Sched.enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in + if job = None then ( + (* wait until an sqe is available *) + Queue.push (fun t -> enqueue_noop t action) t.io_q + ) + +let noop () = + let result = Sched.enter enqueue_noop in + if result <> 0 then raise (Err.unclassified (Eio_unix.Unix_error (Uring.error_of_errno result, "noop", ""))) + +let sleep_until time = + Sched.enter @@ fun t k -> + let job = Eio_utils.Zzz.add t.sleep_q time k in + Eio.Private.Fiber_context.set_cancel_fn k.fiber (fun ex -> + Eio_utils.Zzz.remove t.sleep_q job; + Sched.enqueue_failed_thread t k ex + ) + +let read ?file_offset fd buf amount = + let file_offset = Fd.file_offset fd file_offset in + Fd.use_exn "read" fd @@ fun fd -> + let res = Sched.enter (fun t k -> enqueue_read t k (file_offset, fd, buf, amount)) in + if res < 0 then ( + raise @@ Err.wrap (Uring.error_of_errno res) "read" "" + ) else res + +let read_exactly ?file_offset fd buf len = + ignore (read ?file_offset fd buf (Exactly len) : int) + +let read_upto ?file_offset fd buf len = + read ?file_offset fd buf (Upto len) + +let rec enqueue_readv args st action = + let (file_offset,fd,bufs) = args in + Ctf.label "readv"; + let retry = Sched.with_cancel_hook ~action st (fun () -> + Uring.readv st.uring ~file_offset fd bufs (Job action)) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_readv args st action) st.io_q + +let readv ?file_offset fd bufs = + let file_offset = + match file_offset with + | Some x -> x + | None -> Fd.uring_file_offset fd + in + Fd.use_exn "readv" fd @@ fun fd -> + let res = Sched.enter (enqueue_readv (file_offset, fd, bufs)) in + if res < 0 then ( + raise @@ Err.wrap (Uring.error_of_errno res) "readv" "" + ) else if res = 0 then ( + raise End_of_file + ) else ( + res + ) + +let writev_single ?file_offset fd bufs = + let file_offset = + match file_offset with + | Some x -> x + | None -> Fd.uring_file_offset fd + in + Fd.use_exn "writev" fd @@ fun fd -> + let res = Sched.enter (enqueue_writev (file_offset, fd, bufs)) in + if res < 0 then ( + raise @@ Err.wrap (Uring.error_of_errno res) "writev" "" + ) else ( + res + ) + +let rec writev ?file_offset fd bufs = + let bytes_written = writev_single ?file_offset fd bufs in + match Cstruct.shiftv bufs bytes_written with + | [] -> () + | bufs -> + let file_offset = + let module I63 = Optint.Int63 in + match file_offset with + | None -> None + | Some ofs when ofs = I63.minus_one -> Some I63.minus_one + | Some ofs -> Some (I63.add ofs (I63.of_int bytes_written)) + in + writev ?file_offset fd bufs + +let await_readable fd = + Fd.use_exn "await_readable" fd @@ fun fd -> + let res = Sched.enter (Sched.enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in + if res < 0 then ( + raise (Err.unclassified (Eio_unix.Unix_error (Uring.error_of_errno res, "await_readable", ""))) + ) + +let await_writable fd = + Fd.use_exn "await_writable" fd @@ fun fd -> + let res = Sched.enter (Sched.enqueue_poll_add fd (Uring.Poll_mask.(pollout + pollerr))) in + if res < 0 then ( + raise (Err.unclassified (Eio_unix.Unix_error (Uring.error_of_errno res, "await_writable", ""))) + ) + +let write ?file_offset fd buf len = + let file_offset = Fd.file_offset fd file_offset in + Fd.use_exn "write" fd @@ fun fd -> + let res = Sched.enter (fun t k -> enqueue_write t k (file_offset, fd, buf, Exactly len)) in + if res < 0 then ( + raise @@ Err.wrap (Uring.error_of_errno res) "write" "" + ) + +let alloc_fixed () = Effect.perform Sched.Alloc + +let alloc_fixed_or_wait () = Effect.perform Sched.Alloc_or_wait + +let free_fixed buf = Effect.perform (Sched.Free buf) + +let splice src ~dst ~len = + Fd.use_exn "splice-src" src @@ fun src -> + Fd.use_exn "splice-dst" dst @@ fun dst -> + let res = Sched.enter (enqueue_splice ~src ~dst ~len) in + if res > 0 then res + else if res = 0 then raise End_of_file + else raise @@ Err.wrap (Uring.error_of_errno res) "splice" "" + +let connect fd addr = + Fd.use_exn "connect" fd @@ fun fd -> + let res = Sched.enter (enqueue_connect fd addr) in + if res < 0 then ( + let ex = + match addr with + | ADDR_UNIX _ -> Err.wrap_fs (Uring.error_of_errno res) "connect" "" + | ADDR_INET _ -> Err.wrap (Uring.error_of_errno res) "connect" "" + in + raise ex + ) + +let send_msg fd ?(fds=[]) ?dst buf = + Fd.use_exn "send_msg" fd @@ fun fd -> + Fd.use_exn_list "send_msg" fds @@ fun fds -> + let res = Sched.enter (enqueue_send_msg fd ~fds ~dst buf) in + if res < 0 then ( + raise @@ Err.wrap (Uring.error_of_errno res) "send_msg" "" + ) + +let recv_msg fd buf = + Fd.use_exn "recv_msg" fd @@ fun fd -> + let addr = Uring.Sockaddr.create () in + let msghdr = Uring.Msghdr.create ~addr buf in + let res = Sched.enter (enqueue_recv_msg fd msghdr) in + if res < 0 then ( + raise @@ Err.wrap (Uring.error_of_errno res) "recv_msg" "" + ); + addr, res + +let recv_msg_with_fds ~sw ~max_fds fd buf = + Fd.use_exn "recv_msg_with_fds" fd @@ fun fd -> + let addr = Uring.Sockaddr.create () in + let msghdr = Uring.Msghdr.create ~n_fds:max_fds ~addr buf in + let res = Sched.enter (enqueue_recv_msg fd msghdr) in + if res < 0 then ( + raise @@ Err.wrap (Uring.error_of_errno res) "recv_msg" "" + ); + let fds = + Uring.Msghdr.get_fds msghdr + |> List.map (fun fd -> Fd.of_unix ~sw ~seekable:(Fd.is_seekable fd) ~close_unix:true fd) + in + addr, res, fds + +let with_chunk ~fallback fn = + match alloc_fixed () with + | Some chunk -> + Fun.protect ~finally:(fun () -> free_fixed chunk) @@ fun () -> + fn chunk + | None -> + fallback () + +let openat2 ~sw ?seekable ~access ~flags ~perm ~resolve ?dir path = + let use dir = + let res = Sched.enter (enqueue_openat2 (access, flags, perm, resolve, dir, path)) in + if res < 0 then ( + Switch.check sw; (* If cancelled, report that instead. *) + raise @@ Err.wrap_fs (Uring.error_of_errno res) "openat2" "" + ); + let fd : Unix.file_descr = Obj.magic res in + let seekable = + match seekable with + | None -> Fd.is_seekable fd + | Some x -> x + in + Fd.of_unix ~sw ~seekable ~close_unix:true fd + in + match dir with + | None -> use None + | Some dir -> Fd.use_exn "openat2" dir (fun x -> use (Some x)) + +let openat ~sw ?seekable ~access ~flags ~perm dir path = + match dir with + | FD dir -> openat2 ~sw ?seekable ~access ~flags ~perm ~resolve:Uring.Resolve.beneath ~dir path + | Cwd -> openat2 ~sw ?seekable ~access ~flags ~perm ~resolve:Uring.Resolve.beneath path + | Fs -> openat2 ~sw ?seekable ~access ~flags ~perm ~resolve:Uring.Resolve.empty path + +let fstat t = + (* todo: use uring *) + try + let ust = Fd.use_exn "fstat" t Unix.LargeFile.fstat in + let st_kind : Eio.File.Stat.kind = + match ust.st_kind with + | Unix.S_REG -> `Regular_file + | Unix.S_DIR -> `Directory + | Unix.S_CHR -> `Character_special + | Unix.S_BLK -> `Block_device + | Unix.S_LNK -> `Symbolic_link + | Unix.S_FIFO -> `Fifo + | Unix.S_SOCK -> `Socket + in + Eio.File.Stat.{ + dev = ust.st_dev |> Int64.of_int; + ino = ust.st_ino |> Int64.of_int; + kind = st_kind; + perm = ust.st_perm; + nlink = ust.st_nlink |> Int64.of_int; + uid = ust.st_uid |> Int64.of_int; + gid = ust.st_gid |> Int64.of_int; + rdev = ust.st_rdev |> Int64.of_int; + size = ust.st_size |> Optint.Int63.of_int64; + atime = ust.st_atime; + mtime = ust.st_mtime; + ctime = ust.st_ctime; + } + with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap_fs code name arg + +external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_mkdirat" + +external eio_renameat : Unix.file_descr -> string -> Unix.file_descr -> string -> unit = "caml_eio_renameat" + +external eio_getrandom : Cstruct.buffer -> int -> int -> int = "caml_eio_getrandom" + +external eio_getdents : Unix.file_descr -> string list = "caml_eio_getdents" + +let getrandom { Cstruct.buffer; off; len } = + let rec loop n = + if n = len then + () + else + loop (n + eio_getrandom buffer (off + n) (len - n)) + in + loop 0 + +(* [with_parent_dir dir path fn] runs [fn parent (basename path)], + where [parent] is a path FD for [path]'s parent, resolved using [Resolve.beneath]. *) +let with_parent_dir op dir path fn = + let dir_path = Filename.dirname path in + let leaf = Filename.basename path in + Switch.run (fun sw -> + let parent = + match dir with + | FD d when dir_path = "." -> d + | _ -> + openat ~sw ~seekable:false dir dir_path + ~access:`R + ~flags:Uring.Open_flags.(cloexec + path + directory) + ~perm:0 + in + Fd.use_exn op parent @@ fun parent -> + fn parent leaf + ) + +let mkdir_beneath ~perm dir path = + (* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *) + with_parent_dir "mkdir" dir path @@ fun parent leaf -> + try eio_mkdirat parent leaf perm + with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap_fs code name arg + +let unlink ~rmdir dir path = + (* [unlink] is really an operation on [path]'s parent. Get a reference to that first: *) + with_parent_dir "unlink" dir path @@ fun parent leaf -> + let res = Sched.enter (enqueue_unlink (rmdir, parent, leaf)) in + if res <> 0 then raise @@ Err.wrap_fs (Uring.error_of_errno res) "unlinkat" "" + +let rename old_dir old_path new_dir new_path = + with_parent_dir "renameat-old" old_dir old_path @@ fun old_parent old_leaf -> + with_parent_dir "renameat-new" new_dir new_path @@ fun new_parent new_leaf -> + try + eio_renameat + old_parent old_leaf + new_parent new_leaf + with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap_fs code name arg + +let shutdown socket command = + Fd.use_exn "shutdown" socket @@ fun fd -> + try Unix.shutdown fd command + with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg + +let accept ~sw fd = + Ctf.label "accept"; + Fd.use_exn "accept" fd @@ fun fd -> + let client_addr = Uring.Sockaddr.create () in + let res = Sched.enter (enqueue_accept fd client_addr) in + if res < 0 then ( + raise @@ Err.wrap (Uring.error_of_errno res) "accept" "" + ) else ( + let unix : Unix.file_descr = Obj.magic res in + let client = Fd.of_unix ~sw ~seekable:false ~close_unix:true unix in + let client_addr = Uring.Sockaddr.get client_addr in + client, client_addr + ) + +let open_dir ~sw dir path = + openat ~sw ~seekable:false dir path + ~access:`R + ~flags:Uring.Open_flags.(cloexec + directory) + ~perm:0 + +let read_dir fd = + Fd.use_exn "read_dir" fd @@ fun fd -> + let rec read_all acc fd = + match eio_getdents fd with + | [] -> acc + | files -> + let files = List.filter (function ".." | "." -> false | _ -> true) files in + read_all (files @ acc) fd + in + Eio_unix.run_in_systhread (fun () -> read_all [] fd) + +(* https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml *) +let getaddrinfo ~service node = + let to_eio_sockaddr_t {Unix.ai_family; ai_addr; ai_socktype; ai_protocol; _ } = + match ai_family, ai_socktype, ai_addr with + | (Unix.PF_INET | PF_INET6), + (Unix.SOCK_STREAM | SOCK_DGRAM), + Unix.ADDR_INET (inet_addr,port) -> ( + match ai_protocol with + | 6 -> Some (`Tcp (Eio_unix.Ipaddr.of_unix inet_addr, port)) + | 17 -> Some (`Udp (Eio_unix.Ipaddr.of_unix inet_addr, port)) + | _ -> None) + | _ -> None + in + Eio_unix.run_in_systhread @@ fun () -> + Unix.getaddrinfo node service [] + |> List.filter_map to_eio_sockaddr_t diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml new file mode 100644 index 000000000..551c80755 --- /dev/null +++ b/lib_eio_linux/sched.ml @@ -0,0 +1,541 @@ +[@@@alert "-unstable"] + +open Eio.Std + +module Fiber_context = Eio.Private.Fiber_context +module Ctf = Eio.Private.Ctf + +module Suspended = Eio_utils.Suspended +module Zzz = Eio_utils.Zzz +module Lf_queue = Eio_utils.Lf_queue + +let system_thread = Ctf.mint_id () + +type exit = [`Exit_scheduler] + +type file_offset = [ + | `Pos of Optint.Int63.t + | `Seekable_current + | `Nonseekable_current +] + +type amount = Exactly of int | Upto of int + +type rw_req = { + op : [`R|`W]; + file_offset : file_offset; (* Read from here + cur_off (unless using current pos) *) + fd : Unix.file_descr; + len : amount; + buf : Uring.Region.chunk; + mutable cur_off : int; + action : int Suspended.t; +} + +(* Type of user-data attached to jobs. *) +type io_job = + | Read : rw_req -> io_job + | Job_no_cancel : int Suspended.t -> io_job + | Cancel_job : io_job + | Job : int Suspended.t -> io_job (* A negative result indicates error, and may report cancellation *) + | Write : rw_req -> io_job + | Job_fn : 'a Suspended.t * (int -> [`Exit_scheduler]) -> io_job + (* When done, remove the cancel_fn from [Suspended.t] and call the callback (unless cancelled). *) + +type runnable = + | IO : runnable + | Thread : 'a Suspended.t * 'a -> runnable + | Failed_thread : 'a Suspended.t * exn -> runnable + +type t = { + uring: io_job Uring.t; + mem: Uring.Region.t option; + io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *) + mem_q : Uring.Region.chunk Suspended.t Queue.t; + + (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) + run_q : runnable Lf_queue.t; + + (* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event. + In that case, [need_wakeup = true] and you must signal using [eventfd]. *) + eventfd : Fd.t; + + (* If [false], the main thread will check [run_q] before sleeping again + (possibly because an event has been or will be sent to [eventfd]). + It can therefore be set to [false] in either of these cases: + - By the receiving thread because it will check [run_q] before sleeping, or + - By the sending thread because it will signal the main thread later *) + need_wakeup : bool Atomic.t; + + sleep_q: Zzz.t; +} + +type _ Effect.t += + | Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t + | Cancel : io_job Uring.job -> unit Effect.t + | Alloc : Uring.Region.chunk option Effect.t + | Alloc_or_wait : Uring.Region.chunk Effect.t + | Free : Uring.Region.chunk -> unit Effect.t + +let wake_buffer = + let b = Bytes.create 8 in + Bytes.set_int64_ne b 0 1L; + b + +(* This can be called from any systhread (including ones not running Eio), + and also from signal handlers or GC finalizers. It must not take any locks. *) +let wakeup t = + Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *) + Fd.use t.eventfd + (fun fd -> + let sent = Unix.single_write fd wake_buffer 0 8 in + assert (sent = 8) + ) + ~if_closed:ignore (* Domain has shut down (presumably after handling the event) *) + +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) +let enqueue_thread st k x = + Lf_queue.push st.run_q (Thread (k, x)); + if Atomic.get st.need_wakeup then wakeup st + +(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *) +let enqueue_failed_thread st k ex = + Lf_queue.push st.run_q (Failed_thread (k, ex)); + if Atomic.get st.need_wakeup then wakeup st + +(* Can only be called from our own domain, so no need to check for wakeup. *) +let enqueue_at_head st k x = + Lf_queue.push_head st.run_q (Thread (k, x)) + +let enter fn = Effect.perform (Enter fn) + +let rec enqueue_job t fn = + match fn () with + | Some _ as r -> r + | None -> + if Uring.submit t.uring > 0 then enqueue_job t fn + else None + +(* Cancellations always come from the same domain, so no need to send wake events here. *) +let rec enqueue_cancel job t = + Ctf.label "cancel"; + match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with + | None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q + | Some _ -> () + +let cancel job = Effect.perform (Cancel job) + +(* Cancellation + + For operations that can be cancelled we need to set the fiber's cancellation function. + The typical sequence is: + + 1. We submit an operation, getting back a uring job (needed for cancellation). + 2. We set the cancellation function. The function uses the uring job to cancel. + + When the job completes, we clear the cancellation function. The function + must have been set by this point because we don't poll for completions until + the above steps have finished. + + If the context is cancelled while the operation is running, the function will get removed and called, + which will submit a cancellation request to uring. We know the job is still valid at this point because + we clear the cancel function when it completes. + + If the operation completes before Linux processes the cancellation, we get [ENOENT], which we ignore. *) + +(* [with_cancel_hook ~action t fn] calls [fn] to create a job, + then sets the fiber's cancel function to cancel it. + If [action] is already cancelled, it schedules [action] to be discontinued. + @return Whether to retry the operation later, once there is space. *) +let with_cancel_hook ~action t fn = + match Fiber_context.get_error action.Suspended.fiber with + | Some ex -> enqueue_failed_thread t action ex; false + | None -> + match enqueue_job t fn with + | None -> true + | Some job -> + Fiber_context.set_cancel_fn action.fiber (fun _ -> cancel job); + false + +let submit_pending_io st = + match Queue.take_opt st.io_q with + | None -> () + | Some fn -> + Ctf.label "submit_pending_io"; + fn st + +let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) = + let {uring;io_q;_} = st in + let off = Uring.Region.to_offset buf + cur_off in + let len = match len with Exactly l | Upto l -> l in + let len = len - cur_off in + let retry = with_cancel_hook ~action st (fun () -> + let file_offset = + match file_offset with + | `Pos x -> Optint.Int63.add x (Optint.Int63.of_int cur_off) + | `Seekable_current -> Optint.Int63.minus_one + | `Nonseekable_current -> Optint.Int63.zero + in + match op with + |`R -> Uring.read_fixed uring ~file_offset fd ~off ~len (Read req) + |`W -> Uring.write_fixed uring ~file_offset fd ~off ~len (Write req) + ) + in + if retry then ( + Ctf.label "await-sqe"; + (* wait until an sqe is available *) + Queue.push (fun st -> submit_rw_req st req) io_q + ) + +(* TODO bind from unixsupport *) +let errno_is_retry = function -62 | -11 | -4 -> true |_ -> false + +(* Switch control to the next ready continuation. + If none is ready, wait until we get an event to wake one and then switch. + Returns only if there is nothing to do and no queued operations. *) +let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = + (* This is not a fair scheduler *) + (* Wakeup any paused fibers *) + match Lf_queue.pop run_q with + | None -> assert false (* We should always have an IO job, at least *) + | Some Thread (k, v) -> (* We already have a runnable task *) + Fiber_context.clear_cancel_fn k.fiber; + Suspended.continue k v + | Some Failed_thread (k, ex) -> + Fiber_context.clear_cancel_fn k.fiber; + Suspended.discontinue k ex + | Some IO -> (* Note: be sure to re-inject the IO task before continuing! *) + (* This is not a fair scheduler: timers always run before all other IO *) + let now = Mtime_clock.now () in + match Zzz.pop ~now sleep_q with + | `Due k -> + Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) + Suspended.continue k () (* A sleeping task is now due *) + | `Wait_until _ | `Nothing as next_due -> + (* Handle any pending events before submitting. This is faster. *) + match Uring.get_cqe_nonblocking uring with + | Some { data = runnable; result } -> + Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) + handle_complete st ~runnable result + | None -> + ignore (Uring.submit uring : int); + let timeout = + match next_due with + | `Wait_until time -> + let time = Mtime.to_uint64_ns time in + let now = Mtime.to_uint64_ns now in + let diff_ns = Int64.sub time now |> Int64.to_float in + Some (diff_ns /. 1e9) + | `Nothing -> None + in + if not (Lf_queue.is_empty st.run_q) then ( + Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) + schedule st + ) else if timeout = None && Uring.active_ops uring = 0 then ( + (* Nothing further can happen at this point. + If there are no events in progress but also still no memory available, something has gone wrong! *) + assert (Queue.length mem_q = 0); + Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *) + `Exit_scheduler + ) else ( + Atomic.set st.need_wakeup true; + if Lf_queue.is_empty st.run_q then ( + (* At this point we're not going to check [run_q] again before sleeping. + If [need_wakeup] is still [true], this is fine because we don't promise to do that. + If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *) + Ctf.(note_hiatus Wait_for_work); + let result = Uring.wait ?timeout uring in + Ctf.note_resume system_thread; + Atomic.set st.need_wakeup false; + Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) + match result with + | None -> + (* Woken by a timeout, which is now due, or by a signal. *) + schedule st + | Some { data = runnable; result } -> + handle_complete st ~runnable result + ) else ( + (* Someone added a new job while we were setting [need_wakeup] to [true]. + They might or might not have seen that, so we can't be sure they'll send an event. *) + Atomic.set st.need_wakeup false; + Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) + schedule st + ) + ) +and handle_complete st ~runnable result = + submit_pending_io st; (* If something was waiting for a slot, submit it now. *) + match runnable with + | Read req -> + complete_rw_req st req result + | Write req -> + complete_rw_req st req result + | Job k -> + Fiber_context.clear_cancel_fn k.fiber; + if result >= 0 then Suspended.continue k result + else ( + match Fiber_context.get_error k.fiber with + | None -> Suspended.continue k result + | Some e -> + (* If cancelled, report that instead. *) + Suspended.discontinue k e + ) + | Job_no_cancel k -> + Suspended.continue k result + | Cancel_job -> + begin match result with + | 0 (* Operation cancelled successfully *) + | -2 (* ENOENT - operation completed before cancel took effect *) + | -114 (* EALREADY - operation already in progress *) + -> () + | errno -> + Log.warn (fun f -> f "Cancel returned unexpected error: %s" (Unix.error_message (Uring.error_of_errno errno))) + end; + schedule st + | Job_fn (k, f) -> + Fiber_context.clear_cancel_fn k.fiber; + (* Should we only do this on error, to avoid losing the return value? + We already do that with rw jobs. *) + begin match Fiber_context.get_error k.fiber with + | None -> f result + | Some e -> Suspended.discontinue k e + end +and complete_rw_req st ({len; cur_off; action; _} as req) res = + Fiber_context.clear_cancel_fn action.fiber; + match res, len with + | 0, _ -> Suspended.discontinue action End_of_file + | e, _ when e < 0 -> + begin match Fiber_context.get_error action.fiber with + | Some e -> Suspended.discontinue action e (* If cancelled, report that instead. *) + | None -> + if errno_is_retry e then ( + submit_rw_req st req; + schedule st + ) else ( + Suspended.continue action e + ) + end + | n, Exactly len when n < len - cur_off -> + req.cur_off <- req.cur_off + n; + submit_rw_req st req; + schedule st + | _, Exactly len -> Suspended.continue action len + | n, Upto _ -> Suspended.continue action n + +let alloc_buf_or_wait st k = + match st.mem with + | None -> Suspended.discontinue k (Failure "No fixed buffer available") + | Some mem -> + match Uring.Region.alloc mem with + | buf -> Suspended.continue k buf + | exception Uring.Region.No_space -> + Queue.push k st.mem_q; + schedule st + +let free_buf st buf = + match Queue.take_opt st.mem_q with + | None -> Uring.Region.free buf + | Some k -> enqueue_thread st k buf + +let rec enqueue_poll_add fd poll_mask st action = + Ctf.label "poll_add"; + let retry = with_cancel_hook ~action st (fun () -> + Uring.poll_add st.uring fd poll_mask (Job action) + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q + +let rec enqueue_poll_add_unix fd poll_mask st action cb = + Ctf.label "poll_add"; + let retry = with_cancel_hook ~action st (fun () -> + Uring.poll_add st.uring fd poll_mask (Job_fn (action, cb)) + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_poll_add_unix fd poll_mask st action cb) st.io_q + +let rec enqueue_readv args st action = + let (file_offset,fd,bufs) = args in + Ctf.label "readv"; + let retry = with_cancel_hook ~action st (fun () -> + Uring.readv st.uring ~file_offset fd bufs (Job action)) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_readv args st action) st.io_q + +let read_eventfd fd buf = + let res = enter (enqueue_readv (Optint.Int63.zero, fd, [buf])) in + if res < 0 then ( + raise @@ Unix.Unix_error (Uring.error_of_errno res, "readv", "") + ) else if res = 0 then ( + raise End_of_file + ) else ( + res + ) + +let monitor_event_fd t = + let buf = Cstruct.create 8 in + Fd.use_exn "monitor_event_fd" t.eventfd @@ fun fd -> + while true do + let got = read_eventfd fd buf in + assert (got = 8); + (* We just go back to sleep now, but this will cause the scheduler to look + at the run queue again and notice any new items. *) + done; + assert false + +let run ~extra_effects st main arg = + let rec fork ~new_fiber:fiber fn = + let open Effect.Deep in + Ctf.note_switch (Fiber_context.tid fiber); + match_with fn () + { retc = (fun () -> Fiber_context.destroy fiber; schedule st); + exnc = (fun ex -> + Fiber_context.destroy fiber; + Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ()) + ); + effc = fun (type a) (e : a Effect.t) -> + match e with + | Enter fn -> Some (fun k -> + match Fiber_context.get_error fiber with + | Some e -> discontinue k e + | None -> + let k = { Suspended.k; fiber } in + fn st k; + schedule st + ) + | Cancel job -> Some (fun k -> + enqueue_cancel job st; + continue k () + ) + | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fiber) + | Eio.Private.Effects.Suspend f -> Some (fun k -> + let k = { Suspended.k; fiber } in + f fiber (function + | Ok v -> enqueue_thread st k v + | Error ex -> enqueue_failed_thread st k ex + ); + schedule st + ) + | Eio.Private.Effects.Fork (new_fiber, f) -> Some (fun k -> + let k = { Suspended.k; fiber } in + enqueue_at_head st k (); + fork ~new_fiber f + ) + | Eio_unix.Private.Await_readable fd -> Some (fun k -> + match Fiber_context.get_error fiber with + | Some e -> discontinue k e + | None -> + let k = { Suspended.k; fiber } in + enqueue_poll_add_unix fd Uring.Poll_mask.(pollin + pollerr) st k (fun res -> + if res >= 0 then Suspended.continue k () + else Suspended.discontinue k (Unix.Unix_error (Uring.error_of_errno res, "await_readable", "")) + ); + schedule st + ) + | Eio_unix.Private.Await_writable fd -> Some (fun k -> + match Fiber_context.get_error fiber with + | Some e -> discontinue k e + | None -> + let k = { Suspended.k; fiber } in + enqueue_poll_add_unix fd Uring.Poll_mask.(pollout + pollerr) st k (fun res -> + if res >= 0 then Suspended.continue k () + else Suspended.discontinue k (Unix.Unix_error (Uring.error_of_errno res, "await_writable", "")) + ); + schedule st + ) + | Alloc -> Some (fun k -> + match st.mem with + | None -> continue k None + | Some mem -> + match Uring.Region.alloc mem with + | buf -> continue k (Some buf) + | exception Uring.Region.No_space -> continue k None + ) + | Alloc_or_wait -> Some (fun k -> + let k = { Suspended.k; fiber } in + alloc_buf_or_wait st k + ) + | Free buf -> Some (fun k -> + free_buf st buf; + continue k () + ) + | e -> extra_effects.effc e + } + in + let result = ref None in + let `Exit_scheduler = + let new_fiber = Fiber_context.make_root () in + fork ~new_fiber (fun () -> + Switch.run_protected (fun sw -> + Switch.on_release sw (fun () -> + Fd.close st.eventfd + ); + result := Some ( + Fiber.first + (fun () -> main arg) + (fun () -> monitor_event_fd st) + ) + ) + ) + in + Option.get !result + +type config = { + queue_depth : int; + n_blocks : int; + block_size : int; + polling_timeout : int option; +} + +let config ?(queue_depth=64) ?n_blocks ?(block_size=4096) ?polling_timeout () = + let n_blocks = Option.value n_blocks ~default:queue_depth in + { + queue_depth; + n_blocks; + block_size; + polling_timeout; + } + +external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd" + +let no_fallback (`Msg msg) = failwith msg + +let with_sched ?(fallback=no_fallback) config fn = + let { queue_depth; n_blocks; block_size; polling_timeout } = config in + match Uring.create ~queue_depth ?polling_timeout () with + | exception Unix.Unix_error(Unix.ENOSYS, _, _) -> fallback (`Msg "io_uring is not available on this system") + | uring -> + let probe = Uring.get_probe uring in + if not (Uring.op_supported probe Uring.Op.shutdown) then + fallback (`Msg "Linux >= 5.11 is required for io_uring support") + else ( + match + let mem = + let fixed_buf_len = block_size * n_blocks in + let buf = Bigarray.(Array1.create char c_layout fixed_buf_len) in + match Uring.set_fixed_buffer uring buf with + | Ok () -> + Some (Uring.Region.init ~block_size buf n_blocks) + | Error `ENOMEM -> + Log.warn (fun f -> f "Failed to allocate %d byte fixed buffer" fixed_buf_len); + None + in + let run_q = Lf_queue.create () in + Lf_queue.push run_q IO; + let sleep_q = Zzz.create () in + let io_q = Queue.create () in + let mem_q = Queue.create () in + let eventfd = Fd.of_unix_no_hook ~seekable:false ~close_unix:true (eio_eventfd 0) in + fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q } + with + | x -> Uring.exit uring; x + | exception ex -> + let bt = Printexc.get_raw_backtrace () in + begin + try Uring.exit uring + with ex2 -> + let bt2 = Printexc.get_raw_backtrace () in + raise (Eio.Exn.Multiple [(ex2, bt2); (ex, bt)]) + end; + Printexc.raise_with_backtrace ex bt + )