Skip to content

Commit

Permalink
Update: modify timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
8sileus committed Mar 21, 2024
1 parent dab9ff9 commit 6ac4d40
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 65 deletions.
1 change: 1 addition & 0 deletions examples/benchmark.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "zedio/core.hpp"
#include "zedio/log.hpp"
#include "zedio/net.hpp"
#include "zedio/time.hpp"

#include <string_view>

Expand Down
9 changes: 8 additions & 1 deletion examples/simple_echo.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "zedio/core.hpp"
#include "zedio/net.hpp"
#include "zedio/time.hpp"

using namespace zedio;
using namespace zedio::async;
Expand All @@ -8,7 +9,13 @@ using namespace zedio::net;
auto process(TcpStream stream) -> Task<void> {
char buf[1024]{};
while (true) {
auto len = (co_await (stream.read(buf))).value();
auto ret = co_await stream.read(buf);
if (!ret) {
LOG_ERROR("{}", ret.error());
break;
}
auto len = ret.value();
LOG_DEBUG("{}", std::string_view(buf, len));
if (len == 0) {
break;
}
Expand Down
18 changes: 12 additions & 6 deletions zedio/io/base/callback.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
#pragma once

// #include "zedio/time/event.hpp"
// C++
#include <chrono>
#include <coroutine>

namespace zedio::runtime::detail {

class Entry;

} // namespace zedio::runtime::detail

namespace zedio::io::detail {

struct Callback {
Expand All @@ -19,11 +25,11 @@ struct Callback {
std::coroutine_handle<> handle_{nullptr};
int result_;
};
// union {
// uint64_t has_timeout_{0};
// std::chrono::steady_clock::time_point deadline_;
// std::set<time::detail::Event>::iterator iter_;
// };

union {
runtime::detail::Entry *entry_{nullptr};
std::chrono::steady_clock::time_point deadline_;
};
};

} // namespace zedio::io::detail
52 changes: 27 additions & 25 deletions zedio/io/base/registrator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "zedio/common/util/noncopyable.hpp"
#include "zedio/io/base/callback.hpp"
#include "zedio/runtime/driver.hpp"
#include "zedio/time/timeout.hpp"

using namespace std::chrono_literals;

Expand All @@ -29,51 +30,52 @@ class IORegistrator {
// Delete copy
IORegistrator(const IORegistrator &other) = delete;
auto operator=(const IORegistrator &other) -> IORegistrator & = delete;
// Delete move
IORegistrator(IORegistrator &&other) = delete;
auto operator=(IORegistrator &&other) -> IORegistrator & = delete;

IORegistrator(IORegistrator &&other) noexcept
: cb_{std::move(other.cb_)}
, sqe_{other.sqe_} {
io_uring_sqe_set_data(sqe_, &this->cb_);
other.sqe_ = nullptr;
}

auto operator=(IORegistrator &&other) noexcept -> IORegistrator & {
cb_ = std::move(other.cb_);
sqe_ = other.sqe_;
io_uring_sqe_set_data(sqe_, &this->cb_);
other.sqe_ = nullptr;
return *this;
}

public:
auto await_ready() const noexcept -> bool {
return sqe_ == nullptr;
}

void await_suspend(std::coroutine_handle<> handle) {
cb_.handle_ = std::move(handle);
auto await_suspend(std::coroutine_handle<> handle) -> bool {
assert(sqe_);

cb_.handle_ = std::move(handle);
runtime::detail::t_ring->submit();
return true;
}

[[REMEMBER_CO_AWAIT]]
auto set_timeout_for(std::chrono::nanoseconds timeout) -> IO & {
if (timeout >= std::chrono::milliseconds{1} && sqe_ != nullptr) [[likely]] {
auto timeout_sqe = runtime::detail::t_ring->get_sqe();
if (timeout_sqe != nullptr) [[likely]] {
sqe_->flags |= IOSQE_IO_LINK;

ts_.tv_sec = timeout.count() / 1000'000'000;
ts_.tv_nsec = timeout.count() % 1000'000'000;

io_uring_prep_link_timeout(timeout_sqe, &ts_, 0);
io_uring_sqe_set_data(timeout_sqe, nullptr);
}
} else {
LOG_ERROR("Set timeout failed");
}
return *static_cast<IO *>(this);
auto set_timeout(std::chrono::steady_clock::time_point deadline) noexcept {
cb_.deadline_ = deadline;
return time::detail::Timeout{std::move(*static_cast<IO *>(this))};
}

[[REMEMBER_CO_AWAIT]]
auto set_timeout_at(std::chrono::steady_clock::time_point deadline) -> IO & {
return set_timeout_for(deadline - std::chrono::steady_clock::now());
auto set_timeout(std::chrono::milliseconds interval) noexcept {
return set_timeout(std::chrono::steady_clock::now() + interval);
}

protected:
Callback cb_{};
io_uring_sqe *sqe_;

private:
struct __kernel_timespec ts_;
// private:
// struct __kernel_timespec ts_;
};

} // namespace zedio::io::detail
5 changes: 4 additions & 1 deletion zedio/runtime/driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ class Driver {
std::size_t cnt = ring_.peek_batch(cqes);
for (auto i = 0uz; i < cnt; i += 1) {
auto cb = reinterpret_cast<io::detail::Callback *>(cqes[i]->user_data);
if (cb != nullptr) [[likely]] {
if (cb != nullptr) {
if (cb->entry_ != nullptr) {
timer_.remove_entry(cb->entry_);
}
local_queue.push_back_or_overflow(cb->get_coro_handle_and_set_result(cqes[i]->res),
global_queue);
}
Expand Down
12 changes: 4 additions & 8 deletions zedio/runtime/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ class LocalQueue {
}
}
}
push_back_finish(std::move(task), tail);
std::size_t idx = tail & MASK;
buffer_[idx] = std::move(task);
std::atomic_ref atomic_tail{tail_};
atomic_tail.store(tail + 1, std::memory_order::release);
}

[[nodiscard]]
Expand Down Expand Up @@ -282,13 +285,6 @@ class LocalQueue {
}
}

void push_back_finish(std::coroutine_handle<> &&task, uint32_t tail) {
std::size_t idx = tail & MASK;
buffer_[idx] = std::move(task);
std::atomic_ref atomic_tail{tail_};
atomic_tail.store(tail + 1, std::memory_order::release);
}

auto push_overflow(std::coroutine_handle<> &task,
uint32_t head,
[[maybe_unused]] uint32_t tail,
Expand Down
31 changes: 23 additions & 8 deletions zedio/runtime/timer/entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <cassert>
// C++
#include <chrono>
#include <concepts>
#include <coroutine>
#include <memory>

Expand All @@ -16,26 +17,40 @@ class Entry {
: expiration_time_{expiration_time}
, handle_{handle} {}

Entry(std::chrono::steady_clock::time_point expiration_time, io::detail::Callback *data)
: expiration_time_{expiration_time}
, data_{data} {}

public:
void execute(runtime::detail::LocalQueue &local_queue,
runtime::detail::GlobalQueue &global_queue) {
assert(handle_ != nullptr);
local_queue.push_back_or_overflow(handle_, global_queue);
if (handle_ != nullptr) {
assert(data_ == nullptr);
local_queue.push_back_or_overflow(handle_, global_queue);
} else {
assert(handle_ == nullptr);
auto sqe = runtime::detail::t_ring->get_sqe();
io_uring_prep_cancel(sqe, data_, 0);
io_uring_sqe_set_data(sqe, nullptr);
data_->entry_ = nullptr;
}
}

public:
template <class T>
requires std::constructible_from<Entry, std::chrono::steady_clock::time_point, T>
[[nodiscard]]
static auto make(std::chrono::steady_clock::time_point expiration_time,
std::coroutine_handle<> handle)
-> std::pair<std::shared_ptr<Entry>, std::weak_ptr<Entry>> {
auto entry = std::make_shared<Entry>(expiration_time, handle);
return std::make_pair(entry, std::weak_ptr<Entry>{entry});
static auto make(std::chrono::steady_clock::time_point expiration_time, T handle)
-> std::pair<std::unique_ptr<Entry>, Entry *> {
auto entry = std::make_unique<Entry>(expiration_time, handle);
return std::make_pair(std::move(entry), entry.get());
}

public:
std::chrono::steady_clock::time_point expiration_time_;
std::coroutine_handle<> handle_{nullptr};
std::shared_ptr<Entry> next_{nullptr};
io::detail::Callback *data_{nullptr};
std::unique_ptr<Entry> next_{nullptr};
};

} // namespace zedio::runtime::detail
19 changes: 11 additions & 8 deletions zedio/runtime/timer/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ class Timer {
auto operator=(Timer &&) -> Timer & = delete;

public:
auto add_entry(std::chrono::steady_clock::time_point expiration_time,
std::coroutine_handle<> handle) -> Result<std::weak_ptr<Entry>> {
template <class HandleType>
requires std::is_invocable_v<decltype(Entry::make<HandleType>),
std::chrono::steady_clock::time_point,
HandleType>
auto add_entry(std::chrono::steady_clock::time_point expiration_time, HandleType handle)
-> Result<Entry *> {
auto now = std::chrono::steady_clock::now();
if (expiration_time <= now) [[unlikely]] {
return std::unexpected{make_zedio_error(Error::PassedTime)};
Expand Down Expand Up @@ -60,16 +64,15 @@ class Timer {
return result;
}

void remove_entry(std::shared_ptr<Entry> &&entry) {
void remove_entry(Entry *entry) {
assert(root_wheel_.index() != 0 && num_entries_ != 0);
std::visit(
[this, &entry]<typename T>(T &wheel) {
[this, entry]<typename T>(T &wheel) {
if constexpr (std::is_same_v<T, std::monostate>) {
std::unreachable();
LOG_ERROR("no entries");
} else {
wheel->remove_entry(std::move(entry),
time_since_start(entry->expiration_time_));
wheel->remove_entry(entry, time_since_start(entry->expiration_time_));
num_entries_ -= 1;
if (num_entries_ == 0) {
root_wheel_ = std::monostate{};
Expand Down Expand Up @@ -147,7 +150,7 @@ class Timer {

template <std::size_t LEVEL>
void level_up_and_add_entry(std::unique_ptr<Wheel<LEVEL>> &&wheel,
std::shared_ptr<Entry> &&entry,
std::unique_ptr<Entry> &&entry,
std::size_t interval) {
if constexpr (LEVEL == MAX_LEVEL + 1) {
assert(false);
Expand All @@ -165,7 +168,7 @@ class Timer {
}

template <std::size_t LEVEL = MAX_LEVEL>
void build_wheel_and_add_entry(std::shared_ptr<Entry> &&entry, std::size_t interval) {
void build_wheel_and_add_entry(std::unique_ptr<Entry> &&entry, std::size_t interval) {
if constexpr (LEVEL > 0uz) {
if (!(Wheel<LEVEL>::MS_PER_SLOT <= interval && interval < Wheel<LEVEL>::MAX_MS)) {
build_wheel_and_add_entry<LEVEL - 1>(std::move(entry), interval);
Expand Down
14 changes: 7 additions & 7 deletions zedio/runtime/timer/wheel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Wheel {
}

public:
void add_entry(std::shared_ptr<Entry> &&entry, std::size_t interval) {
void add_entry(std::unique_ptr<Entry> &&entry, std::size_t interval) {
std::size_t index = get_index(interval);
// LOG_DEBUG("{} {}", index, interval);
if (slots_[index] == nullptr) {
Expand All @@ -46,7 +46,7 @@ class Wheel {
slots_[index]->add_entry(std::move(entry), interval);
};

void remove_entry(std::shared_ptr<Entry> &&entry, std::size_t interval) {
void remove_entry(Entry *entry, std::size_t interval) {
auto index = get_index(interval);

assert(slots_[index] != nullptr);
Expand Down Expand Up @@ -149,24 +149,24 @@ class Wheel<0uz> {
}

public:
void add_entry(std::shared_ptr<Entry> &&entry, std::size_t interval) {
void add_entry(std::unique_ptr<Entry> &&entry, std::size_t interval) {
auto index = get_index(interval);
// LOG_DEBUG("{} {}", index, interval);
entry->next_ = std::move(slots_[index]);
bitmap_ |= 1uz << index;
slots_[index] = std::move(entry);
}

void remove_entry(std::shared_ptr<Entry> &&entry, std::size_t interval) {
void remove_entry(Entry *entry, std::size_t interval) {
auto index = get_index(interval);

auto head = slots_[index].get();
if (head == entry.get()) {
if (head == entry) {
slots_[index] = std::move(head->next_);
} else {
auto cur = head->next_.get();
// Do not need to check cur != nullptr
while (cur != entry.get()) {
while (cur != entry) {
head = cur;
cur = head->next_.get();
}
Expand Down Expand Up @@ -234,7 +234,7 @@ class Wheel<0uz> {

private:
uint64_t bitmap_{0};
std::array<std::shared_ptr<Entry>, SLOT_SIZE> slots_{};
std::array<std::unique_ptr<Entry>, SLOT_SIZE> slots_{};
};

} // namespace zedio::runtime::detail
3 changes: 2 additions & 1 deletion zedio/time.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once

#include "zedio/time/interval.hpp"
#include "zedio/time/sleep.hpp"
#include "zedio/time/sleep.hpp"
#include "zedio/time/timeout.hpp"
Loading

0 comments on commit 6ac4d40

Please sign in to comment.