Skip to content

Commit

Permalink
failed tentative to undeadlock the backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
mratsim committed Dec 11, 2019
1 parent 9a62e9c commit f5d8533
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 16 deletions.
79 changes: 63 additions & 16 deletions weave/channels/event_notifiers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ import
../config

type
# Need to workaround Enum not being Atomics, pending: https://github.com/nim-lang/Nim/issues/12812
# so we enforce its size and will cast before use
ConsumerState {.size: sizeof(uint8).}= enum
# We need 4 states to solve a race condition, see (https://github.com/mratsim/weave/issues/27)
# A child signals its parent that it goes to sleep via a steal request.
# Its parent tries to wake it up but the child is not sleeping
# The child goes to sleep and system is deadlocked.
# Instead, before signaling via the channel it should also notify
# its intent to sleep, and then commit/rollback its intent once it has sent its message.
Busy
IntendToSleep
Parked
ShouldWakeup

EventNotifier* = object
## Multi Producers, Single Consumer event notification
## This is wait-free for producers and avoid spending time
Expand All @@ -61,33 +75,56 @@ type
## See also: binary semaphores, eventcount
## On Windows: ManuallyResetEvent and AutoResetEvent
cond{.align: WV_CacheLinePadding.}: Cond
lock: Lock
waiting: Atomic[bool]
lock: Lock # The lock is never used, it's just there for the condition variable
consumerState: ConsumerState

template load(c: ConsumerState, mo: MemoryOrder): ConsumerState =
cast[ConsumerState](cast[var Atomic[uint8]](c.addr).load(mo))

template store(c: var ConsumerState, val: ConsumerState, mo: MemoryOrder) =
cast[var Atomic[uint8]](c.addr).store(cast[uint8](val), mo)

template compareExchange(dst, expected: var ConsumerState, target: ConsumerState, mo: MemoryOrder): bool =
compareExchange(
cast[var Atomic[uint8]](dst.addr),
cast[var uint8](expected.addr),
cast[uint8](target), mo
)

func initialize*(en: var EventNotifier) =
en.cond.initCond()
en.lock.initLock()
en.waiting.store(false, moRelaxed)
en.consumerState.store(Busy, moRelaxed)

func `=destroy`*(en: var EventNotifier) =
en.cond.deinitCond()
en.lock.deinitLock()

func intendToSleep*(en: var EventNotifier) {.inline.} =
## The consumer intends to sleep soon.
## This must be called before the formal notification
## via a channel.
assert en.consumerState.load(moRelaxed) == Busy

fence(moRelease)
en.consumerState.store(IntendToSleep, moRelaxed)


func wait*(en: var EventNotifier) {.inline.} =
## Wait until we are signaled of an event
## Thread is parked and does not consume CPU resources
assert not en.waiting.load(moRelaxed)

fence(moRelease)
en.waiting.store(true, moRelaxed)

en.lock.acquire()
while en.waiting.load(moRelaxed):
fence(moAcquire)
en.cond.wait(en.lock)
en.lock.release()
var expected = IntendToSleep
if compareExchange(en.consumerState, expected, Parked, moAcquireRelease):
while en.consumerState.load(moRelaxed) == Parked:
# We only used the lock for the condition variable, we protect via atomics otherwise
fence(moAcquire)
en.cond.wait(en.lock)

assert not en.waiting.load(moRelaxed)
# If we failed to sleep or just woke up
# we return to the busy state
fence(moRelease)
en.consumerState.store(Busy, moRelaxed)

# TODO there is a deadlock at the moment as a worker sending a
# WAITING steal request and then actually waiting is not atomic
Expand All @@ -97,10 +134,20 @@ func notify*(en: var EventNotifier) {.inline.} =
## Signal a thread that it can be unparked

# No thread waiting, return
if not en.waiting.load(moRelaxed):
let consumerState = en.consumerState.load(moRelaxed)
if consumerState in {Busy, ShouldWakeup}:
fence(moAcquire)
return

fence(moRelease)
en.waiting.store(false, moRelaxed)
en.cond.signal()
en.consumerState.store(ShouldWakeup, moRelaxed)
while true:
# We might signal "ShouldWakeUp" after the consumer check
# and just before it waits so we need to loop the signal
# until it's sure the consumer is back to busy.
fence(moAcquire)
en.cond.signal()
if en.consumerState.load(moAcquire) != Busy:
cpuRelax()
else:
break
1 change: 1 addition & 0 deletions weave/thieves.nim
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ proc lastStealAttemptFailure*(req: sink StealRequest) =
req.state = Waiting
debugTermination:
log("Worker %2d: sends state passively WAITING to its parent worker %d\n", myID(), myWorker().parent)
myParking().intendToSleep()
sendShare(req)
ascertain: not myWorker().isWaiting
myWorker().isWaiting = true
Expand Down

0 comments on commit f5d8533

Please sign in to comment.