Skip to content

Commit

Permalink
Lessen overhead of calling non-coroutine functions with Task<T> retur…
Browse files Browse the repository at this point in the history
…n type.

Corral currently provides `noop()` for creating degenerate Task<void> not backed by a coroutine, useful when a function which is not a coroutine has to return Task<void> (say, to conform to an abstract interface). Sadly, there's no equivalent for synchronous functions returning a non-void value:

    struct AsyncStream {
        Task<void> close() = 0;
        Task<size_t> read(std::span<char> buf) = 0;
    };

    struct MemStream: AsyncStream {
        // Fast to call, no coroutine frame allocated
        Task<void> close() override { return noop(); }

        // Even there aren't any suspension points here,
        // still allocate a coroutine frame and pay the full price
        Task<size_t> read(std::span<char> buf) override {
            memcpy(...);
            co_return buf.size();
        }
    };

This diff addresses that, introducing a new `Task<T> just(T)` function, which wraps a value into a Task which would yield the value when `co_await`ed, allowing to rewrite the above snippet as

   Task<size_t> read(std::span<char> buf) override {
        memcpy(...);
        return corral::just(buf.size());
   }
  • Loading branch information
oremanj authored and dprokoptsev committed Jun 28, 2024
1 parent 1f18406 commit 2ef41e4
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 77 deletions.
5 changes: 0 additions & 5 deletions corral/Nursery.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,6 @@ inline Handle Nursery::addTask(Task<Ret> task, TaskParent<Ret>* parent) {
CORRAL_ASSERT(executor_ && "Nursery is closed to new arrivals");

detail::Promise<Ret>* promise = task.release();
if constexpr (std::is_void_v<Ret>) {
if (promise == detail::noopPromise()) [[unlikely]] {
return std::noop_coroutine();
}
}
CORRAL_ASSERT(promise);
CORRAL_TRACE("pr %p handed to nursery %p (%zu tasks total)", promise, this,
taskCount_ + 1);
Expand Down
12 changes: 6 additions & 6 deletions corral/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ namespace detail {
template <class T> Task<T> Promise<T>::get_return_object() {
return Task<T>(*this);
}

struct NoopLambda {
Task<void> operator()() const { return Task<void>(*detail::noopPromise()); }
};

} // namespace detail

/// A no-op task. Always await_ready(), and co_await'ing on it is a no-op
Expand All @@ -86,7 +81,12 @@ struct NoopLambda {
///
/// saving on coroutine frame allocation (compared to `{ co_return; }`).
inline Task<void> noop() {
return detail::NoopLambda{}();
return Task<void>(detail::StubPromise<void>::instance());
}

/// Create a task that immediately returns a given value when co_await'ed.
template <class T> Task<T> just(T value) {
return Task<T>(*new detail::StubPromise<T>(std::forward<T>(value)));
}

} // namespace corral
143 changes: 107 additions & 36 deletions corral/detail/Promise.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,12 @@ class BasePromise : private TaskFrame, public IntrusiveListItem<BasePromise> {
void cancel() {
CORRAL_TRACE("pr %p cancellation requested", this);

if (cancelState_ == CancelState::Requested) {
// Do nothing

} else if (state_ == State::Ready || state_ == State::Running) {
if (!hasAwaitee()) {
// Mark pending cancellation; coroutine will be cancelled
// at its next suspension point (for running coroutines) or when
// executed by executor (for ready coroutines)
// executed by executor (for ready coroutines). This is a no-op
// if cancel() was already called.
cancelState_ = CancelState::Requested;

} else {
// Coroutine currently suspended, so intercept the flow at
// its resume point, and forward cancellation request to the
Expand All @@ -195,19 +192,47 @@ class BasePromise : private TaskFrame, public IntrusiveListItem<BasePromise> {
/// Destroys the promise and any locals within the coroutine frame.
/// Only safe to call on not-yet-started tasks or those
/// already completed (i.e., whose parent has resumed).
void destroy() { realHandle().destroy(); }
void destroy() {
if (hasCoroutine()) {
realHandle().destroy();
} else {
// Call the `TaskFrame::destroyFn` filled in by makeStub().
// This is the only place where that's actually a function
// pointer; normally we use it as a parent-task link.
proxyHandle().destroy();
}
}

void await_introspect(TaskTreeCollector& c) const noexcept {
if (!hasCoroutine()) {
c.node("<noop>");
return;
}
c.taskPC(pc);
if (state_ == State::Ready) {
c.footnote("<SCHEDULED>");
} else if (state_ == State::Running) {
c.footnote("<ON CPU>");
} else {
CORRAL_ASSERT(hasAwaitee());
awaitee_.introspect(c);
}
}

bool checkImmediateResult(BaseTaskParent* parent) noexcept {
if (!hasCoroutine()) {
// If we have a value to provide immediately, then provide
// it without a trip through the executor
parent_ = parent;
// Invoke callback stashed by makeStub()
CoroutineFrame::resumeFn(this);
// Make sure it's only called once
CoroutineFrame::resumeFn = +[](CoroutineFrame*) {};
return true;
}
return false;
}

protected:
BasePromise() {
CORRAL_TRACE("pr %p created", this);
Expand All @@ -223,15 +248,42 @@ class BasePromise : private TaskFrame, public IntrusiveListItem<BasePromise> {
/// notified (through parent->continuation().resume()) upon coroutine
/// completion.
Handle start(BaseTaskParent* parent, Handle caller) {
if (checkImmediateResult(parent)) {
return parent->continuation(this);
}
parent_ = parent;
CORRAL_TRACE("pr %p started", this);
onResume<&BasePromise::doResume>();
parent_ = parent;
linkTo(caller);
return proxyHandle();
}
// NOLINTNEXTLINE(clang-analyzer-core.uninitialized.UndefReturn)
BaseTaskParent* parent() const noexcept { return parent_; }

/// Cause this promise to not resume a coroutine when it is started.
/// Instead, it will invoke the given callback and then resume its parent.
/// This can be used to create promises that are not associated with a
/// coroutine; see just() and noop(). Must be called before start().
template <class Derived, void (Derived::*onStart)()>
void makeStub(bool deleteThisOnDestroy) {
CORRAL_ASSERT(state_ == State::Ready && parent_ == nullptr);
state_ = State::Stub;
pc = 0;

// Since stub promises never use their inline CoroutineFrame,
// we can reuse them to store callbacks for start and destroy
CoroutineFrame::resumeFn = +[](CoroutineFrame* self) {
(static_cast<Derived*>(self)->*onStart)();
};
if (deleteThisOnDestroy) {
CoroutineFrame::destroyFn = +[](CoroutineFrame* self) {
delete static_cast<Derived*>(self);
};
} else {
CoroutineFrame::destroyFn = +[](CoroutineFrame* self) {};
}
}

private /*methods*/:
/// Returns a handle which, when resume()d, will immediately execute
/// the next step of the task.
Expand All @@ -252,7 +304,10 @@ class BasePromise : private TaskFrame, public IntrusiveListItem<BasePromise> {
Handle proxyHandle() noexcept { return CoroutineFrame::toHandle(); }

bool hasAwaitee() const noexcept {
return state_ != State::Ready && state_ != State::Running;
return state_ > State::Stub;
}
bool hasCoroutine() const noexcept {
return state_ != State::Stub;
}

template <void (BasePromise::*trampolineFn)()> void onResume() {
Expand Down Expand Up @@ -367,16 +422,23 @@ class BasePromise : private TaskFrame, public IntrusiveListItem<BasePromise> {
Executor* executor_ = nullptr;
BaseTaskParent* parent_ = nullptr;

// These constants live in lower bits, so they can coexist with
// an (aligned) pointer in a union, and one can tell them
// from a pointer value.
enum class State : size_t { Ready = 1, Running = 3 };
// These enums live in a union with Awaitee, so their values must
// be distinguishable from the possible object representations of an
// Awaitee. Awaitee consists of two non-null pointers. The first
// (Awaitee::object_, aliased with State) is not aligned, but we can
// reasonably assume that 0x1 and 0x2 are not valid addresses.
// The second (Awaitee::functions_, aliased with CancelState) is aligned
// to a word size.
enum class State : size_t { Ready = 0, Running = 1, Stub = 2 };
enum class CancelState : size_t { None = 0, Requested = 1 };

/// state_ == State::Ready for tasks scheduled for execution
/// (i.e. whose proxyHandle() resume()d);
/// state_ == State::Running for tasks being executed at the moment
/// (i.e. whose realHandle() resume()d);
/// Possible values of state_:
/// - State::Stub for promises associated with no coroutine,
/// implementing just(T) or noop()
/// - State::Ready for tasks scheduled for execution
/// (i.e., whose proxyHandle() resume()d)
/// - State::Running for tasks being executed at the moment
/// (i.e., whose realHandle() resume()d);
/// otherwise the task is suspended on an awaitable, and awaitee_
/// is populated accordingly.
///
Expand Down Expand Up @@ -508,27 +570,36 @@ template <> class ReturnValueMixin<void> {
}
};

/// A tag value for a promise object which should do nothing and
/// immediately wake up its parent if co_await'ed. Note that the
/// object only exists to provide a unique and valid address. It does
/// not represent any task in a valid state. and calling *any* of its
/// member functions is UB.
inline Promise<void>* noopPromise() {
static Promise<void> p;
return &p;
}
/// The promise type for a task that is not backed by a coroutine and
/// immediately returns a value of type T when invoked. Used by just()
/// and noop().
template <class T>
class StubPromise : public Promise<T> {
public:
explicit StubPromise(T value) : value_(std::forward<T>(value)) {
this->template makeStub<StubPromise, &StubPromise::onStart>(
/* deleteThisOnDestroy = */ true);
}
private:
void onStart() { this->return_value(std::forward<T>(value_)); }
T value_;
};
template <> class StubPromise<void> : public Promise<void> {
public:
static StubPromise& instance() {
static StubPromise inst;
return inst;
}
private:
StubPromise() {
this->template makeStub<StubPromise, &StubPromise::onStart>(
/* deleteThisOnDestroy = */ false);
}
void onStart() { this->return_void(); }
};

struct DestroyPromise {
template <class T> void operator()(Promise<T>* p) const {
if constexpr (std::is_same_v<T, void>) {
if (p == detail::noopPromise()) {
return;
}
}
if (p) {
p->destroy();
}
}
template <class T> void operator()(Promise<T>* p) const { p->destroy(); }
};

template <class T>
Expand Down
32 changes: 2 additions & 30 deletions corral/detail/task_awaitables.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,49 +40,27 @@ template <class T, class Self> class TaskAwaitableBase {
explicit TaskAwaitableBase(Promise<T>* promise) : promise_(promise) {}

void await_set_executor(Executor* ex) noexcept {
if constexpr (std::is_same_v<T, void>) {
if (promise_ == detail::noopPromise()) [[unlikely]] {
return;
}
}
promise_->setExecutor(ex);
}

bool await_early_cancel() noexcept {
if constexpr (std::is_same_v<T, void>) {
if (promise_ == detail::noopPromise()) [[unlikely]] {
return true;
}
}
promise_->cancel();
return false;
}

bool await_ready() const noexcept {
if constexpr (std::is_same_v<T, void>) {
if (promise_ == detail::noopPromise()) [[unlikely]] {
return true;
}
}
return false;
return promise_->checkImmediateResult(
const_cast<Self*>(static_cast<const Self*>(this)));
}

Handle await_suspend(Handle h) {
CORRAL_TRACE(" ...pr %p", promise_);
if constexpr (std::is_same_v<T, void>) {
if (promise_ == detail::noopPromise()) [[unlikely]] {
return h;
}
}
CORRAL_ASSERT(promise_);
continuation_ = h;
return promise_->start(static_cast<Self*>(this), h);
}

bool await_cancel(Handle) noexcept {
if constexpr (std::is_same_v<T, void>) {
CORRAL_ASSERT(promise_ != detail::noopPromise());
}
if (promise_) {
promise_->cancel();
} else {
Expand All @@ -97,12 +75,6 @@ template <class T, class Self> class TaskAwaitableBase {
c.node("<completed task>");
return;
}
if constexpr (std::is_same_v<T, void>) {
if (promise_ == detail::noopPromise()) [[unlikely]] {
c.node("<noop>");
return;
}
}
promise_->await_introspect(c);
}

Expand Down
27 changes: 27 additions & 0 deletions test/basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,33 @@ CORRAL_TEST_CASE("value") {
CORRAL_TEST_CASE("noop") {
co_await noop();
co_await anyOf(noop(), noop());
CORRAL_WITH_NURSERY(n) {
n.start(noop);
co_return join;
};
}

CORRAL_TEST_CASE("just") {
auto mkAsync = [](int n) { return [n]() -> Task<int> { return just(n); }; };
auto x = co_await mkAsync(42)();
CATCH_CHECK(x == 42);

auto [y, z] = co_await allOf(mkAsync(1)(), mkAsync(2)());
CATCH_CHECK(y == 1);
CATCH_CHECK(z == 2);

int i;
int& ri = co_await just<int&>(i);
CATCH_CHECK(&ri == &i);

auto p = std::make_unique<int>(42);
auto q = co_await just(std::move(p));
CATCH_CHECK(p == nullptr);
CATCH_CHECK(*q == 42);

auto& rq = co_await just<std::unique_ptr<int>&>(q);
CATCH_CHECK(*q == 42);
CATCH_CHECK(*rq == 42);
}

CORRAL_TEST_CASE("executor") {
Expand Down

0 comments on commit 2ef41e4

Please sign in to comment.