From 18b14ad54a595c018fa9c3174c46ef4096d12f17 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Thu, 12 Aug 2021 12:14:51 -0500 Subject: [PATCH 1/4] Fix eager API thunk retention --- src/sch/Sch.jl | 91 ++++++++------------------------ src/sch/dynamic.jl | 101 +++++++++++++++++++++++------------ src/sch/eager.jl | 8 +-- src/sch/fault-handler.jl | 2 +- src/sch/util.jl | 110 ++++++++++++++++++++++++++++++--------- src/thunk.jl | 35 ++++++++----- 6 files changed, 204 insertions(+), 143 deletions(-) diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index 7ad414ad8..be7caf0f3 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -1,11 +1,11 @@ module Sch using Distributed -import MemPool: DRef +import MemPool: DRef, poolset import ..Dagger -import ..Dagger: Context, Processor, Thunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, AnyScope -import ..Dagger: order, free!, dependents, noffspring, istask, inputs, unwrap_weak, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime +import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, AnyScope +import ..Dagger: order, free!, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor, constrain, cputhreadtime const OneToMany = Dict{Thunk, Set{Thunk}} @@ -45,7 +45,7 @@ Fields: - `cache::Dict{Thunk, Any}` - Maps from a finished `Thunk` to it's cached result, often a DRef - `running::Set{Thunk}` - The set of currently-running `Thunk`s - `running_on::Dict{Thunk,OSProc}` - Map from `Thunk` to the OS process executing it -- `thunk_dict::Dict{Int, Any}` - Maps from thunk IDs to a `Thunk` +- `thunk_dict::Dict{Int, WeakThunk}` - Maps from thunk IDs to a `Thunk` - `node_order::Any` - Function that returns the order of a thunk - `worker_pressure::Dict{Int,Dict{Type,UInt64}}` - Cache of worker pressure - `worker_capacity::Dict{Int,Dict{Type,UInt64}}` - Maps from worker ID to capacity @@ -67,7 +67,7 @@ struct ComputeState cache::WeakKeyDict{Thunk, Any} running::Set{Thunk} running_on::Dict{Thunk,OSProc} - thunk_dict::Dict{Int, Any} + thunk_dict::Dict{Int, WeakThunk} node_order::Any worker_pressure::Dict{Int,Dict{Type,UInt64}} worker_capacity::Dict{Int,Dict{Type,UInt64}} @@ -90,7 +90,7 @@ function start_state(deps::Dict, node_order, chan) Dict{Thunk, Any}(), Set{Thunk}(), Dict{Thunk,OSProc}(), - Dict{Int, Thunk}(), + Dict{Int, WeakThunk}(), node_order, Dict{Int,Dict{Type,UInt64}}(), Dict{Int,Dict{Type,UInt64}}(), @@ -333,9 +333,9 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) # setup thunk_dict mappings for node in filter(istask, keys(deps)) - state.thunk_dict[node.id] = node + state.thunk_dict[node.id] = WeakThunk(node) for dep in deps[node] - state.thunk_dict[dep.id] = dep + state.thunk_dict[dep.id] = WeakThunk(dep) end end @@ -407,14 +407,14 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) end continue else - if ctx.options.allow_errors || state.thunk_dict[thunk_id].options.allow_errors + if ctx.options.allow_errors || unwrap_weak_checked(state.thunk_dict[thunk_id]).options.allow_errors thunk_failed = true else throw(res) end end end - node = state.thunk_dict[thunk_id] + node = unwrap_weak_checked(state.thunk_dict[thunk_id]) if metadata !== nothing state.worker_pressure[pid][typeof(proc)] = metadata.pressure state.worker_loadavg[pid] = metadata.loadavg @@ -572,7 +572,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) # Calculate scope scope = AnyScope() - for input in unwrap_weak.(task.inputs) + for input in unwrap_weak_checked.(task.inputs) chunk = if istask(input) state.cache[input] elseif input isa Chunk @@ -741,69 +741,23 @@ function finish_task!(ctx, state, node, thunk_failed) if node.cache node.cache_ref = state.cache[node] end - for dep in sort!(collect(get(()->Set{Thunk}(), state.waiting_data, node)), by=state.node_order) - dep_isready = false - if haskey(state.waiting, dep) - set = state.waiting[dep] - node in set && pop!(set, node) - dep_isready = isempty(set) - if dep_isready - delete!(state.waiting, dep) - end - else - dep_isready = true - end - if dep_isready - if !thunk_failed - push!(state.ready, dep) - end - end - end - if haskey(state.futures, node) - # Notify any listening thunks - for future in state.futures[node] - put!(future, state.cache[node]; error=thunk_failed) - end - delete!(state.futures, node) - end + schedule_dependents!(state, node, thunk_failed) + fill_registered_futures!(state, node, thunk_failed) - # Chunk clean-up - to_evict = Set{Chunk}() - for inp in filter(t->istask(t) || (t isa Chunk), unwrap_weak.(node.inputs)) - if inp in keys(state.waiting_data) - w = state.waiting_data[inp] - if node in w - pop!(w, node) - end - if isempty(w) - delete!(state.waiting_data, inp) - if istask(inp) && haskey(state.cache, inp) - _node = state.cache[inp] - if _node isa Chunk - push!(to_evict, _node) - end - GC.@preserve inp begin - pop!(state.cache, inp) - if haskey(state.errored, inp) - pop!(state.errored, inp) - end - end - elseif inp isa Chunk - push!(to_evict, inp) - end - end - end - end + to_evict = cleanup_inputs!(state, node) if haskey(state.waiting_data, node) && isempty(state.waiting_data[node]) delete!(state.waiting_data, node) end + evict_all_chunks!(ctx, to_evict) +end + +function evict_all_chunks!(ctx, to_evict) if !isempty(to_evict) @sync for w in map(p->p.pid, procs_to_use(ctx)) @async remote_do(evict_chunks!, w, to_evict) end end end - function evict_chunks!(chunks::Set{Chunk}) for chunk in chunks haskey(CHUNK_CACHE, chunk) && delete!(CHUNK_CACHE, chunk) @@ -852,19 +806,20 @@ function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state) end ids = map(enumerate(thunk.inputs)) do (idx,x) - istask(x) ? unwrap_weak(x).id : -idx + istask(x) ? unwrap_weak_checked(x).id : -idx end data = map(thunk.inputs) do x - istask(x) ? state.cache[unwrap_weak(x)] : x + istask(x) ? state.cache[unwrap_weak_checked(x)] : x end toptions = thunk.options !== nothing ? thunk.options : ThunkOptions() options = merge(ctx.options, toptions) @assert (options.single == 0) || (gproc.pid == options.single) - sch_handle = SchedulerHandle(ThunkID(thunk.id), state.worker_chans[gproc.pid]...) + # TODO: Set `sch_handle.tid.ref` to the right `DRef` + sch_handle = SchedulerHandle(ThunkID(thunk.id, nothing), state.worker_chans[gproc.pid]...) state.worker_pressure[gproc.pid][typeof(proc)] += util - # FIXME: De-dup common fields (log_sink, uid, etc.) + # TODO: De-dup common fields (log_sink, uid, etc.) push!(to_send, (util, thunk.id, thunk.f, data, thunk.get_result, thunk.persist, thunk.cache, thunk.meta, options, ids, (log_sink=ctx.log_sink, profile=ctx.profile), diff --git a/src/sch/dynamic.jl b/src/sch/dynamic.jl index 4a4624a25..0ddca9333 100644 --- a/src/sch/dynamic.jl +++ b/src/sch/dynamic.jl @@ -1,10 +1,12 @@ export SchedulerHaltedException export sch_handle, halt!, exec!, get_dag_ids, add_thunk! -"Identifies a thunk by its ID." +"Identifies a thunk by its ID, and preserves the thunk in the scheduler." struct ThunkID id::Int + ref::Union{DRef,Nothing} end +ThunkID(id::Int) = ThunkID(id, nothing) "A handle to the scheduler, used by dynamic thunks." struct SchedulerHandle @@ -52,7 +54,13 @@ function dynamic_listener!(ctx, state) if !(unwrap_nested_exception(err) isa Union{SchedulerHaltedException, ProcessExitedException, InvalidStateException}) - @error exception=(err,catch_backtrace()) + iob = IOContext(IOBuffer(), :color=>true) + println(iob, "Error in sending dynamic request:") + Base.showerror(iob, err) + Base.show_backtrace(iob, catch_backtrace()) + println(iob) + seek(iob.io, 0) + write(stderr, iob) end break end @@ -69,7 +77,13 @@ function dynamic_listener!(ctx, state) if !(unwrap_nested_exception(err) isa Union{SchedulerHaltedException, ProcessExitedException, InvalidStateException}) - @error exception=(err,catch_backtrace()) + iob = IOContext(IOBuffer(), :color=>true) + println(iob, "Error in sending dynamic result:") + Base.showerror(iob, err) + Base.show_backtrace(iob, catch_backtrace()) + println(iob) + seek(iob.io, 0) + write(stderr, iob) end end end @@ -110,25 +124,34 @@ end "Waits on a thunk to complete, and fetches its result." function Base.fetch(h::SchedulerHandle, id::ThunkID) future = ThunkFuture(Future(1)) - exec!(_register_future!, h, future, id.id) + exec!(_register_future!, h, future, id) fetch(future; proc=thunk_processor()) end "Waits on a thunk to complete, and fetches its result." -function register_future!(h::SchedulerHandle, id::ThunkID, future::ThunkFuture) - exec!(_register_future!, h, future, id.id) -end -function _register_future!(ctx, state, task, tid, (future, id)) - tid != id || throw(DynamicThunkException("Cannot fetch own result")) - thunk = state.thunk_dict[id] - ownthunk = state.thunk_dict[tid] - dominates(target, t) = (t == target) || any(_t->dominates(target, _t), filter(istask, unwrap_weak.(t.inputs))) - !dominates(ownthunk, thunk) || throw(DynamicThunkException("Cannot fetch result of dominated thunk")) - # TODO: Assert that future will be fulfilled - if haskey(state.cache, thunk) - put!(future, state.cache[thunk]; error=state.errored[thunk]) - else - futures = get!(()->ThunkFuture[], state.futures, thunk) - push!(futures, future) +register_future!(h::SchedulerHandle, id::ThunkID, future::ThunkFuture) = + exec!(_register_future!, h, future, id) +function _register_future!(ctx, state, task, tid, (future, id)::Tuple{ThunkFuture,ThunkID}) + tid != id.id || throw(DynamicThunkException("Cannot fetch own result")) + GC.@preserve id begin + thunk = unwrap_weak_checked(state.thunk_dict[id.id]) + ownthunk = unwrap_weak_checked(state.thunk_dict[tid]) + function dominates(target, t) + t == target && return true + # N.B. Skips expired tasks + task_inputs = filter(istask, Dagger.unwrap_weak.(t.inputs)) + if any(_t->dominates(target, _t), task_inputs) + return true + end + return false + end + !dominates(ownthunk, thunk) || throw(DynamicThunkException("Cannot fetch result of dominated thunk")) + # TODO: Assert that future will be fulfilled + if haskey(state.cache, thunk) + put!(future, state.cache[thunk]; error=state.errored[thunk]) + else + futures = get!(()->ThunkFuture[], state.futures, thunk) + push!(futures, future) + end end nothing end @@ -146,27 +169,39 @@ get_dag_ids(h::SchedulerHandle) = function _get_dag_ids(ctx, state, task, tid, _) deps = Dict{ThunkID,Set{ThunkID}}() for (id,thunk) in state.thunk_dict + thunk = unwrap_weak_checked(thunk) + # TODO: Get at `thunk_ref` for `thunk_id.ref` + thunk_id = ThunkID(id, nothing) if haskey(state.waiting_data, thunk) - deps[ThunkID(id)] = Set(map(t->ThunkID(t.id), collect(state.waiting_data[thunk]))) + deps[thunk_id] = Set(map(t->ThunkID(t.id, nothing), collect(state.waiting_data[thunk]))) else - deps[ThunkID(id)] = Set{ThunkID}() + deps[thunk_id] = Set{ThunkID}() end end deps end "Adds a new Thunk to the DAG." -add_thunk!(f, h::SchedulerHandle, args...; future=nothing, kwargs...) = - ThunkID(exec!(_add_thunk!, h, f, args, kwargs, future)) -function _add_thunk!(ctx, state, task, tid, (f, args, kwargs, future)) - _args = map(arg->arg isa ThunkID ? Dagger.WeakThunk(state.thunk_dict[arg.id]) : arg, args) - thunk = Thunk(f, _args...; kwargs...) - state.thunk_dict[thunk.id] = thunk - @assert reschedule_inputs!(state, thunk) - if future !== nothing - # Ensure we attach a future before the thunk is scheduled - _register_future!(ctx, state, task, tid, (future, thunk.id)) +add_thunk!(f, h::SchedulerHandle, args...; future=nothing, ref=nothing, kwargs...) = + exec!(_add_thunk!, h, f, args, kwargs, future, ref) +function _add_thunk!(ctx, state, task, tid, (f, args, kwargs, future, ref)) + _args = map(arg->arg isa ThunkID ? state.thunk_dict[arg.id] : arg, args) + GC.@preserve _args begin + thunk = Thunk(f, _args...; kwargs...) + # Create a `DRef` to `thunk` so that the caller can preserve it + thunk_ref = poolset(thunk) + thunk_id = ThunkID(thunk.id, thunk_ref) + state.thunk_dict[thunk.id] = WeakThunk(thunk) + reschedule_inputs!(state, thunk) + if future !== nothing + # Ensure we attach a future before the thunk is scheduled + _register_future!(ctx, state, task, tid, (future, thunk_id)) + end + if ref !== nothing + # Preserve the `EagerThunkFinalizer` through `thunk` + thunk.eager_ref = ref + end + put!(state.chan, RescheduleSignal()) + return thunk_id end - put!(state.chan, RescheduleSignal()) - return thunk.id::Int end diff --git a/src/sch/eager.jl b/src/sch/eager.jl index 8001c85bc..7c0a55ee9 100644 --- a/src/sch/eager.jl +++ b/src/sch/eager.jl @@ -74,14 +74,14 @@ function eager_thunk() adjust_pressure!(h, Dagger.ThreadProc, -util) while isopen(EAGER_THUNK_CHAN) try - ev, future, uid, f, args, opts = take!(EAGER_THUNK_CHAN) + added_future, future, uid, ref, f, args, opts = take!(EAGER_THUNK_CHAN) # preserve inputs until they enter the scheduler tid = GC.@preserve args begin - args = map(x->x isa Dagger.EagerThunk ? ThunkID(EAGER_ID_MAP[x.uid]) : x, args) - add_thunk!(f, h, args...; future=future, opts...) + _args = map(x->x isa Dagger.EagerThunk ? ThunkID(EAGER_ID_MAP[x.uid], x.thunk_ref) : x, args) + add_thunk!(f, h, _args...; future=future, ref=ref, opts...) end EAGER_ID_MAP[uid] = tid.id - notify(ev) + put!(added_future, tid.ref) catch err iob = IOContext(IOBuffer(), :color=>true) println(iob, "Error in eager listener:") diff --git a/src/sch/fault-handler.jl b/src/sch/fault-handler.jl index 090df7dd8..9c5329d9b 100644 --- a/src/sch/fault-handler.jl +++ b/src/sch/fault-handler.jl @@ -49,7 +49,7 @@ function handle_fault(ctx, state, deadproc) end # Reschedule inputs from deadlist - seen = Dict{Thunk,Bool}() + seen = Set{Thunk}() for t in deadlist reschedule_inputs!(state, t, seen) end diff --git a/src/sch/util.jl b/src/sch/util.jl index 52b3e422d..75b5b7841 100644 --- a/src/sch/util.jl +++ b/src/sch/util.jl @@ -9,29 +9,92 @@ unwrap_nested_exception(err::RemoteException) = unwrap_nested_exception(err.captured) unwrap_nested_exception(err) = err -"Prepares the scheduler to schedule `thunk`." -function reschedule_inputs!(state, thunk, seen=Dict{Thunk,Bool}()) - haskey(seen, thunk) && return seen[thunk] - haskey(state.cache, thunk) && return false +"Fills the result for all registered futures of `node`." +function fill_registered_futures!(state, node, failed) + if haskey(state.futures, node) + # Notify any listening thunks + for future in state.futures[node] + put!(future, state.cache[node]; error=failed) + end + delete!(state.futures, node) + end +end + +"Cleans up any inputs that aren't needed any longer, and returns a `Set{Chunk}` +of all chunks that can now be evicted from workers." +function cleanup_inputs!(state, node) + to_evict = Set{Chunk}() + for inp in unwrap_weak_checked.(node.inputs) + if !istask(inp) && !(inp isa Chunk) + continue + end + if inp in keys(state.waiting_data) + w = state.waiting_data[inp] + if node in w + pop!(w, node) + end + if isempty(w) + if istask(inp) && haskey(state.cache, inp) + _node = state.cache[inp] + if _node isa Chunk + push!(to_evict, _node) + end + elseif inp isa Chunk + push!(to_evict, inp) + end + delete!(state.waiting_data, inp) + end + end + end + to_evict +end + +"Schedules any dependents that may be ready to execute." +function schedule_dependents!(state, node, failed) + for dep in sort!(collect(get(()->Set{Thunk}(), state.waiting_data, node)), by=state.node_order) + dep_isready = false + if haskey(state.waiting, dep) + set = state.waiting[dep] + node in set && pop!(set, node) + dep_isready = isempty(set) + if dep_isready + delete!(state.waiting, dep) + end + else + dep_isready = true + end + if dep_isready + if !failed + push!(state.ready, dep) + end + end + end +end + +""" +Prepares the scheduler to schedule `thunk`, including scheduling `thunk` if +its inputs are satisfied. +""" +function reschedule_inputs!(state, thunk, seen=Set{Thunk}()) + thunk in seen && return + push!(seen, thunk) + if haskey(state.cache, thunk) || (thunk in state.ready) || (thunk in state.running) + return + end w = get!(()->Set{Thunk}(), state.waiting, thunk) - scheduled = false - for input in unwrap_weak.(thunk.inputs) + for input in thunk.inputs + input = unwrap_weak_checked(input) if istask(input) || (input isa Chunk) push!(get!(()->Set{Thunk}(), state.waiting_data, input), thunk) end istask(input) || continue if get(state.errored, input, false) set_failed!(state, input, thunk) - scheduled = true end haskey(state.cache, input) && continue - if (input in state.running) || - (input in state.ready) || - reschedule_inputs!(state, input, seen) - push!(w, input) - scheduled = true - else - error("Failed to reschedule $(input.id) for $(thunk.id)") + push!(w, input) + if !((input in state.running) || (input in state.ready)) + reschedule_inputs!(state, input, seen) end end if isempty(w) @@ -40,9 +103,6 @@ function reschedule_inputs!(state, thunk, seen=Dict{Thunk,Bool}()) if !get(state.errored, thunk, false) push!(state.ready, thunk) end - return true - else - return scheduled end end @@ -51,12 +111,7 @@ function set_failed!(state, origin, thunk=origin) filter!(x->x!==thunk, state.ready) state.cache[thunk] = ThunkFailedException(thunk, origin, state.cache[origin]) state.errored[thunk] = true - if haskey(state.futures, thunk) - for future in state.futures[thunk] - put!(future, state.cache[thunk]; error=true) - end - delete!(state.futures, thunk) - end + fill_registered_futures!(state, thunk, true) if haskey(state.waiting_data, thunk) for dep in state.waiting_data[thunk] haskey(state.waiting, dep) && @@ -103,6 +158,13 @@ function print_sch_status(io::IO, state, thunk; offset=0, limit=5, max_inputs=3) end println(io, "$(thunk.id): $(thunk.f)") for (idx,input) in enumerate(thunk.inputs) + if input isa WeakThunk + input = unwrap_weak(input) + if input === nothing + println(io, repeat(' ', offset+2), "(???)") + continue + end + end input isa Thunk || continue if idx > max_inputs println(io, repeat(' ', offset+2), "…") @@ -142,7 +204,7 @@ function fetch_report(task) end function signature(task::Thunk, state) - inputs = map(x->istask(x) ? state.cache[x] : x, unwrap_weak.(task.inputs)) + inputs = map(x->istask(x) ? state.cache[x] : x, unwrap_weak_checked.(task.inputs)) Tuple{typeof(task.f), map(x->x isa Chunk ? x.chunktype : typeof(x), inputs)...} end diff --git a/src/thunk.jl b/src/thunk.jl index 26f2d29f9..cd12e8a74 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -42,6 +42,7 @@ mutable struct Thunk # cache it cache_ref::Any affinity::Union{Nothing, Vector{Pair{OSProc, Int}}} + eager_ref::Union{DRef,Nothing} options::Any # stores scheduler-specific options function Thunk(f, xs...; id::Int=next_id(), @@ -51,16 +52,17 @@ mutable struct Thunk cache::Bool=false, cache_ref=nothing, affinity=nothing, + eager_ref=nothing, options=nothing, kwargs... ) if options !== nothing @assert isempty(kwargs) new(f, xs, id, get_result, meta, persist, cache, cache_ref, - affinity, options) + affinity, eager_ref, options) else new(f, xs, id, get_result, meta, persist, cache, cache_ref, - affinity, Sch.ThunkOptions(;kwargs...)) + affinity, eager_ref, Sch.ThunkOptions(;kwargs...)) end end end @@ -144,6 +146,12 @@ end istask(::WeakThunk) = true unwrap_weak(t::WeakThunk) = t.x.value unwrap_weak(t) = t +function unwrap_weak_checked(t::WeakThunk) + t = unwrap_weak(t) + @assert t !== nothing + t +end +unwrap_weak_checked(t) = t Base.show(io::IO, t::WeakThunk) = (print(io, "~"); Base.show(io, t.x.value)) Base.convert(::Type{WeakThunk}, t::Thunk) = WeakThunk(t) @@ -177,7 +185,8 @@ be `fetch`'d or `wait`'d on at any time. mutable struct EagerThunk uid::UInt future::ThunkFuture - ref::DRef + finalizer_ref::DRef + thunk_ref::DRef end Base.isready(t::EagerThunk) = isready(t.future) Base.wait(t::EagerThunk) = wait(t.future) @@ -200,11 +209,11 @@ function _spawn(f, args...; kwargs...) Dagger.Sch.init_eager() uid = rand(UInt) future = ThunkFuture() - ref = poolset(EagerThunkFinalizer(uid)) - ev = Base.Event() - put!(Dagger.Sch.EAGER_THUNK_CHAN, (ev, future, uid, f, (args...,), (kwargs...,))) - wait(ev) - return (uid, future, ref) + finalizer_ref = poolset(EagerThunkFinalizer(uid)) + added_future = Future() + put!(Dagger.Sch.EAGER_THUNK_CHAN, (added_future, future, uid, finalizer_ref, f, (args...,), (kwargs...,))) + thunk_ref = fetch(added_future) + return (uid, future, finalizer_ref, thunk_ref) end """ spawn(f, args...; kwargs...) -> EagerThunk @@ -213,12 +222,12 @@ Spawns a task with `f` as the function and `args` as the arguments, returning an `EagerThunk`. Uses a scheduler running in the background to execute code. """ function spawn(f, args...; kwargs...) - uid, future, ref = if myid() == 1 + uid, future, finalizer_ref, thunk_ref = if myid() == 1 _spawn(f, args...; kwargs...) else remotecall_fetch(_spawn, 1, f, args...; kwargs...) end - return EagerThunk(uid, future, ref) + return EagerThunk(uid, future, finalizer_ref, thunk_ref) end """ @@ -309,10 +318,10 @@ Base.hash(x::Thunk, h::UInt) = hash(x.id, hash(h, 0x7ad3bac49089a05f % UInt)) Base.isequal(x::Thunk, y::Thunk) = x.id==y.id function Base.show(io::IO, z::Thunk) - lvl = get(io, :lazy_level, 1) + lvl = get(io, :lazy_level, 2) print(io, "Thunk[$(z.id)]($(z.f), ") - if lvl < 2 - show(IOContext(io, :lazy_level => lvl+1), z.inputs) + if lvl > 0 + show(IOContext(io, :lazy_level => lvl-1), z.inputs) else print(io, "...") end From 6dd29b64106420b4b0b1607a29c059c6f77cee0b Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Thu, 12 Aug 2021 22:49:48 -0500 Subject: [PATCH 2/4] Fixes for fault tolerance --- src/sch/Sch.jl | 4 ++-- src/sch/fault-handler.jl | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index be7caf0f3..892fc7f11 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -899,7 +899,7 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c unlock(TASK_SYNC) else # Under-subscribed, calculate extra utilization and execute thunk - @debug "($(myid())) ($thunk_id) Using available $to_proc: $extra_util | $(real_util[])/$cap" + @debug "($(myid())) $f ($thunk_id) Using available $to_proc: $extra_util | $(real_util[])/$cap" extra_util = if extra_util isa MaxUtilization count(c->typeof(c)===typeof(to_proc), children(from_proc)) else @@ -937,7 +937,7 @@ function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, c lock(TASK_SYNC) do real_util[] -= extra_util end - @debug "($(myid())) ($thunk_id) Releasing $(typeof(to_proc)): $extra_util | $(real_util[])/$cap" + @debug "($(myid())) $f ($thunk_id) Releasing $(typeof(to_proc)): $extra_util | $(real_util[])/$cap" lock(TASK_SYNC) do notify(TASK_SYNC) end diff --git a/src/sch/fault-handler.jl b/src/sch/fault-handler.jl index 9c5329d9b..401f47096 100644 --- a/src/sch/fault-handler.jl +++ b/src/sch/fault-handler.jl @@ -33,6 +33,7 @@ function handle_fault(ctx, state, deadproc) if pid == deadproc.pid push!(deadlist, t) delete!(state.running_on, t) + pop!(state.running, t) end end # Clear thunk.cache_ref @@ -53,5 +54,4 @@ function handle_fault(ctx, state, deadproc) for t in deadlist reschedule_inputs!(state, t, seen) end - schedule!(ctx, state) end From b6dfe9e813f5a586f4cdc23a990eae426a0e9d14 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Fri, 13 Aug 2021 07:03:13 -0500 Subject: [PATCH 3/4] Improve dead worker detection --- src/sch/Sch.jl | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index 892fc7f11..55eacb1b8 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -341,7 +341,14 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) # Initialize procs, pressure, and capacity @sync for p in procs_to_use(ctx) - @async init_proc(state, p) + @async begin + try + init_proc(state, p) + catch err + @error "Error initializing worker $p" exception=(err,catch_backtrace()) + remove_dead_proc!(ctx, state, p) + end + end end # setup dynamic listeners @@ -357,6 +364,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) # Loop while we still have thunks to execute while !isempty(state.ready) || !isempty(state.running) + procs_state = assign_new_procs!(ctx, state, procs_state) if !isempty(state.ready) # Nothing running, so schedule up to N thunks, 1 per N workers schedule!(ctx, state, procs_state) From 03f7aa47a16df89a0d432bbc78f9f10eb1875655 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Fri, 13 Aug 2021 11:22:51 -0500 Subject: [PATCH 4/4] Bump to 0.12.4 --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index ea728f624..70f4f76c2 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "Dagger" uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54" -version = "0.12.3" +version = "0.12.4" [deps] Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"