Skip to content

Commit

Permalink
Support all sources and sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
patricoferris committed Apr 1, 2023
1 parent 12dcd75 commit d553598
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 32 deletions.
13 changes: 11 additions & 2 deletions lib_eio/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@ let status proc = proc#status
let signal proc = proc#signal

class virtual mgr = object
method virtual spawn : sw:Switch.t -> ?cwd:Fs.dir Path.t -> stdin:Flow.source -> stdout:Flow.sink -> stderr:Flow.sink -> string -> string list -> t
method virtual spawn : 'a 'b 'c.
sw:Switch.t ->
?cwd:Fs.dir Path.t ->
stdin:(#Flow.source as 'a) ->
stdout:(#Flow.sink as 'b) ->
stderr:(#Flow.sink as 'c) ->
string ->
string list ->
t
end

let spawn ~sw t ?cwd ~stdin ~stdout ~stderr cmd args = t#spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args
let spawn ~sw (t:#mgr) ?cwd ~(stdin:#Flow.source) ~(stdout:#Flow.sink) ~(stderr:#Flow.sink) cmd args =
t#spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args
12 changes: 6 additions & 6 deletions lib_eio/process.mli
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ val signal : #t -> int -> unit
(** [signal t i] sends the signal [i] to process [t]. *)

class virtual mgr : object
method virtual spawn :
sw:Switch.t ->
method virtual spawn : 'a 'b 'c.
sw:Switch.t ->
?cwd:Fs.dir Path.t ->
stdin:Flow.source ->
stdout:Flow.sink ->
stderr:Flow.sink ->
stdin:(#Flow.source as 'a) ->
stdout:(#Flow.sink as 'b) ->
stderr:(#Flow.sink as 'c) ->
string ->
string list ->
t
end
(** A process manager capable of spawning new processes. *)

val spawn : sw:Switch.t -> #mgr -> ?cwd:Fs.dir Path.t -> stdin:Flow.source -> stdout:Flow.sink -> stderr:Flow.sink -> string -> string list -> t
val spawn : sw:Switch.t -> #mgr -> ?cwd:Fs.dir Path.t -> stdin:#Flow.source -> stdout:#Flow.sink -> stderr:#Flow.sink -> string -> string list -> t
(** [spawn ~sw mgr ?cwd ~stdin ~stdout ~stderr cmd args] creates a new subprocess that is connected to the
switch [sw]. A process will be stopped when the switch is released.
Expand Down
47 changes: 35 additions & 12 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -301,27 +301,50 @@ let process proc : Eio.Process.t = object
method signal i = Process.signal proc i
end

let pipe_or_fd flow =
match Eio.Generic.probe flow FD with
| None -> assert false
| Some fd -> FD.to_rcfd fd
let read_of_fd ~sw t =
match get_fd_opt t with
| None ->
let r, w = Low_level.pipe ~sw in
Some (flow w), r
| Some fd -> None, fd

let write_of_fd ~sw t =
match get_fd_opt t with
| None ->
let r, w = Low_level.pipe ~sw in
Some (flow r), w
| Some fd -> None, fd

let process_mgr = object
method spawn ~sw ?cwd ~stdin ~stdout ~stderr prog args =
inherit Eio.Process.mgr

method spawn ~sw ?cwd ~(stdin : #Eio.Flow.source) ~(stdout : #Eio.Flow.sink) ~(stderr : #Eio.Flow.sink) prog args =
let chdir = Option.to_list cwd |> List.map (fun (_, s) -> Process.Fork_action.chdir s) in
let stdin = pipe_or_fd stdin in
let stdout = pipe_or_fd stdout in
let stderr = pipe_or_fd stderr in
let stdin_w, stdin_fd = read_of_fd ~sw stdin in
let stdout_r, stdout_fd = write_of_fd ~sw stdout in
let stderr_r, stderr_fd = write_of_fd ~sw stderr in
let actions = Process.Fork_action.[
Eio_unix.Private.Fork_action.inherit_fds [
0, stdin, `Blocking;
1, stdout, `Blocking;
2, stderr, `Blocking;
0, Fd.to_rcfd stdin_fd, `Blocking;
1, Fd.to_rcfd stdout_fd, `Blocking;
2, Fd.to_rcfd stderr_fd, `Blocking;
];
execve prog ~argv:(Array.of_list args) ~env:[||]
] in
let actions = chdir @ actions in
process (Process.spawn ~sw actions)
let proc = process (Process.spawn ~sw actions) in
Option.iter (fun stdin_w ->
Eio.Fiber.fork ~sw (fun () ->
Eio.Flow.copy stdin stdin_w;
Eio.Flow.close stdin_w
)) stdin_w;
Option.iter (fun stdout_r ->
Fd.close stdout_fd;
Eio.Fiber.fork ~sw (fun () -> Eio.Flow.copy stdout_r stdout)) stdout_r;
Option.iter (fun stderr_r ->
Fd.close stderr_fd;
Eio.Fiber.fork ~sw (fun () -> Eio.Flow.copy stderr_r stdout)) stderr_r;
proc
end

type stdenv = <
Expand Down
45 changes: 34 additions & 11 deletions lib_eio_posix/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,48 @@ let process proc : Eio.Process.t = object
method signal i = Process.signal proc i
end

let pipe_or_fd flow =
let read_of_fd ~sw flow =
match Fd.get_fd_opt flow with
| None -> assert false
| Some fd -> Fd.to_rcfd fd
| None ->
let r, w = pipe ~sw in
Some (Flow.of_fd w), r
| Some fd -> None, fd

let write_of_fd ~sw t =
match Fd.get_fd_opt t with
| None ->
let r, w = pipe ~sw in
Some (Flow.of_fd r), w
| Some fd -> None, fd

let v = object
method spawn ~sw ?cwd ~stdin ~stdout ~stderr prog args =
inherit Eio.Process.mgr

method spawn ~sw ?cwd ~(stdin : #Eio.Flow.source) ~(stdout : #Eio.Flow.sink) ~(stderr : #Eio.Flow.sink) prog args =
let chdir = Option.to_list cwd |> List.map (fun (_, s) -> Process.Fork_action.chdir s) in
let stdin = pipe_or_fd stdin in
let stdout = pipe_or_fd stdout in
let stderr = pipe_or_fd stderr in
let stdin_w, stdin_fd = read_of_fd ~sw stdin in
let stdout_r, stdout_fd = write_of_fd ~sw stdout in
let stderr_r, stderr_fd = write_of_fd ~sw stderr in
let actions = Process.Fork_action.[
Eio_unix.Private.Fork_action.inherit_fds [
0, stdin, `Blocking;
1, stdout, `Blocking;
2, stderr, `Blocking;
0, Fd.to_rcfd stdin_fd, `Blocking;
1, Fd.to_rcfd stdout_fd, `Blocking;
2, Fd.to_rcfd stderr_fd, `Blocking;
];
execve prog ~argv:(Array.of_list args) ~env:[||]
] in
let actions = chdir @ actions in
process (Process.spawn ~sw actions)
let proc = process (Process.spawn ~sw actions) in
Option.iter (fun stdin_w ->
Eio.Fiber.fork ~sw (fun () ->
Eio.Flow.copy stdin stdin_w;
Eio.Flow.close stdin_w
)) stdin_w;
Option.iter (fun stdout_r ->
Fd.close stdout_fd;
Eio.Fiber.fork ~sw (fun () -> Eio.Flow.copy stdout_r stdout)) stdout_r;
Option.iter (fun stderr_r ->
Fd.close stderr_fd;
Eio.Fiber.fork ~sw (fun () -> Eio.Flow.copy stderr_r stdout)) stderr_r;
proc
end
19 changes: 18 additions & 1 deletion tests/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ val with_pipe_from_child :
Flow.copy r (Flow.buffer_sink buff);
Buffer.contents buff;;
val pread :
< process_mgr : #Process.mgr; stderr : Flow.sink; stdin : Flow.source; .. > ->
< process_mgr : #Process.mgr; stderr : #Flow.sink; stdin : #Flow.source;
.. > ->
string = <fun>
# run @@ fun _spawn env ->
pread env;;
Expand Down Expand Up @@ -141,3 +142,19 @@ hello world
- : Process.status * Process.status * Process.status =
(Eio.Process.Exited 0, Eio.Process.Exited 0, Eio.Process.Exited 0)
```

Using sources and sinks that are not backed by file descriptors.

```ocaml
# run @@ fun _spawn env ->
let proc = env#process_mgr in
let buf = Buffer.create 16 in
let dst = Flow.buffer_sink buf in
Eio.Switch.run @@ fun sw ->
let p =
Eio.Process.spawn proc ~sw ~stdin:env#stdin ~stdout:dst ~stderr:env#stderr "/usr/bin/echo" [ "echo"; "Hello, world" ]
in
let _ : Process.status = Process.status p in
Buffer.contents buf
- : string = "Hello, world\n"
```

0 comments on commit d553598

Please sign in to comment.