Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Undeadlock the backoff #28

Merged
merged 4 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/vendor/tasking-2.0
67 changes: 47 additions & 20 deletions weave/channels/event_notifiers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ import
../config

type
ConsumerState = enum
# We need 4 states to solve a race condition, see (https://github.com/mratsim/weave/issues/27)
# A child signals its parent that it goes to sleep via a steal request.
# Its parent tries to wake it up but the child is not sleeping
# The child goes to sleep and system is deadlocked.
# Instead, before signaling via the channel it should also notify
# its intent to sleep, and then commit/rollback its intent once it has sent its message.
Busy
IntendToSleep
Parked
ShouldWakeup

EventNotifier* = object
## Multi Producers, Single Consumer event notification
## This is wait-free for producers and avoid spending time
Expand All @@ -61,46 +73,61 @@ type
## See also: binary semaphores, eventcount
## On Windows: ManuallyResetEvent and AutoResetEvent
cond{.align: WV_CacheLinePadding.}: Cond
lock: Lock
waiting: Atomic[bool]
lock: Lock # The lock is never used, it's just there for the condition variable
consumerState: Atomic[ConsumerState]

func initialize*(en: var EventNotifier) =
en.cond.initCond()
en.lock.initLock()
en.waiting.store(false, moRelaxed)
en.consumerState.store(Busy, moRelaxed)

func `=destroy`*(en: var EventNotifier) =
en.cond.deinitCond()
en.lock.deinitLock()

func intendToSleep*(en: var EventNotifier) {.inline.} =
## The consumer intends to sleep soon.
## This must be called before the formal notification
## via a channel.
assert en.consumerState.load(moRelaxed) == Busy

fence(moRelease)
en.consumerState.store(IntendToSleep, moRelaxed)

func wait*(en: var EventNotifier) {.inline.} =
## Wait until we are signaled of an event
## Thread is parked and does not consume CPU resources
assert not en.waiting.load(moRelaxed)

fence(moRelease)
en.waiting.store(true, moRelaxed)
var expected = IntendToSleep
if compareExchange(en.consumerState, expected, Parked, moAcquireRelease):
while en.consumerState.load(moRelaxed) == Parked:
# We only used the lock for the condition variable, we protect via atomics otherwise
fence(moAcquire)
en.cond.wait(en.lock)

en.lock.acquire()
while en.waiting.load(moRelaxed):
fence(moAcquire)
en.cond.wait(en.lock)
en.lock.release()

assert not en.waiting.load(moRelaxed)
# If we failed to sleep or just woke up
# we return to the busy state
fence(moRelease)
en.consumerState.store(Busy, moRelaxed)

# TODO there is a deadlock at the moment as a worker sending a
# WAITING steal request and then actually waiting is not atomic
# see https://github.com/mratsim/weave/issues/27
# and https://github.com/mratsim/weave/pull/28
func notify*(en: var EventNotifier) {.inline.} =
## Signal a thread that it can be unparked

# No thread waiting, return
if not en.waiting.load(moRelaxed):
let consumerState = en.consumerState.load(moRelaxed)
if consumerState in {Busy, ShouldWakeup}:
fence(moAcquire)
return

fence(moRelease)
en.waiting.store(false, moRelaxed)
en.cond.signal()
en.consumerState.store(ShouldWakeup, moRelaxed)
while true:
# We might signal "ShouldWakeUp" after the consumer check
# and just before it waits so we need to loop the signal
# until it's sure the consumer is back to busy.
fence(moAcquire)
en.cond.signal()
if en.consumerState.load(moAcquire) != Busy:
cpuRelax()
else:
break
9 changes: 9 additions & 0 deletions weave/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ const WV_StealEarly* {.intdefine.} = 0
## steal requests in advance. This might help hide stealing latencies
## or worsen message overhead.

const WV_EnableBackoff* {.booldefine.} = true
## Workers that fail to find work will sleep. This saves CPU at the price
## of slight latency as the workers' parent nodes need to manage their
## steal requests queues when they sleep and there is latency to wake up.

type
StealKind* {.pure.}= enum
one
Expand Down Expand Up @@ -95,6 +100,10 @@ template EagerFV*(body: untyped): untyped =
when not defined(WV_LazyFlowvar):
body

template Backoff*(body: untyped): untyped =
when WV_EnableBackoff:
body

# Dynamic defines
# ----------------------------------------------------------------------------------

Expand Down
30 changes: 15 additions & 15 deletions weave/contexts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@

import
./datatypes/[context_global, context_thread_local, sync_types, prell_deques, binary_worker_trees],
./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, event_notifiers],
./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch],
./memory/[persistacks, lookaside_lists, memory_pools, allocs],
./config,
system/ansi_c,
./instrumentation/[profilers, loggers, contracts],
./primitives/barriers

Backoff:
import ./channels/event_notifiers

# Contexts
# ----------------------------------------------------------------------------------

Expand Down Expand Up @@ -76,20 +79,17 @@ template myMetrics*: untyped =
metrics:
localCtx.counters

# Backoff is deactivated, see
# - https://github.com/mratsim/weave/issues/27
# - https://github.com/mratsim/weave/pull/28
#
# template myParking*: EventNotifier =
# globalCtx.com.parking[localCtx.worker.ID]
#
# template wakeup*(target: WorkerID) =
# mixin notify
# debugTermination:
# log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target)
# globalCtx.com.parking[target].notify()
#
# export event_notifiers.wait
Backoff:
template myParking*: EventNotifier =
globalCtx.com.parking[localCtx.worker.ID]

template wakeup*(target: WorkerID) =
mixin notify
debugTermination:
log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target)
globalCtx.com.parking[target].notify()

export event_notifiers.wait, event_notifiers.intendToSleep, event_notifiers.initialize, event_notifiers.EventNotifier

# Task caching
# ----------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion weave/datatypes/binary_worker_trees.nim
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ iterator traverseBreadthFirst*(start, maxID: WorkerID): WorkerID =
## - O(1) space, O(n) operations
## - traversal can start from any subtrees

assert start in 0 .. maxID
preCondition: start in 0 .. maxID

var
levelStart = start # Index of the node starting the current depth
Expand Down
10 changes: 5 additions & 5 deletions weave/datatypes/context_global.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import
../channels/channels_mpsc_unbounded_batch,
../channels/channels_spsc_single_ptr,
../channels/event_notifiers,
../memory/[persistacks, memory_pools],
../config,
../primitives/barriers,
./sync_types, ./binary_worker_trees

when WV_EnableBackoff:
import ../channels/event_notifiers

# Global / inter-thread communication channels
# ----------------------------------------------------------------------------------

Expand All @@ -36,10 +38,8 @@ type
# Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker"
thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]]
tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]]
# parking*: ptr UncheckedArray[EventNotifier]
# Backoff is deactivated, see
# - https://github.com/mratsim/weave/issues/27
# - https://github.com/mratsim/weave/pull/28
when static(WV_EnableBackoff):
parking*: ptr UncheckedArray[EventNotifier]

GlobalContext* = object
com*: ComChannels
Expand Down
11 changes: 6 additions & 5 deletions weave/runtime.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import
./instrumentation/[contracts, profilers, loggers],
./contexts, ./config,
./datatypes/[sync_types, prell_deques, binary_worker_trees],
./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, event_notifiers],
./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch],
./memory/[persistacks, lookaside_lists, allocs, memory_pools],
./scheduler, ./signals, ./workers, ./thieves, ./victims,
# Low-level primitives
Expand Down Expand Up @@ -40,7 +40,8 @@ proc init*(_: type Weave) =
globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce())
globalCtx.com.thefts = wv_alloc(ChannelMpscUnboundedBatch[StealRequest], workforce())
globalCtx.com.tasks = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce())
# globalCtx.com.parking = wv_alloc(EventNotifier, workforce())
Backoff:
globalCtx.com.parking = wv_alloc(EventNotifier, workforce())
discard pthread_barrier_init(globalCtx.barrier, nil, workforce())

# Lead thread - pinned to CPU 0
Expand Down Expand Up @@ -216,9 +217,9 @@ proc loadBalance*(_: type Weave) {.gcsafe.} =
shareWork()

# Check also channels on behalf of the children workers that are managed.
if myThieves().peek() != 0:
# myWorker().leftIsWaiting and hasThievesProxy(myWorker().left) or
# myWorker().rightIsWaiting and hasThievesProxy(myWorker().right):
if myThieves().peek() != 0 or
myWorker().leftIsWaiting and hasThievesProxy(myWorker().left) or
myWorker().rightIsWaiting and hasThievesProxy(myWorker().right):
var req: StealRequest
while recv(req):
dispatchTasks(req)
8 changes: 5 additions & 3 deletions weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
./instrumentation/[contracts, profilers, loggers],
./primitives/barriers,
./datatypes/[sync_types, prell_deques, context_thread_local, flowvars, sparsesets, binary_worker_trees, bounded_queues],
./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, channels_spsc_single, event_notifiers],
./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, channels_spsc_single],
./memory/[persistacks, lookaside_lists, allocs, memory_pools],
./contexts, ./config,
./victims, ./loop_splitting,
Expand Down Expand Up @@ -84,7 +84,8 @@ proc init*(ctx: var TLContext) {.gcsafe.} =
myWorker().deque.initialize()
myWorker().workSharingRequests.initialize()

# myParking().initialize() - Backoff deactivated
Backoff:
myParking().initialize()

myTodoBoxes().initialize()
for i in 0 ..< myTodoBoxes().len:
Expand Down Expand Up @@ -232,7 +233,8 @@ proc threadLocalCleanup*() {.gcsafe.} =
ascertain: myTodoBoxes().access(i).isEmpty()
localCtx.stealCache.access(i).victims.delete()
myTodoBoxes().delete()
# `=destroy`(myParking()) # Backoff deactivated
Backoff:
`=destroy`(myParking())

# The task cache is full of tasks
delete(localCtx.taskCache)
Expand Down
7 changes: 4 additions & 3 deletions weave/signals.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ proc signalTerminate*(_: pointer) {.gcsafe.} =
if myWorker().left != Not_a_worker:
# Send the terminate signal
asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().left].access(0))
# Wake the worker up so that it can process the terminate signal
# wakeup(myWorker().left) - Backoff deactivated
Backoff: # Wake the worker up so that it can process the terminate signal
wakeup(myWorker().left)
if myWorker().right != Not_a_worker:
asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().right].access(0))
# wakeup(myWorker().right) - Backoff deactivated
Backoff:
wakeup(myWorker().right)

Worker:
# When processing this signal for our queue, it was counted
Expand Down
7 changes: 5 additions & 2 deletions weave/thieves.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
./datatypes/[sparsesets, sync_types, context_thread_local, binary_worker_trees],
./contexts, ./targets,
./instrumentation/[contracts, profilers, loggers],
./channels/[channels_mpsc_unbounded_batch, event_notifiers],
./channels/channels_mpsc_unbounded_batch,
./memory/persistacks,
./config, ./signals,
std/atomics
Expand Down Expand Up @@ -195,7 +195,10 @@ proc lastStealAttemptFailure*(req: sink StealRequest) =
req.state = Waiting
debugTermination:
log("Worker %2d: sends state passively WAITING to its parent worker %d\n", myID(), myWorker().parent)
Backoff:
myParking().intendToSleep()
sendShare(req)
ascertain: not myWorker().isWaiting
myWorker().isWaiting = true
# myParking().wait() # Thread is blocked here until woken up.
Backoff: # Thread is blocked here until woken up.
myParking().wait()
Loading