From b740348a2392ad4955f7e2e702dc9eb7c9fb43cd Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Fri, 27 May 2016 16:17:30 -0400 Subject: [PATCH 1/3] change close semantics for fixed-length sources Previously, the of_list, of_array, and of_string constructors each would produce a stream s for which is_closed s = false This is counter-intuitive as all the elements of the stream s have been generated and should be consumable without blocking. This commit changes the behavior of this constructors such that is_closed s = true immediately upon creation. --- src/core/lwt_stream.ml | 47 +++++++++------------------ tests/core/test_lwt_stream.ml | 60 ++++++++++++++--------------------- 2 files changed, 38 insertions(+), 69 deletions(-) diff --git a/src/core/lwt_stream.ml b/src/core/lwt_stream.ml index cbef35c88d..5890bdcd04 100644 --- a/src/core/lwt_stream.ml +++ b/src/core/lwt_stream.ml @@ -170,38 +170,6 @@ let enqueue' e last = let enqueue e s = enqueue' e s.last -let of_list l = - let l = ref l in - from_direct - (fun () -> - match !l with - | [] -> None - | x :: l' -> l := l'; Some x) - -let of_array a = - let len = Array.length a and i = ref 0 in - from_direct - (fun () -> - if !i = len then - None - else begin - let c = Array.unsafe_get a !i in - incr i; - Some c - end) - -let of_string s = - let len = String.length s and i = ref 0 in - from_direct - (fun () -> - if !i = len then - None - else begin - let c = String.unsafe_get s !i in - incr i; - Some c - end) - let create_with_reference () = (* Create the source for notifications of new elements. *) let source, wakener_cell = @@ -243,6 +211,21 @@ let create () = let source, push, _ = create_with_reference () in (source, push) +let of_iter iter i = + let stream, push = create () in + iter (fun x -> push (Some x)) i; + push None; + stream + +let of_list l = + of_iter List.iter l + +let of_array a = + of_iter Array.iter a + +let of_string s = + of_iter String.iter s + (* Add the pending element to the queue and notify the blocked pushed. Precondition: info.pushb_pending = Some _ diff --git a/tests/core/test_lwt_stream.ml b/tests/core/test_lwt_stream.ml index 8e6ac4701f..877a80a3a8 100644 --- a/tests/core/test_lwt_stream.ml +++ b/tests/core/test_lwt_stream.ml @@ -294,62 +294,48 @@ let suite = suite "lwt_stream" [ test "is_closed" (fun () -> - let st = Lwt_stream.of_list [1; 2] in - let b1 = not (Lwt_stream.is_closed st) in - ignore (Lwt_stream.peek st); - let b2 = not (Lwt_stream.is_closed st) in - ignore (Lwt_stream.junk st); - ignore (Lwt_stream.peek st); - let b3 = not (Lwt_stream.is_closed st) in - ignore (Lwt_stream.junk st); - ignore (Lwt_stream.peek st); - let b4 = Lwt_stream.is_closed st in - Lwt.return (b1 && b2 && b3 && b4)); + let b1 = Lwt_stream.(is_closed (of_list [])) in + let b2 = Lwt_stream.(is_closed (of_list [1;2;3])) in + let b3 = Lwt_stream.(is_closed (of_array [||])) in + let b4 = Lwt_stream.(is_closed (of_array [|1;2;3;|])) in + let b5 = Lwt_stream.(is_closed (of_string "")) in + let b6 = Lwt_stream.(is_closed (of_string "123")) in + let b7 = Lwt_stream.(is_closed (from_direct (fun () -> Some 1))) in + let b8 = Lwt_stream.(is_closed (from_direct (fun () -> None))) in + return (b1 && b2 && b3 && b4 && b5 && b6 && not b7 && not b8)); test "closed" (fun () -> - let st = Lwt_stream.of_list [1; 2] in + let st = Lwt_stream.from_direct ( + let value = ref (Some 1) in + fun () -> let r = !value in value := None; r) + in let b = ref false in - let is_closed_in_notification = ref false in - Lwt.async (fun () -> - Lwt_stream.closed st >|= fun () -> - b := true; - is_closed_in_notification := Lwt_stream.is_closed st); + Lwt.async (fun () -> Lwt_stream.closed st >|= fun () -> b := true); ignore (Lwt_stream.peek st); let b1 = !b = false in ignore (Lwt_stream.junk st); ignore (Lwt_stream.peek st); - let b2 = !b = false in - ignore (Lwt_stream.junk st); - ignore (Lwt_stream.peek st); - let b3 = !b = true in - Lwt.return (b1 && b2 && b3 && !is_closed_in_notification)); + let b2 = !b = true in + let b3 = Lwt_stream.is_closed st in + return (b1 && b2 && b3)); test "on_termination" (fun () -> - let st = Lwt_stream.of_list [1; 2] in + let st = Lwt_stream.from_direct ( + let value = ref (Some 1) in + fun () -> let r = !value in value := None; r) + in let b = ref false in Lwt_stream.on_termination st (fun () -> b := true); ignore (Lwt_stream.peek st); let b1 = !b = false in ignore (Lwt_stream.junk st); ignore (Lwt_stream.peek st); - let b2 = !b = false in - ignore (Lwt_stream.junk st); - ignore (Lwt_stream.peek st); - let b3 = !b = true in + let b2 = !b = true in + let b3 = Lwt_stream.is_closed st in Lwt.return (b1 && b2 && b3)); - test "on_termination when closed" - (fun () -> - let st = Lwt_stream.of_list [] in - let b = ref false in - let b1 = not (Lwt_stream.is_closed st) in - ignore (Lwt_stream.junk st); - let b2 = Lwt_stream.is_closed st in - Lwt_stream.on_termination st (fun () -> b := true); - Lwt.return (b1 && b2 && !b)); - test "choose_exhausted" (fun () -> Lwt_stream.(to_list (choose [of_list []])) >|= fun _ -> true); From 2e41fe96d4dc43eb3b48d80bc44b43d1005e0597 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Fri, 17 Jun 2016 12:14:27 -0400 Subject: [PATCH 2/3] add test for on_termination after stream closed Attaching a termination callback to an already closed stream should result in the callback being invoked. --- tests/core/test_lwt_stream.ml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/core/test_lwt_stream.ml b/tests/core/test_lwt_stream.ml index 877a80a3a8..d676d90080 100644 --- a/tests/core/test_lwt_stream.ml +++ b/tests/core/test_lwt_stream.ml @@ -336,6 +336,14 @@ let suite = suite "lwt_stream" [ let b3 = Lwt_stream.is_closed st in Lwt.return (b1 && b2 && b3)); + test "on_termination when closed" + (fun () -> + let st = Lwt_stream.of_list [] in + let b = ref false in + let b1 = Lwt_stream.is_closed st in + Lwt_stream.on_termination st (fun () -> b := true); + Lwt.return (b1 && !b)); + test "choose_exhausted" (fun () -> Lwt_stream.(to_list (choose [of_list []])) >|= fun _ -> true); From dfda6dad54491d5fae61833ca31e5d1971b2bdda Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Fri, 17 Jun 2016 12:48:54 -0400 Subject: [PATCH 3/3] update closed test to eliminate superfluous checks The test now consists of checking that the stream is not closed when initially constructed, and that once all its elements are consumed, the closed thread has returned, and the stream in fact has been closed. --- tests/core/test_lwt_stream.ml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/core/test_lwt_stream.ml b/tests/core/test_lwt_stream.ml index d676d90080..8e9e615543 100644 --- a/tests/core/test_lwt_stream.ml +++ b/tests/core/test_lwt_stream.ml @@ -311,14 +311,14 @@ let suite = suite "lwt_stream" [ fun () -> let r = !value in value := None; r) in let b = ref false in - Lwt.async (fun () -> Lwt_stream.closed st >|= fun () -> b := true); + Lwt.async (fun () -> + Lwt_stream.closed st >|= fun () -> b := Lwt_stream.is_closed st); ignore (Lwt_stream.peek st); let b1 = !b = false in ignore (Lwt_stream.junk st); ignore (Lwt_stream.peek st); let b2 = !b = true in - let b3 = Lwt_stream.is_closed st in - return (b1 && b2 && b3)); + return (b1 && b2)); test "on_termination" (fun () ->