Skip to content

Commit

Permalink
Workaround deadlock #27 by deactivating backoff (see also #28)
Browse files Browse the repository at this point in the history
  • Loading branch information
mratsim committed Dec 5, 2019
1 parent 1384095 commit 1ef0b9f
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 32 deletions.
4 changes: 4 additions & 0 deletions weave/channels/event_notifiers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func wait*(en: var EventNotifier) {.inline.} =

assert not en.waiting.load(moRelaxed)

# TODO there is a deadlock at the moment as a worker sending a
# WAITING steal request and then actually waiting is not atomic
# see https://github.com/mratsim/weave/issues/27
# and https://github.com/mratsim/weave/pull/28
func notify*(en: var EventNotifier) {.inline.} =
## Signal a thread that it can be unparked

Expand Down
24 changes: 14 additions & 10 deletions weave/contexts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,20 @@ template myMetrics*: untyped =
metrics:
localCtx.counters

template myParking*: EventNotifier =
globalCtx.com.parking[localCtx.worker.ID]

template wakeup*(target: WorkerID) =
mixin notify
debugTermination:
log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target)
globalCtx.com.parking[target].notify()

export event_notifiers.wait
# Backoff is deactivated, see
# - https://github.com/mratsim/weave/issues/27
# - https://github.com/mratsim/weave/pull/28
#
# template myParking*: EventNotifier =
# globalCtx.com.parking[localCtx.worker.ID]
#
# template wakeup*(target: WorkerID) =
# mixin notify
# debugTermination:
# log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target)
# globalCtx.com.parking[target].notify()
#
# export event_notifiers.wait

# Task caching
# ----------------------------------------------------------------------------------
Expand Down
5 changes: 4 additions & 1 deletion weave/datatypes/context_global.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ type
# Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker"
thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]]
tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]]
parking*: ptr UncheckedArray[EventNotifier]
# parking*: ptr UncheckedArray[EventNotifier]
# Backoff is deactivated, see
# - https://github.com/mratsim/weave/issues/27
# - https://github.com/mratsim/weave/pull/28

GlobalContext* = object
com*: ComChannels
Expand Down
8 changes: 4 additions & 4 deletions weave/runtime.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ proc init*(_: type Weave) =
globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce())
globalCtx.com.thefts = wv_alloc(ChannelMpscUnboundedBatch[StealRequest], workforce())
globalCtx.com.tasks = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce())
globalCtx.com.parking = wv_alloc(EventNotifier, workforce())
# globalCtx.com.parking = wv_alloc(EventNotifier, workforce())
discard pthread_barrier_init(globalCtx.barrier, nil, workforce())

# Lead thread - pinned to CPU 0
Expand Down Expand Up @@ -216,9 +216,9 @@ proc loadBalance*(_: type Weave) =
shareWork()

# Check also channels on behalf of the children workers that are managed.
if myThieves().peek() != 0 or
myWorker().leftIsWaiting and hasThievesProxy(myWorker().left) or
myWorker().rightIsWaiting and hasThievesProxy(myWorker().right):
if myThieves().peek() != 0:
# myWorker().leftIsWaiting and hasThievesProxy(myWorker().left) or
# myWorker().rightIsWaiting and hasThievesProxy(myWorker().right):
var req: StealRequest
while recv(req):
dispatchTasks(req)
4 changes: 2 additions & 2 deletions weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ proc init*(ctx: var TLContext) {.gcsafe.} =
myWorker().deque.initialize()
myWorker().initialize(maxID())

myParking().initialize()
# myParking().initialize() - Backoff deactivated

myTodoBoxes().initialize()
for i in 0 ..< myTodoBoxes().len:
Expand Down Expand Up @@ -230,7 +230,7 @@ proc threadLocalCleanup*() {.gcsafe.} =
ascertain: myTodoBoxes().access(i).isEmpty()
localCtx.stealCache.access(i).victims.delete()
myTodoBoxes().delete()
`=destroy`(myParking())
# `=destroy`(myParking()) # Backoff deactivated

# The task cache is full of tasks
delete(localCtx.taskCache)
Expand Down
4 changes: 2 additions & 2 deletions weave/signals.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ proc signalTerminate*(_: pointer) =
# Send the terminate signal
asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().left].access(0))
# Wake the worker up so that it can process the terminate signal
wakeup(myWorker().left)
# wakeup(myWorker().left) - Backoff deactivated
if myWorker().right != Not_a_worker:
asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().right].access(0))
wakeup(myWorker().right)
# wakeup(myWorker().right) - Backoff deactivated

Worker:
# When processing this signal for our queue, it was counted
Expand Down
2 changes: 1 addition & 1 deletion weave/thieves.nim
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,4 @@ proc lastStealAttemptFailure*(req: sink StealRequest) =
sendShare(req)
ascertain: not myWorker().isWaiting
myWorker().isWaiting = true
myParking().wait() # Thread is blocked here until woken up.
# myParking().wait() # Thread is blocked here until woken up.
24 changes: 12 additions & 12 deletions weave/victims.nim
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,13 @@ proc recv*(req: var StealRequest): bool {.inline.} =
# Check the next steal request
result = myThieves().tryRecv(req)

# When a child thread backs off, it is parked by the OS
# We need to handle steal requests on its behalf to avoid latency
if not result and myWorker().leftIsWaiting:
result = recvProxy(req, myWorker().left)
# # When a child thread backs off, it is parked by the OS
# # We need to handle steal requests on its behalf to avoid latency
# if not result and myWorker().leftIsWaiting:
# result = recvProxy(req, myWorker().left)


if not result and myWorker().rightIsWaiting:
result = recvProxy(req, myWorker().right)
# if not result and myWorker().rightIsWaiting:
# result = recvProxy(req, myWorker().right)

postCondition: not result or (result and req.state != Waiting)

Expand Down Expand Up @@ -274,10 +273,10 @@ proc splitAndSend*(task: Task, req: sink StealRequest) =
# Split iteration range according to given strategy
# [start, stop) => [start, split) + [split, end)
var guessThieves = approxNumThieves()
if myWorker().leftIsWaiting:
guessThieves += approxNumThievesProxy(myWorker().left)
if myWorker().rightIsWaiting:
guessThieves += approxNumThievesProxy(myWorker().right)
# if myWorker().leftIsWaiting:
# guessThieves += approxNumThievesProxy(myWorker().left)
# if myWorker().rightIsWaiting:
# guessThieves += approxNumThievesProxy(myWorker().right)
let split = split(task, guessThieves)

# New task gets the upper half
Expand Down Expand Up @@ -350,7 +349,8 @@ proc shareWork*() {.inline.} =
else:
ascertain: myWorker().rightIsWaiting
myWorker().rightIsWaiting = false
wakeup(req.thiefID)
# wakeup(req.thiefID) - backoff is deactivated

# Now we can dequeue as we found work
# We cannot access the steal request anymore or
# we would have a race with the child worker recycling it.
Expand Down

0 comments on commit 1ef0b9f

Please sign in to comment.