From f5d8533c0e0fa3762a4762aee0b48299b18a1315 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mamy=20Andr=C3=A9-Ratsimbazafy?= Date: Thu, 5 Dec 2019 01:32:48 +0100 Subject: [PATCH 1/4] failed tentative to undeadlock the backoff --- weave/channels/event_notifiers.nim | 79 ++++++++++++++++++++++++------ weave/thieves.nim | 1 + 2 files changed, 64 insertions(+), 16 deletions(-) diff --git a/weave/channels/event_notifiers.nim b/weave/channels/event_notifiers.nim index fa6432a..a9c27be 100644 --- a/weave/channels/event_notifiers.nim +++ b/weave/channels/event_notifiers.nim @@ -48,6 +48,20 @@ import ../config type + # Need to workaround Enum not being Atomics, pending: https://github.com/nim-lang/Nim/issues/12812 + # so we enforce its size and will cast before use + ConsumerState {.size: sizeof(uint8).}= enum + # We need 4 states to solve a race condition, see (https://github.com/mratsim/weave/issues/27) + # A child signals its parent that it goes to sleep via a steal request. + # Its parent tries to wake it up but the child is not sleeping + # The child goes to sleep and system is deadlocked. + # Instead, before signaling via the channel it should also notify + # its intent to sleep, and then commit/rollback its intent once it has sent its message. + Busy + IntendToSleep + Parked + ShouldWakeup + EventNotifier* = object ## Multi Producers, Single Consumer event notification ## This is wait-free for producers and avoid spending time @@ -61,33 +75,56 @@ type ## See also: binary semaphores, eventcount ## On Windows: ManuallyResetEvent and AutoResetEvent cond{.align: WV_CacheLinePadding.}: Cond - lock: Lock - waiting: Atomic[bool] + lock: Lock # The lock is never used, it's just there for the condition variable + consumerState: ConsumerState + +template load(c: ConsumerState, mo: MemoryOrder): ConsumerState = + cast[ConsumerState](cast[var Atomic[uint8]](c.addr).load(mo)) + +template store(c: var ConsumerState, val: ConsumerState, mo: MemoryOrder) = + cast[var Atomic[uint8]](c.addr).store(cast[uint8](val), mo) + +template compareExchange(dst, expected: var ConsumerState, target: ConsumerState, mo: MemoryOrder): bool = + compareExchange( + cast[var Atomic[uint8]](dst.addr), + cast[var uint8](expected.addr), + cast[uint8](target), mo + ) func initialize*(en: var EventNotifier) = en.cond.initCond() en.lock.initLock() - en.waiting.store(false, moRelaxed) + en.consumerState.store(Busy, moRelaxed) func `=destroy`*(en: var EventNotifier) = en.cond.deinitCond() en.lock.deinitLock() +func intendToSleep*(en: var EventNotifier) {.inline.} = + ## The consumer intends to sleep soon. + ## This must be called before the formal notification + ## via a channel. + assert en.consumerState.load(moRelaxed) == Busy + + fence(moRelease) + en.consumerState.store(IntendToSleep, moRelaxed) + + func wait*(en: var EventNotifier) {.inline.} = ## Wait until we are signaled of an event ## Thread is parked and does not consume CPU resources - assert not en.waiting.load(moRelaxed) - - fence(moRelease) - en.waiting.store(true, moRelaxed) - en.lock.acquire() - while en.waiting.load(moRelaxed): - fence(moAcquire) - en.cond.wait(en.lock) - en.lock.release() + var expected = IntendToSleep + if compareExchange(en.consumerState, expected, Parked, moAcquireRelease): + while en.consumerState.load(moRelaxed) == Parked: + # We only used the lock for the condition variable, we protect via atomics otherwise + fence(moAcquire) + en.cond.wait(en.lock) - assert not en.waiting.load(moRelaxed) + # If we failed to sleep or just woke up + # we return to the busy state + fence(moRelease) + en.consumerState.store(Busy, moRelaxed) # TODO there is a deadlock at the moment as a worker sending a # WAITING steal request and then actually waiting is not atomic @@ -97,10 +134,20 @@ func notify*(en: var EventNotifier) {.inline.} = ## Signal a thread that it can be unparked # No thread waiting, return - if not en.waiting.load(moRelaxed): + let consumerState = en.consumerState.load(moRelaxed) + if consumerState in {Busy, ShouldWakeup}: fence(moAcquire) return fence(moRelease) - en.waiting.store(false, moRelaxed) - en.cond.signal() + en.consumerState.store(ShouldWakeup, moRelaxed) + while true: + # We might signal "ShouldWakeUp" after the consumer check + # and just before it waits so we need to loop the signal + # until it's sure the consumer is back to busy. + fence(moAcquire) + en.cond.signal() + if en.consumerState.load(moAcquire) != Busy: + cpuRelax() + else: + break diff --git a/weave/thieves.nim b/weave/thieves.nim index 8ea64e4..a40f13c 100644 --- a/weave/thieves.nim +++ b/weave/thieves.nim @@ -195,6 +195,7 @@ proc lastStealAttemptFailure*(req: sink StealRequest) = req.state = Waiting debugTermination: log("Worker %2d: sends state passively WAITING to its parent worker %d\n", myID(), myWorker().parent) + myParking().intendToSleep() sendShare(req) ascertain: not myWorker().isWaiting myWorker().isWaiting = true From 29a7d208fa00286670f7dd63839985f09216f8de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mamy=20Andr=C3=A9-Ratsimbazafy?= Date: Wed, 11 Dec 2019 21:02:04 +0100 Subject: [PATCH 2/4] It seems like my attempt was actually working --- benchmarks/vendor/tasking-2.0 | 2 +- weave/contexts.nim | 20 ++++++++++---------- weave/datatypes/binary_worker_trees.nim | 2 +- weave/datatypes/context_global.nim | 2 +- weave/runtime.nim | 8 ++++---- weave/scheduler.nim | 4 ++-- weave/signals.nim | 4 ++-- weave/thieves.nim | 2 +- weave/victims.nim | 25 +++++++++++++------------ 9 files changed, 35 insertions(+), 34 deletions(-) diff --git a/benchmarks/vendor/tasking-2.0 b/benchmarks/vendor/tasking-2.0 index 303ce3e..0df848f 160000 --- a/benchmarks/vendor/tasking-2.0 +++ b/benchmarks/vendor/tasking-2.0 @@ -1 +1 @@ -Subproject commit 303ce3e09a7c118241b9d5482219e3e3b848212e +Subproject commit 0df848f56a21b93afd5ea36754732e9ff05c8a70 diff --git a/weave/contexts.nim b/weave/contexts.nim index 58093cd..b34a688 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -80,16 +80,16 @@ template myMetrics*: untyped = # - https://github.com/mratsim/weave/issues/27 # - https://github.com/mratsim/weave/pull/28 # -# template myParking*: EventNotifier = -# globalCtx.com.parking[localCtx.worker.ID] -# -# template wakeup*(target: WorkerID) = -# mixin notify -# debugTermination: -# log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target) -# globalCtx.com.parking[target].notify() -# -# export event_notifiers.wait +template myParking*: EventNotifier = + globalCtx.com.parking[localCtx.worker.ID] + +template wakeup*(target: WorkerID) = + mixin notify + debugTermination: + log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target) + globalCtx.com.parking[target].notify() + +export event_notifiers.wait # Task caching # ---------------------------------------------------------------------------------- diff --git a/weave/datatypes/binary_worker_trees.nim b/weave/datatypes/binary_worker_trees.nim index fffc7ea..1728472 100644 --- a/weave/datatypes/binary_worker_trees.nim +++ b/weave/datatypes/binary_worker_trees.nim @@ -139,7 +139,7 @@ iterator traverseBreadthFirst*(start, maxID: WorkerID): WorkerID = ## - O(1) space, O(n) operations ## - traversal can start from any subtrees - assert start in 0 .. maxID + preCondition: start in 0 .. maxID var levelStart = start # Index of the node starting the current depth diff --git a/weave/datatypes/context_global.nim b/weave/datatypes/context_global.nim index fe92cdd..95b3480 100644 --- a/weave/datatypes/context_global.nim +++ b/weave/datatypes/context_global.nim @@ -36,7 +36,7 @@ type # Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker" thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]] tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]] - # parking*: ptr UncheckedArray[EventNotifier] + parking*: ptr UncheckedArray[EventNotifier] # Backoff is deactivated, see # - https://github.com/mratsim/weave/issues/27 # - https://github.com/mratsim/weave/pull/28 diff --git a/weave/runtime.nim b/weave/runtime.nim index d0e6a5b..6527ae6 100644 --- a/weave/runtime.nim +++ b/weave/runtime.nim @@ -40,7 +40,7 @@ proc init*(_: type Weave) = globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce()) globalCtx.com.thefts = wv_alloc(ChannelMpscUnboundedBatch[StealRequest], workforce()) globalCtx.com.tasks = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce()) - # globalCtx.com.parking = wv_alloc(EventNotifier, workforce()) + globalCtx.com.parking = wv_alloc(EventNotifier, workforce()) discard pthread_barrier_init(globalCtx.barrier, nil, workforce()) # Lead thread - pinned to CPU 0 @@ -216,9 +216,9 @@ proc loadBalance*(_: type Weave) {.gcsafe.} = shareWork() # Check also channels on behalf of the children workers that are managed. - if myThieves().peek() != 0: - # myWorker().leftIsWaiting and hasThievesProxy(myWorker().left) or - # myWorker().rightIsWaiting and hasThievesProxy(myWorker().right): + if myThieves().peek() != 0 or + myWorker().leftIsWaiting and hasThievesProxy(myWorker().left) or + myWorker().rightIsWaiting and hasThievesProxy(myWorker().right): var req: StealRequest while recv(req): dispatchTasks(req) diff --git a/weave/scheduler.nim b/weave/scheduler.nim index 22e75b0..456621e 100644 --- a/weave/scheduler.nim +++ b/weave/scheduler.nim @@ -84,7 +84,7 @@ proc init*(ctx: var TLContext) {.gcsafe.} = myWorker().deque.initialize() myWorker().workSharingRequests.initialize() - # myParking().initialize() - Backoff deactivated + myParking().initialize() # - Backoff deactivated myTodoBoxes().initialize() for i in 0 ..< myTodoBoxes().len: @@ -232,7 +232,7 @@ proc threadLocalCleanup*() {.gcsafe.} = ascertain: myTodoBoxes().access(i).isEmpty() localCtx.stealCache.access(i).victims.delete() myTodoBoxes().delete() - # `=destroy`(myParking()) # Backoff deactivated + `=destroy`(myParking()) # Backoff deactivated # The task cache is full of tasks delete(localCtx.taskCache) diff --git a/weave/signals.nim b/weave/signals.nim index eb5379e..a31c8f2 100644 --- a/weave/signals.nim +++ b/weave/signals.nim @@ -50,10 +50,10 @@ proc signalTerminate*(_: pointer) {.gcsafe.} = # Send the terminate signal asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().left].access(0)) # Wake the worker up so that it can process the terminate signal - # wakeup(myWorker().left) - Backoff deactivated + wakeup(myWorker().left) # - Backoff deactivated if myWorker().right != Not_a_worker: asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().right].access(0)) - # wakeup(myWorker().right) - Backoff deactivated + wakeup(myWorker().right) # - Backoff deactivated Worker: # When processing this signal for our queue, it was counted diff --git a/weave/thieves.nim b/weave/thieves.nim index a40f13c..daf8246 100644 --- a/weave/thieves.nim +++ b/weave/thieves.nim @@ -199,4 +199,4 @@ proc lastStealAttemptFailure*(req: sink StealRequest) = sendShare(req) ascertain: not myWorker().isWaiting myWorker().isWaiting = true - # myParking().wait() # Thread is blocked here until woken up. + myParking().wait() # Thread is blocked here until woken up. diff --git a/weave/victims.nim b/weave/victims.nim index d1d3c06..6737a2c 100644 --- a/weave/victims.nim +++ b/weave/victims.nim @@ -53,10 +53,11 @@ proc approxNumThieves(): int32 {.inline.} = # - We already read 1 steal request before trying to split so need to add it back. # - Workers may send steal requests before actually running out-of-work result = 1 + myThieves().peek() - debug: log("Worker %2d: has %ld steal requests\n", myID(), approxNumThieves) + debug: log("Worker %2d: has %ld steal requests\n", myID(), result) proc approxNumThievesProxy(worker: WorkerID): int32 = # Estimate the number of idle workers of a worker subtree + if worker == Not_a_worker: return 0 result = 0 for w in traverseBreadthFirst(worker, maxID()): result += getThievesOf(w).peek() @@ -108,13 +109,13 @@ proc recv*(req: var StealRequest): bool {.inline.} = # Check the next steal request result = myThieves().tryRecv(req) - # # When a child thread backs off, it is parked by the OS - # # We need to handle steal requests on its behalf to avoid latency - # if not result and myWorker().leftIsWaiting: - # result = recvProxy(req, myWorker().left) + # When a child thread backs off, it is parked by the OS + # We need to handle steal requests on its behalf to avoid latency + if not result and myWorker().leftIsWaiting: + result = recvProxy(req, myWorker().left) - # if not result and myWorker().rightIsWaiting: - # result = recvProxy(req, myWorker().right) + if not result and myWorker().rightIsWaiting: + result = recvProxy(req, myWorker().right) postCondition: not result or (result and req.state != Waiting) @@ -254,10 +255,10 @@ proc splitAndSend*(task: Task, req: sink StealRequest) = # Split iteration range according to given strategy # [start, stop) => [start, split) + [split, end) var guessThieves = approxNumThieves() - # if myWorker().leftIsWaiting: - # guessThieves += approxNumThievesProxy(myWorker().left) - # if myWorker().rightIsWaiting: - # guessThieves += approxNumThievesProxy(myWorker().right) + if myWorker().leftIsWaiting: + guessThieves += approxNumThievesProxy(myWorker().left) + if myWorker().rightIsWaiting: + guessThieves += approxNumThievesProxy(myWorker().right) let split = split(task, guessThieves) # New task gets the upper half @@ -339,7 +340,7 @@ proc shareWork*() {.inline.} = else: ascertain: myWorker().rightIsWaiting myWorker().rightIsWaiting = false - # wakeup(req.thiefID) - backoff is deactivated + wakeup(req.thiefID) # - backoff is deactivated # Now we can dequeue as we found work # We cannot access the steal request anymore or From b859fdb82f9cd708808e1917418e0859f9c447ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mamy=20Andr=C3=A9-Ratsimbazafy?= Date: Wed, 11 Dec 2019 21:26:44 +0100 Subject: [PATCH 3/4] Cleanup comment and atomics enum with the fix of https://github.com/nim-lang/Nim/issues/12812 --- weave/channels/event_notifiers.nim | 24 ++---------------------- weave/contexts.nim | 4 ---- weave/datatypes/context_global.nim | 3 --- weave/scheduler.nim | 4 ++-- weave/signals.nim | 4 ++-- weave/victims.nim | 2 +- 6 files changed, 7 insertions(+), 34 deletions(-) diff --git a/weave/channels/event_notifiers.nim b/weave/channels/event_notifiers.nim index a9c27be..4abee1d 100644 --- a/weave/channels/event_notifiers.nim +++ b/weave/channels/event_notifiers.nim @@ -48,9 +48,7 @@ import ../config type - # Need to workaround Enum not being Atomics, pending: https://github.com/nim-lang/Nim/issues/12812 - # so we enforce its size and will cast before use - ConsumerState {.size: sizeof(uint8).}= enum + ConsumerState = enum # We need 4 states to solve a race condition, see (https://github.com/mratsim/weave/issues/27) # A child signals its parent that it goes to sleep via a steal request. # Its parent tries to wake it up but the child is not sleeping @@ -76,20 +74,7 @@ type ## On Windows: ManuallyResetEvent and AutoResetEvent cond{.align: WV_CacheLinePadding.}: Cond lock: Lock # The lock is never used, it's just there for the condition variable - consumerState: ConsumerState - -template load(c: ConsumerState, mo: MemoryOrder): ConsumerState = - cast[ConsumerState](cast[var Atomic[uint8]](c.addr).load(mo)) - -template store(c: var ConsumerState, val: ConsumerState, mo: MemoryOrder) = - cast[var Atomic[uint8]](c.addr).store(cast[uint8](val), mo) - -template compareExchange(dst, expected: var ConsumerState, target: ConsumerState, mo: MemoryOrder): bool = - compareExchange( - cast[var Atomic[uint8]](dst.addr), - cast[var uint8](expected.addr), - cast[uint8](target), mo - ) + consumerState: Atomic[ConsumerState] func initialize*(en: var EventNotifier) = en.cond.initCond() @@ -109,7 +94,6 @@ func intendToSleep*(en: var EventNotifier) {.inline.} = fence(moRelease) en.consumerState.store(IntendToSleep, moRelaxed) - func wait*(en: var EventNotifier) {.inline.} = ## Wait until we are signaled of an event ## Thread is parked and does not consume CPU resources @@ -126,10 +110,6 @@ func wait*(en: var EventNotifier) {.inline.} = fence(moRelease) en.consumerState.store(Busy, moRelaxed) -# TODO there is a deadlock at the moment as a worker sending a -# WAITING steal request and then actually waiting is not atomic -# see https://github.com/mratsim/weave/issues/27 -# and https://github.com/mratsim/weave/pull/28 func notify*(en: var EventNotifier) {.inline.} = ## Signal a thread that it can be unparked diff --git a/weave/contexts.nim b/weave/contexts.nim index b34a688..e88758e 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -76,10 +76,6 @@ template myMetrics*: untyped = metrics: localCtx.counters -# Backoff is deactivated, see -# - https://github.com/mratsim/weave/issues/27 -# - https://github.com/mratsim/weave/pull/28 -# template myParking*: EventNotifier = globalCtx.com.parking[localCtx.worker.ID] diff --git a/weave/datatypes/context_global.nim b/weave/datatypes/context_global.nim index 95b3480..e107bc0 100644 --- a/weave/datatypes/context_global.nim +++ b/weave/datatypes/context_global.nim @@ -37,9 +37,6 @@ type thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]] tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]] parking*: ptr UncheckedArray[EventNotifier] - # Backoff is deactivated, see - # - https://github.com/mratsim/weave/issues/27 - # - https://github.com/mratsim/weave/pull/28 GlobalContext* = object com*: ComChannels diff --git a/weave/scheduler.nim b/weave/scheduler.nim index 456621e..8d5acfb 100644 --- a/weave/scheduler.nim +++ b/weave/scheduler.nim @@ -84,7 +84,7 @@ proc init*(ctx: var TLContext) {.gcsafe.} = myWorker().deque.initialize() myWorker().workSharingRequests.initialize() - myParking().initialize() # - Backoff deactivated + myParking().initialize() myTodoBoxes().initialize() for i in 0 ..< myTodoBoxes().len: @@ -232,7 +232,7 @@ proc threadLocalCleanup*() {.gcsafe.} = ascertain: myTodoBoxes().access(i).isEmpty() localCtx.stealCache.access(i).victims.delete() myTodoBoxes().delete() - `=destroy`(myParking()) # Backoff deactivated + `=destroy`(myParking()) # The task cache is full of tasks delete(localCtx.taskCache) diff --git a/weave/signals.nim b/weave/signals.nim index a31c8f2..ebcd9e7 100644 --- a/weave/signals.nim +++ b/weave/signals.nim @@ -50,10 +50,10 @@ proc signalTerminate*(_: pointer) {.gcsafe.} = # Send the terminate signal asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().left].access(0)) # Wake the worker up so that it can process the terminate signal - wakeup(myWorker().left) # - Backoff deactivated + wakeup(myWorker().left) if myWorker().right != Not_a_worker: asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().right].access(0)) - wakeup(myWorker().right) # - Backoff deactivated + wakeup(myWorker().right) Worker: # When processing this signal for our queue, it was counted diff --git a/weave/victims.nim b/weave/victims.nim index 6737a2c..822ec24 100644 --- a/weave/victims.nim +++ b/weave/victims.nim @@ -340,7 +340,7 @@ proc shareWork*() {.inline.} = else: ascertain: myWorker().rightIsWaiting myWorker().rightIsWaiting = false - wakeup(req.thiefID) # - backoff is deactivated + wakeup(req.thiefID) # Now we can dequeue as we found work # We cannot access the steal request anymore or From f023b8ef93578f018f93c6ab2a37b720aba91bc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mamy=20Andr=C3=A9-Ratsimbazafy?= Date: Wed, 11 Dec 2019 22:12:43 +0100 Subject: [PATCH 4/4] Make the backoff opt-out --- weave/config.nim | 9 +++++ weave/contexts.nim | 22 +++++----- weave/datatypes/context_global.nim | 7 +++- weave/runtime.nim | 5 ++- weave/scheduler.nim | 8 ++-- weave/signals.nim | 7 ++-- weave/thieves.nim | 8 ++-- weave/victims.nim | 65 ++++++++++++++++-------------- 8 files changed, 79 insertions(+), 52 deletions(-) diff --git a/weave/config.nim b/weave/config.nim index e4bc877..e0060b9 100644 --- a/weave/config.nim +++ b/weave/config.nim @@ -50,6 +50,11 @@ const WV_StealEarly* {.intdefine.} = 0 ## steal requests in advance. This might help hide stealing latencies ## or worsen message overhead. +const WV_EnableBackoff* {.booldefine.} = true + ## Workers that fail to find work will sleep. This saves CPU at the price + ## of slight latency as the workers' parent nodes need to manage their + ## steal requests queues when they sleep and there is latency to wake up. + type StealKind* {.pure.}= enum one @@ -95,6 +100,10 @@ template EagerFV*(body: untyped): untyped = when not defined(WV_LazyFlowvar): body +template Backoff*(body: untyped): untyped = + when WV_EnableBackoff: + body + # Dynamic defines # ---------------------------------------------------------------------------------- diff --git a/weave/contexts.nim b/weave/contexts.nim index e88758e..b4e4ed6 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -7,13 +7,16 @@ import ./datatypes/[context_global, context_thread_local, sync_types, prell_deques, binary_worker_trees], - ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, event_notifiers], + ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch], ./memory/[persistacks, lookaside_lists, memory_pools, allocs], ./config, system/ansi_c, ./instrumentation/[profilers, loggers, contracts], ./primitives/barriers +Backoff: + import ./channels/event_notifiers + # Contexts # ---------------------------------------------------------------------------------- @@ -76,16 +79,17 @@ template myMetrics*: untyped = metrics: localCtx.counters -template myParking*: EventNotifier = - globalCtx.com.parking[localCtx.worker.ID] +Backoff: + template myParking*: EventNotifier = + globalCtx.com.parking[localCtx.worker.ID] -template wakeup*(target: WorkerID) = - mixin notify - debugTermination: - log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target) - globalCtx.com.parking[target].notify() + template wakeup*(target: WorkerID) = + mixin notify + debugTermination: + log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target) + globalCtx.com.parking[target].notify() -export event_notifiers.wait + export event_notifiers.wait, event_notifiers.intendToSleep, event_notifiers.initialize, event_notifiers.EventNotifier # Task caching # ---------------------------------------------------------------------------------- diff --git a/weave/datatypes/context_global.nim b/weave/datatypes/context_global.nim index e107bc0..266395e 100644 --- a/weave/datatypes/context_global.nim +++ b/weave/datatypes/context_global.nim @@ -8,12 +8,14 @@ import ../channels/channels_mpsc_unbounded_batch, ../channels/channels_spsc_single_ptr, - ../channels/event_notifiers, ../memory/[persistacks, memory_pools], ../config, ../primitives/barriers, ./sync_types, ./binary_worker_trees +when WV_EnableBackoff: + import ../channels/event_notifiers + # Global / inter-thread communication channels # ---------------------------------------------------------------------------------- @@ -36,7 +38,8 @@ type # Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker" thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]] tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]] - parking*: ptr UncheckedArray[EventNotifier] + when static(WV_EnableBackoff): + parking*: ptr UncheckedArray[EventNotifier] GlobalContext* = object com*: ComChannels diff --git a/weave/runtime.nim b/weave/runtime.nim index 6527ae6..5fa1c33 100644 --- a/weave/runtime.nim +++ b/weave/runtime.nim @@ -12,7 +12,7 @@ import ./instrumentation/[contracts, profilers, loggers], ./contexts, ./config, ./datatypes/[sync_types, prell_deques, binary_worker_trees], - ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, event_notifiers], + ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch], ./memory/[persistacks, lookaside_lists, allocs, memory_pools], ./scheduler, ./signals, ./workers, ./thieves, ./victims, # Low-level primitives @@ -40,7 +40,8 @@ proc init*(_: type Weave) = globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce()) globalCtx.com.thefts = wv_alloc(ChannelMpscUnboundedBatch[StealRequest], workforce()) globalCtx.com.tasks = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce()) - globalCtx.com.parking = wv_alloc(EventNotifier, workforce()) + Backoff: + globalCtx.com.parking = wv_alloc(EventNotifier, workforce()) discard pthread_barrier_init(globalCtx.barrier, nil, workforce()) # Lead thread - pinned to CPU 0 diff --git a/weave/scheduler.nim b/weave/scheduler.nim index 8d5acfb..3e91ce8 100644 --- a/weave/scheduler.nim +++ b/weave/scheduler.nim @@ -9,7 +9,7 @@ import ./instrumentation/[contracts, profilers, loggers], ./primitives/barriers, ./datatypes/[sync_types, prell_deques, context_thread_local, flowvars, sparsesets, binary_worker_trees, bounded_queues], - ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, channels_spsc_single, event_notifiers], + ./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch, channels_spsc_single], ./memory/[persistacks, lookaside_lists, allocs, memory_pools], ./contexts, ./config, ./victims, ./loop_splitting, @@ -84,7 +84,8 @@ proc init*(ctx: var TLContext) {.gcsafe.} = myWorker().deque.initialize() myWorker().workSharingRequests.initialize() - myParking().initialize() + Backoff: + myParking().initialize() myTodoBoxes().initialize() for i in 0 ..< myTodoBoxes().len: @@ -232,7 +233,8 @@ proc threadLocalCleanup*() {.gcsafe.} = ascertain: myTodoBoxes().access(i).isEmpty() localCtx.stealCache.access(i).victims.delete() myTodoBoxes().delete() - `=destroy`(myParking()) + Backoff: + `=destroy`(myParking()) # The task cache is full of tasks delete(localCtx.taskCache) diff --git a/weave/signals.nim b/weave/signals.nim index ebcd9e7..5e08d20 100644 --- a/weave/signals.nim +++ b/weave/signals.nim @@ -49,11 +49,12 @@ proc signalTerminate*(_: pointer) {.gcsafe.} = if myWorker().left != Not_a_worker: # Send the terminate signal asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().left].access(0)) - # Wake the worker up so that it can process the terminate signal - wakeup(myWorker().left) + Backoff: # Wake the worker up so that it can process the terminate signal + wakeup(myWorker().left) if myWorker().right != Not_a_worker: asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().right].access(0)) - wakeup(myWorker().right) + Backoff: + wakeup(myWorker().right) Worker: # When processing this signal for our queue, it was counted diff --git a/weave/thieves.nim b/weave/thieves.nim index daf8246..3454090 100644 --- a/weave/thieves.nim +++ b/weave/thieves.nim @@ -9,7 +9,7 @@ import ./datatypes/[sparsesets, sync_types, context_thread_local, binary_worker_trees], ./contexts, ./targets, ./instrumentation/[contracts, profilers, loggers], - ./channels/[channels_mpsc_unbounded_batch, event_notifiers], + ./channels/channels_mpsc_unbounded_batch, ./memory/persistacks, ./config, ./signals, std/atomics @@ -195,8 +195,10 @@ proc lastStealAttemptFailure*(req: sink StealRequest) = req.state = Waiting debugTermination: log("Worker %2d: sends state passively WAITING to its parent worker %d\n", myID(), myWorker().parent) - myParking().intendToSleep() + Backoff: + myParking().intendToSleep() sendShare(req) ascertain: not myWorker().isWaiting myWorker().isWaiting = true - myParking().wait() # Thread is blocked here until woken up. + Backoff: # Thread is blocked here until woken up. + myParking().wait() diff --git a/weave/victims.nim b/weave/victims.nim index 822ec24..e918390 100644 --- a/weave/victims.nim +++ b/weave/victims.nim @@ -28,19 +28,20 @@ proc hasThievesProxy*(worker: WorkerID): bool = return true return false -proc recvProxy(req: var StealRequest, worker: WorkerID): bool = - ## Receives steal requests on behalf of child workers - ## Note that on task reception, children are waken up - ## and tasks are sent to them before thieves so this should happen rarely - if worker == Not_a_worker: - return false +Backoff: + proc recvProxy(req: var StealRequest, worker: WorkerID): bool = + ## Receives steal requests on behalf of child workers + ## Note that on task reception, children are waken up + ## and tasks are sent to them before thieves so this should happen rarely + if worker == Not_a_worker: + return false - profile(send_recv_req): - for w in traverseBreadthFirst(worker, maxID()): - result = getThievesOf(w).tryRecv(req) - if result: - return true - return false + profile(send_recv_req): + for w in traverseBreadthFirst(worker, maxID()): + result = getThievesOf(w).tryRecv(req) + if result: + return true + return false # Victims - Adaptative task splitting # ---------------------------------------------------------------------------------- @@ -55,13 +56,14 @@ proc approxNumThieves(): int32 {.inline.} = result = 1 + myThieves().peek() debug: log("Worker %2d: has %ld steal requests\n", myID(), result) -proc approxNumThievesProxy(worker: WorkerID): int32 = - # Estimate the number of idle workers of a worker subtree - if worker == Not_a_worker: return 0 - result = 0 - for w in traverseBreadthFirst(worker, maxID()): - result += getThievesOf(w).peek() - debug: log("Worker %2d: found %ld steal requests addressed to its child %d and grandchildren\n", myID(), result, worker) +Backoff: + proc approxNumThievesProxy(worker: WorkerID): int32 = + # Estimate the number of idle workers of a worker subtree + if worker == Not_a_worker: return 0 + result = 0 + for w in traverseBreadthFirst(worker, maxID()): + result += getThievesOf(w).peek() + debug: log("Worker %2d: found %ld steal requests addressed to its child %d and grandchildren\n", myID(), result, worker) # Victims - Steal requests handling # ---------------------------------------------------------------------------------- @@ -109,13 +111,14 @@ proc recv*(req: var StealRequest): bool {.inline.} = # Check the next steal request result = myThieves().tryRecv(req) - # When a child thread backs off, it is parked by the OS - # We need to handle steal requests on its behalf to avoid latency - if not result and myWorker().leftIsWaiting: - result = recvProxy(req, myWorker().left) + Backoff: + # When a child thread backs off, it is parked by the OS + # We need to handle steal requests on its behalf to avoid latency + if not result and myWorker().leftIsWaiting: + result = recvProxy(req, myWorker().left) - if not result and myWorker().rightIsWaiting: - result = recvProxy(req, myWorker().right) + if not result and myWorker().rightIsWaiting: + result = recvProxy(req, myWorker().right) postCondition: not result or (result and req.state != Waiting) @@ -255,10 +258,11 @@ proc splitAndSend*(task: Task, req: sink StealRequest) = # Split iteration range according to given strategy # [start, stop) => [start, split) + [split, end) var guessThieves = approxNumThieves() - if myWorker().leftIsWaiting: - guessThieves += approxNumThievesProxy(myWorker().left) - if myWorker().rightIsWaiting: - guessThieves += approxNumThievesProxy(myWorker().right) + Backoff: + if myWorker().leftIsWaiting: + guessThieves += approxNumThievesProxy(myWorker().left) + if myWorker().rightIsWaiting: + guessThieves += approxNumThievesProxy(myWorker().right) let split = split(task, guessThieves) # New task gets the upper half @@ -340,7 +344,8 @@ proc shareWork*() {.inline.} = else: ascertain: myWorker().rightIsWaiting myWorker().rightIsWaiting = false - wakeup(req.thiefID) + Backoff: + wakeup(req.thiefID) # Now we can dequeue as we found work # We cannot access the steal request anymore or