Skip to content

Commit

Permalink
Merge pull request #122 from JuliaLang/threadsafety
Browse files Browse the repository at this point in the history
Thread-safety improvements. Add tests with multiple threads. CI improvements.
  • Loading branch information
JamesWrigley authored Jan 26, 2025
2 parents ff34f4c + 8494bbd commit 1c7eb92
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 144 deletions.
25 changes: 15 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,25 @@ concurrency:

jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
name: julia -t${{ matrix.threads}} - ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
version:
- 'nightly'
os:
- ubuntu-latest
- macOS-latest
- windows-latest
arch:
- x64
- x86
threads:
# - '1'
- '4,4'
version: [nightly]
os: [ubuntu-latest, windows-latest, macOS-latest]
arch: [x64, x86, aarch64]
exclude:
- os: ubuntu-latest
arch: aarch64
- os: windows-latest
arch: aarch64
- os: macOS-latest
arch: x64
- os: macOS-latest
arch: x86
steps:
Expand All @@ -44,6 +48,7 @@ jobs:
- uses: julia-actions/julia-runtest@v1
env:
JULIA_DISTRIBUTED_TESTING_STANDALONE: 1
JULIA_NUM_THREADS: '${{ matrix.threads}}'
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v5
with:
Expand Down
64 changes: 30 additions & 34 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ function set_worker_state(w, state)
end

function check_worker_state(w::Worker)
if w.state === W_CREATED
if (@atomic w.state) === W_CREATED
if !isclusterlazy()
if PGRP.topology === :all_to_all
# Since higher pids connect with lower pids, the remote worker
Expand All @@ -163,10 +163,10 @@ function check_worker_state(w::Worker)
else
w.ct_time = time()
if myid() > w.id
t = Threads.@spawn Threads.threadpool() exec_conn_func(w)
t = @async exec_conn_func(w)
else
# route request via node 1
t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
errormonitor(t)
wait_for_conn(w)
Expand All @@ -190,20 +190,14 @@ function exec_conn_func(w::Worker)
end

function wait_for_conn(w)
if w.state === W_CREATED
if (@atomic w.state) === W_CREATED
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

T = Threads.@spawn Threads.threadpool() begin
sleep($timeout)
lock(w.c_state) do
notify(w.c_state; all=true)
end
end
errormonitor(T)
lock(w.c_state) do
wait(w.c_state)
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
if timedwait(() -> (@atomic w.state) === W_CONNECTED, timeout) === :timed_out
# Notify any waiters on the state and throw
@lock w.c_state notify(w.c_state)
error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
end
nothing
Expand Down Expand Up @@ -258,7 +252,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
else
sock = listen(interface, LPROC.bind_port)
end
errormonitor(Threads.@spawn while isopen(sock)
errormonitor(@async while isopen(sock)
client = accept(sock)
process_messages(client, client, true)
end)
Expand Down Expand Up @@ -290,7 +284,7 @@ end


function redirect_worker_output(ident, stream)
t = Threads.@spawn while !eof(stream)
t = @async while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
Expand Down Expand Up @@ -329,7 +323,7 @@ function read_worker_host_port(io::IO)
leader = String[]
try
while ntries > 0
readtask = Threads.@spawn Threads.threadpool() readline(io)
readtask = @async readline(io)
yield()
while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
sleep(0.05)
Expand Down Expand Up @@ -430,7 +424,7 @@ if launching workers programmatically, execute `addprocs` in its own task.
```julia
# On busy clusters, call `addprocs` asynchronously
t = Threads.@spawn addprocs(...)
t = @async addprocs(...)
```
```julia
Expand Down Expand Up @@ -496,13 +490,14 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
# call manager's `launch` is a separate task. This allows the master
# process initiate the connection setup process as and when workers come
# online
t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy)
# NOTE: Must be `@async`. See FIXME above
t_launch = @async launch(manager, params, launched, launch_ntfy)

@sync begin
while true
if isempty(launched)
istaskdone(t_launch) && break
Threads.@spawn Threads.threadpool() begin
@async begin # NOTE: Must be `@async`. See FIXME above
sleep(1)
notify(launch_ntfy)
end
Expand All @@ -512,7 +507,8 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
if !isempty(launched)
wconfig = popfirst!(launched)
let wconfig=wconfig
Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q)
# NOTE: Must be `@async`. See FIXME above
@async setup_launched_worker(manager, wconfig, launched_q)
end
end
end
Expand Down Expand Up @@ -592,7 +588,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
wconfig.port = port

let wconfig=wconfig
Threads.@spawn Threads.threadpool() begin
@async begin
pid = create_worker(manager, wconfig)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
Expand Down Expand Up @@ -660,7 +656,7 @@ function create_worker(manager, wconfig)
for jw in PGRP.workers
if (jw.id != 1) && (jw.id < w.id)
# wait for wl to join
if jw.state === W_CREATED
if (@atomic jw.state) === W_CREATED
lock(jw.c_state) do
wait(jw.c_state)
end
Expand Down Expand Up @@ -688,7 +684,7 @@ function create_worker(manager, wconfig)

for wl in wlist
lock(wl.c_state) do
if wl.state === W_CREATED
if (@atomic wl.state) === W_CREATED
# wait for wl to join
wait(wl.c_state)
end
Expand Down Expand Up @@ -758,7 +754,7 @@ function check_master_connect()
end

errormonitor(
Threads.@spawn begin
@async begin
timeout = worker_timeout()
if timedwait(() -> !haskey(map_pid_wrkr, 1), timeout) === :timed_out
print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n")
Expand Down Expand Up @@ -890,7 +886,7 @@ function nprocs()
n = length(PGRP.workers)
# filter out workers in the process of being setup/shutdown.
for jw in PGRP.workers
if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED)
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED)
n = n - 1
end
end
Expand Down Expand Up @@ -941,7 +937,7 @@ julia> procs()
function procs()
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
# filter out workers in the process of being setup/shutdown.
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
else
return Int[x.id for x in PGRP.workers]
end
Expand All @@ -950,7 +946,7 @@ end
function id_in_procs(id) # faster version of `id in procs()`
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
for x in PGRP.workers
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED)
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED)
return true
end
end
Expand All @@ -972,7 +968,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
"""
function procs(pid::Integer)
if myid() == 1
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
else
Expand Down Expand Up @@ -1050,13 +1046,13 @@ function rmprocs(pids...; waitfor=typemax(Int))

pids = vcat(pids...)
if waitfor == 0
t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int))
t = @async _rmprocs(pids, typemax(Int))
yield()
return t
else
_rmprocs(pids, waitfor)
# return a dummy task object that user code can wait on.
return Threads.@spawn Threads.threadpool() nothing
return @async nothing
end
end

Expand All @@ -1079,11 +1075,11 @@ function _rmprocs(pids, waitfor)

start = time_ns()
while (time_ns() - start) < waitfor*1e9
all(w -> w.state === W_TERMINATED, rmprocset) && break
all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
end

unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)]
unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)]
if length(unremoved) > 0
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
throw(ErrorException(estr))
Expand Down Expand Up @@ -1239,7 +1235,7 @@ function interrupt(pids::AbstractVector=workers())
@assert myid() == 1
@sync begin
for pid in pids
Threads.@spawn Threads.threadpool() interrupt(pid)
@async interrupt(pid)
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex)
# execute locally last as we do not want local execution to block serialization
# of the request to remote nodes.
for _ in 1:run_locally
Threads.@spawn Threads.threadpool() Core.eval(m, ex)
@async Core.eval(m, ex)
end
end
nothing
Expand Down Expand Up @@ -275,7 +275,7 @@ function preduce(reducer, f, R)
end

function pfor(f, R)
t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
@spawnat :any f(R, first(c), last(c))
end
errormonitor(t)
Expand Down
18 changes: 13 additions & 5 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ addprocs([
* `exeflags`: additional flags passed to the worker processes. It can either be a `Cmd`, a `String`
holding one flag, or a collection of strings, with one element per flag.
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.
* `topology`: Specifies how the workers connect to each other. Sending a message between
unconnected workers results in an error.
Expand Down Expand Up @@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
# Wait for all launches to complete.
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
Threads.@spawn Threads.threadpool() try
@async try
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
Expand Down Expand Up @@ -740,16 +740,24 @@ function kill(manager::SSHManager, pid::Int, config::WorkerConfig)
nothing
end

function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15)
function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15)
# profile_wait = 6 is 1s for profile, 5s for the report to show
# First, try sending `exit()` to the remote over the usual control channels
remote_do(exit, pid)

timer_task = Threads.@spawn Threads.threadpool() begin
timer_task = @async begin
sleep(exit_timeout)

# Check to see if our child exited, and if not, send an actual kill signal
if !process_exited(config.process)
@warn("Failed to gracefully kill worker $(pid), sending SIGQUIT")
@warn "Failed to gracefully kill worker $(pid)"
profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10)
if profile_sig !== nothing
@warn("Sending profile $(profile_sig[1]) to worker $(pid)")
kill(config.process, profile_sig[2])
sleep(profile_wait)
end
@warn("Sending SIGQUIT to worker $(pid)")
kill(config.process, Base.SIGQUIT)

sleep(term_timeout)
Expand Down
4 changes: 2 additions & 2 deletions src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ end
function flush_gc_msgs()
try
for w in (PGRP::ProcessGroup).workers
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag
flush_gc_msgs(w)
end
end
catch e
bt = catch_backtrace()
Threads.@spawn showerror(stderr, e, bt)
@async showerror(stderr, e, bt)
end
end

Expand Down
Loading

0 comments on commit 1c7eb92

Please sign in to comment.