diff --git a/base/arraydist.jl b/base/arraydist.jl new file mode 100644 index 0000000000000..e042c55edf0ed --- /dev/null +++ b/base/arraydist.jl @@ -0,0 +1,216 @@ +# ArrayDist specifies the distribution of an array +# Specifically it should provide the following: +# dims(ad) : dimensions of the array being distributed +# pdims(ad) : dimensions of how the workers are partitoned - Number of parts in each dimension +# ngroups(ad) : Equal to the number of workers +# length(ad) : Number of tiles/partitions +# getindex(ad, i) : array of indices at index i +# locate(ad, I) : returns the partition index of the requested array index +# dmode(ad) : returns a const indicating the type of array using the distribution + + +abstract ArrayDist + +# To be implemented +#type TileDist <: ArrayDist +#end + +# Different ways in which an ArrDist may be distributed. +global const DISTMODE_DISTRIBUTED = 1 +global const DISTMODE_SHARED = 2 + +type DimDist{N} <: ArrayDist + dims::NTuple{N,Int} + + # number of parts in each dimension + pdims::Vector{Int} + + # indexes held by piece i + indexes::Array{NTuple{N,Range1{Int}},N} + + # cuts[d][i] = first index of chunk i in dimension d + cuts::Vector{Vector{Int}} + + dmode::Integer + + DimDist(dims, pdims, indexes, cuts, mode) = new(dims, pdims, indexes, cuts, mode) +end + +function DimDist(dims, ngroups, pdims; mode=DISTMODE_DISTRIBUTED) + if prod(pdims) > ngroups + error("Total requested number of chunks is greater than the number of workers") + end + idxs, cuts = chunk_idxs([dims...], pdims) + assert(dims == map(last,last(idxs))) + + DimDist{length(dims)}(dims, pdims, idxs, cuts, mode) +end + +DimDist(dims, ngroups; kwargs...) = DimDist(dims, ngroups, defaultdist(dims, ngroups); kwargs...) +DimDist(dims::(Integer...); kwargs...) = DimDist(dims, nworkers(); kwargs...) +DimDist(dims::Integer...; kwargs...) = DimDist(dims; kwargs...) + +## chunk index utilities ## + +# decide how to divide each dimension +# returns size of chunks array +# allocates largest factor to largest dim +function defaultdist(dims, ngroups) + dims = [dims...] + chunks = ones(Int, length(dims)) + f = sort!(collect(keys(factor(ngroups))), rev=true) + k = 1 + while ngroups > 1 + # repeatedly allocate largest factor to largest dim + if ngroups%f[k] != 0 + k += 1 + if k > length(f) + break + end + end + fac = f[k] + (d, dno) = findmax(dims) + # resolve ties to highest dim + dno = last(find(dims .== d)) + if dims[dno] >= fac + dims[dno] = div(dims[dno], fac) + chunks[dno] *= fac + end + ngroups = div(ngroups,fac) + end + chunks +end + +# get array of start indexes for dividing sz into nc chunks +function defaultdist(sz::Int, nc::Int) + if sz >= nc + iround(linspace(1, sz+1, nc+1)) + else + [[1:(sz+1)], zeros(Int, nc-sz)] + end +end + + +# compute indexes array for dividing dims into chunks +function chunk_idxs(dims, chunks) + cuts = map(defaultdist, dims, chunks) + n = length(dims) + idxs = Array(NTuple{n,Range1{Int}},chunks...) + cartesianmap(tuple(chunks...)) do cidx... + idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1)) + end + idxs, cuts +end + + +dims(dimdist::DimDist) = dimdist.dims +pdims(dimdist::DimDist) = dimdist.pdims +ngroups(dimdist::DimDist) = length(dimdist.indexes) +length(dimdist::DimDist) = length(dimdist.indexes) +dmode(dimdist::DimDist) = dimdist.dmode + +getindex(dimdist::DimDist, i::Int) = dimdist.indexes[i] +getindex(dimdist::DimDist, i::Int...) = dimdist.indexes[i...] + +# find which piece holds index (I...) +function locate{N}(dimdist::DimDist{N}, I::Int...) + ntuple(N, i->searchsortedlast(dimdist.cuts[i], I[i])) +end + + +# Helper types and functions for distributed and shared arrays +type DistRefs{N} + ppmap::Vector{Int} + chunks::Array{RemoteRef,N} + + DistRefs(p, c) = new(p,c) +end + +procs(dr::DistRefs) = dr.ppmap +length(dr::DistRefs) = length(dr.chunks) + +getindex(ar::DistRefs, i::Int) = ar.chunks[i] +getindex(ar::DistRefs, i...) = ar.chunks[i...] + + +function setup_chunks(allocf, dprocs, arrdist) + if ngroups(arrdist) > length(dprocs) + error("Number of array partitions requested is more than the number of workers specified") + end + + ppmap = dprocs[1:ngroups(arrdist)] + chunks = Array(RemoteRef, pdims(arrdist)...) + for (i, p) in enumerate(ppmap) + chunks[i] = remotecall(p, allocf, arrdist[i]) + end + + dr = DistRefs{length(dims(arrdist))}(ppmap, chunks) + +# assert(size(chunks) == size(arrdist)) + assert(length(chunks) == length(ppmap)) + + dr +end + +localpartindex(dr::DistRefs) = findfirst(dr.ppmap, myid()) + + +# additional distributed/shared array constructors +function fill(v, dimdist::DimDist; kwargs...) + if dmode(dimdist) == DISTMODE_DISTRIBUTED + DArray(I->fill(v, map(length,I)), dimdist; kwargs...) + else + SharedArray(typeof(v), dimdist; init = S->fill!(localpart(S), v), kwargs...) + end +end + +# rand variant with range +function rand(TR::Union(DataType, Range1), dimdist::DimDist; kwargs...) + if dmode(dimdist) == DISTMODE_DISTRIBUTED + DArray(I->rand(TR, map(length,I)), dimdist; kwargs...) + else + if isa(TR, Range1) + SharedArray(Int, dimdist; init = S->map!((x)->rand(TR), localpart(S)), kwargs...) + else + SharedArray(TR, dimdist; init = S->map!((x)->rand(TR), localpart(S)), kwargs...) + end + end +end + +rand(dimdist::DimDist; kwargs...) = rand(Float64, dimdist; kwargs...) + +function randn(dimdist::DimDist; kwargs...) + if dmode(dimdist) == DISTMODE_DISTRIBUTED + DArray(I->randn(map(length,I)), dimdist; kwargs...) + else + SharedArray(Float64, dimdist; init = S-> map!((x)->randn(), localpart(S)), kwargs...) + end +end + +# ambiguity warning removal +similar{T}(a::Array{T, 1}, dimdist::DimDist) = make_distributed(a, T, dimdist) +similar{T}(a::Array{T, 2}, dimdist::DimDist) = make_distributed(a, T, dimdist) + +function make_distributed(a, T, dimdist; kwargs...) + if dmode(dimdist) == DISTMODE_DISTRIBUTED + owner = myid() + rr = RemoteRef() + put(rr, a) + DArray(dimdist; kwargs...) do I + remotecall_fetch(owner, ()->fetch(rr)[I...]) + end + else + sa = SharedArray(T, dimdist; kwargs...) + if isdefined(sa, :local_shmmap) + copy!(sa.local_shmmap, a) + else + remotecall_fetch(procs(sa)[1], SA -> copy!(SA.local_shmmap, a), SA) + end + sa + end +end + +# generic version +similar{T}(a::AbstractArray{T}, dimdist::DimDist; kwargs...) = make_distributed(a, T, dimdist; kwargs...) + + diff --git a/base/darray.jl b/base/darray.jl index 0e52a283d6fa2..cbbffb6e35f67 100644 --- a/base/darray.jl +++ b/base/darray.jl @@ -1,178 +1,118 @@ type DArray{T,N,A} <: AbstractArray{T,N} - dims::NTuple{N,Int} - - chunks::Array{RemoteRef,N} - - # pmap[i]==p ⇒ processor p has piece i - pmap::Vector{Int} - - # indexes held by piece i - indexes::Array{NTuple{N,Range1{Int}},N} - # cuts[d][i] = first index of chunk i in dimension d - cuts::Vector{Vector{Int}} - - function DArray(dims, chunks, pmap, indexes, cuts) - # check invariants - assert(size(chunks) == size(indexes)) - assert(length(chunks) == length(pmap)) - assert(dims == map(last,last(indexes))) - new(dims, chunks, pmap, indexes, cuts) - end + dimdist::DimDist{N} + refs::DistRefs{N} + DArray(ad, r) = new(ad, r) end + typealias SubDArray{T,N,D<:DArray} SubArray{T,N,D} typealias SubOrDArray{T,N} Union(DArray{T,N}, SubDArray{T,N}) ## core constructors ## -# dist == size(chunks) -function DArray(init, dims, procs, dist) - np = prod(dist) - procs = procs[1:np] - idxs, cuts = chunk_idxs([dims...], dist) - chunks = Array(RemoteRef, dist...) - for i = 1:np - chunks[i] = remotecall(procs[i], init, idxs[i]) - end - p = max(1, localpartindex(procs)) - A = remotecall_fetch(procs[p], r->typeof(fetch(r)), chunks[p]) - DArray{eltype(A),length(dims),A}(dims, chunks, procs, idxs, cuts) -end - -function DArray(init, dims, procs) - if isempty(procs) - error("no processors") +function DArray(alloc_arg::Union(Type, Function), dimdist::DimDist; init=false, dprocs=workers()) + N = length(dims(dimdist)) + + if isa(alloc_arg, Type) + allocf = (idxs) -> Array(alloc_arg, map(length, idxs)) + else + allocf = alloc_arg end - DArray(init, dims, procs, defaultdist(dims,procs)) -end -DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(),maximum(dims))]) + + refs = setup_chunks(allocf, dprocs, dimdist) -# new DArray similar to an existing one -DArray(init, d::DArray) = DArray(init, size(d), procs(d), [size(d.chunks)...]) - -size(d::DArray) = d.dims -procs(d::DArray) = d.pmap - -chunktype{T,N,A}(d::DArray{T,N,A}) = A + p = max(1, localpartindex(refs)) + A = remotecall_fetch(procs(refs)[p], r->typeof(fetch(r)), refs[p]) + T = eltype(A) + d = DArray{T,N,A}(dimdist, refs) -## chunk index utilities ## - -# decide how to divide each dimension -# returns size of chunks array -function defaultdist(dims, procs) - dims = [dims...] - chunks = ones(Int, length(dims)) - np = length(procs) - f = sort!(collect(keys(factor(np))), rev=true) - k = 1 - while np > 1 - # repeatedly allocate largest factor to largest dim - if np%f[k] != 0 - k += 1 - if k > length(f) - break + # if present, init function is called on each of the parts + @sync begin + if isa(init, Function) + for i in procs(d) + @async remotecall_wait(i, init, d) end end - fac = f[k] - (d, dno) = findmax(dims) - # resolve ties to highest dim - dno = last(find(dims .== d)) - if dims[dno] >= fac - dims[dno] = div(dims[dno], fac) - chunks[dno] *= fac - end - np = div(np,fac) end - chunks + d end -# get array of start indexes for dividing sz into nc chunks -function defaultdist(sz::Int, nc::Int) - if sz >= nc - iround(linspace(1, sz+1, nc+1)) +function DArray(alloc_arg, dims; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + dprocs = kwargs[idx][2] + DArray(alloc_arg, DimDist(dims, length(dprocs)); kwargs...) else - [[1:(sz+1)], zeros(Int, nc-sz)] + DArray(alloc_arg, DimDist(dims); kwargs...) end end - -# compute indexes array for dividing dims into chunks -function chunk_idxs(dims, chunks) - cuts = map(defaultdist, dims, chunks) - n = length(dims) - idxs = Array(NTuple{n,Range1{Int}},chunks...) - cartesianmap(tuple(chunks...)) do cidx... - idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1)) +function DArray(alloc_arg, dims, dist; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + dprocs = kwargs[idx][2] + DArray(alloc_arg, DimDist(dims, length(dprocs), dist); kwargs...) + else + DArray(alloc_arg, DimDist(dims, length(workers()), dist); kwargs...) end - idxs, cuts end -function localpartindex(pmap::Vector{Int}) - mi = myid() - for i = 1:length(pmap) - if pmap[i] == mi - return i +# new DArray similar to an existing one, ensure length of dprocs is same as original if present +function DArray{T}(d::DArray{T}; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + dprocs = kwargs[idx][2] + if length(procs(d)) != length(dprocs) + error("Requested number of workers must be same as existing DArray") end + DArray(T, DimDist(size(d), length(dprocs), pdims(d.dimdist)); kwargs...) + else + append!(kwargs, [(:dprocs, procs(d))]) + DArray(T, DimDist(size(d), length(procs(d)), pdims(d.dimdist)); kwargs...) end - return 0 end -localpartindex(d::DArray) = localpartindex(d.pmap) +length(d::DArray) = prod(dims(d.dimdist)) +size(d::DArray) = dims(d.dimdist) +procs(d::DArray) = procs(d.refs) + +chunktype{T,N,A}(d::DArray{T,N,A}) = A + +localpartindex(d::DArray) = localpartindex(d.refs) function localpart{T,N,A}(d::DArray{T,N,A}) - lpidx = localpartindex(d) + lpidx = localpartindex(d.refs) if lpidx == 0 convert(A, Array(T, ntuple(N,i->0)))::A else - fetch(d.chunks[lpidx])::A + fetch(d.refs[lpidx])::A end end -function myindexes(d::DArray) - lpidx = localpartindex(d) + + +function myindexes(d::DArray) + lpidx = localpartindex(d.refs) if lpidx == 0 - ntuple(ndims(d), i->1:0) + ntuple(N, i->1:0) else - d.indexes[lpidx] + (d.dimdist[lpidx])[1] end end -# find which piece holds index (I...) -function locate(d::DArray, I::Int...) - ntuple(ndims(d), i->searchsortedlast(d.cuts[i], I[i])) -end +locate(d::DArray, I::Int...) = locate(d.dimdist, I...) -chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(d.chunks[i...])::A +chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(d.refs[i...])::A -## convenience constructors ## -dzeros(args...) = DArray(I->zeros(map(length,I)), args...) -dzeros(d::Int...) = dzeros(d) -dones(args...) = DArray(I->ones(map(length,I)), args...) -dones(d::Int...) = dones(d) -dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...) -dfill(v, d::Int...) = dfill(v, d) -drand(args...) = DArray(I->rand(map(length,I)), args...) -drand(d::Int...) = drand(d) -drandn(args...) = DArray(I->randn(map(length,I)), args...) -drandn(d::Int...) = drandn(d) ## conversions ## -function distribute(a::AbstractArray) - owner = myid() - rr = RemoteRef() - put(rr, a) - DArray(size(a)) do I - remotecall_fetch(owner, ()->fetch(rr)[I...]) - end -end - convert{T,N}(::Type{Array}, d::SubOrDArray{T,N}) = convert(Array{T,N}, d) function convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N}) a = Array(S, size(d)) @sync begin - for i = 1:length(d.chunks) - @async a[d.indexes[i]...] = chunk(d, i) + for i = 1:length(d.dimdist) + @async a[(d.dimdist[i])...] = chunk(d, i) end end a @@ -183,7 +123,7 @@ function convert{S,T,N}(::Type{Array{S,N}}, s::SubDArray{T,N}) d = s.parent if isa(I,(Range1{Int}...)) && S<:T && T<:S l = locate(d, map(first, I)...) - if isequal(d.indexes[l...], I) + if isequal(d.dimdist[l...], I) # SubDArray corresponds to a chunk return chunk(d, l...) end @@ -220,23 +160,25 @@ end ## indexing ## -function getindex(r::RemoteRef, args...) +function getchunk(r::RemoteRef, args...) if r.where==myid() getindex(fetch(r), args...) else - remotecall_fetch(r.where, getindex, r, args...) + remotecall_fetch(r.where, getchunk, r, args...) end end + getindex(d::DArray, i::Int) = getindex(d, ind2sub(size(d), i)) getindex(d::DArray, i::Int...) = getindex(d, sub2ind(size(d), i...)) -function getindex{T}(d::DArray{T}, I::(Int...)) - chidx = locate(d, I...) - chunk = d.chunks[chidx...] - idxs = d.indexes[chidx...] - localidx = ntuple(ndims(d), i->(I[i]-first(idxs[i])+1)) - chunk[localidx...]::T +function getindex{T, N}(d::DArray{T,N}, I::(Int...)) + chidx = locate(d.dimdist, I...) + idxs = d.dimdist[chidx...] + chunk_rr = d.refs[chidx...] + localidx = ntuple(N, i->(I[i]-first(idxs[i])+1)) + + getchunk(chunk_rr, localidx...)::T end getindex(d::DArray) = d[1] @@ -250,8 +192,8 @@ copy(d::SubOrDArray) = d function setindex!(a::Array, d::DArray, I::Range1{Int}...) n = length(I) @sync begin - for i = 1:length(d.chunks) - K = d.indexes[i] + for i = 1:length(d.dimdist) + K = d.dimdist[i] @async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i) end end @@ -268,8 +210,8 @@ function setindex!(a::Array, s::SubDArray, I::Range1{Int}...) end offs = [isa(J[i],Int) ? J[i]-1 : first(J[i])-1 for i=1:n] @sync begin - for i = 1:length(d.chunks) - K_c = {d.indexes[i]...} + for i = 1:length(d.dimdist) + K_c = {d.dimdist[i]...} K = [ intersect(J[j],K_c[j]) for j=1:n ] if !any(isempty, K) idxs = [ I[j][K[j]-offs[j]] for j=1:n ] @@ -278,7 +220,7 @@ function setindex!(a::Array, s::SubDArray, I::Range1{Int}...) @async a[idxs...] = chunk(d, i) else # partial chunk - ch = d.chunks[i] + ch = d.refs[i] @async a[idxs...] = remotecall_fetch(ch.where, ()->sub(fetch(ch), [K[j]-first(K_c[j])+1 for j=1:n]...)) end end diff --git a/base/deprecated.jl b/base/deprecated.jl index cb81645b95d4d..d4d8ce81cbf2f 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -290,6 +290,7 @@ export ComplexPair @deprecate parse(str::String, pos::Int, greedy::Bool, raise::Bool) parse(str,pos,greedy=greedy,raise=raise) @deprecate parse(str::String, pos::Int, greedy::Bool) parse(str,pos,greedy=greedy) + function amap(f::Function, A::AbstractArray, axis::Integer) depwarn("amap is deprecated, use mapslices(f, A, dims) instead", :amap) dimsA = size(A) @@ -368,5 +369,33 @@ const CharString = UTF32String @deprecate with_bigfloat_rounding(f::Function, r::RoundingMode) with_rounding(f::Function, BigFloat, r) eval(Sys, :(@deprecate shlib_list dllist)) + +@deprecate dzeros(dims) fill(0.0, DimDist(dims)) +@deprecate dzeros(dims, procs) fill(0.0, DimDist(dims, length(procs)); dprocs=procs) +@deprecate dzeros(dims, procs, dist) fill(0.0, DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate dzeros(d::Int...) fill(0.0, DimDist(d)) + +@deprecate dones(dims) fill(1.0, DimDist(dims)) +@deprecate dones(dims, procs) fill(1.0, DimDist(dims, length(procs)); dprocs=procs) +@deprecate dones(dims, procs, dist) fill(1.0, DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate dones(d::Int...) fill(1.0, DimDist(d)) + +@deprecate drand(dims) rand(DimDist(dims)) +@deprecate drand(dims, procs) rand(DimDist(dims, length(procs)); dprocs=procs) +@deprecate drand(dims, procs, dist) rand(DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate drand(d::Int...) rand(DimDist(d)) + +@deprecate drandn(dims) randn(DimDist(dims)) +@deprecate drandn(dims, procs) randn(DimDist(dims, length(procs)); dprocs=procs) +@deprecate drandn(dims, procs, dist) randn(DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate drandn(d::Int...) randn(DimDist(d)) + +@deprecate dfill(v, dims) fill(v, DimDist(dims)) +@deprecate dfill(v, dims, procs) fill(v, DimDist(dims, length(procs)); dprocs=procs) +@deprecate dfill(v, dims, procs, dist) fill(v, DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate dfill(v, d::Int...) fill(v, DimDist(d)) + +@deprecate distribute(a::Array) similar(a, DimDist(size(a))) + # 0.3 discontinued functions diff --git a/base/exports.jl b/base/exports.jl index fc95f3d71cc32..d19092a9ffe17 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -18,6 +18,7 @@ export AbstractVector, AbstractVecOrMat, Array, + ArrayDist, Associative, Bidiagonal, BigFloat, @@ -40,6 +41,7 @@ export Diagonal, Dict, Dims, + DimDist, EachLine, Eigen, Enumerate, @@ -89,6 +91,7 @@ export RoundUp, Schur, Set, + SharedArray, SparseMatrixCSC, StatStruct, StridedArray, @@ -155,6 +158,8 @@ export C_NULL, CPU_CORES, DL_LOAD_PATH, + DISTMODE_DISTRIBUTED, + DISTMODE_SHARED, ENDIAN_BOM, ENV, Inf, @@ -1138,7 +1143,7 @@ export pushdisplay, redisplay, -# distributed arrays +# distributed and shared arrays dfill, distribute, dones, @@ -1148,7 +1153,9 @@ export localpart, myindexes, procs, + share, + # paths and file names abspath, basename, diff --git a/base/mmap.jl b/base/mmap.jl index 2d60e5f9e7f7b..6d5060ea37a41 100644 --- a/base/mmap.jl +++ b/base/mmap.jl @@ -108,13 +108,13 @@ function mmap_stream_settings(s::IO) end # Mmapped-array constructor -function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset) +function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset; grow::Bool=true) prot, flags, iswrite = mmap_stream_settings(s) len = prod(dims)*sizeof(T) if len > typemax(Int) error("file is too large to memory-map on this platform") end - if iswrite + if iswrite && grow pmap, delta = mmap_grow(len, prot, flags, fd(s), offset) else pmap, delta = mmap(len, prot, flags, fd(s), offset) diff --git a/base/sharedarray.jl b/base/sharedarray.jl new file mode 100644 index 0000000000000..5287ccec88716 --- /dev/null +++ b/base/sharedarray.jl @@ -0,0 +1,250 @@ +type SharedArray{T,N} <: AbstractArray{T,N} + arrdist::ArrayDist + refs::DistRefs{N} + + # Local shmem map. Not to be serialized. + local_shmmap::Array{T,N} + + # local index into chunks when this SharedArray is serialized onto a different node, 0 if non-existent locally. + local_idx::Int +end + +function SharedArray(T::Type, arrdist::ArrayDist; init=false, dprocs=workers()) + N = length(dims(arrdist)) + + !isbits(T) ? error("Type of Shared Array elements must be bits types") : nothing + @windows_only error(" SharedArray is not supported on Windows yet.") + + onlocalhost = assert_same_host(dprocs) + + local shm_seg_name = "" + local local_shmmap + local sa = nothing + local shmmem_create_pid + try + # On OSX, the shm_seg_name length must be < 32 characters + shm_seg_name = string("/jl", getpid(), int64(time() * 10^9)) + if onlocalhost + shmmem_create_pid = myid() + local_shmmap = shm_mmap_array(T, dims(arrdist), shm_seg_name, JL_O_CREAT | JL_O_RDWR) + else + # The shared array is being created on a remote machine.... + shmmem_create_pid = dprocs[1] + remotecall(dprocs[1], () -> begin shm_mmap_array(T, dims(arrdist), shm_seg_name, JL_O_CREAT | JL_O_RDWR); nothing end) + end + + func_alloc = (idxs) -> begin + basemap = shm_mmap_array(T, dims(arrdist), shm_seg_name, JL_O_RDWR) + sub(basemap, idxs) + end + + refs = setup_chunks(func_alloc, dprocs, arrdist) + + # Wait till all the workers have mapped the segment + for i in 1:length(refs) + wait(refs[i]) + end + + # All good, immediately unlink the segment. + remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end) + shm_seg_name = "" + + if onlocalhost + sa = SharedArray{T,N}(arrdist, refs, local_shmmap, localpartindex(refs)) + else + sa = SharedArray{T,N}(arrdist, refs) + sa.local_idx = 0 + end + + # if present init function is called on each of the parts + @sync begin + if isa(init, Function) + for i in procs(sa) + @async remotecall_wait(i, init, sa) + end + end + end + + finally + if shm_seg_name != "" + remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end) + end + end + sa +end + + +function SharedArray(T, dims; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + SharedArray(T, DimDist(dims, length(kwargs[idx][2]); mode=DISTMODE_SHARED); kwargs...) + else + SharedArray(T, DimDist(dims, length(workers()); mode=DISTMODE_SHARED); kwargs...) + end +end + +function SharedArray(T, dims, dist; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + SharedArray(T, DimDist(dims, length(kwargs[idx][2]), dist; mode=DISTMODE_SHARED); kwargs...) + else + SharedArray(T, DimDist(dims, length(workers()), dist; mode=DISTMODE_SHARED); kwargs...) + end +end + +# new SharedArray similar to an existing one +function SharedArray{T}(sa::SharedArray{T}; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + dprocs = kwargs[idx][2] + if length(procs(d)) != length(dprocs) + error("Requested number of workers must be same as existing SharedArray") + end + SharedArray(T, size(sa), length(dprocs), pdims(sa.arrdist); kwargs...) + else + append!(kwargs, [(:dprocs, procs(d))]) + SharedArray(T, size(sa), length(dprocs), length(procs(d)), pdims(sa.arrdist); kwargs...) + end +end + + +length(sa::SharedArray) = prod(dims(sa.arrdist)) +size(sa::SharedArray) = dims(sa.arrdist) +procs(sa::SharedArray) = procs(sa.refs) + +localpartindex(sa::SharedArray) = localpartindex(sa.refs) + +function localpart{T,N}(sa::SharedArray{T,N}) + if sa.local_idx == 0 + sub(Array(T, ntuple(N,i->0)), ntuple(N, i->1:0)) + else + fetch(sa.refs[sa.local_idx]) + end +end +function myindexes(sa::SharedArray) + lpidx = localpartindex(sa.refs) + if lpidx == 0 + ntuple(N, i->1:0) + else + d.dimdist[lpidx] + end + +end + +locate(sa::SharedArray, I::Int...) = locate(sa.arrdist, I...) + + + +# Don't serialize local_shmmap (it is the complete array) and +# local_idx, which is relevant to the current process only +function serialize(s, sa::SharedArray) + serialize_type(s, typeof(sa)) + serialize(s, length(SharedArray.names)) + for n in SharedArray.names + if n == :local_shmmap + writetag(s, UndefRefTag) + else + serialize(s, getfield(sa, n)) + end + end +end + +function deserialize{T,N}(s, t::Type{SharedArray{T,N}}) + sa = invoke(deserialize, (Any, DataType), s, t) + + sa.local_idx = localpartindex(sa) + if (sa.local_idx > 0) + sa.local_shmmap = parent(fetch(sa.refs[sa.local_idx])) + else + error("SharedArray cannot be used on a non-participating process") + end + sa +end + +localpartindex(sa::SharedArray) = localpartindex(sa.refs) + +convert(::Type{Array}, sa::SharedArray) = sa.local_shmmap + +# avoiding ambiguity warnings +getindex(sa::SharedArray, x::Real) = getindex(sa.local_shmmap, x) +getindex(sa::SharedArray, x::AbstractArray) = getindex(sa.local_shmmap, x) + +# pass through getindex and setindex! - they always work on the complete array unlike DArrays +getindex(sa::SharedArray, args...) = getindex(sa.local_shmmap, args...) +setindex!(sa::SharedArray, args...) = (setindex!(sa.local_shmmap, args...); sa) + +function print_shmem_limits(slen) + try + @linux_only pfx = "kernel" + @osx_only pfx = "kern.sysv" + + shmmax_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmmax`)[1]))[end]), 1024*1024) + page_size = int(split(readall(readsfrom(`getconf PAGE_SIZE`)[1]))[end]) + shmall_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmall`)[1]))[end]) * page_size, 1024*1024) + + println("System max size of single shmem segment(MB) : ", shmmax_MB, + "\nSystem max size of all shmem segments(MB) : ", shmall_MB, + "\nRequested size(MB) : ", div(slen, 1024*1024), + "\nPlease ensure requested size is within system limits.", + "\nIf not, increase system limits and try again." + ) + catch e + nothing # Ignore any errors in this... + end +end + +# utilities +function shm_mmap_array(T, dims, shm_seg_name, mode) + local s = nothing + local A = nothing + try + fd_mem = shm_open(shm_seg_name, mode, S_IRUSR | S_IWUSR) + if !(fd_mem > 0) + error("shm_open() failed") + end + + s = fdio(fd_mem, true) + + # On OSX, ftruncate must to used to set size of segment, just lseek does not work. + # and only at creation time + if (mode & JL_O_CREAT) == JL_O_CREAT + rc = ccall(:ftruncate, Int, (Int, Int), fd_mem, prod(dims)*sizeof(T)) + if rc != 0 + ec = errno() + error("ftruncate() failed, errno : ", ec) + end + end + + A = mmap_array(T, dims, s, 0, grow=false) + catch e + print_shmem_limits(prod(dims)*sizeof(T)) + rethrow(e) + + finally + if s != nothing + close(s) + end + end + A +end + +@unix_only shm_unlink(shm_seg_name) = ccall(:shm_unlink, Cint, (Ptr{Uint8},), shm_seg_name) +@unix_only shm_open(shm_seg_name, oflags, permissions) = ccall(:shm_open, Int, (Ptr{Uint8}, Int, Int), shm_seg_name, oflags, permissions) + + +function assert_same_host(procs) + myip = + resp = Array(Any, length(procs)) + + @sync begin + for (i, p) in enumerate(procs) + @async resp[i] = remotecall_fetch(p, () -> getipaddr()) + end + end + + if !all(x->x==resp[1], resp) + error("SharedArray requires all requested processes to be on the same machine.") + end + + return (resp[1] != getipaddr()) ? false : true +end diff --git a/base/sysimg.jl b/base/sysimg.jl index e5da016db2fba..03a8a3aecb851 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -154,8 +154,10 @@ importall .Sort include("combinatorics.jl") # distributed arrays and memory-mapped arrays +include("arraydist.jl") include("darray.jl") include("mmap.jl") +include("sharedarray.jl") # utilities - version, timing, help, edit, metaprogramming include("sysinfo.jl") diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index 8944977c973cc..ec95ffb381a5b 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -398,8 +398,8 @@ required, since the threads are scheduled cooperatively and not preemptively. This means context switches only occur at well-defined points: in this case, when ``remotecall_fetch`` is called. -Distributed Arrays ------------------- +Distributed and Shared Arrays +----------------------------- Large computations are often organized around large arrays of data. In these cases, a particularly natural way to obtain parallelism is to @@ -415,21 +415,21 @@ A ``DArray`` can also use arbitrary array-like types to represent the local chunks that store actual data. The data in a ``DArray`` is distributed by dividing the index space into some number of blocks in each dimension. -Common kinds of arrays can be constructed with functions beginning with -``d``:: +Common kinds of darrays can be constructed using the same convenience +functions for Arrays with the first parameter specified as ``DArray``:: - dzeros(100,100,10) - dones(100,100,10) - drand(100,100,10) - drandn(100,100,10) - dfill(x, 100,100,10) + zeros(DArray, 100,100,10) + ones(DArray, 100,100,10) + rand(DArray, 100,100,10) + randn(DArray, 100,100,10) + fill(x, DArray, 100,100,10) # second parameter for fill In the last case, each element will be initialized to the specified value ``x``. These functions automatically pick a distribution for you. For more control, you can specify which processors to use, and how the data should be distributed:: - dzeros((100,100), workers()[1:4], [1,4]) + zeros(DArray, (100,100), workers()[1:4], [1,4]) The second argument specifies that the array should be created on the first four workers. When dividing data among a large number of processes, @@ -464,11 +464,12 @@ Constructing Distributed Arrays The primitive ``DArray`` constructor has the following somewhat elaborate signature:: - DArray(init, dims[, procs, dist]) + DArray(alloc, dims[, procs, dist]; init=false) -``init`` is a function that accepts a tuple of index ranges. This function should +``alloc`` is a function that accepts a tuple of index ranges. This function should allocate a local chunk of the distributed array and initialize it for the specified -indices. ``dims`` is the overall size of the distributed array. +indices. You can also pass a type instead and the required space is allocated by DArray +itself. ``dims`` is the overall size of the distributed array. ``procs`` optionally specifies a vector of processor IDs to use. ``dist`` is an integer vector specifying how many chunks the distributed array should be divided into in each dimension. @@ -476,13 +477,14 @@ distributed array should be divided into in each dimension. The last two arguments are optional, and defaults will be used if they are omitted. -As an example, here is how to turn the local array constructor ``fill`` -into a distributed array constructor:: +If a keyword argument ``init`` function is specified, it is called on all the workers +with the ``darray`` object and is expected to initialize its ``localpart``. - dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...) +As an example, the DArray ``fill`` is implemented as:: -In this case the ``init`` function only needs to call ``fill`` with the -dimensions of the local piece it is creating. + fill(v, ::Type{DArray}, args...) = DArray(I->fill(v, map(length,I)), args...) + +In this case the ``alloc`` allocates and initializes the local piece it is creating. Distributed Array Operations ---------------------------- @@ -519,7 +521,7 @@ following code accomplishes this:: As you can see, we use a series of indexing expressions to fetch data into a local array ``old``. Note that the ``do`` block syntax is -convenient for passing ``init`` functions to the ``DArray`` constructor. +convenient for passing ``alloc`` functions to the ``DArray`` constructor. Next, the serial function ``life_rule`` is called to apply the update rules to the data, yielding the needed ``DArray`` chunk. Nothing about ``life_rule`` is ``DArray``\ -specific, but we list it here for completeness:: @@ -540,6 +542,20 @@ is ``DArray``\ -specific, but we list it here for completeness:: new end +SharedArrays +------------ + +SharedArrays are like DArrays in the sense that they allow computation to be more easily +spread across a bunch of worker processes. Unlike DArrays which allocate a chunk of +the Array on each participating worker, SharedArrays map the complete array into each +worker using operating system provided shared memory facilities. + +While each worker has full visibility into the SharedArray, local chunks in SharedArrays +(of type SubArray) may be used to partition work across paticipating workers. + +All participating workers in a SharedArray must be on the same host. Limits on the +size and number of SharedArray objects are system defined. + ClusterManagers --------------- diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index a37b0f1f57d0c..aec440d64bd5b 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -4425,50 +4425,66 @@ Parallel Computing Wait until all dynamically-enclosed uses of ``@async``, ``@spawn``, and ``@spawnat`` complete. -Distributed Arrays ------------------- +Distributed and Shared Arrays +----------------------------- -.. function:: DArray(init, dims, [procs, dist]) +.. function:: DArray(alloc::Union(Type, Function), dims, [procs, dist]; init=false) - Construct a distributed array. ``init`` is a function that accepts a tuple of index ranges. - This function should allocate a local chunk of the distributed array and initialize it for the specified indices. + Construct a distributed array. If ``alloc`` is a ``DataType``, local array chunks are alocated. If ``alloc`` is + a ``Function``, it should be a a function that accepts a tuple of index ranges, and should allocate a local + chunk of the distributed array. It may also optionally initialize it. ``dims`` is the overall size of the distributed array. ``procs`` optionally specifies a vector of processor IDs to use. If unspecified, the array is distributed over all worker processes only. Typically, when runnning in distributed mode, i.e., ``nprocs() > 1``, this would mean that no chunk of the distributed array exists on the process hosting the interactive julia prompt. ``dist`` is an integer vector specifying how many chunks the distributed array should be divided into in each dimension. + If keyword argument ``init`` is specified, it should be a function that accepts a DArray object. It is expected to + initialize its ``localpart``. - For example, the ``dfill`` function that creates a distributed array and fills it with a value ``v`` is implemented as: - - ``dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)`` - -.. function:: dzeros(dims, ...) - - Construct a distributed array of zeros. Trailing arguments are the same as those accepted by ``darray``. - -.. function:: dones(dims, ...) - - Construct a distributed array of ones. Trailing arguments are the same as those accepted by ``darray``. - -.. function:: dfill(x, dims, ...) - - Construct a distributed array filled with value ``x``. Trailing arguments are the same as those accepted by ``darray``. + For example, the ``fill`` variant that creates a distributed array and fills it with a value ``v`` is implemented as: -.. function:: drand(dims, ...) + `` fill(v, ::Type{DArray}, args...) = DArray(I->fill(v, map(length,I)), args...)`` - Construct a distributed uniform random array. Trailing arguments are the same as those accepted by ``darray``. +.. function:: SharedArray(arrtype, dims, [procs, dist]; init=false) -.. function:: drandn(dims, ...) + Construct an array mapped across all participating processes using shared memory. ``arrtype`` determines the type of + the array created in shared memory. ``isbits(arrtype)`` MUST be true. + ``dims``, ``procs`` and ``dist`` are similar to those in ``DArray``. + The shared array created is initialized by calling the ``init``function if specified. It should be a function that + accepts a SharedArray object and is expected to initialize its ``localpart`` which will be a ``SubArray``. + + +.. function:: ones, zeros, infs, nans, rand, randn, fill - Construct a distributed normal random array. Trailing arguments are the same as those accepted by ``darray``. + DArray / SharedArray variants of the standard Array convenience constructors. + + Except for ``fill``, they are called by specifying the distribution type as the first parameter. + ``fill`` accepts the distribution type as the second parameter. + + ``zeros(SharedArray, dims, ...)``, ``ones(SharedArray, dims, ...)``, ``rand(DArray, dims, ...)`` and so on. + Trailing arguments are the same as those accepted by DArray and SharedArray. + + For example, + + ``rand(DArray, 1:100, (100,100))`` will created a DArray of size 100 x 100 with random integers between + 1 and 100. + + ``fill(float64(pi), SharedArray, (100,100))`` gets you an all pi SharedArray. + + .. function:: distribute(a) Convert a local array to distributed +.. function:: share(a) + + Convert a local array to shared + .. function:: localpart(d) - Get the local piece of a distributed array. Returns an empty array if no local part exists on the calling process. + Get the local piece of a distributed/shared array. Returns an empty array if no local part exists on the calling process. + If ``d`` is a SharedArray, returns a ``SubArray``. .. function:: myindexes(d) @@ -4477,7 +4493,28 @@ Distributed Arrays .. function:: procs(d) - Get the vector of processors storing pieces of ``d`` + Get the vector of processors participating in the distribution of ``d`` + + +.. function:: dzeros(dims, ...) : DEPRECATED + + Construct a distributed array of zeros. Trailing arguments are the same as those accepted by ``darray``. + +.. function:: dones(dims, ...) : DEPRECATED + + Construct a distributed array of ones. Trailing arguments are the same as those accepted by ``darray``. + +.. function:: dfill(x, dims, ...) : DEPRECATED + + Construct a distributed array filled with value ``x``. Trailing arguments are the same as those accepted by ``darray``. + +.. function:: drand(dims, ...) : DEPRECATED + + Construct a distributed uniform random array. Trailing arguments are the same as those accepted by ``darray``. + +.. function:: drandn(dims, ...) : DEPRECATED + + Construct a distributed normal random array. Trailing arguments are the same as those accepted by ``darray``. System ------ diff --git a/test/parallel.jl b/test/parallel.jl index 4410ebfc3e9b9..3a805eb415cfc 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -12,13 +12,59 @@ id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] @test @fetchfrom id_other begin myid() end == id_other @fetch begin myid() end -d = drand((200,200), [id_me, id_other]) -s = convert(Array, d[1:150, 1:150]) + +@windows_only dist_modes = [DISTMODE_DISTRIBUTED] +@unix_only dist_modes = [DISTMODE_DISTRIBUTED, DISTMODE_SHARED] + +for dm in dist_modes + dprocs = [id_me, id_other] + d = rand(DimDist((200,200), length(dprocs); mode = dm); dprocs = dprocs) + s = convert(Array, d[1:150, 1:150]) + a = convert(Array, d) + @test a[1:150,1:150] == s + + @test fetch(@spawnat id_me localpart(d)[1,1]) == d[1,1] + @test fetch(@spawnat id_other localpart(d)[1,1]) == d[1,101] +end + +@unix_only begin +# SharedArray tests +dims = (20,20,20) +d = rand(1:100, DimDist(dims; mode=DISTMODE_SHARED)) a = convert(Array, d) -@test a[1:150,1:150] == s -@test fetch(@spawnat id_me localpart(d)[1,1]) == d[1,1] -@test fetch(@spawnat id_other localpart(d)[1,1]) == d[1,101] +partsums = Array(Int, length(procs(d))) +@sync begin + for (i, p) in enumerate(procs(d)) + @async partsums[i] = remotecall_fetch(p, D->sum(localpart(D)), d) + end +end +@test sum(a) == sum(partsums) + +d = rand(DimDist(dims; mode=DISTMODE_SHARED)) +for p in procs(d) + idxes_in_p = remotecall_fetch(p, D->parentindexes(localpart(D)), d) + idxf = sub2ind(dims, map(first,idxes_in_p)...) + idxl = sub2ind(dims, map(last,idxes_in_p)...) + d[idxf] = float64(idxf) + rv = remotecall_fetch(p, (D,idxf,idxl) -> begin assert(D[idxf] == float64(idxf)); D[idxl] = float64(idxl); D[idxl]; end, d,idxf,idxl) + @test d[idxl] == rv +end + +@test ones(10, 10, 10) == fill(1.0, DimDist(10, 10, 10; mode=DISTMODE_SHARED)) +@test zeros(Int32, 10, 10, 10) == fill(0, DimDist(10, 10, 10; mode=DISTMODE_SHARED)) + +d = SharedArray(Int, DimDist(dims; mode=DISTMODE_SHARED); init = D->fill!(localpart(D), myid())) +for p in procs(d) + idxes_in_p = remotecall_fetch(p, D->parentindexes(localpart(D)), d) + idxf = sub2ind(dims, map(first,idxes_in_p)...) + idxl = sub2ind(dims, map(last,idxes_in_p)...) + @test d[idxf] == p + @test d[idxl] == p +end + + +end # @unix_only(SharedArray tests) # Test @parallel load balancing - all processors should get either M or M+1 # iterations out of the loop range for some M.