diff --git a/docs/src/api-dagger/functions.md b/docs/src/api-dagger/functions.md index 8da1e9008..57d16e14b 100644 --- a/docs/src/api-dagger/functions.md +++ b/docs/src/api-dagger/functions.md @@ -59,13 +59,13 @@ addprocs! rmprocs! ``` -## Thunk Execution Environment Functions +## DTask Execution Environment Functions -These functions are used within the function called by a `Thunk`. +These functions are used within the function called by a `DTask`. ```@docs -in_thunk -thunk_processor +in_task +task_processor ``` ### Dynamic Scheduler Control Functions diff --git a/docs/src/darray.md b/docs/src/darray.md index 2a4ec6e6a..c7b813e4b 100644 --- a/docs/src/darray.md +++ b/docs/src/darray.md @@ -357,7 +357,7 @@ julia> Dagger.chunks(DZ) DTask (finished) DTask (finished) julia> Dagger.chunks(fetch(DZ)) -2×2 Matrix{Union{Thunk, Dagger.Chunk}}: +2×2 Matrix{Union{DTask, Dagger.Chunk}}: Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(4, 8, 0x0000000000004e20), ThreadProc(4, 1), AnyScope(), true) … Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(2, 5, 0x0000000000004e20), ThreadProc(2, 1), AnyScope(), true) Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(5, 5, 0x0000000000004e20), ThreadProc(5, 1), AnyScope(), true) Chunk{Matrix{Float64}, DRef, ThreadProc, AnyScope}(Matrix{Float64}, ArrayDomain{2}((1:50, 1:50)), DRef(3, 3, 0x0000000000004e20), ThreadProc(3, 1), AnyScope(), true) ``` diff --git a/docs/src/scopes.md b/docs/src/scopes.md index 70a08b277..c089a7991 100644 --- a/docs/src/scopes.md +++ b/docs/src/scopes.md @@ -23,7 +23,7 @@ using VideoIO, Distributed function get_handle() handle = VideoIO.opencamera() - proc = Dagger.thunk_processor() + proc = Dagger.task_processor() scope = Dagger.scope(worker=myid()) # constructs a `ProcessScope` return Dagger.tochunk(handle, proc, scope) end @@ -78,7 +78,7 @@ function generate() fill!(arr, 1) Mmap.sync!(arr) # Note: Dagger.scope() does not yet support node scopes - Dagger.tochunk(path, Dagger.thunk_processor(), NodeScope()) + Dagger.tochunk(path, Dagger.task_processor(), NodeScope()) end function consume(path) @@ -120,7 +120,7 @@ function generate_secrets() secrets = open("/shared/secret_results.txt", "r") do io String(read(io)) end - Dagger.tochunk(secrets, Dagger.thunk_processor(), secrets_scope) + Dagger.tochunk(secrets, Dagger.task_processor(), secrets_scope) end summarize(secrets) = occursin("QA Pass", secrets) @@ -144,7 +144,7 @@ constraints). For example: ps2 = ProcessScope(2) ps3 = ProcessScope(3) -generate(scope) = Dagger.tochunk(rand(64), Dagger.thunk_processor(), scope) +generate(scope) = Dagger.tochunk(rand(64), Dagger.task_processor(), scope) d2 = Dagger.@spawn generate(ps2) # Run on process 2 d3 = Dagger.@spawn generate(ps3) # Run on process 3 diff --git a/src/array/alloc.jl b/src/array/alloc.jl index 4a933905b..f67c927de 100644 --- a/src/array/alloc.jl +++ b/src/array/alloc.jl @@ -25,11 +25,11 @@ function partition(p::AbstractBlocks, dom::ArrayDomain) end function allocate_array(f, T, idx, sz) - new_f = allocate_array_func(thunk_processor(), f) + new_f = allocate_array_func(task_processor(), f) return new_f(idx, T, sz) end function allocate_array(f, T, sz) - new_f = allocate_array_func(thunk_processor(), f) + new_f = allocate_array_func(task_processor(), f) return new_f(T, sz) end allocate_array_func(::Processor, f) = f diff --git a/src/array/cholesky.jl b/src/array/cholesky.jl index 1a371ff91..4ba6ae4a3 100644 --- a/src/array/cholesky.jl +++ b/src/array/cholesky.jl @@ -1,6 +1,6 @@ LinearAlgebra.cholcopy(A::DArray{T,2}) where T = copy(A) function potrf_checked!(uplo, A, info_arr) - _A, info = move(thunk_processor(), LAPACK.potrf!)(uplo, A) + _A, info = move(task_processor(), LAPACK.potrf!)(uplo, A) if info != 0 fill!(info_arr, info) throw(PosDefException(info)) @@ -41,7 +41,7 @@ function LinearAlgebra._chol!(A::DArray{T,2}, ::Type{UpperTriangular}) where T end end catch err - err isa ThunkFailedException || rethrow() + err isa DTaskFailedException || rethrow() err = Dagger.Sch.unwrap_nested_exception(err.ex) err isa PosDefException || rethrow() end @@ -82,7 +82,7 @@ function LinearAlgebra._chol!(A::DArray{T,2}, ::Type{LowerTriangular}) where T end end catch err - err isa ThunkFailedException || rethrow() + err isa DTaskFailedException || rethrow() err = Dagger.Sch.unwrap_nested_exception(err.ex) err isa PosDefException || rethrow() end diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index 54c8eab96..30e778787 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -8,8 +8,8 @@ import Random: randperm import Base: @invokelatest import ..Dagger -import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, ThunkFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject -import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, thunk_processor, constrain, cputhreadtime +import ..Dagger: Context, Processor, Thunk, WeakThunk, ThunkFuture, DTaskFailedException, Chunk, WeakChunk, OSProc, AnyScope, DefaultScope, LockedObject +import ..Dagger: order, dependents, noffspring, istask, inputs, unwrap_weak_checked, affinity, tochunk, timespan_start, timespan_finish, procs, move, chunktype, processor, get_processors, get_parent, execute!, rmprocs!, task_processor, constrain, cputhreadtime import ..Dagger: @dagdebug, @safe_lock_spin1 import DataStructures: PriorityQueue, enqueue!, dequeue_pair!, peek diff --git a/src/sch/dynamic.jl b/src/sch/dynamic.jl index df78a1dd1..a847dfc78 100644 --- a/src/sch/dynamic.jl +++ b/src/sch/dynamic.jl @@ -132,7 +132,7 @@ end function Base.fetch(h::SchedulerHandle, id::ThunkID) future = ThunkFuture(Future(1)) exec!(_register_future!, h, future, id, true) - fetch(future; proc=thunk_processor()) + fetch(future; proc=task_processor()) end """ Waits on a thunk to complete, and fetches its result. If `check` is set to diff --git a/src/sch/eager.jl b/src/sch/eager.jl index 2e8886c5e..1ee72829f 100644 --- a/src/sch/eager.jl +++ b/src/sch/eager.jl @@ -61,10 +61,10 @@ Allows a thunk to safely wait on another thunk by temporarily reducing its effective occupancy to 0, which allows a newly-spawned task to run. """ function thunk_yield(f) - if Dagger.in_thunk() + if Dagger.in_task() h = sch_handle() tls = Dagger.get_tls() - proc = Dagger.thunk_processor() + proc = Dagger.task_processor() proc_istate = proc_states(tls.sch_uid) do states states[proc].state end diff --git a/src/sch/util.jl b/src/sch/util.jl index 17b0fbfbe..2cd96c38e 100644 --- a/src/sch/util.jl +++ b/src/sch/util.jl @@ -175,7 +175,11 @@ end "Marks `thunk` and all dependent thunks as failed." function set_failed!(state, origin, thunk=origin) filter!(x->x!==thunk, state.ready) - state.cache[thunk] = ThunkFailedException(thunk, origin, state.cache[origin]) + ex = state.cache[origin] + if ex isa RemoteException + ex = ex.captured + end + state.cache[thunk] = DTaskFailedException(thunk, origin, ex) state.errored[thunk] = true finish_failed!(state, thunk, origin) end diff --git a/src/submission.jl b/src/submission.jl index 2c5ee042e..6e61f86d7 100644 --- a/src/submission.jl +++ b/src/submission.jl @@ -159,7 +159,7 @@ end # Local -> Remote function eager_submit!(ntasks, uid, future, finalizer_ref, f, args, options) - if Dagger.in_thunk() + if Dagger.in_task() h = Dagger.sch_handle() return exec!(eager_submit_internal!, h, ntasks, uid, future, finalizer_ref, f, args, options, true) elseif myid() != 1 diff --git a/src/task-tls.jl b/src/task-tls.jl index fb42dfbc9..ea188e004 100644 --- a/src/task-tls.jl +++ b/src/task-tls.jl @@ -1,18 +1,20 @@ # In-Thunk Helpers """ - thunk_processor() + task_processor() -Get the current processor executing the current thunk. +Get the current processor executing the current Dagger task. """ -thunk_processor() = task_local_storage(:_dagger_processor)::Processor +task_processor() = task_local_storage(:_dagger_processor)::Processor +@deprecate thunk_processor() task_processor() """ - in_thunk() + in_task() -Returns `true` if currently in a [`Thunk`](@ref) process, else `false`. +Returns `true` if currently executing in a [`DTask`](@ref), else `false`. """ -in_thunk() = haskey(task_local_storage(), :_dagger_sch_uid) +in_task() = haskey(task_local_storage(), :_dagger_sch_uid) +@deprecate in_thunk() in_task() """ get_tls() @@ -22,7 +24,7 @@ Gets all Dagger TLS variable as a `NamedTuple`. get_tls() = ( sch_uid=task_local_storage(:_dagger_sch_uid), sch_handle=task_local_storage(:_dagger_sch_handle), - processor=thunk_processor(), + processor=task_processor(), task_spec=task_local_storage(:_dagger_task_spec), ) diff --git a/src/thunk.jl b/src/thunk.jl index cea8a9d60..02353fd17 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -220,22 +220,23 @@ function Base.convert(::Type{ThunkSummary}, t::WeakThunk) return t end -struct ThunkFailedException{E<:Exception} <: Exception +struct DTaskFailedException{E<:Exception} <: Exception thunk::ThunkSummary origin::ThunkSummary ex::E end -ThunkFailedException(thunk, origin, ex::E) where E = - ThunkFailedException{E}(convert(ThunkSummary, thunk), +DTaskFailedException(thunk, origin, ex::E) where E = + DTaskFailedException{E}(convert(ThunkSummary, thunk), convert(ThunkSummary, origin), ex) -function Base.showerror(io::IO, ex::ThunkFailedException) +@deprecate ThunkFailedException DTaskFailedException +function Base.showerror(io::IO, ex::DTaskFailedException) t = ex.thunk # Find root-cause thunk last_tfex = ex failed_tasks = Union{ThunkSummary,Nothing}[] - while last_tfex.ex isa ThunkFailedException + while last_tfex.ex isa DTaskFailedException push!(failed_tasks, last_tfex.thunk) last_tfex = last_tfex.ex end @@ -246,7 +247,7 @@ function Base.showerror(io::IO, ex::ThunkFailedException) Tinputs = Any[] for (_, input) in t.inputs if istask(input) - push!(Tinputs, "Thunk(id=$(input.id))") + push!(Tinputs, "DTask(id=$(input.id))") else push!(Tinputs, input) end @@ -256,28 +257,28 @@ function Base.showerror(io::IO, ex::ThunkFailedException) else "$(t.f)($(length(Tinputs)) inputs...)" end - return "Thunk(id=$(t.id), $t_sig)" + return "DTask(id=$(t.id), $t_sig)" end t_str = thunk_string(t) o_str = thunk_string(o) - println(io, "ThunkFailedException:") - println(io, " Root Exception Type: $(typeof(root_ex))") + println(io, "DTaskFailedException:") + println(io, " Root Exception Type: $(typeof(Sch.unwrap_nested_exception(root_ex)))") println(io, " Root Exception:") Base.showerror(io, root_ex); println(io) if t.id !== o.id - println(io, " Root Thunk: $o_str") + println(io, " Root Task: $o_str") if length(failed_tasks) <= 4 for i in failed_tasks i_str = thunk_string(i) - println(io, " Inner Thunk: $i_str") + println(io, " Inner Task: $i_str") end else println(io, " ...") - println(io, " $(length(failed_tasks)) Inner Thunks...") + println(io, " $(length(failed_tasks)) Inner Tasks...") println(io, " ...") end end - print(io, " This Thunk: $t_str") + print(io, " This Task: $t_str") end """ diff --git a/test/datadeps.jl b/test/datadeps.jl index 271e2c667..ff5136b6b 100644 --- a/test/datadeps.jl +++ b/test/datadeps.jl @@ -396,7 +396,7 @@ function test_datadeps(;args_chunks::Bool, # Scope exec_procs = fetch.(Dagger.spawn_datadeps(;aliasing) do - [Dagger.@spawn Dagger.thunk_processor() for i in 1:10] + [Dagger.@spawn Dagger.task_processor() for i in 1:10] end) unique!(exec_procs) scope = Dagger.get_options(:scope) diff --git a/test/mutation.jl b/test/mutation.jl index 56d2661f5..b6ac7143b 100644 --- a/test/mutation.jl +++ b/test/mutation.jl @@ -48,7 +48,7 @@ end x = Dagger.@mutable worker=w Ref{Int}() @test fetch(Dagger.@spawn mutable_update!(x)) == w wo_scope = Dagger.ProcessScope(wo) - @test_throws_unwrap Dagger.ThunkFailedException fetch(Dagger.@spawn scope=wo_scope mutable_update!(x)) + @test_throws_unwrap Dagger.DTaskFailedException fetch(Dagger.@spawn scope=wo_scope mutable_update!(x)) end end # @testset "@mutable" diff --git a/test/processors.jl b/test/processors.jl index 88bd29667..e97a1d239 100644 --- a/test/processors.jl +++ b/test/processors.jl @@ -37,9 +37,9 @@ end end @testset "Processor exhaustion" begin opts = ThunkOptions(proclist=[OptOutProc]) - @test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3])) + @test_throws_unwrap Dagger.DTaskFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3])) opts = ThunkOptions(proclist=(proc)->false) - @test_throws_unwrap Dagger.ThunkFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3])) + @test_throws_unwrap Dagger.DTaskFailedException ex isa Dagger.Sch.SchedulingException ex.reason="No processors available, try widening scope" collect(delayed(sum; options=opts)([1,2,3])) opts = ThunkOptions(proclist=nothing) @test collect(delayed(sum; options=opts)([1,2,3])) == 6 end @@ -89,7 +89,7 @@ end @testset "Processor TLS accessor" begin @everywhere function mythunk(x) - typeof(Dagger.thunk_processor()) + typeof(Dagger.task_processor()) end @test collect(delayed(mythunk)(1)) === ThreadProc end diff --git a/test/scheduler.jl b/test/scheduler.jl index 96daa491d..0e7bf046f 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -182,7 +182,7 @@ end @testset "allow errors" begin opts = ThunkOptions(;allow_errors=true) a = delayed(error; options=opts)("Test") - @test_throws_unwrap Dagger.ThunkFailedException collect(a) + @test_throws_unwrap Dagger.DTaskFailedException collect(a) end end diff --git a/test/scopes.jl b/test/scopes.jl index be6e557f6..5f82a71a0 100644 --- a/test/scopes.jl +++ b/test/scopes.jl @@ -56,7 +56,7 @@ # Different nodes for (ch1, ch2) in [(ns1_ch, ns2_ch), (ns2_ch, ns1_ch)] - @test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end end @testset "Process Scope" begin @@ -75,7 +75,7 @@ # Different process for (ch1, ch2) in [(ps1_ch, ps2_ch), (ps2_ch, ps1_ch)] - @test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end # Same process and node @@ -83,11 +83,11 @@ # Different process and node for (ch1, ch2) in [(ps1_ch, ns2_ch), (ns2_ch, ps1_ch)] - @test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end end @testset "Exact Scope" begin - @everywhere exact_scope_test(ch...) = Dagger.thunk_processor() + @everywhere exact_scope_test(ch...) = Dagger.task_processor() @test es1.parent.wid == wid1 @test es1.parent.parent.uuid == Dagger.system_uuid(wid1) @test es2.parent.wid == wid2 @@ -104,14 +104,14 @@ # Different process, different processor for (ch1, ch2) in [(es1_ch, es2_ch), (es2_ch, es1_ch)] - @test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end # Same process, different processor es1_2 = ExactScope(Dagger.ThreadProc(wid1, 2)) es1_2_ch = Dagger.tochunk(nothing, OSProc(), es1_2) for (ch1, ch2) in [(es1_ch, es1_2_ch), (es1_2_ch, es1_ch)] - @test_throws_unwrap Dagger.ThunkFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) + @test_throws_unwrap Dagger.DTaskFailedException ex.reason<"Scopes are not compatible:" fetch(Dagger.@spawn ch1 + ch2) end end @testset "Union Scope" begin diff --git a/test/thunk.jl b/test/thunk.jl index 82f5c84f5..22a755dba 100644 --- a/test/thunk.jl +++ b/test/thunk.jl @@ -69,7 +69,7 @@ end A = rand(4, 4) @test fetch(@spawn sum(A; dims=1)) ≈ sum(A; dims=1) - @test_throws_unwrap Dagger.ThunkFailedException fetch(@spawn sum(A; fakearg=2)) + @test_throws_unwrap Dagger.DTaskFailedException fetch(@spawn sum(A; fakearg=2)) @test fetch(@spawn reduce(+, A; dims=1, init=2.0)) ≈ reduce(+, A; dims=1, init=2.0) @@ -187,7 +187,7 @@ end a = @spawn error("Test") wait(a) @test isready(a) - @test_throws_unwrap Dagger.ThunkFailedException fetch(a) + @test_throws_unwrap Dagger.DTaskFailedException fetch(a) b = @spawn 1+2 @test fetch(b) == 3 end @@ -202,9 +202,9 @@ end end ex = Dagger.Sch.unwrap_nested_exception(ex) ex_str = sprint(io->Base.showerror(io,ex)) - @test occursin(r"^ThunkFailedException:", ex_str) + @test occursin(r"^DTaskFailedException:", ex_str) @test occursin("Test", ex_str) - @test !occursin("Root Thunk", ex_str) + @test !occursin("Root Task", ex_str) ex = try fetch(b) @@ -214,33 +214,33 @@ end ex = Dagger.Sch.unwrap_nested_exception(ex) ex_str = sprint(io->Base.showerror(io,ex)) @test occursin("Test", ex_str) - @test occursin("Root Thunk", ex_str) + @test occursin("Root Task", ex_str) end @testset "single dependent" begin a = @spawn error("Test") b = @spawn a+2 - @test_throws_unwrap Dagger.ThunkFailedException fetch(a) + @test_throws_unwrap Dagger.DTaskFailedException fetch(a) end @testset "multi dependent" begin a = @spawn error("Test") b = @spawn a+2 c = @spawn a*2 - @test_throws_unwrap Dagger.ThunkFailedException fetch(b) - @test_throws_unwrap Dagger.ThunkFailedException fetch(c) + @test_throws_unwrap Dagger.DTaskFailedException fetch(b) + @test_throws_unwrap Dagger.DTaskFailedException fetch(c) end @testset "dependent chain" begin a = @spawn error("Test") - @test_throws_unwrap Dagger.ThunkFailedException fetch(a) + @test_throws_unwrap Dagger.DTaskFailedException fetch(a) b = @spawn a+1 - @test_throws_unwrap Dagger.ThunkFailedException fetch(b) + @test_throws_unwrap Dagger.DTaskFailedException fetch(b) c = @spawn b+2 - @test_throws_unwrap Dagger.ThunkFailedException fetch(c) + @test_throws_unwrap Dagger.DTaskFailedException fetch(c) end @testset "single input" begin a = @spawn 1+1 b = @spawn (a->error("Test"))(a) @test fetch(a) == 2 - @test_throws_unwrap Dagger.ThunkFailedException fetch(b) + @test_throws_unwrap Dagger.DTaskFailedException fetch(b) end @testset "multi input" begin a = @spawn 1+1 @@ -248,7 +248,7 @@ end c = @spawn ((a,b)->error("Test"))(a,b) @test fetch(a) == 2 @test fetch(b) == 4 - @test_throws_unwrap Dagger.ThunkFailedException fetch(c) + @test_throws_unwrap Dagger.DTaskFailedException fetch(c) end @testset "diamond" begin a = @spawn 1+1 @@ -258,7 +258,7 @@ end @test fetch(a) == 2 @test fetch(b) == 3 @test fetch(c) == 4 - @test_throws_unwrap Dagger.ThunkFailedException fetch(d) + @test_throws_unwrap Dagger.DTaskFailedException fetch(d) end end @testset "remote spawn" begin @@ -276,7 +276,7 @@ end t1 = Dagger.@spawn 1+"fail" Dagger.@spawn t1+1 end - @test_throws_unwrap Dagger.ThunkFailedException fetch(t2) + @test_throws_unwrap Dagger.DTaskFailedException fetch(t2) end @testset "undefined function" begin # Issues #254, #255