diff --git a/zedio/runtime/multi_thread/queue.hpp b/zedio/runtime/multi_thread/queue.hpp index a481e81..961c82c 100644 --- a/zedio/runtime/multi_thread/queue.hpp +++ b/zedio/runtime/multi_thread/queue.hpp @@ -102,8 +102,7 @@ class LocalQueue { [[nodiscard]] auto remaining_slots() -> std::size_t { auto [steal, _] = unpack(head_.load(std::memory_order::acquire)); - std::atomic_ref atoimc_tail{tail_}; - auto tail = atoimc_tail.load(std::memory_order::acquire); + auto tail = tail_.load(std::memory_order::acquire); assert(detail::LOCAL_QUEUE_CAPACITY >= static_cast(tail - steal)); return detail::LOCAL_QUEUE_CAPACITY - static_cast(tail - steal); } @@ -111,8 +110,7 @@ class LocalQueue { [[nodiscard]] auto size() -> uint32_t { auto [_, head] = unpack(head_.load(std::memory_order::acquire)); - std::atomic_ref atoimc_tail{tail_}; - auto tail = atoimc_tail.load(std::memory_order::acquire); + auto tail = tail_.load(std::memory_order::acquire); return tail - head; } @@ -132,7 +130,7 @@ class LocalQueue { assert(0 < len && len <= capacity()); auto [steal, _] = unpack(head_.load(std::memory_order::acquire)); - auto tail = tail_; + auto tail = tail_.load(std::memory_order::relaxed); if (tail - steal > static_cast(capacity() - len)) [[unlikely]] { throw std::runtime_error(std::format("push_back overflow! cur size {}, push size {}", @@ -144,11 +142,9 @@ class LocalQueue { std::size_t idx = tail & MASK; buffer_[idx] = std::move(task); tail += 1; - // tail = wrapping_add(tail, 1); } - std::atomic_ref atomic_tail{tail_}; - atomic_tail.store(tail, std::memory_order::release); + tail_.store(tail, std::memory_order::release); } void push_back_or_overflow(std::coroutine_handle<> task, GlobalQueue &global_queue) { @@ -157,7 +153,7 @@ class LocalQueue { while (true) { auto head = head_.load(std::memory_order::acquire); auto [steal, real] = unpack(head); - tail = tail_; + tail = tail_.load(std::memory_order::relaxed); if (tail - steal < static_cast(detail::LOCAL_QUEUE_CAPACITY)) { // There is capacity for the task break; @@ -175,8 +171,7 @@ class LocalQueue { } std::size_t idx = static_cast(tail) & MASK; buffer_[idx] = std::move(task); - std::atomic_ref atomic_tail{tail_}; - atomic_tail.store(tail + 1, std::memory_order::release); + tail_.store(tail + 1, std::memory_order::release); } [[nodiscard]] @@ -185,7 +180,7 @@ class LocalQueue { std::size_t idx = 0; while (true) { auto [steal, real] = unpack(head); - auto tail = tail_; + auto tail = tail_.load(std::memory_order::relaxed); if (real == tail) { // Queue is empty return std::nullopt; @@ -209,7 +204,7 @@ class LocalQueue { std::optional> result{std::nullopt}; auto [steal, _] = unpack(dst.head_.load(std::memory_order::acquire)); - auto dst_tail = dst.tail_; + auto dst_tail = dst.tail_.load(std::memory_order::relaxed); // less than half of local_queue_capacity just return if (dst_tail - steal > static_cast(detail::LOCAL_QUEUE_CAPACITY / 2)) { @@ -226,8 +221,7 @@ class LocalQueue { std::size_t idx = static_cast(dst_new_tail) & MASK; result.emplace(std::move(dst.buffer_[idx])); if (n > 0) { - std::atomic_ref atomic_dst_tail{dst.tail_}; - atomic_dst_tail.store(dst_new_tail, std::memory_order::release); + dst.tail_.store(dst_new_tail, std::memory_order::release); } return result; } @@ -240,8 +234,7 @@ class LocalQueue { uint32_t n = 0; while (true) { auto [src_steal, src_real] = unpack(prev_packed); - std::atomic_ref atomic_src_tail{tail_}; - auto src_tail = atomic_src_tail.load(std::memory_order::acquire); + auto src_tail = tail_.load(std::memory_order::acquire); if (src_steal != src_real) { // Other thread is stealing @@ -338,8 +331,7 @@ class LocalQueue { private: std::atomic head_{0}; - uint32_t tail_{0}; - // std::atomic tail_{0}; + std::atomic tail_{0}; std::array, detail::LOCAL_QUEUE_CAPACITY> buffer_; };