Skip to content

Commit

Permalink
add take_until
Browse files Browse the repository at this point in the history
Summary: add take_until algorithm

Reviewed By: ericniebler

Differential Revision: D13697233

fbshipit-source-id: b2e060bc88cb652dc196e4915961d811a97b414d
  • Loading branch information
Kirk Shoop authored and facebook-github-bot committed Apr 11, 2019
1 parent 1689b2f commit 5b474b9
Show file tree
Hide file tree
Showing 7 changed files with 735 additions and 31 deletions.
2 changes: 1 addition & 1 deletion folly/experimental/pushmi/benchmarks/PushmiBenchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ BENCHMARK(trampoline_flow_many_sender_1000, n) {
BENCHMARK_SUSPEND {
std::iota(values.begin(), values.end(), 1);
}
auto f = op::flow_from(values, tr) | op::tap([&](int){
auto f = op::flow_from(values, [&]{return tr;}) | op::tap([&](int){
--counter;
});
FOR_EACH_RANGE (i, 0, n) {
Expand Down
1 change: 1 addition & 0 deletions folly/experimental/pushmi/detail/if_constexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#define PUSHMI_PP_IGNORE_SHADOW_BEGIN \
_Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wshadow\"") \
_Pragma("GCC diagnostic ignored \"-Wshadow-local\"") \
/**/
#define PUSHMI_PP_IGNORE_SHADOW_END \
_Pragma("GCC diagnostic pop")
Expand Down
9 changes: 9 additions & 0 deletions folly/experimental/pushmi/executor/functional.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ struct priorityZeroF {
auto operator()(){ return 0; }
};

PUSHMI_TEMPLATE(class Exec)
(requires Strand<Exec>)
struct strandFactory {
Exec ex_;
strandFactory() = default;
explicit strandFactory(Exec ex) : ex_(std::move(ex)) {}
Exec operator()(){ return ex_; }
};

struct passDNF {
PUSHMI_TEMPLATE(class Data)
(requires TimeExecutor<Data>)
Expand Down
6 changes: 6 additions & 0 deletions folly/experimental/pushmi/executor/trampoline.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ inline detail::nester<E> nested_trampoline() {
return {};
}

PUSHMI_INLINE_VAR constexpr auto trampolines =
strandFactory<detail::delegator<std::exception_ptr>>{};

PUSHMI_INLINE_VAR constexpr auto nested_trampolines =
strandFactory<detail::nester<std::exception_ptr>>{};

namespace detail {

PUSHMI_TEMPLATE(class E)
Expand Down
58 changes: 28 additions & 30 deletions folly/experimental/pushmi/o/from.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,17 @@ template <class Producer>
struct flow_from_up {
using receiver_category = receiver_tag;

explicit flow_from_up(std::shared_ptr<Producer> p_) : p(std::move(p_)) {}
std::shared_ptr<Producer> p;
explicit flow_from_up(std::shared_ptr<Producer> p) : p_(std::move(p)) {}
std::shared_ptr<Producer> p_;

void value(std::ptrdiff_t requested) {
if (requested < 1) {
return;
}
// submit work to exec
::folly::pushmi::submit(
::folly::pushmi::schedule(p->exec),
make_receiver([p = p, requested](auto) {
::folly::pushmi::schedule(p_->exec),
make_receiver([p = p_, requested](auto) {
auto remaining = requested;
// this loop is structured to work when there is
// re-entrancy out.value in the loop may call up.value.
Expand All @@ -146,17 +146,17 @@ struct flow_from_up {

template <class E>
void error(E) noexcept {
p->stop.store(true);
p_->stop.store(true);
::folly::pushmi::submit(
::folly::pushmi::schedule(p->exec),
flow_from_done<Producer>{p});
::folly::pushmi::schedule(p_->exec),
flow_from_done<Producer>{p_});
}

void done() {
p->stop.store(true);
p_->stop.store(true);
::folly::pushmi::submit(
::folly::pushmi::schedule(p->exec),
flow_from_done<Producer>{p});
::folly::pushmi::schedule(p_->exec),
flow_from_done<Producer>{p_});
}
};

Expand Down Expand Up @@ -184,32 +184,30 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn {
}
};

template <class I, class S, class Exec>
struct task
template <class I, class S, class EF>
struct sender_impl
: flow_sender_tag::with_values<typename std::iterator_traits<I>::value_type>
::no_error
, pipeorigin {
using properties = property_set<is_always_blocking<>>;

I begin_;
S end_;
Exec exec_;

task(I begin, S end, Exec exec)
: begin_(begin), end_(end), exec_(exec) {
}

EF ef_;
sender_impl(I begin, S end, EF ef) : begin_(begin), end_(end), ef_(ef) {}
PUSHMI_TEMPLATE(class Out)
(requires ReceiveValue<
Out&,
typename std::iterator_traits<I>::value_type>) //
void submit(Out out) {
auto exec = ::folly::pushmi::make_strand(ef_);
using Exec = decltype(exec);
using Producer = flow_from_producer<I, S, Out, Exec>;
auto p = std::make_shared<Producer>(
begin_, end_, std::move(out), exec_, false);
begin_, end_, std::move(out), std::move(exec), false);

::folly::pushmi::submit(
::folly::pushmi::schedule(exec_), receiver_impl<Producer>{p});
::folly::pushmi::schedule(p->exec), receiver_impl<Producer>{p});
}
};

Expand All @@ -219,27 +217,27 @@ PUSHMI_INLINE_VAR constexpr struct flow_from_fn {
typename std::iterator_traits<I>::iterator_category,
std::forward_iterator_tag>) //
auto operator()(I begin, S end) const {
return (*this)(begin, end, trampoline());
return (*this)(begin, end, trampolines);
}

PUSHMI_TEMPLATE(class R)
(requires Range<R>) //
auto operator()(R&& range) const {
return (*this)(std::begin(range), std::end(range), trampoline());
return (*this)(std::begin(range), std::end(range), trampolines);
}

PUSHMI_TEMPLATE(class I, class S, class Exec)
PUSHMI_TEMPLATE(class I, class S, class EF)
(requires DerivedFrom<
typename std::iterator_traits<I>::iterator_category,
std::forward_iterator_tag>&& Executor<Exec>) //
auto operator()(I begin, S end, Exec exec) const {
return task<I, S, Exec>{begin, end, exec};
std::forward_iterator_tag>&& StrandFactory<EF>) //
auto operator()(I begin, S end, EF ef) const {
return sender_impl<I, S, EF>{begin, end, ef};
}

PUSHMI_TEMPLATE(class R, class Exec)
(requires Range<R>&& Executor<Exec>) //
auto operator()(R&& range, Exec exec) const {
return (*this)(std::begin(range), std::end(range), exec);
PUSHMI_TEMPLATE(class R, class EF)
(requires Range<R>&& StrandFactory<EF>) //
auto operator()(R&& range, EF ef) const {
return (*this)(std::begin(range), std::end(range), ef);
}
} flow_from{};

Expand Down
Loading

0 comments on commit 5b474b9

Please sign in to comment.