Skip to content

Commit

Permalink
Merge pull request #559 from JuliaParallel/sch-errormonitor
Browse files Browse the repository at this point in the history
Replace all uses of @async with Threads.@Spawn
  • Loading branch information
jpsamaroo authored Aug 9, 2024
2 parents 67a7435 + 88bb1fe commit 368df2e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
for (name, t) in tracked
if t.state == :runnable
@warn "Waiting on $name"
@async Base.throwto(t, InterruptException())
Threads.@spawn Base.throwto(t, InterruptException())
end
end
end
Expand Down
15 changes: 7 additions & 8 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ function init_proc(state, p, log_sink)
lock(WORKER_MONITOR_LOCK) do
wid = p.pid
if !haskey(WORKER_MONITOR_TASKS, wid)
t = @async begin
t = Threads.@spawn begin
try
# Wait until this connection is terminated
remotecall_fetch(sleep, wid, typemax(UInt64))
Expand Down Expand Up @@ -505,7 +505,7 @@ function scheduler_init(ctx, state::ComputeState, d::Thunk, options, deps)

# Initialize workers
@sync for p in procs_to_use(ctx)
@async begin
Threads.@spawn begin
try
init_proc(state, p, ctx.log_sink)
catch err
Expand All @@ -521,7 +521,7 @@ function scheduler_init(ctx, state::ComputeState, d::Thunk, options, deps)
end

# Listen for new workers
@async begin
Threads.@spawn begin
try
monitor_procs_changed!(ctx, state)
catch err
Expand Down Expand Up @@ -632,7 +632,7 @@ function scheduler_exit(ctx, state::ComputeState, options)
@dagdebug nothing :global "Tearing down scheduler" uid=state.uid

@sync for p in procs_to_use(ctx)
@async cleanup_proc(state, p, ctx.log_sink)
Threads.@spawn cleanup_proc(state, p, ctx.log_sink)
end

lock(state.lock) do
Expand Down Expand Up @@ -988,7 +988,7 @@ end
function evict_all_chunks!(ctx, to_evict)
if !isempty(to_evict)
@sync for w in map(p->p.pid, procs_to_use(ctx))
@async remote_do(evict_chunks!, w, ctx.log_sink, to_evict)
Threads.@spawn remote_do(evict_chunks!, w, ctx.log_sink, to_evict)
end
end
end
Expand Down Expand Up @@ -1075,8 +1075,7 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state)
# know which task failed.
tasks = Task[]
for ts in to_send
# TODO: errormonitor
@async begin
task = Threads.@spawn begin
timespan_start(ctx, :fire, (;worker=gproc.pid), nothing)
try
remotecall_wait(do_tasks, gproc.pid, proc, state.chan, [ts]);
Expand Down Expand Up @@ -1523,7 +1522,7 @@ function do_task(to_proc, task_desc)
(data, ids)
end
fetch_tasks = map(Iterators.zip(_data,_ids)) do (x, id)
@async begin
Threads.@spawn begin
timespan_start(ctx, :move, (;thunk_id, id, processor=to_proc), (;f, data=x))
#= FIXME: This isn't valid if x is written to
x = if x isa Chunk
Expand Down
8 changes: 4 additions & 4 deletions src/sch/dynamic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ end
function dynamic_listener!(ctx, state, wid)
task = current_task() # The scheduler's main task
inp_chan, out_chan = state.worker_chans[wid]
listener_task = @async begin
listener_task = Threads.@spawn begin
while !state.halt.set
tid, f, data = try
take!(inp_chan)
Expand Down Expand Up @@ -98,11 +98,11 @@ function dynamic_listener!(ctx, state, wid)
end
end
errormonitor_tracked("dynamic_listener! $wid", listener_task)
errormonitor_tracked("dynamic_listener! (halt+throw) $wid", @async begin
errormonitor_tracked("dynamic_listener! (halt+throw) $wid", Threads.@spawn begin
wait(state.halt)
# TODO: Not sure why we need the @async here, but otherwise we
# TODO: Not sure why we need the Threads.@spawn here, but otherwise we
# don't stop all the listener tasks
@async Base.throwto(listener_task, SchedulerHaltedException())
Threads.@spawn Base.throwto(listener_task, SchedulerHaltedException())
end)
end

Expand Down

0 comments on commit 368df2e

Please sign in to comment.