From 1f87c4f2b0c12a9c9e803a13b28bcbe4f0d81eed Mon Sep 17 00:00:00 2001 From: Konstantin Morozov <34001730+k-morozov@users.noreply.github.com> Date: Mon, 29 Jul 2024 21:25:32 +0200 Subject: [PATCH] queue spinlock (#59) * add queue spinlock --- bench/bench_pool.cpp | 26 ++++----- bench/readme.md | 14 +++++ src/components/sync/queue_spinlock.h | 73 +++++++++++++++++++++++++ src/fiber/awaiter/mutex_awaiter.h | 16 +++--- src/fiber/sync/async_mutex.cpp | 2 +- src/fiber/sync/async_mutex.h | 3 +- test/components/meson.build | 1 + test/components/test_queue_spinlock.cpp | 58 ++++++++++++++++++++ 8 files changed, 171 insertions(+), 22 deletions(-) create mode 100644 src/components/sync/queue_spinlock.h create mode 100644 test/components/test_queue_spinlock.cpp diff --git a/bench/bench_pool.cpp b/bench/bench_pool.cpp index 952dfce..43a7fe5 100644 --- a/bench/bench_pool.cpp +++ b/bench/bench_pool.cpp @@ -46,19 +46,19 @@ void bench_logic(NExecutors::IExecutor& pool) { }); } -static void IntrusiveThreadPool(benchmark::State& state) { - for (auto _ : state) { - NExecutors::IntrusiveThreadPool pool{CountThreads}; - pool.Start(); - bench_logic(pool); - pool.WaitIdle(); - } -} -BENCHMARK(IntrusiveThreadPool) - ->Name(std::format("IntrusiveThreadPool_task_{}", kTasks)) - ->Repetitions(CountRepetitions) - ->Iterations(CountIteration) - ->Unit(benchmark::kMillisecond); +//static void IntrusiveThreadPool(benchmark::State& state) { +// for (auto _ : state) { +// NExecutors::IntrusiveThreadPool pool{CountThreads}; +// pool.Start(); +// bench_logic(pool); +// pool.WaitIdle(); +// } +//} +//BENCHMARK(IntrusiveThreadPool) +// ->Name(std::format("IntrusiveThreadPool_task_{}", kTasks)) +// ->Repetitions(CountRepetitions) +// ->Iterations(CountIteration) +// ->Unit(benchmark::kMillisecond); static void DistributedPool(benchmark::State& state) { for (auto _ : state) { diff --git a/bench/readme.md b/bench/readme.md index 7f9a719..9c7ad79 100644 --- a/bench/readme.md +++ b/bench/readme.md @@ -28,6 +28,20 @@ | DistributedPool_task_100000/iterations:10/repeats:5_stddev | 8.96 ms| 0.477 ms| 5 | | DistributedPool_task_100000/iterations:10/repeats:5_cv | 7.05 % | 0.80 % | 5 | +### DistributedPool Task 100,000 (queue spinlock) + +| Benchmark | Time | CPU | Iterations | +|---------------------------------------------------------------|--------|--------|------------| +| DistributedPool_task_100000/iterations:10/repeats:5 | 304 ms | 126 ms | 10 | +| DistributedPool_task_100000/iterations:10/repeats:5 | 263 ms | 107 ms | 10 | +| DistributedPool_task_100000/iterations:10/repeats:5 | 205 ms | 79.7 ms| 10 | +| DistributedPool_task_100000/iterations:10/repeats:5 | 190 ms | 78.7 ms| 10 | +| DistributedPool_task_100000/iterations:10/repeats:5 | 460 ms | 189 ms | 10 | +| DistributedPool_task_100000/iterations:10/repeats:5_mean | 285 ms | 116 ms | 5 | +| DistributedPool_task_100000/iterations:10/repeats:5_median | 263 ms | 107 ms | 5 | +| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 108 ms | 45.5 ms| 5 | +| DistributedPool_task_100000/iterations:10/repeats:5_cv | 37.98 %| 39.18 %| 5 | + ### DistributedPool Task 100,000 (std::mutex) | Benchmark | Time | CPU | Iterations | diff --git a/src/components/sync/queue_spinlock.h b/src/components/sync/queue_spinlock.h new file mode 100644 index 0000000..b10d847 --- /dev/null +++ b/src/components/sync/queue_spinlock.h @@ -0,0 +1,73 @@ +// +// Created by konstantin on 28.07.24. +// + +#pragma once + +#include + +namespace NSync { + +class QueueSpinLock final { +public: + class Guard final { + public: + explicit Guard(QueueSpinLock& host) : host(host) { host.Acquire(this); } + ~Guard() { + if (is_owner) Release(); + } + + void Release() { + host.Release(this); + is_owner.store(false); + } + + void SetOwner() { is_owner.store(true); } + + void SetNext(Guard* guard) { next.store(guard); } + + bool IsOwner() const { return is_owner.load(); } + + bool HasNext() const { return next.load() != nullptr; } + + void SetNextOwner() { next.load()->SetOwner(); } + + private: + QueueSpinLock& host; + std::atomic next{}; + std::atomic is_owner{}; + }; + +private: + void Acquire(Guard* guard) { + auto ancestor = tail_.exchange(guard); + if (ancestor == nullptr) { + guard->SetOwner(); + return; + } + + ancestor->SetNext(guard); + while (!guard->IsOwner()) { + } + } + + void Release(Guard* guard) { + if (guard->HasNext()) { + guard->SetNextOwner(); + return; + } + + Guard* old_guard = guard; + while (!tail_.compare_exchange_weak(old_guard, nullptr)) { + if (guard->HasNext()) { + guard->SetNextOwner(); + return; + } + old_guard = guard; + } + } + + std::atomic tail_{}; +}; + +} // namespace NSync diff --git a/src/fiber/awaiter/mutex_awaiter.h b/src/fiber/awaiter/mutex_awaiter.h index 05e9779..73b648e 100644 --- a/src/fiber/awaiter/mutex_awaiter.h +++ b/src/fiber/awaiter/mutex_awaiter.h @@ -8,6 +8,7 @@ #include #include +#include #include namespace NFibers { @@ -16,25 +17,26 @@ template class AsyncMutexWaiter : public IAwaiter, public NComponents::Node> { public: - using Guard = std::unique_lock; + using Guard = NSync::QueueSpinLock::Guard; - AsyncMutexWaiter(M* mutex, Guard guard) - : mutex(mutex), guard(std::move(guard)){}; + AsyncMutexWaiter(M* async_mutex, Guard& guard) + : async_mutex(async_mutex), guard(guard){}; void AwaitSuspend(StoppedFiber handle) override { assert(handle.IsValid()); stopped_handle = handle; - mutex->Park(this); - guard.release()->unlock(); + async_mutex->Park(this); + + guard.Release(); } void Schedule() { stopped_handle.Schedule(); } private: - M* mutex; + M* async_mutex; StoppedFiber stopped_handle; - Guard guard; + Guard& guard; }; } // namespace NFibers diff --git a/src/fiber/sync/async_mutex.cpp b/src/fiber/sync/async_mutex.cpp index 69d98ff..d72a825 100644 --- a/src/fiber/sync/async_mutex.cpp +++ b/src/fiber/sync/async_mutex.cpp @@ -11,7 +11,7 @@ namespace NFibers { void AsyncMutex::Lock() { Waiter::Guard guard(spinlock_); if (locked_) { - Waiter waiter(this, std::move(guard)); + Waiter waiter(this, guard); Suspend(&waiter); } else { locked_ = true; diff --git a/src/fiber/sync/async_mutex.h b/src/fiber/sync/async_mutex.h index a796239..41aca9e 100644 --- a/src/fiber/sync/async_mutex.h +++ b/src/fiber/sync/async_mutex.h @@ -8,12 +8,13 @@ #include #include +#include #include namespace NFibers { class AsyncMutex { - using Spinlock = NSync::SpinLock; + using Spinlock = NSync::QueueSpinLock; using Waiter = AsyncMutexWaiter; friend Waiter; diff --git a/test/components/meson.build b/test/components/meson.build index 93f1b2a..91024ab 100644 --- a/test/components/meson.build +++ b/test/components/meson.build @@ -5,6 +5,7 @@ test_source_files = [ 'test_intrusive_list.cpp', 'test_ms_queue.cpp', 'test_async_mutex.cpp', + 'test_queue_spinlock.cpp', ] cpp = meson.get_compiler('cpp') diff --git a/test/components/test_queue_spinlock.cpp b/test/components/test_queue_spinlock.cpp new file mode 100644 index 0000000..5a58a7b --- /dev/null +++ b/test/components/test_queue_spinlock.cpp @@ -0,0 +1,58 @@ +// +// Created by konstantin on 28.07.24. +// + +#include "gtest/gtest.h" + +#include + +#include + +TEST(TestQueueSpinlock, LockUnlock) { + NSync::QueueSpinLock spinlock; + + { + NSync::QueueSpinLock::Guard guard(spinlock); // <-- Acquired + // Critical section + } // <-- Released +} + +TEST(TestQueueSpinlock, SequentialLockUnlock) { + NSync::QueueSpinLock spinlock; + + { + NSync::QueueSpinLock::Guard guard(spinlock); + // Critical section + } + + { + NSync::QueueSpinLock::Guard guard(spinlock); + // Critical section + } +} + +TEST(TestQueueSpinlock, ConcurrentIncrements) { + NSync::QueueSpinLock spinlock; + size_t counter = 0; + + const size_t kIncrementsPerThread = 1000; + + auto contender = [&] { + for (size_t i = 0; i < kIncrementsPerThread; ++i) { + NSync::QueueSpinLock::Guard guard(spinlock); + + size_t current = counter; + std::this_thread::yield(); + counter = current + 1; + } + }; + + std::thread t1(contender); + std::thread t2(contender); + t1.join(); + t2.join(); + + std::cout << "Shared counter value: " << counter << std::endl; + + ASSERT_EQ(counter, 2 * kIncrementsPerThread); +} \ No newline at end of file