Skip to content

Commit

Permalink
Merge branch 'JuliaParallel:master' into matmul
Browse files Browse the repository at this point in the history
  • Loading branch information
Rabab53 authored Jun 3, 2024
2 parents 751736e + 2972cac commit 5f7b4f6
Show file tree
Hide file tree
Showing 16 changed files with 1,666 additions and 446 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "Dagger"
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
version = "0.18.9"
version = "0.18.10"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
67 changes: 64 additions & 3 deletions docs/src/datadeps.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
For many programs, the restriction that tasks cannot write to their arguments
feels overly restrictive and makes certain kinds of programs (such as in-place
linear algebra) hard to express efficiently in Dagger. Thankfully, there is a
solution: `spawn_datadeps`. This function constructs a "datadeps region",
solution called "Datadeps" (short for "data dependencies"), accessible through
the `spawn_datadeps` function. This function constructs a "datadeps region",
within which tasks are allowed to write to their arguments, with parallelism
controlled via dependencies specified via argument annotations. Let's look at
a simple example to make things concrete:
controlled via dependencies specified via argument annotations. Let's look at a
simple example to make things concrete:

```julia
A = rand(1000)
Expand Down Expand Up @@ -94,3 +95,63 @@ Additionally, we can notice a powerful feature of this model - if the
runs sequentially. This means that the structure of the program doesn't have to
change in order to use Dagger for parallelization, which can make applying
Dagger to existing algorithms quite effortless.

## Aliasing Support

Datadeps is smart enough to detect when two arguments from different tasks
actually access the same memory (we say that these arguments "alias"). There's
the obvious case where the two arguments are exactly the same object, but
Datadeps is also aware of more subtle cases, such as when two arguments are
different views into the same array, or where two arrays point to the same
underlying memory. In these cases, Datadeps will ensure that the tasks are
executed in the correct order - if one task writes to an argument which aliases
with an argument read by another task, those two tasks will be executed in
sequence, rather than in parallel.

There are two ways to specify aliasing to Datadeps. The simplest way is the most straightforward: if the argument passed to a task is a view or another supported object (such as an `UpperTriangular`-wrapped array), Datadeps will compare it with all other task's arguments to determine if they alias. This works great when you want to pass that view or `UpperTriangular` object directly to the called function. For example:

```julia
A = rand(1000)
A_l = view(A, 1:500)
A_r = view(A, 501:1000)

# inc! supports views, so we can pass A_l and A_r directly
inc!(X) = X .+= 1

Dagger.spawn_datadeps() do
# These two tasks don't alias, so they can run in parallel
Dagger.@spawn inc!(InOut(A_l))
Dagger.@spawn inc!(InOut(A_r))

# This task aliases with the previous two, so it will run after them
Dagger.@spawn inc!(InOut(A))
end
```

The other way allows you to seperate what argument is passed to the function,
from how that argument is accessed within the function. This is done with the
`Deps` wrapper, which is used like so:

```julia
A = rand(1000, 1000)

inc_upper!(X) = UpperTriangular(X) .+= 1
inc_ulower!(X) = UnitLowerTriangular(X) .+= 1
inc_diag!(X) = X[diagind(X)] .+= 1

Dagger.spawn_datadeps() do
# These two tasks don't alias, so they can run in parallel
Dagger.@spawn inc_upper!(Deps(A, InOut(UpperTriangular)))
Dagger.@spawn inc_ulower!(Deps(A, InOut(UnitLowerTriangular)))

# This task aliases with the `inc_upper!` task (`UpperTriangular` accesses the diagonal of the array)
Dagger.@spawn inc_diag!(Deps(A, InOut(Diagonal)))
end
```

You can pass any number of aliasing modifiers to `Deps`. This is particularly
useful for declaring aliasing with `Diagonal`, `Bidiagonal`, `Tridiagonal`, and
`SymTridiagonal` access, as these "wrappers" make a copy of their parent array
and thus can't be used to "mask" access to the parent like `UpperTriangular`
and `UnitLowerTriangular` can (which is valuable for writing memory-efficient,
generic algorithms in Julia).
143 changes: 143 additions & 0 deletions ext/LuxorExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
module LuxorExt

if isdefined(Base, :get_extension)
using Luxor
else
using ..Luxor
end

import Dagger
import Dagger: Chunk, Processor
import Dagger.TimespanLogging: Timespan

import .Luxor: Drawing, finish, Point, background, sethue, fontsize, rect, text

function proclt(p1::T, p2::R) where {T,R}
if p1.owner != p2.owner
return p1.owner < p2.owner
else
return repr(T) < repr(R)
end
end
function proclt(p1::T, p2::T) where {T}
if p1.owner != p2.owner
return p1.owner < p2.owner
else
for field in fieldnames(T)
f1 = getfield(p1, field)
f2 = getfield(p2, field)
if f1 != f2
return f1 < f2
end
end
end
false
end
proclt(p1::Dagger.OSProc, p2::Dagger.OSProc) = p1.pid < p2.pid
proclt(p1::Dagger.OSProc, p2) = p1.pid < p2.owner
proclt(p1, p2::Dagger.OSProc) = p1.owner < p2.pid

function update_window_logs!(window_logs, logs; root_time, window_start)
if !isempty(logs)
for id in keys(logs)
append!(window_logs, map(x->(x,), filter(x->x.category==:compute||x.category==:scheduler_init, logs[id])))
end
end
for idx in length(window_logs):-1:1
log = window_logs[idx]
if length(log) == 2
# Clear out finished events older than window start
log_finish_s = (log[2].timestamp-root_time)/(1000^3)
if log_finish_s < window_start
@debug "Gantt: Deleted event"
deleteat!(window_logs, idx)
end
elseif log[1] isa Dagger.Event{:finish}
# Pair finish events with start events
sidx = findfirst(x->length(x) == 1 &&
x[1] isa Dagger.Event{:start} &&
x[1].id==log[1].id, window_logs)
if sidx === nothing
@debug "Gantt: Removed unpaired finish"
deleteat!(window_logs, idx)
continue
end
window_logs[sidx] = (window_logs[sidx][1], log[1])
@debug "Gantt: Paired event"
deleteat!(window_logs, idx)
end
end
end
function Dagger.render_plan(logs::Dict, ::Val{:luxor_gantt}; delay=2, width=1000, height=640, window_length=20)
root_time = time_ns()
window_logs = []
if window_length !== nothing
window_start = -window_length
else
window_start = 0
end
window_finish = 0
procs = Dagger.all_processors()
if (height/length(procs)) < 50
height = length(procs) * 50
@warn "SVG height too small; resizing to $height pixels"
end

update_window_logs!(window_logs, logs; root_time=root_time, window_start=window_start)
isempty(window_logs) && return

for proc in unique(map(x->x[1].timeline[2], filter(x->x[1].category==:compute, window_logs)))
push!(procs, proc)
end
colors = Colors.distinguishable_colors(length(procs))
procs_len = length(procs)
proc_height = height/(3procs_len)
@debug "Gantt: Start"
if isfile(svg_path)
Drawing(width, height, svg_path)
else
Drawing(width, height, joinpath(svg_path, repr(image_idx) * ".svg"))
end
background("white")
for (proc_idx, proc) in enumerate(sort(collect(procs); lt=proclt))
ypos = (proc_idx-0.5)*(height/procs_len)
sethue("grey15")
fontsize(round(Int,proc_height))
text(getname(proc), Point(width/2,ypos-(proc_height/2)))
rect(Point(1,ypos+(proc_height/3)),width-2,proc_height,:stroke)
fontsize(8)
text("$(window_start) s", Point(0,ypos); halign=:left)
text("$(window_finish) s", Point(width-8,ypos); halign=:right)
proc_color = colors[proc_idx]
for log in filter(x->x[1].timeline[2]==proc, filter(x->x[1].category==:compute, window_logs))
length(log) == 1 && log[1] isa Dagger.Event{:finish} && error("Unpaired finish!")
log_start_s = (log[1].timestamp-root_time)/(1000^3)
log_finish_s = if length(log) == 2
log_finish_s = (log[2].timestamp-root_time)/(1000^3)
else
window_finish+1
end
xstart = ((log_start_s-window_start)/(window_finish-window_start))*width
xfinish = ((log_finish_s-window_start)/(window_finish-window_start))*width
sethue(proc_color)
rect(Point(xstart,ypos+(proc_height/3)+1),xfinish-xstart,proc_height-2,:fill)
sethue("black")
rect(Point(xstart,ypos+(proc_height/3)+1),xfinish-xstart,proc_height-2,:stroke)
end
for log in filter(x->x[1].category==:scheduler_init, window_logs)
log_start_s = (log[1].timestamp-root_time)/(1000^3)
log_finish_s = if length(log) == 2
log_finish_s = (log[2].timestamp-root_time)/(1000^3)
else
window_finish+1
end
xstart = ((log_start_s-window_start)/(window_finish-window_start))*width
xfinish = ((log_finish_s-window_start)/(window_finish-window_start))*width
sethue("red")
rect(Point(xstart,0),#=xfinish-xstart=#1,height)
end
end
finish()
end

end # module LuxorExt
4 changes: 2 additions & 2 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Distributed
import Distributed: Future, RemoteChannel, myid, workers, nworkers, procs, remotecall, remotecall_wait, remotecall_fetch

import LinearAlgebra
import LinearAlgebra: Adjoint, BLAS, Diagonal, LAPACK, LowerTriangular, PosDefException, Transpose, UpperTriangular, diagind, ishermitian, issymmetric
import LinearAlgebra: Adjoint, BLAS, Diagonal, Bidiagonal, Tridiagonal, LAPACK, LowerTriangular, PosDefException, Transpose, UpperTriangular, UnitLowerTriangular, UnitUpperTriangular, diagind, ishermitian, issymmetric

import UUIDs: UUID, uuid4

Expand All @@ -23,7 +23,7 @@ else
end

if !isdefined(Base, :get_extension)
using Requires
import Requires: @require
end

import TimespanLogging
Expand Down
6 changes: 3 additions & 3 deletions src/array/cholesky.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
LinearAlgebra.cholcopy(A::DArray{T,2}) where T = copy(A)
function potrf_checked!(uplo, A, info_arr)
_A, info = LAPACK.potrf!(uplo, A)
if info > 0
_A, info = move(thunk_processor(), LAPACK.potrf!)(uplo, A)
if info != 0
info_arr[1] = info
throw(PosDefException(info))
end
Expand All @@ -22,7 +22,7 @@ function LinearAlgebra._chol!(A::DArray{T,2}, ::Type{UpperTriangular}) where T

info = [convert(LinearAlgebra.BlasInt, 0)]
try
Dagger.spawn_datadeps() do
Dagger.spawn_datadeps(;aliasing=true) do
for k in range(1, mt)
Dagger.@spawn potrf_checked!(uplo, InOut(Ac[k, k]), Out(info))
for n in range(k+1, nt)
Expand Down
4 changes: 2 additions & 2 deletions src/array/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ function Base.show(io::IO, ::MIME"text/plain", A::DArray{T,N}) where {T,N}
printstyled(io, "~$(round(Int, pct_complete))% completed"; color=:yellow)
end
println(io)
with_index_caching(1) do
# FIXME: with_index_caching(1) do
Base.print_array(IOContext(io, :compact=>true), ColorArray(A))
end
# end
end

function (==)(x::ArrayOp, y::ArrayOp)
Expand Down
Loading

0 comments on commit 5f7b4f6

Please sign in to comment.