Skip to content

Commit

Permalink
update: remove atomic_ref #21
Browse files Browse the repository at this point in the history
  • Loading branch information
8sileus committed May 14, 2024
1 parent 932b361 commit f3a4970
Showing 1 changed file with 11 additions and 19 deletions.
30 changes: 11 additions & 19 deletions zedio/runtime/multi_thread/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,15 @@ class LocalQueue {
[[nodiscard]]
auto remaining_slots() -> std::size_t {
auto [steal, _] = unpack(head_.load(std::memory_order::acquire));
std::atomic_ref<uint32_t> atoimc_tail{tail_};
auto tail = atoimc_tail.load(std::memory_order::acquire);
auto tail = tail_.load(std::memory_order::acquire);
assert(detail::LOCAL_QUEUE_CAPACITY >= static_cast<std::size_t>(tail - steal));
return detail::LOCAL_QUEUE_CAPACITY - static_cast<std::size_t>(tail - steal);
}

[[nodiscard]]
auto size() -> uint32_t {
auto [_, head] = unpack(head_.load(std::memory_order::acquire));
std::atomic_ref<uint32_t> atoimc_tail{tail_};
auto tail = atoimc_tail.load(std::memory_order::acquire);
auto tail = tail_.load(std::memory_order::acquire);
return tail - head;
}

Expand All @@ -132,7 +130,7 @@ class LocalQueue {
assert(0 < len && len <= capacity());

auto [steal, _] = unpack(head_.load(std::memory_order::acquire));
auto tail = tail_;
auto tail = tail_.load(std::memory_order::relaxed);

if (tail - steal > static_cast<uint32_t>(capacity() - len)) [[unlikely]] {
throw std::runtime_error(std::format("push_back overflow! cur size {}, push size {}",
Expand All @@ -144,11 +142,9 @@ class LocalQueue {
std::size_t idx = tail & MASK;
buffer_[idx] = std::move(task);
tail += 1;
// tail = wrapping_add(tail, 1);
}

std::atomic_ref<uint32_t> atomic_tail{tail_};
atomic_tail.store(tail, std::memory_order::release);
tail_.store(tail, std::memory_order::release);
}

void push_back_or_overflow(std::coroutine_handle<> task, GlobalQueue &global_queue) {
Expand All @@ -157,7 +153,7 @@ class LocalQueue {
while (true) {
auto head = head_.load(std::memory_order::acquire);
auto [steal, real] = unpack(head);
tail = tail_;
tail = tail_.load(std::memory_order::relaxed);
if (tail - steal < static_cast<uint32_t>(detail::LOCAL_QUEUE_CAPACITY)) {
// There is capacity for the task
break;
Expand All @@ -175,8 +171,7 @@ class LocalQueue {
}
std::size_t idx = static_cast<std::size_t>(tail) & MASK;
buffer_[idx] = std::move(task);
std::atomic_ref atomic_tail{tail_};
atomic_tail.store(tail + 1, std::memory_order::release);
tail_.store(tail + 1, std::memory_order::release);
}

[[nodiscard]]
Expand All @@ -185,7 +180,7 @@ class LocalQueue {
std::size_t idx = 0;
while (true) {
auto [steal, real] = unpack(head);
auto tail = tail_;
auto tail = tail_.load(std::memory_order::relaxed);
if (real == tail) {
// Queue is empty
return std::nullopt;
Expand All @@ -209,7 +204,7 @@ class LocalQueue {
std::optional<std::coroutine_handle<>> result{std::nullopt};

auto [steal, _] = unpack(dst.head_.load(std::memory_order::acquire));
auto dst_tail = dst.tail_;
auto dst_tail = dst.tail_.load(std::memory_order::relaxed);

// less than half of local_queue_capacity just return
if (dst_tail - steal > static_cast<uint32_t>(detail::LOCAL_QUEUE_CAPACITY / 2)) {
Expand All @@ -226,8 +221,7 @@ class LocalQueue {
std::size_t idx = static_cast<std::size_t>(dst_new_tail) & MASK;
result.emplace(std::move(dst.buffer_[idx]));
if (n > 0) {
std::atomic_ref atomic_dst_tail{dst.tail_};
atomic_dst_tail.store(dst_new_tail, std::memory_order::release);
dst.tail_.store(dst_new_tail, std::memory_order::release);
}
return result;
}
Expand All @@ -240,8 +234,7 @@ class LocalQueue {
uint32_t n = 0;
while (true) {
auto [src_steal, src_real] = unpack(prev_packed);
std::atomic_ref<uint32_t> atomic_src_tail{tail_};
auto src_tail = atomic_src_tail.load(std::memory_order::acquire);
auto src_tail = tail_.load(std::memory_order::acquire);

if (src_steal != src_real) {
// Other thread is stealing
Expand Down Expand Up @@ -338,8 +331,7 @@ class LocalQueue {

private:
std::atomic<uint64_t> head_{0};
uint32_t tail_{0};
// std::atomic<uint32_t> tail_{0};
std::atomic<uint32_t> tail_{0};
std::array<std::coroutine_handle<>, detail::LOCAL_QUEUE_CAPACITY> buffer_;
};

Expand Down

0 comments on commit f3a4970

Please sign in to comment.