Skip to content

Commit

Permalink
Merge pull request #164 from JuliaParallel/jps/async-arg-fetch
Browse files Browse the repository at this point in the history
Make argument fetching asynchronous
  • Loading branch information
jpsamaroo authored Dec 2, 2020
2 parents 429ff8c + d6d070a commit 61ff302
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 169 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "Dagger"
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
version = "0.10.0"
version = "0.10.1"

[deps]
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
Expand Down
31 changes: 21 additions & 10 deletions src/chunks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ domain(x::Any) = UnitDomain()
A chunk with some data
"""
mutable struct Chunk{T, H, P<:Processor}
chunktype::Type
chunktype::Type{T}
domain
handle::H
processor::P
persist::Bool
function (::Type{Chunk{T,H,P}})(chunktype, domain, handle, processor, persist) where {T,H,P}
c = new{T,H,P}(chunktype, domain, handle, processor, persist)
function (::Type{Chunk{T,H,P}})(::Type{T}, domain, handle, processor, persist) where {T,H,P}
c = new{T,H,P}(T, domain, handle, processor, persist)
finalizer(x -> @async(myid() == 1 && free!(x)), c)
c
end
Expand Down Expand Up @@ -80,10 +80,21 @@ end
collect(ctx::Context, ref::DRef; options=nothing) =
move(OSProc(ref.owner), OSProc(), ref)
collect(ctx::Context, ref::FileRef; options=nothing) =
poolget(ref)
move(from_proc::OSProc, to_proc::OSProc, ref::Union{DRef, FileRef}) =
poolget(ref)

poolget(ref) # FIXME: Do move call

# Unwrap Chunk, DRef, and FileRef by default
move(from_proc::Processor, to_proc::Processor, x::Chunk) =
move(from_proc, to_proc, x.handle)
move(from_proc::Processor, to_proc::Processor, x::Union{DRef,FileRef}) =
move(from_proc, to_proc, poolget(x))

# Determine from_proc when unspecified
move(to_proc::Processor, chunk::Chunk) =
move(chunk.processor, to_proc, chunk)
move(to_proc::Processor, d::DRef) =
move(OSProc(d.owner), to_proc, d)
move(to_proc::Processor, x) =
move(OSProc(), to_proc, x)

### ChunkIO
affinity(r::DRef) = Pair{OSProc, UInt64}[OSProc(r.owner) => r.size]
Expand All @@ -102,9 +113,9 @@ end
Create a chunk from sequential object `x` which resides on `proc`.
"""
function tochunk(x, proc::P=OSProc(); persist=false, cache=false) where P
function tochunk(x::X, proc::P=OSProc(); persist=false, cache=false) where {X,P}
ref = poolset(x, destroyonevict=persist ? false : cache)
Chunk{Any, typeof(ref), P}(typeof(x), domain(x), ref, proc, persist)
Chunk{X, typeof(ref), P}(X, domain(x), ref, proc, persist)
end
tochunk(x::Union{Chunk, Thunk}, proc=nothing) = x

Expand All @@ -129,7 +140,7 @@ function savechunk(data, dir, f)
return position(io)
end
fr = FileRef(f, sz)
Chunk{Any, typeof(fr), P}(typeof(data), domain(data), fr, OSProc(), true)
Chunk{typeof(data), typeof(fr), P}(typeof(data), domain(data), fr, OSProc(), true)
end


Expand Down
90 changes: 20 additions & 70 deletions src/processor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ calls differently than normal Julia.
execute!

"""
iscompatible(proc::Processor, opts, f, args...) -> Bool
iscompatible(proc::Processor, opts, f, Targs...) -> Bool
Indicates whether `proc` can execute `f` over `args` given `opts`. `Processor`
Indicates whether `proc` can execute `f` over `Targs` given `opts`. `Processor`
subtypes should overload this function to return `true` if and only if it is
essentially guaranteed that `f(args...)` is supported. Additionally,
essentially guaranteed that `f(::Targs...)` is supported. Additionally,
`iscompatible_func` and `iscompatible_arg` can be overriden to determine
compatibility of `f` and `args` individually. The default implementation
compatibility of `f` and `Targs` individually. The default implementation
returns `false`.
"""
iscompatible(proc::Processor, opts, f, args...) =
iscompatible(proc::Processor, opts, f, Targs...) =
iscompatible_func(proc, opts, f) &&
all(x->iscompatible_arg(proc, opts, x), args)
all(x->iscompatible_arg(proc, opts, x), Targs)
iscompatible_func(proc::Processor, opts, f) = false
iscompatible_arg(proc::Processor, opts, x) = false

Expand Down Expand Up @@ -70,58 +70,10 @@ get_parent
Moves and/or converts `x` such that it's available and suitable for usage on
the `to_proc` processor. This function can be overloaded by `Processor`
subtypes to transport arguments and convert them to an appropriate form before
being used for exection. This is additionally called on thunk results when
moving back to `from_proc` before being serialized over the wire as needed.
The default implementation breaks a single `move` call down into a sequence of
`move` calls, and is not intended to be maximally efficient.
being used for exection. Subtypes of `Processor` wishing to implement efficient
data movement should provide implementations where `x::Chunk`.
"""
function move(from_proc::Processor, to_proc::Processor, x)
if from_proc == to_proc
return x
end
@debug "Initiating generic move"
# Move to remote OSProc
@debug "(Remote) moving $parent_proc to $grandparent_proc"
root = get_parent_osproc(from_proc)
x, parent_proc = remotecall_fetch(move_to_osproc, root.pid, from_proc, x)

# Move to local OSProc
remote_proc = parent_proc
local_proc = OSProc()
@debug "(Network) moving $remote_proc to $local_proc"
x = move(remote_proc, local_proc, x)

# Move to to_proc
parent_proc = get_parent(to_proc)
path = Processor[to_proc, parent_proc]
while parent_proc != local_proc && !(parent_proc isa OSProc)
parent_proc = get_parent(parent_proc)
push!(path, parent_proc)
end
last_proc = local_proc
while !isempty(path)
next_proc = pop!(path)
@debug "(Local) moving $last_proc to $next_proc"
x = move(last_proc, next_proc, x)
last_proc = next_proc
end
return x
end
function get_parent_osproc(proc)
while !(proc isa OSProc)
proc = get_parent(proc)
end
proc
end
function move_to_osproc(parent_proc, x)
ctx = Context()
while !(parent_proc isa OSProc)
grandparent_proc = get_parent(parent_proc)
x = move(parent_proc, grandparent_proc, x)
parent_proc = grandparent_proc
end
return x, parent_proc
end
move(from_proc::Processor, to_proc::Processor, x) = x

"""
capacity(proc::Processor=OSProc()) -> Int
Expand Down Expand Up @@ -174,18 +126,19 @@ iscompatible_arg(proc::OSProc, opts, args...) =
proc.children)
get_processors(proc::OSProc) =
vcat((get_processors(child) for child in proc.children)...,)
function choose_processor(from_proc::OSProc, options, f, args)
if isempty(from_proc.queue)
for child in from_proc.children
function choose_processor(options, f, Targs)
osproc = OSProc()
if isempty(osproc.queue)
for child in osproc.children
grandchildren = get_processors(child)
append!(from_proc.queue, grandchildren)
append!(osproc.queue, grandchildren)
end
end
@assert !isempty(from_proc.queue)
for i in 1:length(from_proc.queue)
proc = popfirst!(from_proc.queue)
push!(from_proc.queue, proc)
if !iscompatible(proc, options, f, args...)
@assert !isempty(osproc.queue)
for i in 1:length(osproc.queue)
proc = popfirst!(osproc.queue)
push!(osproc.queue, proc)
if !iscompatible(proc, options, f, Targs...)
continue
end
if default_enabled(proc) && isempty(options.proctypes)
Expand All @@ -194,7 +147,7 @@ function choose_processor(from_proc::OSProc, options, f, args)
return proc
end
end
throw(ProcessorSelectionException(options.proctypes, from_proc.queue, f, args))
throw(ProcessorSelectionException(options.proctypes, osproc.queue, f, Targs))
end
struct ProcessorSelectionException <: Exception
proctypes::Vector{Type}
Expand All @@ -210,7 +163,6 @@ function Base.show(io::IO, pex::ProcessorSelectionException)
print(io, " Arguments: $(pex.args)")
end

move(from_proc::OSProc, to_proc::OSProc, x) = x
execute!(proc::OSProc, f, args...) = f(args...)
default_enabled(proc::OSProc) = true

Expand All @@ -226,8 +178,6 @@ end
iscompatible(proc::ThreadProc, opts, f, args...) = true
iscompatible_func(proc::ThreadProc, opts, f) = true
iscompatible_arg(proc::ThreadProc, opts, x) = true
move(from_proc::OSProc, to_proc::ThreadProc, x) = x
move(from_proc::ThreadProc, to_proc::OSProc, x) = x
@static if VERSION >= v"1.3.0-DEV.573"
execute!(proc::ThreadProc, f, args...) = fetch(Threads.@spawn begin
task_local_storage(:processor, proc)
Expand Down
74 changes: 40 additions & 34 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Sch
using Distributed
import MemPool: DRef

import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, choose_processor, execute!, rmprocs!, addprocs!
import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, choose_processor, execute!, rmprocs!, addprocs!

include("fault-handler.jl")

Expand Down Expand Up @@ -146,14 +146,15 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
end

proc, thunk_id, res, metadata = take!(chan) # get result of completed thunk
pid, thunk_id, (res, metadata) = take!(chan) # get result of completed thunk
proc = OSProc(pid)
lock(newtasks_lock) # This waits for any assign_new_procs! above to complete and then shuts down the task
if isa(res, CapturedException) || isa(res, RemoteException)
if check_exited_exception(res)
@warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work"

# Remove dead worker from procs list
remove_dead_proc!(ctx, proc)
remove_dead_proc!(ctx, state, proc)

handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan)
continue
Expand All @@ -163,7 +164,6 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
state.worker_pressure[proc.pid] = metadata.pressure
node = state.thunk_dict[thunk_id]
@logmsg("WORKER $(proc.pid) - $node ($(node.f)) input:$(node.inputs)")
state.cache[node] = res

@dbg timespan_start(ctx, :scheduler, thunk_id, master)
Expand All @@ -173,7 +173,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
@dbg timespan_end(ctx, :scheduler, thunk_id, master)
end
state.cache[d]
state.cache[d] # TODO: move(OSProc(), state.cache[d])
end

function procs_to_use(ctx, options=ctx.options)
Expand Down Expand Up @@ -236,6 +236,13 @@ function assign_new_procs!(ctx, state, chan, assignedprocs)
# Must track individual procs to handle the case when procs are removed
diffps = setdiff(ps, assignedprocs)
if !isempty(diffps)
for p in diffps
state.worker_pressure[p.pid] = 0
state.worker_capacity[p.pid] = 1
@async begin
state.worker_capacity[p.pid] = remotecall_fetch(capacity, p.pid)
end
end
schedule!(ctx, state, chan, diffps)
end
return ps
Expand All @@ -244,9 +251,11 @@ end
# Might be a good policy to not remove the proc if immediate_next
shall_remove_proc(ctx, proc) = proc procs_to_use(ctx)

function remove_dead_proc!(ctx, proc, options=ctx.options)
function remove_dead_proc!(ctx, state, proc, options=ctx.options)
@assert options.single !== proc.pid "Single worker failed, cannot continue."
rmprocs!(ctx, [proc])
delete!(state.worker_pressure, proc.pid)
delete!(state.worker_capacity, proc.pid)
end

function pop_with_affinity!(ctx, tasks, proc, immediate_next)
Expand Down Expand Up @@ -301,16 +310,16 @@ function fire_task!(ctx, thunk, proc, state, chan)
data = unrelease(thunk.cache_ref) # ask worker to keep the data around
# till this compute cycle frees it
if data !== nothing
@logmsg("cache hit: $(thunk.cache_ref)")
# cache hit
state.cache[thunk] = data
immediate_next = finish_task!(state, thunk; free=false)
if !isempty(state.ready)
pop_and_fire!(ctx, state, chan, proc; immediate_next=immediate_next)
end
return
else
# cache miss
thunk.cache_ref = nothing
@logmsg("cache miss: $(thunk.cache_ref) recomputing $(thunk)")
end
end

Expand All @@ -320,20 +329,22 @@ function fire_task!(ctx, thunk, proc, state, chan)
if thunk.meta
# Run it on the parent node, do not move data.
p = OSProc(myid())
fetched = map(Iterators.zip(thunk.inputs,ids)) do (x, id)
@dbg timespan_start(ctx, :comm, (thunk.id, id), (thunk.f, id))
x = istask(x) ? state.cache[x] : x
@dbg timespan_end(ctx, :comm, (thunk.id, id), (thunk.f, id))
return x
end
fetched = fetch.(map(Iterators.zip(thunk.inputs,ids)) do (x, id)
@async begin
@dbg timespan_start(ctx, :comm, (thunk.id, id), (thunk.f, id))
x = istask(x) ? state.cache[x] : x
@dbg timespan_end(ctx, :comm, (thunk.id, id), (thunk.f, id))
return x
end
end)

@dbg timespan_start(ctx, :compute, thunk.id, thunk.f)
Threads.atomic_add!(ACTIVE_TASKS, 1)
res = thunk.f(fetched...)
Threads.atomic_sub!(ACTIVE_TASKS, 1)
@dbg timespan_end(ctx, :compute, thunk.id, (thunk.f, p, typeof(res), sizeof(res)))

#push!(state.running, thunk)
# TODO: push!(state.running, thunk) (when the scheduler becomes multithreaded)
state.cache[thunk] = res
immediate_next = finish_task!(state, thunk; free=false)
if !isempty(state.ready)
Expand Down Expand Up @@ -420,22 +431,17 @@ end

@noinline function do_task(thunk_id, f, data, send_result, persist, cache, options, ids, log_sink)
ctx = Context(Processor[]; log_sink=log_sink)
from_proc = OSProc()
fetched = map(Iterators.zip(data,ids)) do (x, id)
@dbg timespan_start(ctx, :comm, (thunk_id, id), (f, id))
x = x isa Union{Chunk,Thunk} ? collect(ctx, x) : x
@dbg timespan_end(ctx, :comm, (thunk_id, id), (f, id))
return x
end

# TODO: Time choose_processor?
to_proc = choose_processor(from_proc, options, f, fetched)
fetched = map(Iterators.zip(fetched,ids)) do (x, id)
@dbg timespan_start(ctx, :move, (thunk_id, id), (f, id))
x = move(from_proc, to_proc, x)
@dbg timespan_end(ctx, :move, (thunk_id, id), (f, id))
return x
end
# TODO: Time choose_processor
Tdata = map(x->x isa Chunk ? chunktype(x) : x, data)
to_proc = choose_processor(options, f, Tdata)
fetched = fetch.(map(Iterators.zip(data,ids)) do (x, id)
@async begin
@dbg timespan_start(ctx, :move, (thunk_id, id), (f, id))
x = move(to_proc, x)
@dbg timespan_end(ctx, :move, (thunk_id, id), (f, id))
return x
end
end)
@dbg timespan_start(ctx, :compute, thunk_id, f)
res = nothing
result_meta = try
Expand All @@ -455,16 +461,16 @@ end
end
@dbg timespan_end(ctx, :compute, thunk_id, (f, to_proc, typeof(res), sizeof(res)))
metadata = (pressure=ACTIVE_TASKS[],)
(from_proc, thunk_id, result_meta, metadata)
(result_meta, metadata)
end

@noinline function async_apply(p::OSProc, thunk_id, f, data, chan, send_res, persist, cache, options, ids, log_sink)
@async begin
try
put!(chan, remotecall_fetch(do_task, p.pid, thunk_id, f, data, send_res, persist, cache, options, ids, log_sink))
put!(chan, (p.pid, thunk_id, remotecall_fetch(do_task, p.pid, thunk_id, f, data, send_res, persist, cache, options, ids, log_sink)))
catch ex
bt = catch_backtrace()
put!(chan, (p, thunk_id, CapturedException(ex, bt), nothing))
put!(chan, (p.pid, thunk_id, (CapturedException(ex, bt), nothing)))
end
nothing
end
Expand Down
Loading

2 comments on commit 61ff302

@jpsamaroo
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator register()

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/25664

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.10.1 -m "<description of version>" 61ff3027c3634756701be735a41c19643786b0c7
git push origin v0.10.1

Please sign in to comment.