From 5fe5d5fd07801a1581aac757ba687b9274124220 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Thu, 12 Dec 2024 12:17:10 -0300 Subject: [PATCH 1/4] ReentrantLock: wakeup a single task on unlock and add a short spin --- base/lock.jl | 172 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 147 insertions(+), 25 deletions(-) diff --git a/base/lock.jl b/base/lock.jl index a44cd4c0d63cf..0b8b2d05c8894 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -9,6 +9,28 @@ Get the currently running [`Task`](@ref). """ current_task() = ccall(:jl_get_current_task, Ref{Task}, ()) +# This bit is set in the `havelock` of a `ReentrantLock` when that lock is locked by some task. +const LOCKED_BIT = 0b01 +# This bit is set in the `havelock` of a `ReentrantLock` just before parking a task. A task is being +# parked if it wants to lock the lock, but it is currently being held by some other task. +const PARKED_BIT = 0b10 + +const MAX_SPINS = 10 + +# Spins without yielding the thread to the OS. +# +# Instead, the backoff is simply capped at a maximum value. This can be +# used to improve throughput in `compare_exchange` loops that have high +# contention. +@inline function spin(iteration::Int) + next = iteration >= MAX_SPINS ? MAX_SPINS : iteration + 1 + for _ in 1:(1 << next) + ccall(:jl_cpu_pause, Cvoid, ()) + end + GC.safepoint() + return next +end + # Advisory reentrant lock """ ReentrantLock() @@ -43,7 +65,28 @@ mutable struct ReentrantLock <: AbstractLock # offset32 = 20, offset64 = 24 reentrancy_cnt::UInt32 # offset32 = 24, offset64 = 28 - @atomic havelock::UInt8 # 0x0 = none, 0x1 = lock, 0x2 = conflict + # + # This atomic integer holds the current state of the lock instance. Only the two lowest bits + # are used. See `LOCKED_BIT` and `PARKED_BIT` for the bitmask for these bits. + # + # # State table: + # + # PARKED_BIT | LOCKED_BIT | Description + # 0 | 0 | The lock is not locked, nor is anyone waiting for it. + # -----------+------------+------------------------------------------------------------------ + # 0 | 1 | The lock is locked by exactly one task. No other task is + # | | waiting for it. + # -----------+------------+------------------------------------------------------------------ + # 1 | 0 | The lock is not locked. One or more tasks are parked. + # -----------+------------+------------------------------------------------------------------ + # 1 | 1 | The lock is locked by exactly one task. One or more tasks are + # | | parked waiting for the lock to become available. + # | | In this state, PARKED_BIT is only ever cleared when the cond_wait lock + # | | is held (i.e. on unlock). This ensures that + # | | we never end up in a situation where there are parked tasks but + # | | PARKED_BIT is not set (which would result in those tasks + # | | potentially never getting woken up). + @atomic havelock::UInt8 # offset32 = 28, offset64 = 32 cond_wait::ThreadSynchronizer # 2 words # offset32 = 36, offset64 = 48 @@ -112,7 +155,7 @@ function islocked end # `ReentrantLock`. function islocked(rl::ReentrantLock) - return (@atomic :monotonic rl.havelock) != 0 + return (@atomic :monotonic rl.havelock) & LOCKED_BIT != 0 end """ @@ -136,23 +179,29 @@ function trylock end @inline function trylock(rl::ReentrantLock) ct = current_task() if rl.locked_by === ct - #@assert rl.havelock !== 0x00 rl.reentrancy_cnt += 0x0000_0001 return true end return _trylock(rl, ct) end @noinline function _trylock(rl::ReentrantLock, ct::Task) - GC.disable_finalizers() - if (@atomicreplace :acquire rl.havelock 0x00 => 0x01).success - #@assert rl.locked_by === nothing - #@assert rl.reentrancy_cnt === 0 - rl.reentrancy_cnt = 0x0000_0001 - @atomic :release rl.locked_by = ct - return true + state = @atomic :monotonic rl.havelock + while true + if state & LOCKED_BIT != 0 + return false + end + + GC.disable_finalizers() + result = (@atomicreplace :acquire :monotonic rl.havelock state => state | LOCKED_BIT) + if result.success + rl.reentrancy_cnt = 0x0000_0001 + @atomic :release rl.locked_by = ct + return true + else + state = result.old + end + GC.enable_finalizers() end - GC.enable_finalizers() - return false end """ @@ -168,23 +217,76 @@ Each `lock` must be matched by an [`unlock`](@ref). trylock(rl) || (@noinline function slowlock(rl::ReentrantLock) Threads.lock_profiling() && Threads.inc_lock_conflict_count() c = rl.cond_wait - lock(c.lock) - try - while true - if (@atomicreplace rl.havelock 0x01 => 0x02).old == 0x00 # :sequentially_consistent ? # now either 0x00 or 0x02 - # it was unlocked, so try to lock it ourself - _trylock(rl, current_task()) && break - else # it was locked, so now wait for the release to notify us - wait(c) + ct = current_task() + iteration = 0 + state = @atomic :monotonic rl.havelock + while true + # Grab the lock if it isn't locked, even if there is a queue on it + if state & LOCKED_BIT == 0 + GC.disable_finalizers() + result = (@atomicreplace :acquire :monotonic rl.havelock state => (state | LOCKED_BIT)) + if result.success + rl.reentrancy_cnt = 0x0000_0001 + @atomic :release rl.locked_by = ct + return + else + state = result.old end + GC.enable_finalizers() + continue end - finally - unlock(c.lock) + + # If there is no queue, try spinning a few times + iteration = spin(iteration) + if state & PARKED_BIT == 0 && iteration < MAX_SPINS + state = @atomic :monotonic rl.havelock + continue + end + + # If still not locked, set the parked bit + if state & PARKED_BIT == 0 + result = (@atomicreplace :monotonic :monotonic rl.havelock state => (state | PARKED_BIT)) + if !result.success + state = result.old + continue + end + end + + # With the parked bit set, lock the `cond_wait` + lock(c.lock) + + # Last check before we wait to make sure `unlock` did not win the race + # to the `cond_wait` lock and cleared the parked bit + state = @atomic :acquire rl.havelock + if state != LOCKED_BIT | PARKED_BIT + unlock(c.lock) + continue + end + + # It was locked, so now wait for the unlock to notify us + wait_no_relock(c) + + # Loop back and try locking again + iteration = 0 + state = @atomic :monotonic rl.havelock end end)(rl) return end +function wait_no_relock(c::GenericCondition) + ct = current_task() + _wait2(c, ct) + token = unlockall(c.lock) + try + return wait() + catch + ct.queue === nothing || list_deletefirst!(ct.queue, ct) + rethrow() + end +end + + """ unlock(lock) @@ -201,18 +303,38 @@ internal counter and return immediately. rl.reentrancy_cnt = n if n == 0x0000_00000 @atomic :monotonic rl.locked_by = nothing - if (@atomicswap :release rl.havelock = 0x00) == 0x02 + result = (@atomicreplace :release :monotonic rl.havelock LOCKED_BIT => 0x00) + if result.success + return true + else (@noinline function notifywaiters(rl) cond_wait = rl.cond_wait lock(cond_wait) try - notify(cond_wait) + + tasks_notified = notify(cond_wait, all=false) + if tasks_notified == 0 + # Either: + # - We won the race to the `cond_wait` lock as a task was about to park + # - We are the last task on the queue + # + # Unlock anyway as any parking task will retry + + @atomic :release rl.havelock = 0x00 + elseif isempty(cond_wait.waitq) + # We notified the last task, unlock and unset the parked bit + @atomic :release rl.havelock = 0x00 + else + # There are more tasks on the queue, we unlock and keep the parked bit set + @atomic :release rl.havelock = PARKED_BIT + notify(cond_wait, all=false) + end finally unlock(cond_wait) end end)(rl) + return true end - return true end return false end)(rl) && GC.enable_finalizers() From 4dc6406e642b7646c078c6ad72c01e853f89273f Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Thu, 12 Dec 2024 16:21:06 -0300 Subject: [PATCH 2/4] Revert _trylock to the simple case to recover uncontended throughput --- base/lock.jl | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/base/lock.jl b/base/lock.jl index 0b8b2d05c8894..786f9188e6090 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -185,23 +185,14 @@ function trylock end return _trylock(rl, ct) end @noinline function _trylock(rl::ReentrantLock, ct::Task) - state = @atomic :monotonic rl.havelock - while true - if state & LOCKED_BIT != 0 - return false - end - - GC.disable_finalizers() - result = (@atomicreplace :acquire :monotonic rl.havelock state => state | LOCKED_BIT) - if result.success - rl.reentrancy_cnt = 0x0000_0001 - @atomic :release rl.locked_by = ct - return true - else - state = result.old - end - GC.enable_finalizers() + GC.disable_finalizers() + if (@atomicreplace :acquire rl.havelock 0x0 => LOCKED_BIT).success + rl.reentrancy_cnt = 0x0000_0001 + @atomic :release rl.locked_by = ct + return true end + GC.enable_finalizers() + return false end """ From fa6832dd4e90ad44e793ae85e00fa4c9cbab0683 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Tue, 24 Dec 2024 10:21:00 -0300 Subject: [PATCH 3/4] Make ReentrantLock fair to non-locking tasks --- base/lock.jl | 79 ++++++++++++++++------------------------------------ 1 file changed, 24 insertions(+), 55 deletions(-) diff --git a/base/lock.jl b/base/lock.jl index 786f9188e6090..59e554c01c24a 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -15,21 +15,7 @@ const LOCKED_BIT = 0b01 # parked if it wants to lock the lock, but it is currently being held by some other task. const PARKED_BIT = 0b10 -const MAX_SPINS = 10 - -# Spins without yielding the thread to the OS. -# -# Instead, the backoff is simply capped at a maximum value. This can be -# used to improve throughput in `compare_exchange` loops that have high -# contention. -@inline function spin(iteration::Int) - next = iteration >= MAX_SPINS ? MAX_SPINS : iteration + 1 - for _ in 1:(1 << next) - ccall(:jl_cpu_pause, Cvoid, ()) - end - GC.safepoint() - return next -end +const MAX_SPIN_ITERS = 40 # Advisory reentrant lock """ @@ -186,7 +172,8 @@ function trylock end end @noinline function _trylock(rl::ReentrantLock, ct::Task) GC.disable_finalizers() - if (@atomicreplace :acquire rl.havelock 0x0 => LOCKED_BIT).success + state = (@atomic :monotonic rl.havelock) & PARKED_BIT + if (@atomicreplace :acquire rl.havelock state => (state | LOCKED_BIT)).success rl.reentrancy_cnt = 0x0000_0001 @atomic :release rl.locked_by = ct return true @@ -209,9 +196,9 @@ Each `lock` must be matched by an [`unlock`](@ref). Threads.lock_profiling() && Threads.inc_lock_conflict_count() c = rl.cond_wait ct = current_task() - iteration = 0 - state = @atomic :monotonic rl.havelock + iteration = 1 while true + state = @atomic :monotonic rl.havelock # Grab the lock if it isn't locked, even if there is a queue on it if state & LOCKED_BIT == 0 GC.disable_finalizers() @@ -220,30 +207,24 @@ Each `lock` must be matched by an [`unlock`](@ref). rl.reentrancy_cnt = 0x0000_0001 @atomic :release rl.locked_by = ct return - else - state = result.old end GC.enable_finalizers() continue end - # If there is no queue, try spinning a few times - iteration = spin(iteration) - if state & PARKED_BIT == 0 && iteration < MAX_SPINS - state = @atomic :monotonic rl.havelock - continue - end - - # If still not locked, set the parked bit if state & PARKED_BIT == 0 - result = (@atomicreplace :monotonic :monotonic rl.havelock state => (state | PARKED_BIT)) - if !result.success - state = result.old + # If there is no queue, try spinning a few times + if iteration <= MAX_SPIN_ITERS + Base.yield() + iteration += 1 continue end + + # If still not locked, try setting the parked bit + @atomicreplace :monotonic :monotonic rl.havelock state => (state | PARKED_BIT) end - # With the parked bit set, lock the `cond_wait` + # lock the `cond_wait` lock(c.lock) # Last check before we wait to make sure `unlock` did not win the race @@ -258,8 +239,7 @@ Each `lock` must be matched by an [`unlock`](@ref). wait_no_relock(c) # Loop back and try locking again - iteration = 0 - state = @atomic :monotonic rl.havelock + iteration = 1 end end)(rl) return @@ -301,28 +281,17 @@ internal counter and return immediately. (@noinline function notifywaiters(rl) cond_wait = rl.cond_wait lock(cond_wait) - try - - tasks_notified = notify(cond_wait, all=false) - if tasks_notified == 0 - # Either: - # - We won the race to the `cond_wait` lock as a task was about to park - # - We are the last task on the queue - # - # Unlock anyway as any parking task will retry - - @atomic :release rl.havelock = 0x00 - elseif isempty(cond_wait.waitq) - # We notified the last task, unlock and unset the parked bit - @atomic :release rl.havelock = 0x00 - else - # There are more tasks on the queue, we unlock and keep the parked bit set - @atomic :release rl.havelock = PARKED_BIT - notify(cond_wait, all=false) - end - finally - unlock(cond_wait) + + notify(cond_wait, all=false) + if !isempty(cond_wait.waitq) + @atomic :release rl.havelock = PARKED_BIT + else + # We may have won the race to the `cond_wait` lock as a task was about to park + # but we unlock anyway as any parking task will retry + @atomic :release rl.havelock = 0x00 end + + unlock(cond_wait) end)(rl) return true end From 5db392f647dfc275a4b61ff34e724bdbe6e418a8 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Fri, 27 Dec 2024 13:48:50 -0300 Subject: [PATCH 4/4] Minor test fix --- test/threads.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/threads.jl b/test/threads.jl index 4d928ca05da16..179279dbab4e6 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -16,7 +16,8 @@ let lk = ReentrantLock() t2 = @async (notify(c2); trylock(lk)) wait(c1) wait(c2) - @test t1.queue === lk.cond_wait.waitq + # wait for the task to park in the queue (it may be spinning) + @test timedwait(() -> t1.queue === lk.cond_wait.waitq, 1.0) == :ok @test t2.queue !== lk.cond_wait.waitq @test istaskdone(t2) @test !fetch(t2)