diff --git a/folly/experimental/pushmi/benchmarks/PushmiBenchmarks.cpp b/folly/experimental/pushmi/benchmarks/PushmiBenchmarks.cpp index db0a5b9cb5b..c6217f5ec91 100644 --- a/folly/experimental/pushmi/benchmarks/PushmiBenchmarks.cpp +++ b/folly/experimental/pushmi/benchmarks/PushmiBenchmarks.cpp @@ -570,7 +570,7 @@ BENCHMARK(trampoline_flow_many_sender_1000, n) { BENCHMARK_SUSPEND { std::iota(values.begin(), values.end(), 1); } - auto f = op::flow_from(values, tr) | op::tap([&](int){ + auto f = op::flow_from(values, [&]{return tr;}) | op::tap([&](int){ --counter; }); FOR_EACH_RANGE (i, 0, n) { diff --git a/folly/experimental/pushmi/detail/if_constexpr.h b/folly/experimental/pushmi/detail/if_constexpr.h index 5773ec57a9b..113eddf8376 100644 --- a/folly/experimental/pushmi/detail/if_constexpr.h +++ b/folly/experimental/pushmi/detail/if_constexpr.h @@ -68,6 +68,7 @@ #define PUSHMI_PP_IGNORE_SHADOW_BEGIN \ _Pragma("GCC diagnostic push") \ _Pragma("GCC diagnostic ignored \"-Wshadow\"") \ + _Pragma("GCC diagnostic ignored \"-Wshadow-local\"") \ /**/ #define PUSHMI_PP_IGNORE_SHADOW_END \ _Pragma("GCC diagnostic pop") diff --git a/folly/experimental/pushmi/executor/functional.h b/folly/experimental/pushmi/executor/functional.h index 1032c675e7a..e6f08bce219 100644 --- a/folly/experimental/pushmi/executor/functional.h +++ b/folly/experimental/pushmi/executor/functional.h @@ -32,6 +32,15 @@ struct priorityZeroF { auto operator()(){ return 0; } }; +PUSHMI_TEMPLATE(class Exec) + (requires Strand) +struct strandFactory { + Exec ex_; + strandFactory() = default; + explicit strandFactory(Exec ex) : ex_(std::move(ex)) {} + Exec operator()(){ return ex_; } +}; + struct passDNF { PUSHMI_TEMPLATE(class Data) (requires TimeExecutor) diff --git a/folly/experimental/pushmi/executor/trampoline.h b/folly/experimental/pushmi/executor/trampoline.h index 966f9f35d0e..c00f26ed317 100644 --- a/folly/experimental/pushmi/executor/trampoline.h +++ b/folly/experimental/pushmi/executor/trampoline.h @@ -275,6 +275,12 @@ inline detail::nester nested_trampoline() { return {}; } +PUSHMI_INLINE_VAR constexpr auto trampolines = + strandFactory>{}; + +PUSHMI_INLINE_VAR constexpr auto nested_trampolines = + strandFactory>{}; + namespace detail { PUSHMI_TEMPLATE(class E) diff --git a/folly/experimental/pushmi/o/from.h b/folly/experimental/pushmi/o/from.h index deace800670..8605f25602f 100644 --- a/folly/experimental/pushmi/o/from.h +++ b/folly/experimental/pushmi/o/from.h @@ -117,8 +117,8 @@ template struct flow_from_up { using receiver_category = receiver_tag; - explicit flow_from_up(std::shared_ptr p_) : p(std::move(p_)) {} - std::shared_ptr p; + explicit flow_from_up(std::shared_ptr p) : p_(std::move(p)) {} + std::shared_ptr p_; void value(std::ptrdiff_t requested) { if (requested < 1) { @@ -126,8 +126,8 @@ struct flow_from_up { } // submit work to exec ::folly::pushmi::submit( - ::folly::pushmi::schedule(p->exec), - make_receiver([p = p, requested](auto) { + ::folly::pushmi::schedule(p_->exec), + make_receiver([p = p_, requested](auto) { auto remaining = requested; // this loop is structured to work when there is // re-entrancy out.value in the loop may call up.value. @@ -146,17 +146,17 @@ struct flow_from_up { template void error(E) noexcept { - p->stop.store(true); + p_->stop.store(true); ::folly::pushmi::submit( - ::folly::pushmi::schedule(p->exec), - flow_from_done{p}); + ::folly::pushmi::schedule(p_->exec), + flow_from_done{p_}); } void done() { - p->stop.store(true); + p_->stop.store(true); ::folly::pushmi::submit( - ::folly::pushmi::schedule(p->exec), - flow_from_done{p}); + ::folly::pushmi::schedule(p_->exec), + flow_from_done{p_}); } }; @@ -184,8 +184,8 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn { } }; - template - struct task + template + struct sender_impl : flow_sender_tag::with_values::value_type> ::no_error , pipeorigin { @@ -193,23 +193,21 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn { I begin_; S end_; - Exec exec_; - - task(I begin, S end, Exec exec) - : begin_(begin), end_(end), exec_(exec) { - } - + EF ef_; + sender_impl(I begin, S end, EF ef) : begin_(begin), end_(end), ef_(ef) {} PUSHMI_TEMPLATE(class Out) (requires ReceiveValue< Out&, typename std::iterator_traits::value_type>) // void submit(Out out) { + auto exec = ::folly::pushmi::make_strand(ef_); + using Exec = decltype(exec); using Producer = flow_from_producer; auto p = std::make_shared( - begin_, end_, std::move(out), exec_, false); + begin_, end_, std::move(out), std::move(exec), false); ::folly::pushmi::submit( - ::folly::pushmi::schedule(exec_), receiver_impl{p}); + ::folly::pushmi::schedule(p->exec), receiver_impl{p}); } }; @@ -219,27 +217,27 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn { typename std::iterator_traits::iterator_category, std::forward_iterator_tag>) // auto operator()(I begin, S end) const { - return (*this)(begin, end, trampoline()); + return (*this)(begin, end, trampolines); } PUSHMI_TEMPLATE(class R) (requires Range) // auto operator()(R&& range) const { - return (*this)(std::begin(range), std::end(range), trampoline()); + return (*this)(std::begin(range), std::end(range), trampolines); } - PUSHMI_TEMPLATE(class I, class S, class Exec) + PUSHMI_TEMPLATE(class I, class S, class EF) (requires DerivedFrom< typename std::iterator_traits::iterator_category, - std::forward_iterator_tag>&& Executor) // - auto operator()(I begin, S end, Exec exec) const { - return task{begin, end, exec}; + std::forward_iterator_tag>&& StrandFactory) // + auto operator()(I begin, S end, EF ef) const { + return sender_impl{begin, end, ef}; } - PUSHMI_TEMPLATE(class R, class Exec) - (requires Range&& Executor) // - auto operator()(R&& range, Exec exec) const { - return (*this)(std::begin(range), std::end(range), exec); + PUSHMI_TEMPLATE(class R, class EF) + (requires Range&& StrandFactory) // + auto operator()(R&& range, EF ef) const { + return (*this)(std::begin(range), std::end(range), ef); } } flow_from{}; diff --git a/folly/experimental/pushmi/o/take_until.h b/folly/experimental/pushmi/o/take_until.h new file mode 100644 index 00000000000..04f25d2ee0d --- /dev/null +++ b/folly/experimental/pushmi/o/take_until.h @@ -0,0 +1,391 @@ +/* + * Copyright 2018-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +namespace folly { +namespace pushmi { + +namespace detail { + +using take_until_request_channel_t = + any_receiver; +template +struct take_until_fn_base { + Exec exec_; + std::atomic done_; + take_until_request_channel_t up_trigger_; + take_until_request_channel_t up_source_; + explicit take_until_fn_base(Exec ex) : exec_(std::move(ex)), done_(false) {} +}; +template +struct take_until_fn_shared : public take_until_fn_base { + take_until_fn_shared(Out out, Exec exec) + : take_until_fn_base(std::move(exec)), out_(std::move(out)) {} + Out out_; +}; + +template +auto make_take_until_fn_shared(Out out, Exec ex) + -> std::shared_ptr> { + return std::make_shared>( + std::move(out), std::move(ex)); +} + +/// The implementation of the take_until algorithms +/// +/// This algorithm will coordinate two FlowSenders source and trigger. +/// When `take_until.submit()` is called, take until will `source.submit()` and +/// `trigger.submit()`. +/// +/// - signals from the source are passed through until a signal from the trigger +/// arrives. +/// - Any signal on the trigger will cancel the source and complete the +/// take_until. +/// - The done and error signals from the source will cancel the trigger and +/// pass the error or done signal onward. +/// +/// This is often used to insert a cancellation point into an expression. The +/// trigger parameter can be a flow_sender that is manually fired (such as a +/// subject or stop_token) or any other valid expression that results in a +/// flow_sender. +/// +struct take_until_fn { + private: + /// The source_receiver is submitted to the source + template + struct source_receiver { + using shared_t = std::shared_ptr>; + using properties = properties_t; + using receiver_category = receiver_category_t; + + shared_t s_; + + template + void value(VN&&... vn) { + set_value(s_->out_, (VN &&) vn...); + } + template + void error(E&& e) noexcept { + set_error(s_->out_, (E &&) e); + } + void done() { + set_done(s_->out_); + } + template + void starting(Up&& up) noexcept { + set_starting(s_->out_, (Up &&) up); + } + }; + /// The trigger_receiver is submitted to the trigger + template + struct trigger_receiver : flow_receiver<> { + using shared_t = std::shared_ptr>; + shared_t s_; + explicit trigger_receiver(shared_t s) : s_(s) {} + }; + /// The request_receiver is sent to the consumer when starting + template + struct request_receiver : receiver<> { + using shared_t = std::shared_ptr>; + shared_t s_; + explicit request_receiver(shared_t s) : s_(s) {} + }; + /// passes source values to consumer + template + struct on_value_impl { + template + struct impl { + V v_; + std::shared_ptr out_; + void operator()(any) { + set_value(out_, std::move(v_)); + } + }; + template + void operator()(Data& data, V&& v) const { + if (data.s_->done_.load()) { + return; + } + ::folly::pushmi::submit( + ::folly::pushmi::schedule(data.s_->exec_), + ::folly::pushmi::make_receiver(impl>{ + (V &&) v, std::shared_ptr{data.s_, &(data.s_->out_)}})); + } + }; + /// passes error to consumer and cancels trigger + template + struct on_error_impl { + template + struct impl { + E e_; + Shared s_; + void operator()(any) { + // cancel source + set_done(s_->up_source_); + // cancel trigger + set_done(s_->up_trigger_); + + // cleanup circular references + s_->up_source_ = take_until_request_channel_t{}; + s_->up_trigger_ = take_until_request_channel_t{}; + + // complete consumer + set_error(s_->out_, std::move(e_)); + } + }; + template + void operator()(Data& data, E e) const noexcept { + if (data.s_->done_.exchange(true)) { + return; + } + + ::folly::pushmi::submit( + ::folly::pushmi::schedule(data.s_->exec_), + ::folly::pushmi::make_receiver( + impl{std::move(e), data.s_})); + } + }; + /// passes done to consumer and cancels trigger + template + struct on_done_impl { + template + struct impl { + Shared s_; + void operator()(any) { + // cancel source + set_done(s_->up_source_); + // cancel trigger + set_done(s_->up_trigger_); + + // cleanup circular references + s_->up_source_ = take_until_request_channel_t{}; + s_->up_trigger_ = take_until_request_channel_t{}; + + // complete consumer + set_done(s_->out_); + } + }; + template + void operator()(Data& data) const { + if (data.s_->done_.exchange(true)) { + return; + } + + ::folly::pushmi::submit( + ::folly::pushmi::schedule(data.s_->exec_), + ::folly::pushmi::make_receiver(impl{data.s_})); + } + }; + /// passes flow requests to source + struct on_requested_impl { + struct impl { + std::ptrdiff_t requested_; + std::shared_ptr up_source_; + void operator()(any) { + // pass requested to source + set_value(up_source_, requested_); + } + }; + template + void operator()(Data& data, V&& v) const { + if (data.s_->done_.load()) { + return; + } + ::folly::pushmi::submit( + ::folly::pushmi::schedule(data.s_->exec_), + ::folly::pushmi::make_receiver( + impl{(V &&) v, + std::shared_ptr{ + data.s_, &(data.s_->up_source_)}})); + } + }; + /// reused for all the trigger signals. cancels the source and sends done to + /// the consumer + template + struct on_trigger_impl { + template + struct impl { + Shared s_; + void operator()(any) { + // cancel source + set_done(s_->up_source_); + // cancel trigger + set_done(s_->up_trigger_); + + // cleanup circular references + s_->up_source_ = take_until_request_channel_t{}; + s_->up_trigger_ = take_until_request_channel_t{}; + + // complete consumer + set_done(s_->out_); + } + }; + + template + void operator()(Data& data, AN&&...) const noexcept { + if (data.s_->done_.exchange(true)) { + return; + } + + // tell consumer that the end is nigh + ::folly::pushmi::submit( + ::folly::pushmi::schedule(data.s_->exec_), + ::folly::pushmi::make_receiver(impl{data.s_})); + } + }; + /// both source and trigger are started, ask the trigger for a value and + /// give the consumer a back channel for flow control. + template + struct on_starting_trigger_impl { + template + struct impl { + Shared s_; + void operator()(any) { + + // set back channel for consumer + set_starting( + s_->out_, + ::folly::pushmi::make_receiver( + request_receiverexec_)>{s_}, + on_requested_impl{}, + on_trigger_impl{}, + on_trigger_impl{})); + + if (!s_->done_.load()) { + // ask for trigger + set_value(s_->up_trigger_, 1); + } + } + }; + template + void operator()(Data& data, Up up) const { + data.s_->up_trigger_ = take_until_request_channel_t{std::move(up)}; + + ::folly::pushmi::submit( + ::folly::pushmi::schedule(data.s_->exec_), + ::folly::pushmi::make_receiver(impl{data.s_})); + } + }; + /// source has been submitted now submit trigger + template + struct on_starting_source_impl { + Trigger t_; + + template + void operator()(Data& data, Up up) { + data.s_->up_source_ = take_until_request_channel_t{std::move(up)}; + + // start trigger + ::folly::pushmi::submit( + t_, + ::folly::pushmi::detail::receiver_from_fn()( + trigger_receiverexec_)>{data.s_}, + on_trigger_impl{}, + on_trigger_impl{}, + on_trigger_impl{}, + on_starting_trigger_impl{})); + } + }; + /// submit creates a strand to use for signal coordination and submits the + /// source + template + struct submit_impl { + StrandF sf_; + Trigger t_; + + PUSHMI_TEMPLATE(class SIn, class Out) + (requires Receiver) // + void + operator()(SIn&& in, Out out) & { + auto exec = ::folly::pushmi::make_strand(sf_); + + // start source + ::folly::pushmi::submit( + (In &&) in, + ::folly::pushmi::detail::receiver_from_fn()( + source_receiver>{ + make_take_until_fn_shared(std::move(out), std::move(exec))}, + on_value_impl{}, + on_error_impl{}, + on_done_impl{}, + on_starting_source_impl{t_})); + } + PUSHMI_TEMPLATE(class SIn, class Out) + (requires Receiver) // + void + operator()(SIn&& in, Out out) && { + auto exec = ::folly::pushmi::make_strand(sf_); + + // start source + ::folly::pushmi::submit( + (In &&) in, + ::folly::pushmi::detail::receiver_from_fn()( + source_receiver>{ + make_take_until_fn_shared(std::move(out), std::move(exec))}, + on_value_impl{}, + on_error_impl{}, + on_done_impl{}, + on_starting_source_impl{std::move(t_)})); + } + }; + + /// adapt binds the source into a new sender + template + struct adapt_impl { + StrandF sf_; + Trigger t_; + + PUSHMI_TEMPLATE(class In) + (requires FlowSender) // + auto + operator()(In&& in) & { + // copy to allow multiple calls to connect to multiple 'in' + return ::folly::pushmi::detail::sender_from( + (In &&) in, submit_impl{sf_, t_}); + } + PUSHMI_TEMPLATE(class In) + (requires FlowSender) // + auto + operator()(In&& in) && { + return ::folly::pushmi::detail::sender_from( + (In &&) in, submit_impl{std::move(sf_), std::move(t_)}); + } + }; + + public: + /// constructs the algorithm by storing the strand factory and Trigger sender + PUSHMI_TEMPLATE(class StrandF, class Trigger) + (requires StrandFactory&& FlowSender) // + auto + operator()(StrandF sf, Trigger t) const { + return adapt_impl{std::move(sf), std::move(t)}; + } +}; + +} // namespace detail + +namespace operators { +PUSHMI_INLINE_VAR constexpr detail::take_until_fn take_until{}; +} // namespace operators + +} // namespace pushmi +} // namespace folly diff --git a/folly/experimental/pushmi/test/o/TakeUntilTest.cpp b/folly/experimental/pushmi/test/o/TakeUntilTest.cpp new file mode 100644 index 00000000000..a4c06f6c8cb --- /dev/null +++ b/folly/experimental/pushmi/test/o/TakeUntilTest.cpp @@ -0,0 +1,299 @@ +/* + * Copyright 2018-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +using namespace std::literals; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace folly::pushmi::aliases; + +#include +#include + +using namespace testing; + +namespace detail { + +struct receiver_counters { + std::atomic values_{0}; + std::atomic errors_{0}; + std::atomic dones_{0}; + std::atomic startings_{0}; + std::atomic finallys_{0}; +}; + +template > +struct ReceiverSignals_ : Base { + ~ReceiverSignals_() {} + ReceiverSignals_(const ReceiverSignals_&) = default; + ReceiverSignals_& operator=(const ReceiverSignals_&) = default; + ReceiverSignals_(ReceiverSignals_&&) = default; + ReceiverSignals_& operator=(ReceiverSignals_&&) = default; + explicit ReceiverSignals_(std::string id) : id_(std::move(id)), counters_(std::make_shared()) {} + std::string id_; + std::shared_ptr counters_; + + void value(mi::detail::any) { + if (mi::FlowReceiver != false) { + EXPECT_THAT(counters_->startings_.load(), Eq(1)) + << "[" << id_ + << "]::value() expected the starting signal to be recorded before the value signal"; + } + EXPECT_THAT(counters_->finallys_.load(), Eq(0)) + << "[" << id_ + << "]::value() expected the value signal to be recorded before the done/error signal"; + ++counters_->values_; + } + void error(mi::detail::any) noexcept { + if (mi::FlowReceiver != false) { + EXPECT_THAT(counters_->startings_.load(), Eq(1)) + << "[" << id_ + << "]::error() expected the starting signal to be recorded before the error signal"; + } + EXPECT_THAT(counters_->finallys_.load(), Eq(0)) + << "[" << id_ + << "]::error() expected only one of done/error signals to be recorded"; + ++counters_->errors_; + ++counters_->finallys_; + } + void done() { + if (mi::FlowReceiver != false) { + EXPECT_THAT(counters_->startings_.load(), Eq(1)) + << "[" << id_ + << "]::done() expected the starting signal to be recorded before the done signal"; + } + EXPECT_THAT(counters_->finallys_.load(), Eq(0)) + << "[" << id_ + << "]::done() expected only one of done/error signals to be recorded"; + ++counters_->dones_; + ++counters_->finallys_; + } + void starting(mi::detail::any) { + EXPECT_THAT(counters_->startings_.load(), Eq(0)) + << "[" << id_ + << "]::starting() expected the starting signal to be recorded once"; + ++counters_->startings_; + } + + void wait() { + while (counters_->finallys_.load() == 0) { + } + } + + void verifyValues(int count) { + EXPECT_THAT(counters_->values_.load(), Eq(count)) + << "[" << id_ + << "]::verifyValues() expected the value signal to be recorded [" + << count << "] times."; + } + void verifyErrors() { + EXPECT_THAT(counters_->errors_.load(), Eq(1)) + << "[" << id_ + << "]::verifyErrors() expected the error signal to be recorded once"; + EXPECT_THAT(counters_->dones_.load(), Eq(0)) + << "[" << id_ + << "]::verifyErrors() expected the dones signal not to be recorded"; + EXPECT_THAT(counters_->finallys_.load(), Eq(1)) + << "[" << id_ + << "]::verifyErrors() expected the finally signal to be recorded once"; + } + void verifyDones() { + EXPECT_THAT(counters_->dones_.load(), Eq(1)) + << "[" << id_ + << "]::verifyDones() expected the dones signal to be recorded once"; + EXPECT_THAT(counters_->errors_.load(), Eq(0)) + << "[" << id_ + << "]::verifyDones() expected the errors signal not to be recorded"; + EXPECT_THAT(counters_->finallys_.load(), Eq(1)) + << "[" << id_ + << "]::verifyDones() expected the finally signal to be recorded once"; + } + void verifyFinal() { + if (mi::FlowReceiver == false) { + EXPECT_THAT(counters_->startings_.load(), Eq(0)) + << "[" << id_ + << "]::verifyFinal() expected the starting signal not to be recorded"; + } else { + EXPECT_THAT(counters_->startings_.load(), Eq(1)) + << "[" << id_ + << "]::verifyFinal() expected the starting signal to be recorded once"; + } + EXPECT_THAT(counters_->finallys_.load(), Eq(1)) + << "[" << id_ + << "]::verifyFinal() expected the finally signal to be recorded once"; + } +}; + +} // namespace detail + +using ReceiverSignals = + detail::ReceiverSignals_>; +using FlowReceiverSignals = + detail::ReceiverSignals_>; + +TEST(EmptySourceEmptyTriggerTrampoline, TakeUntil) { + auto e = op::flow_from(std::array{}, mi::trampolines); + + FlowReceiverSignals source{"source"}; + FlowReceiverSignals trigger{"trigger"}; + ReceiverSignals each{"each"}; + + e | op::tap(source) | + op::take_until(mi::trampolines, e | op::tap(trigger)) | + op::for_each(each); + + source.wait(); + source.verifyValues(0); + source.verifyDones(); + source.verifyFinal(); + + trigger.wait(); + trigger.verifyValues(0); + trigger.verifyDones(); + trigger.verifyFinal(); + + each.wait(); + each.verifyValues(0); + each.verifyDones(); + each.verifyFinal(); +} + +TEST(EmptySourceEmptyTrigger, TakeUntil) { + auto nt = mi::new_thread(); + auto e = op::flow_from(std::array{}, mi::strands(nt)); + + FlowReceiverSignals source{"source"}; + FlowReceiverSignals trigger{"trigger"}; + ReceiverSignals each{"each"}; + + e | op::tap(source) | + op::take_until(mi::strands(nt), e | op::tap(trigger)) | + op::for_each(each); + + source.wait(); + source.verifyValues(0); + source.verifyDones(); + source.verifyFinal(); + + trigger.wait(); + trigger.verifyValues(0); + trigger.verifyDones(); + trigger.verifyFinal(); + + each.wait(); + each.verifyValues(0); + each.verifyDones(); + each.verifyFinal(); +} + +TEST(EmptySourceValueTrigger, TakeUntil) { + auto nt = mi::new_thread(); + auto e = op::flow_from(std::array{}, mi::strands(nt)); + auto v = op::flow_from(std::array{{42}}, mi::strands(nt)); + + FlowReceiverSignals source{"source"}; + FlowReceiverSignals trigger{"trigger"}; + ReceiverSignals each{"each"}; + + e | op::tap(source) | + op::take_until(mi::strands(nt), v | op::tap(trigger)) | + op::for_each(each); + + source.wait(); + source.verifyValues(0); + source.verifyDones(); + source.verifyFinal(); + + trigger.wait(); + trigger.verifyValues(1); + trigger.verifyDones(); + trigger.verifyFinal(); + + each.wait(); + each.verifyValues(0); + each.verifyDones(); + each.verifyFinal(); +} + +TEST(ValueSourceEmptyTrigger, TakeUntil) { + auto nt = mi::new_thread(); + auto e = op::flow_from(std::array{}, mi::strands(nt)); + auto v = op::flow_from(std::array{{42}}, mi::strands(nt)); + + FlowReceiverSignals source{"source"}; + FlowReceiverSignals trigger{"trigger"}; + ReceiverSignals each{"each"}; + + v | op::tap(source) | + op::take_until(mi::strands(nt), e | op::tap(trigger)) | + op::for_each(each); + + source.wait(); + source.verifyDones(); + source.verifyFinal(); + + trigger.wait(); + trigger.verifyValues(0); + trigger.verifyDones(); + trigger.verifyFinal(); + + each.wait(); + each.verifyValues(0); + each.verifyDones(); + each.verifyFinal(); +} + +TEST(ValueSourceValueTrigger, TakeUntil) { + auto nt = mi::new_thread(); + auto v = op::flow_from(std::array{{42}}, mi::strands(nt)); + + FlowReceiverSignals source{"source"}; + FlowReceiverSignals trigger{"trigger"}; + ReceiverSignals each{"each"}; + + v | op::tap(source) | + op::take_until(mi::strands(nt), v | op::tap(trigger)) | + op::for_each(each); + + source.wait(); + source.verifyDones(); + source.verifyFinal(); + + trigger.wait(); + trigger.verifyValues(1); + trigger.verifyDones(); + trigger.verifyFinal(); + + each.wait(); + each.verifyValues(0); + each.verifyDones(); + each.verifyFinal(); +}