Skip to content

Commit

Permalink
Document and test Lwt_stream on exceptions
Browse files Browse the repository at this point in the history
Closes #155.
  • Loading branch information
aantron committed Dec 5, 2016
1 parent 4fe6fbf commit e62cda8
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 6 deletions.
21 changes: 18 additions & 3 deletions src/core/lwt_stream.mli
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,36 @@ type 'a t
val from : (unit -> 'a option Lwt.t) -> 'a t
(** [from f] creates a stream from the given input function. [f] is
called each time more input is needed, and the stream ends when
[f] returns [None]. *)
[f] returns [None].
If [f], or the thread produced by [f], raises an exception, that exception
is forwarded to the consumer of the stream (for example, a caller of
{!get}). Note that this does not end the stream. A subsequent attempt to
read from the stream will cause another call to [f], which may succeed
with a value. *)

val from_direct : (unit -> 'a option) -> 'a t
(** [from_direct f] does the same as {!from} but with a function
that does not return a thread. It is preferred that this
function be used rather than wrapping [f] into a function which
returns a thread. *)
returns a thread.
The behavior when [f] raises an exception is the same as for {!from},
except that [f] does not produce a thread. *)

exception Closed
(** Exception raised by the push function of a push-stream when
pushing an element after the end of stream ([= None]) has been
pushed. *)

val create : unit -> 'a t * ('a option -> unit)
(** [create ()] returns a new stream and a push function. *)
(** [create ()] returns a new stream and a push function.
To notify the stream's consumer of errors, either use a separate
communication channel, or use a
{{:http://caml.inria.fr/pub/docs/manual-ocaml/libref/Pervasives.html#TYPEresult}
[result]} stream. There is no way to push an exception into a
push-stream. *)

val create_with_reference : unit -> 'a t * ('a option -> unit) * ('b -> unit)
(** [create_with_reference ()] returns a new stream and a push
Expand Down
81 changes: 78 additions & 3 deletions tests/core/test_lwt_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@
open Lwt
open Test

let expect_exit f =
Lwt.catch
(fun () ->
f () >>= fun _ ->
Lwt.return_false)
(function
| Exit -> Lwt.return_true
| e -> Lwt.fail e)

let suite = suite "lwt_stream" [
test "from"
(fun () ->
Expand Down Expand Up @@ -273,9 +282,6 @@ let suite = suite "lwt_stream" [

test "map_exn"
(fun () ->
(* TODO: This will no longer be a shadowing open once Lwt_stream.error
is removed. *)
let open! Lwt_stream in
let l =
[Result.Ok 1;
Result.Error Exit;
Expand Down Expand Up @@ -364,4 +370,73 @@ let suite = suite "lwt_stream" [
(fun () ->
let open! Lwt_stream in
to_list (choose [of_list []]) >|= fun _ -> true);

test "exception passing: basic, from"
(fun () ->
let stream = Lwt_stream.from (fun () -> Lwt.fail Exit) in
expect_exit (fun () -> Lwt_stream.get stream));

test "exception passing: basic, from_direct"
(fun () ->
let stream = Lwt_stream.from_direct (fun () -> raise Exit) in
expect_exit (fun () -> Lwt_stream.get stream));

test "exception passing: to_list"
(fun () ->
let stream = Lwt_stream.from (fun () -> Lwt.fail Exit) in
expect_exit (fun () -> Lwt_stream.to_list stream));

test "exception passing: mapped"
(fun () ->
let stream = Lwt_stream.from (fun () -> Lwt.fail Exit) in
let stream = Lwt_stream.map (fun v -> v) stream in
expect_exit (fun () -> Lwt_stream.get stream));

test "exception passing: resume, not closed, from"
(fun () ->
let to_feed = ref (Lwt.fail Exit) in
let stream = Lwt_stream.from (fun () -> !to_feed) in

expect_exit (fun () -> Lwt_stream.get stream) >>= fun got_exit ->
let closed_after_exit = Lwt_stream.is_closed stream in

to_feed := Lwt.return (Some 0);
Lwt_stream.get stream >>= fun v ->
let got_zero = (v = Some 0) in

to_feed := Lwt.return_none;
Lwt_stream.get stream >>= fun v ->
let got_none = (v = None) in
let closed_at_end = Lwt_stream.is_closed stream in

Lwt.return
(got_exit &&
not closed_after_exit &&
got_zero &&
got_none &&
closed_at_end));

test "exception passing: resume, not closed, from_direct"
(fun () ->
let to_feed = ref (fun () -> raise Exit) in
let stream = Lwt_stream.from_direct (fun () -> !to_feed ()) in

expect_exit (fun () -> Lwt_stream.get stream) >>= fun got_exit ->
let closed_after_exit = Lwt_stream.is_closed stream in

to_feed := (fun () -> Some 0);
Lwt_stream.get stream >>= fun v ->
let got_zero = (v = Some 0) in

to_feed := (fun () -> None);
Lwt_stream.get stream >>= fun v ->
let got_none = (v = None) in
let closed_at_end = Lwt_stream.is_closed stream in

Lwt.return
(got_exit &&
not closed_after_exit &&
got_zero &&
got_none &&
closed_at_end));
]

0 comments on commit e62cda8

Please sign in to comment.