Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change close semantics for fixed-length sources #239

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 15 additions & 32 deletions src/core/lwt_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -170,38 +170,6 @@ let enqueue' e last =
let enqueue e s =
enqueue' e s.last

let of_list l =
let l = ref l in
from_direct
(fun () ->
match !l with
| [] -> None
| x :: l' -> l := l'; Some x)

let of_array a =
let len = Array.length a and i = ref 0 in
from_direct
(fun () ->
if !i = len then
None
else begin
let c = Array.unsafe_get a !i in
incr i;
Some c
end)

let of_string s =
let len = String.length s and i = ref 0 in
from_direct
(fun () ->
if !i = len then
None
else begin
let c = String.unsafe_get s !i in
incr i;
Some c
end)

let create_with_reference () =
(* Create the source for notifications of new elements. *)
let source, wakener_cell =
Expand Down Expand Up @@ -243,6 +211,21 @@ let create () =
let source, push, _ = create_with_reference () in
(source, push)

let of_iter iter i =
let stream, push = create () in
iter (fun x -> push (Some x)) i;
push None;
stream

let of_list l =
of_iter List.iter l

let of_array a =
of_iter Array.iter a

let of_string s =
of_iter String.iter s

(* Add the pending element to the queue and notify the blocked pushed.

Precondition: info.pushb_pending = Some _
Expand Down
60 changes: 23 additions & 37 deletions tests/core/test_lwt_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -294,62 +294,48 @@ let suite = suite "lwt_stream" [

test "is_closed"
(fun () ->
let st = Lwt_stream.of_list [1; 2] in
let b1 = not (Lwt_stream.is_closed st) in
ignore (Lwt_stream.peek st);
let b2 = not (Lwt_stream.is_closed st) in
ignore (Lwt_stream.junk st);
ignore (Lwt_stream.peek st);
let b3 = not (Lwt_stream.is_closed st) in
ignore (Lwt_stream.junk st);
ignore (Lwt_stream.peek st);
let b4 = Lwt_stream.is_closed st in
Lwt.return (b1 && b2 && b3 && b4));
let b1 = Lwt_stream.(is_closed (of_list [])) in
let b2 = Lwt_stream.(is_closed (of_list [1;2;3])) in
let b3 = Lwt_stream.(is_closed (of_array [||])) in
let b4 = Lwt_stream.(is_closed (of_array [|1;2;3;|])) in
let b5 = Lwt_stream.(is_closed (of_string "")) in
let b6 = Lwt_stream.(is_closed (of_string "123")) in
let b7 = Lwt_stream.(is_closed (from_direct (fun () -> Some 1))) in
let b8 = Lwt_stream.(is_closed (from_direct (fun () -> None))) in
return (b1 && b2 && b3 && b4 && b5 && b6 && not b7 && not b8));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing this no longer tests, is that streams for which is_closed is false can be turned into streams for which it is true, by reading them.


test "closed"
(fun () ->
let st = Lwt_stream.of_list [1; 2] in
let st = Lwt_stream.from_direct (
let value = ref (Some 1) in
fun () -> let r = !value in value := None; r)
in
let b = ref false in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be removed. I think is_closed should be true inside threads waiting on closed, and it is conceivable that some future implementation might get that wrong.

let is_closed_in_notification = ref false in
Lwt.async (fun () ->
Lwt_stream.closed st >|= fun () ->
b := true;
is_closed_in_notification := Lwt_stream.is_closed st);
Lwt.async (fun () -> Lwt_stream.closed st >|= fun () -> b := true);
ignore (Lwt_stream.peek st);
let b1 = !b = false in
ignore (Lwt_stream.junk st);
ignore (Lwt_stream.peek st);
let b2 = !b = false in
ignore (Lwt_stream.junk st);
ignore (Lwt_stream.peek st);
let b3 = !b = true in
Lwt.return (b1 && b2 && b3 && !is_closed_in_notification));
let b2 = !b = true in
let b3 = Lwt_stream.is_closed st in
return (b1 && b2 && b3));

test "on_termination"
(fun () ->
let st = Lwt_stream.of_list [1; 2] in
let st = Lwt_stream.from_direct (
let value = ref (Some 1) in
fun () -> let r = !value in value := None; r)
in
let b = ref false in
Lwt_stream.on_termination st (fun () -> b := true);
ignore (Lwt_stream.peek st);
let b1 = !b = false in
ignore (Lwt_stream.junk st);
ignore (Lwt_stream.peek st);
let b2 = !b = false in
ignore (Lwt_stream.junk st);
ignore (Lwt_stream.peek st);
let b3 = !b = true in
let b2 = !b = true in
let b3 = Lwt_stream.is_closed st in
Lwt.return (b1 && b2 && b3));

test "on_termination when closed"
(fun () ->
let st = Lwt_stream.of_list [] in
let b = ref false in
let b1 = not (Lwt_stream.is_closed st) in
ignore (Lwt_stream.junk st);
let b2 = Lwt_stream.is_closed st in
Lwt_stream.on_termination st (fun () -> b := true);
Lwt.return (b1 && b2 && !b));

test "choose_exhausted"
(fun () ->
Lwt_stream.(to_list (choose [of_list []])) >|= fun _ -> true);
Expand Down