diff --git a/benchmarks/vendor/tasking-2.0 b/benchmarks/vendor/tasking-2.0 index 303ce3e..0df848f 160000 --- a/benchmarks/vendor/tasking-2.0 +++ b/benchmarks/vendor/tasking-2.0 @@ -1 +1 @@ -Subproject commit 303ce3e09a7c118241b9d5482219e3e3b848212e +Subproject commit 0df848f56a21b93afd5ea36754732e9ff05c8a70 diff --git a/weave/channels/event_notifiers.nim b/weave/channels/event_notifiers.nim index fa6432a..4abee1d 100644 --- a/weave/channels/event_notifiers.nim +++ b/weave/channels/event_notifiers.nim @@ -48,6 +48,18 @@ import ../config type + ConsumerState = enum + # We need 4 states to solve a race condition, see (https://github.com/mratsim/weave/issues/27) + # A child signals its parent that it goes to sleep via a steal request. + # Its parent tries to wake it up but the child is not sleeping + # The child goes to sleep and system is deadlocked. + # Instead, before signaling via the channel it should also notify + # its intent to sleep, and then commit/rollback its intent once it has sent its message. + Busy + IntendToSleep + Parked + ShouldWakeup + EventNotifier* = object ## Multi Producers, Single Consumer event notification ## This is wait-free for producers and avoid spending time @@ -61,46 +73,61 @@ type ## See also: binary semaphores, eventcount ## On Windows: ManuallyResetEvent and AutoResetEvent cond{.align: WV_CacheLinePadding.}: Cond - lock: Lock - waiting: Atomic[bool] + lock: Lock # The lock is never used, it's just there for the condition variable + consumerState: Atomic[ConsumerState] func initialize*(en: var EventNotifier) = en.cond.initCond() en.lock.initLock() - en.waiting.store(false, moRelaxed) + en.consumerState.store(Busy, moRelaxed) func `=destroy`*(en: var EventNotifier) = en.cond.deinitCond() en.lock.deinitLock() +func intendToSleep*(en: var EventNotifier) {.inline.} = + ## The consumer intends to sleep soon. + ## This must be called before the formal notification + ## via a channel. + assert en.consumerState.load(moRelaxed) == Busy + + fence(moRelease) + en.consumerState.store(IntendToSleep, moRelaxed) + func wait*(en: var EventNotifier) {.inline.} = ## Wait until we are signaled of an event ## Thread is parked and does not consume CPU resources - assert not en.waiting.load(moRelaxed) - fence(moRelease) - en.waiting.store(true, moRelaxed) + var expected = IntendToSleep + if compareExchange(en.consumerState, expected, Parked, moAcquireRelease): + while en.consumerState.load(moRelaxed) == Parked: + # We only used the lock for the condition variable, we protect via atomics otherwise + fence(moAcquire) + en.cond.wait(en.lock) - en.lock.acquire() - while en.waiting.load(moRelaxed): - fence(moAcquire) - en.cond.wait(en.lock) - en.lock.release() - - assert not en.waiting.load(moRelaxed) + # If we failed to sleep or just woke up + # we return to the busy state + fence(moRelease) + en.consumerState.store(Busy, moRelaxed) -# TODO there is a deadlock at the moment as a worker sending a -# WAITING steal request and then actually waiting is not atomic -# see https://github.com/mratsim/weave/issues/27 -# and https://github.com/mratsim/weave/pull/28 func notify*(en: var EventNotifier) {.inline.} = ## Signal a thread that it can be unparked # No thread waiting, return - if not en.waiting.load(moRelaxed): + let consumerState = en.consumerState.load(moRelaxed) + if consumerState in {Busy, ShouldWakeup}: fence(moAcquire) return fence(moRelease) - en.waiting.store(false, moRelaxed) - en.cond.signal() + en.consumerState.store(ShouldWakeup, moRelaxed) + while true: + # We might signal "ShouldWakeUp" after the consumer check + # and just before it waits so we need to loop the signal + # until it's sure the consumer is back to busy. + fence(moAcquire) + en.cond.signal() + if en.consumerState.load(moAcquire) != Busy: + cpuRelax() + else: + break diff --git a/weave/config.nim b/weave/config.nim index e4bc877..e0060b9 100644 --- a/weave/config.nim +++ b/weave/config.nim @@ -50,6 +50,11 @@ const WV_StealEarly* {.intdefine.} = 0 ## steal requests in advance. This might help hide stealing latencies ## or worsen message overhead. +const WV_EnableBackoff* {.booldefine.} = true + ## Workers that fail to find work will sleep. This saves CPU at the price + ## of slight latency as the workers' parent nodes need to manage their + ## steal requests queues when they sleep and there is latency to wake up. + type StealKind* {.pure.}= enum one @@ -95,6 +100,10 @@ template EagerFV*(body: untyped): untyped = when not defined(WV_LazyFlowvar): body +template Backoff*(body: untyped): untyped = + when WV_EnableBackoff: + body + # Dynamic defines # ---------------------------------------------------------------------------------- diff --git a/weave/contexts.nim b/weave/contexts.nim index 58093cd..b4e4ed6 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -7,13 +7,16 @@ import ./datatypes/[context_global, context_thread_local, sync_types, prell_deques, binary_worker_trees], - ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, event_notifiers], + ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch], ./memory/[persistacks, lookaside_lists, memory_pools, allocs], ./config, system/ansi_c, ./instrumentation/[profilers, loggers, contracts], ./primitives/barriers +Backoff: + import ./channels/event_notifiers + # Contexts # ---------------------------------------------------------------------------------- @@ -76,20 +79,17 @@ template myMetrics*: untyped = metrics: localCtx.counters -# Backoff is deactivated, see -# - https://github.com/mratsim/weave/issues/27 -# - https://github.com/mratsim/weave/pull/28 -# -# template myParking*: EventNotifier = -# globalCtx.com.parking[localCtx.worker.ID] -# -# template wakeup*(target: WorkerID) = -# mixin notify -# debugTermination: -# log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target) -# globalCtx.com.parking[target].notify() -# -# export event_notifiers.wait +Backoff: + template myParking*: EventNotifier = + globalCtx.com.parking[localCtx.worker.ID] + + template wakeup*(target: WorkerID) = + mixin notify + debugTermination: + log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target) + globalCtx.com.parking[target].notify() + + export event_notifiers.wait, event_notifiers.intendToSleep, event_notifiers.initialize, event_notifiers.EventNotifier # Task caching # ---------------------------------------------------------------------------------- diff --git a/weave/datatypes/binary_worker_trees.nim b/weave/datatypes/binary_worker_trees.nim index fffc7ea..1728472 100644 --- a/weave/datatypes/binary_worker_trees.nim +++ b/weave/datatypes/binary_worker_trees.nim @@ -139,7 +139,7 @@ iterator traverseBreadthFirst*(start, maxID: WorkerID): WorkerID = ## - O(1) space, O(n) operations ## - traversal can start from any subtrees - assert start in 0 .. maxID + preCondition: start in 0 .. maxID var levelStart = start # Index of the node starting the current depth diff --git a/weave/datatypes/context_global.nim b/weave/datatypes/context_global.nim index fe92cdd..266395e 100644 --- a/weave/datatypes/context_global.nim +++ b/weave/datatypes/context_global.nim @@ -8,12 +8,14 @@ import ../channels/channels_mpsc_unbounded_batch, ../channels/channels_spsc_single_ptr, - ../channels/event_notifiers, ../memory/[persistacks, memory_pools], ../config, ../primitives/barriers, ./sync_types, ./binary_worker_trees +when WV_EnableBackoff: + import ../channels/event_notifiers + # Global / inter-thread communication channels # ---------------------------------------------------------------------------------- @@ -36,10 +38,8 @@ type # Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker" thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]] tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]] - # parking*: ptr UncheckedArray[EventNotifier] - # Backoff is deactivated, see - # - https://github.com/mratsim/weave/issues/27 - # - https://github.com/mratsim/weave/pull/28 + when static(WV_EnableBackoff): + parking*: ptr UncheckedArray[EventNotifier] GlobalContext* = object com*: ComChannels diff --git a/weave/runtime.nim b/weave/runtime.nim index d0e6a5b..5fa1c33 100644 --- a/weave/runtime.nim +++ b/weave/runtime.nim @@ -12,7 +12,7 @@ import ./instrumentation/[contracts, profilers, loggers], ./contexts, ./config, ./datatypes/[sync_types, prell_deques, binary_worker_trees], - ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, event_notifiers], + ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch], ./memory/[persistacks, lookaside_lists, allocs, memory_pools], ./scheduler, ./signals, ./workers, ./thieves, ./victims, # Low-level primitives @@ -40,7 +40,8 @@ proc init*(_: type Weave) = globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce()) globalCtx.com.thefts = wv_alloc(ChannelMpscUnboundedBatch[StealRequest], workforce()) globalCtx.com.tasks = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce()) - # globalCtx.com.parking = wv_alloc(EventNotifier, workforce()) + Backoff: + globalCtx.com.parking = wv_alloc(EventNotifier, workforce()) discard pthread_barrier_init(globalCtx.barrier, nil, workforce()) # Lead thread - pinned to CPU 0 @@ -216,9 +217,9 @@ proc loadBalance*(_: type Weave) {.gcsafe.} = shareWork() # Check also channels on behalf of the children workers that are managed. - if myThieves().peek() != 0: - # myWorker().leftIsWaiting and hasThievesProxy(myWorker().left) or - # myWorker().rightIsWaiting and hasThievesProxy(myWorker().right): + if myThieves().peek() != 0 or + myWorker().leftIsWaiting and hasThievesProxy(myWorker().left) or + myWorker().rightIsWaiting and hasThievesProxy(myWorker().right): var req: StealRequest while recv(req): dispatchTasks(req) diff --git a/weave/scheduler.nim b/weave/scheduler.nim index 22e75b0..3e91ce8 100644 --- a/weave/scheduler.nim +++ b/weave/scheduler.nim @@ -9,7 +9,7 @@ import ./instrumentation/[contracts, profilers, loggers], ./primitives/barriers, ./datatypes/[sync_types, prell_deques, context_thread_local, flowvars, sparsesets, binary_worker_trees, bounded_queues], - ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, channels_spsc_single, event_notifiers], + ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, channels_spsc_single], ./memory/[persistacks, lookaside_lists, allocs, memory_pools], ./contexts, ./config, ./victims, ./loop_splitting, @@ -84,7 +84,8 @@ proc init*(ctx: var TLContext) {.gcsafe.} = myWorker().deque.initialize() myWorker().workSharingRequests.initialize() - # myParking().initialize() - Backoff deactivated + Backoff: + myParking().initialize() myTodoBoxes().initialize() for i in 0 ..< myTodoBoxes().len: @@ -232,7 +233,8 @@ proc threadLocalCleanup*() {.gcsafe.} = ascertain: myTodoBoxes().access(i).isEmpty() localCtx.stealCache.access(i).victims.delete() myTodoBoxes().delete() - # `=destroy`(myParking()) # Backoff deactivated + Backoff: + `=destroy`(myParking()) # The task cache is full of tasks delete(localCtx.taskCache) diff --git a/weave/signals.nim b/weave/signals.nim index eb5379e..5e08d20 100644 --- a/weave/signals.nim +++ b/weave/signals.nim @@ -49,11 +49,12 @@ proc signalTerminate*(_: pointer) {.gcsafe.} = if myWorker().left != Not_a_worker: # Send the terminate signal asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().left].access(0)) - # Wake the worker up so that it can process the terminate signal - # wakeup(myWorker().left) - Backoff deactivated + Backoff: # Wake the worker up so that it can process the terminate signal + wakeup(myWorker().left) if myWorker().right != Not_a_worker: asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().right].access(0)) - # wakeup(myWorker().right) - Backoff deactivated + Backoff: + wakeup(myWorker().right) Worker: # When processing this signal for our queue, it was counted diff --git a/weave/thieves.nim b/weave/thieves.nim index 8ea64e4..3454090 100644 --- a/weave/thieves.nim +++ b/weave/thieves.nim @@ -9,7 +9,7 @@ import ./datatypes/[sparsesets, sync_types, context_thread_local, binary_worker_trees], ./contexts, ./targets, ./instrumentation/[contracts, profilers, loggers], - ./channels/[channels_mpsc_unbounded_batch, event_notifiers], + ./channels/channels_mpsc_unbounded_batch, ./memory/persistacks, ./config, ./signals, std/atomics @@ -195,7 +195,10 @@ proc lastStealAttemptFailure*(req: sink StealRequest) = req.state = Waiting debugTermination: log("Worker %2d: sends state passively WAITING to its parent worker %d\n", myID(), myWorker().parent) + Backoff: + myParking().intendToSleep() sendShare(req) ascertain: not myWorker().isWaiting myWorker().isWaiting = true - # myParking().wait() # Thread is blocked here until woken up. + Backoff: # Thread is blocked here until woken up. + myParking().wait() diff --git a/weave/victims.nim b/weave/victims.nim index d1d3c06..e918390 100644 --- a/weave/victims.nim +++ b/weave/victims.nim @@ -28,19 +28,20 @@ proc hasThievesProxy*(worker: WorkerID): bool = return true return false -proc recvProxy(req: var StealRequest, worker: WorkerID): bool = - ## Receives steal requests on behalf of child workers - ## Note that on task reception, children are waken up - ## and tasks are sent to them before thieves so this should happen rarely - if worker == Not_a_worker: - return false +Backoff: + proc recvProxy(req: var StealRequest, worker: WorkerID): bool = + ## Receives steal requests on behalf of child workers + ## Note that on task reception, children are waken up + ## and tasks are sent to them before thieves so this should happen rarely + if worker == Not_a_worker: + return false - profile(send_recv_req): - for w in traverseBreadthFirst(worker, maxID()): - result = getThievesOf(w).tryRecv(req) - if result: - return true - return false + profile(send_recv_req): + for w in traverseBreadthFirst(worker, maxID()): + result = getThievesOf(w).tryRecv(req) + if result: + return true + return false # Victims - Adaptative task splitting # ---------------------------------------------------------------------------------- @@ -53,14 +54,16 @@ proc approxNumThieves(): int32 {.inline.} = # - We already read 1 steal request before trying to split so need to add it back. # - Workers may send steal requests before actually running out-of-work result = 1 + myThieves().peek() - debug: log("Worker %2d: has %ld steal requests\n", myID(), approxNumThieves) + debug: log("Worker %2d: has %ld steal requests\n", myID(), result) -proc approxNumThievesProxy(worker: WorkerID): int32 = - # Estimate the number of idle workers of a worker subtree - result = 0 - for w in traverseBreadthFirst(worker, maxID()): - result += getThievesOf(w).peek() - debug: log("Worker %2d: found %ld steal requests addressed to its child %d and grandchildren\n", myID(), result, worker) +Backoff: + proc approxNumThievesProxy(worker: WorkerID): int32 = + # Estimate the number of idle workers of a worker subtree + if worker == Not_a_worker: return 0 + result = 0 + for w in traverseBreadthFirst(worker, maxID()): + result += getThievesOf(w).peek() + debug: log("Worker %2d: found %ld steal requests addressed to its child %d and grandchildren\n", myID(), result, worker) # Victims - Steal requests handling # ---------------------------------------------------------------------------------- @@ -108,13 +111,14 @@ proc recv*(req: var StealRequest): bool {.inline.} = # Check the next steal request result = myThieves().tryRecv(req) - # # When a child thread backs off, it is parked by the OS - # # We need to handle steal requests on its behalf to avoid latency - # if not result and myWorker().leftIsWaiting: - # result = recvProxy(req, myWorker().left) + Backoff: + # When a child thread backs off, it is parked by the OS + # We need to handle steal requests on its behalf to avoid latency + if not result and myWorker().leftIsWaiting: + result = recvProxy(req, myWorker().left) - # if not result and myWorker().rightIsWaiting: - # result = recvProxy(req, myWorker().right) + if not result and myWorker().rightIsWaiting: + result = recvProxy(req, myWorker().right) postCondition: not result or (result and req.state != Waiting) @@ -254,10 +258,11 @@ proc splitAndSend*(task: Task, req: sink StealRequest) = # Split iteration range according to given strategy # [start, stop) => [start, split) + [split, end) var guessThieves = approxNumThieves() - # if myWorker().leftIsWaiting: - # guessThieves += approxNumThievesProxy(myWorker().left) - # if myWorker().rightIsWaiting: - # guessThieves += approxNumThievesProxy(myWorker().right) + Backoff: + if myWorker().leftIsWaiting: + guessThieves += approxNumThievesProxy(myWorker().left) + if myWorker().rightIsWaiting: + guessThieves += approxNumThievesProxy(myWorker().right) let split = split(task, guessThieves) # New task gets the upper half @@ -339,7 +344,8 @@ proc shareWork*() {.inline.} = else: ascertain: myWorker().rightIsWaiting myWorker().rightIsWaiting = false - # wakeup(req.thiefID) - backoff is deactivated + Backoff: + wakeup(req.thiefID) # Now we can dequeue as we found work # We cannot access the steal request anymore or