Skip to content

Commit

Permalink
avoid using async/ignore_result in Lwt_process
Browse files Browse the repository at this point in the history
Ignore-this: 619716ea4da7487425ad5a7394b31176

darcs-hash:20120719111344-c41ad-aace2993ecd1de708f02cfc4e49569ea35e2eaea
  • Loading branch information
jeremiedimino committed Jul 19, 2012
1 parent 29e8be9 commit f694d44
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/unix/lwt_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ let program_name = Filename.basename Sys.argv.(0)

(* Errors happening in this module are always logged to [stderr]: *)
let log_intern fmt =
Printf.ksprintf (fun msg -> Lwt.async (fun () -> Lwt_io.eprintlf "%s: Lwt_log: %s" program_name msg)) fmt
Printf.eprintf ("%s: Lwt_log: " ^^ fmt ^^ "\n%!") program_name

(* +-----------------------------------------------------------------+
| Log levels |
Expand Down
115 changes: 86 additions & 29 deletions src/unix/lwt_process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,24 @@ object(self)
| None ->
()
| Some dt ->
Lwt.async
(fun () ->
match_lwt Lwt.choose [Lwt_unix.sleep dt >> return false; wait >> return true] with
ignore (
(* Ignore errors since they can be obtained by
self#close. *)
Lwt.try_bind
(fun () ->
Lwt.choose [Lwt_unix.sleep dt >> return false;
wait >> return true])
(function
| true ->
return ()
return_unit
| false ->
(try self#terminate with _ -> ());
join (List.map Lwt_io.close channels))
self#terminate;
Lazy.force close >> Lwt.return_unit)
(fun exn ->
(* The exception is dropped because it can be
obtained with self#close. *)
Lwt.return_unit)
)
end

class process_none ?timeout ?env ?stdin ?stdout ?stderr cmd =
Expand Down Expand Up @@ -399,6 +409,13 @@ let recv_line pr =
finally
Lwt_io.close ic

let send f pr data =
let oc = pr#stdin in
try_lwt
f oc data
finally
Lwt_io.close oc

(* Receiving *)

let pread ?timeout ?env ?stdin ?stderr cmd =
Expand All @@ -415,13 +432,6 @@ let pread_lines ?timeout ?env ?stdin ?stderr cmd =

(* Sending *)

let send f pr data =
let oc = pr#stdin in
try_lwt
f oc data
finally
Lwt_io.close oc

let pwrite ?timeout ?env ?stdout ?stderr cmd text =
send Lwt_io.write (open_process_out ?timeout ?env ?stdout ?stderr cmd) text

Expand All @@ -436,32 +446,79 @@ let pwrite_lines ?timeout ?env ?stdout ?stderr cmd lines =

(* Mapping *)

(* Dump something to a command: *)
let dump f pr data =
let oc = pr#stdin in
Lwt.async
type 'a map_state =
| Init
| Save of 'a option Lwt.t
| Done

(* Monitor the thread [sender] in the stream [st] so write errors are
reported. *)
let monitor sender st =
let sender = sender >|= fun () -> None in
let state = ref Init in
Lwt_stream.from
(fun () ->
try_lwt
f oc data
finally
Lwt_io.close oc)
match !state with
| Init ->
let getter = Lwt.apply Lwt_stream.get st in
let result _ =
match Lwt.state sender with
| Lwt.Sleep ->
(* The sender is still sleeping, behave as the
getter. *)
getter
| Lwt.Return _ ->
(* The sender terminated successfully, we are
done monitoring it. *)
state := Done;
getter
| Lwt.Fail _ ->
(* The sender failed, behave as the sender for
this element and save current getter. *)
state := Save getter;
sender
in
Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
| Save t ->
state := Done;
t
| Done ->
Lwt_stream.get st)

let pmap ?timeout ?env ?stderr cmd text =
let pr = open_process ?timeout ?env ?stderr cmd in
dump Lwt_io.write pr text;
recv pr
(* Start the sender and getter at the same time. *)
let sender = send Lwt_io.write pr text in
let getter = recv pr in
try_lwt
(* Wait for both to terminate, returning the result of the
getter. *)
sender >> getter
with Lwt.Canceled as exn ->
(* Cancel the getter if the sender was canceled. *)
Lwt.cancel getter;
raise_lwt exn

let pmap_chars ?timeout ?env ?stderr cmd chars =
let pr = open_process ?timeout ?env ?stderr cmd in
dump Lwt_io.write_chars pr chars;
recv_chars pr
let sender = send Lwt_io.write_chars pr chars in
monitor sender (recv_chars pr)

let pmap_line ?timeout ?env ?stderr cmd line =
let pr = open_process ?timeout ?env ?stderr cmd in
dump Lwt_io.write_line pr line;
recv_line pr
(* Start the sender and getter at the same time. *)
let sender = send Lwt_io.write_line pr line in
let getter = recv_line pr in
try_lwt
(* Wait for both to terminate, returning the result of the
getter. *)
sender >> getter
with Lwt.Canceled as exn ->
(* Cancel the getter if the sender was canceled. *)
Lwt.cancel getter;
raise_lwt exn

let pmap_lines ?timeout ?env ?stderr cmd lines =
let pr = open_process ?timeout ?env ?stderr cmd in
dump Lwt_io.write_lines pr lines;
recv_lines pr
let sender = send Lwt_io.write_lines pr lines in
monitor sender (recv_lines pr)

0 comments on commit f694d44

Please sign in to comment.