From f502e7c82658fd8b2903287e772d485d65ab8c85 Mon Sep 17 00:00:00 2001 From: amitmurthy Date: Tue, 26 Nov 2013 17:01:14 +0530 Subject: [PATCH 1/4] Introduced ArrayDist and SharedArray --- base/arraydist.jl | 133 ++++++++++++++++++++++++++++ base/darray.jl | 177 +++++++++++-------------------------- base/exports.jl | 10 ++- base/mmap.jl | 4 +- base/multi.jl | 13 +++ base/sharedarray.jl | 210 ++++++++++++++++++++++++++++++++++++++++++++ base/sysimg.jl | 2 + 7 files changed, 419 insertions(+), 130 deletions(-) create mode 100644 base/arraydist.jl create mode 100644 base/sharedarray.jl diff --git a/base/arraydist.jl b/base/arraydist.jl new file mode 100644 index 0000000000000..ad39576f3302f --- /dev/null +++ b/base/arraydist.jl @@ -0,0 +1,133 @@ +# ArrayDist holds the chunks of an array distribution across workers +type ArrayDist{N} + dims::NTuple{N,Int} + + # ppmap[i]==p ⇒ processor p has piece i + ppmap::Vector{Int} + + # indexes held by piece i + indexes::Array{NTuple{N,Range1{Int}},N} + + # cuts[d][i] = first index of chunk i in dimension d + cuts::Vector{Vector{Int}} + + chunks::Array{RemoteRef,N} + + function ArrayDist(dims, ppmap, indexes, cuts, chunks) + # check invariants + assert(dims == map(last,last(indexes))) + assert(size(chunks) == size(indexes)) + assert(length(chunks) == length(ppmap)) + + new(dims, ppmap, indexes, cuts, chunks) + end +end + +function ArrayDist(allocf, dims, procs, dist) + np = prod(dist) + if np > length(procs) + error("Total requested number of chunks is greater than the number of workers") + end + ppmap = procs[1:np] + idxs, cuts = chunk_idxs([dims...], dist) + chunks = Array(RemoteRef, dist...) + for i = 1:np + chunks[i] = remotecall(procs[i], allocf, idxs[i]) + end + + ArrayDist{length(dims)}(dims, ppmap, idxs, cuts, chunks) +end + +## chunk index utilities ## + +# decide how to divide each dimension +# returns size of chunks array +function defaultdist(dims, procs) + dims = [dims...] + chunks = ones(Int, length(dims)) + np = length(procs) + f = sort!(collect(keys(factor(np))), rev=true) + k = 1 + while np > 1 + # repeatedly allocate largest factor to largest dim + if np%f[k] != 0 + k += 1 + if k > length(f) + break + end + end + fac = f[k] + (d, dno) = findmax(dims) + # resolve ties to highest dim + dno = last(find(dims .== d)) + if dims[dno] >= fac + dims[dno] = div(dims[dno], fac) + chunks[dno] *= fac + end + np = div(np,fac) + end + chunks +end + +# get array of start indexes for dividing sz into nc chunks +function defaultdist(sz::Int, nc::Int) + if sz >= nc + iround(linspace(1, sz+1, nc+1)) + else + [[1:(sz+1)], zeros(Int, nc-sz)] + end +end + + +# compute indexes array for dividing dims into chunks +function chunk_idxs(dims, chunks) + cuts = map(defaultdist, dims, chunks) + n = length(dims) + idxs = Array(NTuple{n,Range1{Int}},chunks...) + cartesianmap(tuple(chunks...)) do cidx... + idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1)) + end + idxs, cuts +end + +function localpartindex(dist::ArrayDist) + ppmap = dist.ppmap + mi = myid() + for i = 1:length(ppmap) + if ppmap[i] == mi + return i + end + end + return 0 +end + +indexes(ad::ArrayDist) = ad.indexes +procs(ad::ArrayDist) = ad.ppmap +length(ad::ArrayDist) = length(ad.chunks) +dimdist(ad::ArrayDist) = size(ad.chunks) # number of parts in each dimension + +chunk_ref(ad::ArrayDist, i::Int) = ad.chunks[i] +chunk_ref(ad::ArrayDist, i...) = ad.chunks[i...] + +function myindexes(ad::ArrayDist) + lpidx = localpartindex(ad) + if lpidx == 0 + ntuple(length(ad.dims), i->1:0) + else + ad.indexes[lpidx] + end +end + + +# find which piece holds index (I...) +function locate(ad::ArrayDist, I::Int...) + ntuple(length(ad.dims), i->searchsortedlast(ad.cuts[i], I[i])) +end + +function localindex_rr(ad::ArrayDist, I::(Int...)) + chidx = locate(ad, I...) + idxs = ad.indexes[chidx...] + chunk_rr = ad.chunks[chidx...] + chunk_rr, ntuple(length(ad.dims), i->(I[i]-first(idxs[i])+1)) +end + diff --git a/base/darray.jl b/base/darray.jl index 0e52a283d6fa2..ffda8523ea714 100644 --- a/base/darray.jl +++ b/base/darray.jl @@ -1,146 +1,72 @@ type DArray{T,N,A} <: AbstractArray{T,N} dims::NTuple{N,Int} - - chunks::Array{RemoteRef,N} - - # pmap[i]==p ⇒ processor p has piece i - pmap::Vector{Int} - - # indexes held by piece i - indexes::Array{NTuple{N,Range1{Int}},N} - # cuts[d][i] = first index of chunk i in dimension d - cuts::Vector{Vector{Int}} - - function DArray(dims, chunks, pmap, indexes, cuts) - # check invariants - assert(size(chunks) == size(indexes)) - assert(length(chunks) == length(pmap)) - assert(dims == map(last,last(indexes))) - new(dims, chunks, pmap, indexes, cuts) - end + ad::ArrayDist{N} end + typealias SubDArray{T,N,D<:DArray} SubArray{T,N,D} typealias SubOrDArray{T,N} Union(DArray{T,N}, SubDArray{T,N}) ## core constructors ## -# dist == size(chunks) -function DArray(init, dims, procs, dist) - np = prod(dist) - procs = procs[1:np] - idxs, cuts = chunk_idxs([dims...], dist) - chunks = Array(RemoteRef, dist...) - for i = 1:np - chunks[i] = remotecall(procs[i], init, idxs[i]) +function DArray(alloc_arg::Union(Type, Function), dims, dprocs, dist; init=false) + N = length(dims) + + ad = + if isa(alloc_arg, Type) + alloc_chunk = (idxs) -> Array(alloc_arg, map(length, idxs)) + ArrayDist(alloc_chunk, dims, dprocs, dist) + else + ArrayDist(alloc_arg, dims, dprocs, dist) + end + + p = max(1, localpartindex(ad)) + A = remotecall_fetch(procs(ad)[p], r->typeof(fetch(r)), chunk_ref(ad, p)) + T = eltype(A) + d = DArray{T,N,A}(dims, ad) + + # if present, init function is called on each of the parts + @sync begin + if isa(init, Function) + for i in procs(d) + @async remotecall_wait(i, init, d) + end + end end - p = max(1, localpartindex(procs)) - A = remotecall_fetch(procs[p], r->typeof(fetch(r)), chunks[p]) - DArray{eltype(A),length(dims),A}(dims, chunks, procs, idxs, cuts) + d end -function DArray(init, dims, procs) +function DArray(alloc_arg, dims, procs; kwargs...) if isempty(procs) error("no processors") end - DArray(init, dims, procs, defaultdist(dims,procs)) + DArray(alloc_arg, dims, procs, defaultdist(dims,procs); kwargs...) end -DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(),maximum(dims))]) +DArray(alloc_arg, dims; kwargs...) = DArray(alloc_arg, dims, workers()[1:min(nworkers(),maximum(dims))]; kwargs...) # new DArray similar to an existing one -DArray(init, d::DArray) = DArray(init, size(d), procs(d), [size(d.chunks)...]) +DArray{T}(d::DArray{T}; kwargs...) = DArray(T, size(d), procs(d), [dimdist(d.ad)...]; kwargs...) +length(d::DArray) = prod(d.dims) size(d::DArray) = d.dims -procs(d::DArray) = d.pmap +procs(d::DArray) = procs(d.ad) chunktype{T,N,A}(d::DArray{T,N,A}) = A -## chunk index utilities ## - -# decide how to divide each dimension -# returns size of chunks array -function defaultdist(dims, procs) - dims = [dims...] - chunks = ones(Int, length(dims)) - np = length(procs) - f = sort!(collect(keys(factor(np))), rev=true) - k = 1 - while np > 1 - # repeatedly allocate largest factor to largest dim - if np%f[k] != 0 - k += 1 - if k > length(f) - break - end - end - fac = f[k] - (d, dno) = findmax(dims) - # resolve ties to highest dim - dno = last(find(dims .== d)) - if dims[dno] >= fac - dims[dno] = div(dims[dno], fac) - chunks[dno] *= fac - end - np = div(np,fac) - end - chunks -end - -# get array of start indexes for dividing sz into nc chunks -function defaultdist(sz::Int, nc::Int) - if sz >= nc - iround(linspace(1, sz+1, nc+1)) - else - [[1:(sz+1)], zeros(Int, nc-sz)] - end -end - -# compute indexes array for dividing dims into chunks -function chunk_idxs(dims, chunks) - cuts = map(defaultdist, dims, chunks) - n = length(dims) - idxs = Array(NTuple{n,Range1{Int}},chunks...) - cartesianmap(tuple(chunks...)) do cidx... - idxs[cidx...] = ntuple(n, i->(cuts[i][cidx[i]]:cuts[i][cidx[i]+1]-1)) - end - idxs, cuts -end - -function localpartindex(pmap::Vector{Int}) - mi = myid() - for i = 1:length(pmap) - if pmap[i] == mi - return i - end - end - return 0 -end - -localpartindex(d::DArray) = localpartindex(d.pmap) +localpartindex(d::DArray) = localpartindex(d.ad) function localpart{T,N,A}(d::DArray{T,N,A}) lpidx = localpartindex(d) if lpidx == 0 convert(A, Array(T, ntuple(N,i->0)))::A else - fetch(d.chunks[lpidx])::A - end -end -function myindexes(d::DArray) - lpidx = localpartindex(d) - if lpidx == 0 - ntuple(ndims(d), i->1:0) - else - d.indexes[lpidx] + fetch(chunk_ref(d.ad, lpidx))::A end end +myindexes(d::DArray) = myindexes(d.ad) +locate(d::DArray, I::Int...) = locate(d.ad, I...) -# find which piece holds index (I...) -function locate(d::DArray, I::Int...) - ntuple(ndims(d), i->searchsortedlast(d.cuts[i], I[i])) -end - -chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(d.chunks[i...])::A +chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(chunk_ref(d.ad, i...))::A ## convenience constructors ## @@ -154,9 +80,9 @@ drand(args...) = DArray(I->rand(map(length,I)), args...) drand(d::Int...) = drand(d) drandn(args...) = DArray(I->randn(map(length,I)), args...) drandn(d::Int...) = drandn(d) - ## conversions ## + function distribute(a::AbstractArray) owner = myid() rr = RemoteRef() @@ -171,8 +97,8 @@ convert{T,N}(::Type{Array}, d::SubOrDArray{T,N}) = convert(Array{T,N}, d) function convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N}) a = Array(S, size(d)) @sync begin - for i = 1:length(d.chunks) - @async a[d.indexes[i]...] = chunk(d, i) + for i = 1:length(d.ad) + @async a[indexes(d.ad)[i]...] = chunk(d, i) end end a @@ -183,7 +109,7 @@ function convert{S,T,N}(::Type{Array{S,N}}, s::SubDArray{T,N}) d = s.parent if isa(I,(Range1{Int}...)) && S<:T && T<:S l = locate(d, map(first, I)...) - if isequal(d.indexes[l...], I) + if isequal(indexes(d.ad)[l...], I) # SubDArray corresponds to a chunk return chunk(d, l...) end @@ -220,11 +146,11 @@ end ## indexing ## -function getindex(r::RemoteRef, args...) +function getchunk(r::RemoteRef, args...) if r.where==myid() getindex(fetch(r), args...) else - remotecall_fetch(r.where, getindex, r, args...) + remotecall_fetch(r.where, getchunk, r, args...) end end @@ -232,11 +158,8 @@ getindex(d::DArray, i::Int) = getindex(d, ind2sub(size(d), i)) getindex(d::DArray, i::Int...) = getindex(d, sub2ind(size(d), i...)) function getindex{T}(d::DArray{T}, I::(Int...)) - chidx = locate(d, I...) - chunk = d.chunks[chidx...] - idxs = d.indexes[chidx...] - localidx = ntuple(ndims(d), i->(I[i]-first(idxs[i])+1)) - chunk[localidx...]::T + chunk_rr, localidx = localindex_rr(d.ad, I) + getchunk(chunk_rr, localidx...)::T end getindex(d::DArray) = d[1] @@ -250,8 +173,8 @@ copy(d::SubOrDArray) = d function setindex!(a::Array, d::DArray, I::Range1{Int}...) n = length(I) @sync begin - for i = 1:length(d.chunks) - K = d.indexes[i] + for i = 1:length(d.ad) + K = indexes(d.ad)[i] @async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i) end end @@ -268,8 +191,8 @@ function setindex!(a::Array, s::SubDArray, I::Range1{Int}...) end offs = [isa(J[i],Int) ? J[i]-1 : first(J[i])-1 for i=1:n] @sync begin - for i = 1:length(d.chunks) - K_c = {d.indexes[i]...} + for i = 1:length(d.ad) + K_c = {indexes(d.ad)[i]...} K = [ intersect(J[j],K_c[j]) for j=1:n ] if !any(isempty, K) idxs = [ I[j][K[j]-offs[j]] for j=1:n ] @@ -278,7 +201,7 @@ function setindex!(a::Array, s::SubDArray, I::Range1{Int}...) @async a[idxs...] = chunk(d, i) else # partial chunk - ch = d.chunks[i] + ch = chunk_ref(d.ad, i) @async a[idxs...] = remotecall_fetch(ch.where, ()->sub(fetch(ch), [K[j]-first(K_c[j])+1 for j=1:n]...)) end end diff --git a/base/exports.jl b/base/exports.jl index fc95f3d71cc32..9de94ffe3f1fe 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -36,6 +36,7 @@ export Complex64, Complex32, DArray, + SharedArray, DevNull, Diagonal, Dict, @@ -1138,7 +1139,7 @@ export pushdisplay, redisplay, -# distributed arrays +# distributed and shared arrays dfill, distribute, dones, @@ -1148,7 +1149,14 @@ export localpart, myindexes, procs, + sharedfill, + sharedones, + sharedrand, + sharedrandn, + sharedzeros, + share, + # paths and file names abspath, basename, diff --git a/base/mmap.jl b/base/mmap.jl index 2d60e5f9e7f7b..6d5060ea37a41 100644 --- a/base/mmap.jl +++ b/base/mmap.jl @@ -108,13 +108,13 @@ function mmap_stream_settings(s::IO) end # Mmapped-array constructor -function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset) +function mmap_array{T,N,TInt<:Integer}(::Type{T}, dims::NTuple{N,TInt}, s::IO, offset::FileOffset; grow::Bool=true) prot, flags, iswrite = mmap_stream_settings(s) len = prod(dims)*sizeof(T) if len > typemax(Int) error("file is too large to memory-map on this platform") end - if iswrite + if iswrite && grow pmap, delta = mmap_grow(len, prot, flags, fd(s), offset) else pmap, delta = mmap(len, prot, flags, fd(s), offset) diff --git a/base/multi.jl b/base/multi.jl index 0d26bd268438e..3967d79e9a269 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -1665,3 +1665,16 @@ function interrupt(pids::AbstractVector=workers()) end end end + + +function islocalwrkr(id) + if (myid() == id) + return true + else + wrkr = worker_from_id(id) + if (wrkr.host == "127.0.0.1") || (wrkr.host == "localhost") || (wrkr.host == string(getipaddr())) + return true + end + end + return false +end \ No newline at end of file diff --git a/base/sharedarray.jl b/base/sharedarray.jl new file mode 100644 index 0000000000000..d48df71e30139 --- /dev/null +++ b/base/sharedarray.jl @@ -0,0 +1,210 @@ +type SharedArray{T,N,A} <: AbstractArray{T,N} + dims::NTuple{N,Int} + ad::ArrayDist{N} + + # Local shmem map. Not to be serialized. + local_shmmap::Array{T,N} + + # local index into chunks when this SharedArray is serialized onto a different node, 0 if non-existent locally. + local_idx::Int +end + +function SharedArray(T::Type, dims, dprocs, dist; init=false) + N = length(dims) + + @windows_only error(" SharedArray is not available on Windows yet.") + + # Ensure that all processes are on localhost + if !(all(x->islocalwrkr(x), [dprocs, myid()])) + error("SharedArray requires all requested processes to be on the same machine.") + end + + local shm_seg_name = "" + local local_shmmap + local sa = nothing + try + # On OSX, the shm_seg_name length must be < 32 characters + shm_seg_name = string("/jl", getpid(), int64(time() * 10^9)) + + local_shmmap = shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR) + + func_alloc = (idxs) -> begin + basemap = shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR) + sub(basemap, idxs) + end + + ad = ArrayDist(func_alloc, dims, dprocs, dist) + + # Wait till all the workers have mapped the segment + for i in 1:length(ad) + wait(chunk_ref(ad, i)) + end + + # All good, immediately unlink the segment. + shm_unlink(shm_seg_name) + shm_seg_name = "" ; + + + # get the typeof the chunks + A = typeof(sub(Array(T, ntuple(N,i->0)), ntuple(N, i->1:0))) + sa = SharedArray{T,N,A}(dims, ad, local_shmmap, localpartindex(ad)) + + # if present init function is called on each of the parts + @sync begin + if isa(init, Function) + for i in procs(sa) + @async remotecall_wait(i, init, sa) + end + end + end + + finally + if shm_seg_name != "" + shm_unlink(shm_seg_name) + end + end + sa +end + + +function SharedArray(T, dims, procs; kwargs...) + if isempty(procs) + error("no processors") + end + SharedArray(T, dims, procs, defaultdist(dims,procs); kwargs...) +end +SharedArray(T, dims; kwargs...) = SharedArray(T, dims, workers()[1:min(nworkers(),maximum(dims))]; kwargs...) + +# new DArray similar to an existing one +SharedArray{T}(sa::SharedArray{T}; kwargs...) = SharedArray(T, size(sa), procs(sa), [dimdist(sa.ad)...]; kwargs...) + +length(sa::SharedArray) = prod(sa.dims) +size(sa::SharedArray) = sa.dims +procs(sa::SharedArray) = procs(sa.ad) + +chunktype{T,N,A}(sa::SharedArray{T,N,A}) = A + +localpartindex(sa::SharedArray) = localpartindex(sa.ad) + +function localpart{T,N,A}(sa::SharedArray{T,N,A}) + if sa.local_idx == 0 + sub(Array(T, ntuple(N,i->0)), ntuple(N, i->1:0)) + else + fetch(chunk_ref(sa.ad, sa.local_idx))::A + end +end +myindexes(sa::SharedArray) = myindexes(sa.ad) +locate(sa::SharedArray, I::Int...) = locate(sa.ad, I...) + +## convenience constructors ## + +sharedzeros(args...) = SharedArray(Float64, args...; init = S->fill!(localpart(S), 0.0)) +sharedzeros(d::Int...) = sharedzeros(d) +sharedones(args...) = SharedArray(Float64, args...; init = S->fill!(localpart(S), 1.0)) +sharedones(d::Int...) = sharedones(d) +sharedfill(v, args...) = SharedArray(typeof(v), args...; init = S->fill!(localpart(S), v)) +sharedfill(v, d::Int...) = sharedfill(v, d) +sharedrand(args...) = SharedArray(Float64, args...; init = S->fill!(localpart(S), rand())) +sharedrand(d::Int...) = sharedrand(d) +sharedrandn(args...) = SharedArray(Float64, args...; init = S->fill!(localpart(S), randn())) +sharedrandn(d::Int...) = sharedrandn(d) +## conversions ## + + +function share{T}(a::AbstractArray{T}) + sa = SharedArray(T, size(a)) + copy!(sa.local_shmmap, a) + sa +end + + +# Don't serialize local_shmmap (it is the complete array) and +# local_idx, which is relevant to the current process only +function serialize(s, sa::SharedArray) + serialize_type(s, typeof(sa)) + serialize(s, length(SharedArray.names)) + for n in SharedArray.names + if n == :local_shmmap + writetag(s, UndefRefTag) + else + serialize(s, getfield(sa, n)) + end + end +end + +function deserialize{T,N,A}(s, t::Type{SharedArray{T,N,A}}) + sa = invoke(deserialize, (Any, DataType), s, t) + + sa.local_idx = localpartindex(sa) + if (sa.local_idx > 0) + sa.local_shmmap = parent(fetch(chunk_ref(sa.ad, sa.local_idx))) + else + error("SharedArray cannot be used on a non-participating process") + end + sa +end + +localpartindex(sa::SharedArray) = localpartindex(sa.ad) + +convert(::Type{Array}, sa::SharedArray) = sa.local_shmmap + +# avoiding ambiguity warnings +getindex(sa::SharedArray, x::Real) = getindex(sa.local_shmmap, x) +getindex(sa::SharedArray, x::AbstractArray) = getindex(sa.local_shmmap, x) + +# pass through getindex and setindex! - they always work on the complete array unlike DArrays +getindex(sa::SharedArray, args...) = getindex(sa.local_shmmap, args...) +setindex!(sa::SharedArray, args...) = (setindex!(sa.local_shmmap, args...); sa) + + +# utilities +function shm_mmap_array(T, dims, shm_seg_name, mode) + local s = nothing + local A = nothing + try + fd_mem = shm_open(shm_seg_name, mode, S_IRUSR | S_IWUSR) + if !(fd_mem > 0) + error("shm_open() failed") + end + + s = fdio(fd_mem, true) + + # On OSX, ftruncate must to used to set size of segment, just lseek does not work. + # and only at creation time + if (mode & JL_O_CREAT) == JL_O_CREAT + rc = ccall(:ftruncate, Int, (Int, Int), fd_mem, prod(dims)*sizeof(T)) + if rc != 0 + ec = errno() + error("ftruncate() failed, errno : ", ec) + end + end + + A = mmap_array(T, dims, s, 0, grow=false) + catch e + @linux_only pfx = "kernel" + @osx_only pfx = "kern.sysv" + + shmmax_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmmax`)[1]))[end]), 1024*1024) + page_size = int(split(readall(readsfrom(`getconf PAGE_SIZE`)[1]))[end]) + shmall_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmall`)[1]))[end]) * page_size, 1024*1024) + + println("System max size of single shmem segment(MB) : ", shmmax_MB, + "\nSystem max size of all shmem segments(MB) : ", shmall_MB, + "\nRequested size(MB) : ", div(prod(dims)*sizeof(T), 1024*1024), + "\nPlease ensure requested size is within system limits.", + "\nIf not, increase system limits and try again." + ) + + rethrow() + + finally + if s != nothing + close(s) + end + end + A +end + +@unix_only shm_unlink(shm_seg_name) = ccall(:shm_unlink, Cint, (Ptr{Uint8},), shm_seg_name) +@unix_only shm_open(shm_seg_name, oflags, permissions) = ccall(:shm_open, Int, (Ptr{Uint8}, Int, Int), shm_seg_name, oflags, permissions) + diff --git a/base/sysimg.jl b/base/sysimg.jl index e5da016db2fba..03a8a3aecb851 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -154,8 +154,10 @@ importall .Sort include("combinatorics.jl") # distributed arrays and memory-mapped arrays +include("arraydist.jl") include("darray.jl") include("mmap.jl") +include("sharedarray.jl") # utilities - version, timing, help, edit, metaprogramming include("sysinfo.jl") From ea48b6a9fc2369ecdf0b2e7b5908bf9c5d822873 Mon Sep 17 00:00:00 2001 From: amitmurthy Date: Tue, 24 Dec 2013 09:47:41 +0530 Subject: [PATCH 2/4] fix conflict while rebasing --- base/arraydist.jl | 16 ++++----- base/darray.jl | 32 +++++++++++------ base/deprecated.jl | 13 +++++++ base/exports.jl | 5 --- base/multi.jl | 1 + base/sharedarray.jl | 85 ++++++++++++++++++++++++++------------------- test/parallel.jl | 55 ++++++++++++++++++++++++++--- 7 files changed, 143 insertions(+), 64 deletions(-) diff --git a/base/arraydist.jl b/base/arraydist.jl index ad39576f3302f..303fa9a22ca5b 100644 --- a/base/arraydist.jl +++ b/base/arraydist.jl @@ -1,7 +1,5 @@ # ArrayDist holds the chunks of an array distribution across workers type ArrayDist{N} - dims::NTuple{N,Int} - # ppmap[i]==p ⇒ processor p has piece i ppmap::Vector{Int} @@ -19,7 +17,7 @@ type ArrayDist{N} assert(size(chunks) == size(indexes)) assert(length(chunks) == length(ppmap)) - new(dims, ppmap, indexes, cuts, chunks) + new(ppmap, indexes, cuts, chunks) end end @@ -109,10 +107,10 @@ dimdist(ad::ArrayDist) = size(ad.chunks) # number of parts in each dimension chunk_ref(ad::ArrayDist, i::Int) = ad.chunks[i] chunk_ref(ad::ArrayDist, i...) = ad.chunks[i...] -function myindexes(ad::ArrayDist) +function myindexes{N}(ad::ArrayDist{N}) lpidx = localpartindex(ad) if lpidx == 0 - ntuple(length(ad.dims), i->1:0) + ntuple(N, i->1:0) else ad.indexes[lpidx] end @@ -120,14 +118,14 @@ end # find which piece holds index (I...) -function locate(ad::ArrayDist, I::Int...) - ntuple(length(ad.dims), i->searchsortedlast(ad.cuts[i], I[i])) +function locate{N}(ad::ArrayDist{N}, I::Int...) + ntuple(N, i->searchsortedlast(ad.cuts[i], I[i])) end -function localindex_rr(ad::ArrayDist, I::(Int...)) +function localindex_rr{N}(ad::ArrayDist{N}, I::(Int...)) chidx = locate(ad, I...) idxs = ad.indexes[chidx...] chunk_rr = ad.chunks[chidx...] - chunk_rr, ntuple(length(ad.dims), i->(I[i]-first(idxs[i])+1)) + chunk_rr, ntuple(N, i->(I[i]-first(idxs[i])+1)) end diff --git a/base/darray.jl b/base/darray.jl index ffda8523ea714..e0207baf27690 100644 --- a/base/darray.jl +++ b/base/darray.jl @@ -69,17 +69,29 @@ locate(d::DArray, I::Int...) = locate(d.ad, I...) chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(chunk_ref(d.ad, i...))::A ## convenience constructors ## +for (f) in (:zeros, :ones, :nans, :infs) + @eval begin + ($f)(::Type{DArray}, T::DataType, args...) = DArray(I->($f)(T, map(length,I)), args...) + ($f)(::Type{DArray}, args...) = ($f)(DArray, Float64, args...) + ($f)(::Type{DArray}, T::DataType, d::Int...) = ($f)(DArray, T, d) + ($f)(::Type{DArray}, d::Int...) = ($f)(DArray, Float64, d) + end +end + +# rand variant with range +rand(::Type{DArray}, TR::Union(DataType, Range1), d::Int...) = rand(DArray, TR, d) +rand(::Type{DArray}, TR::Union(DataType, Range1), args...) = DArray(I->rand(TR, map(length,I)), args...) +rand(::Type{DArray}, I::(Int...)) = rand(DArray, Float64, I) +rand(::Type{DArray}, args...) = rand(DArray, Float64, args...) +rand(::Type{DArray}, d::Int...) = rand(DArray, Float64, d) + +fill(v, ::Type{DArray}, args...) = DArray(I->fill(v, map(length,I)), args...) +fill(v, ::Type{DArray}, d::Int...) = fill(v, DArray, d) + +randn(::Type{DArray}, args...) = DArray(I->randn(map(length,I)), args...) +randn(::Type{DArray}, d::Int...) = randn(DArray, d) + -dzeros(args...) = DArray(I->zeros(map(length,I)), args...) -dzeros(d::Int...) = dzeros(d) -dones(args...) = DArray(I->ones(map(length,I)), args...) -dones(d::Int...) = dones(d) -dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...) -dfill(v, d::Int...) = dfill(v, d) -drand(args...) = DArray(I->rand(map(length,I)), args...) -drand(d::Int...) = drand(d) -drandn(args...) = DArray(I->randn(map(length,I)), args...) -drandn(d::Int...) = drandn(d) ## conversions ## diff --git a/base/deprecated.jl b/base/deprecated.jl index cb81645b95d4d..360e605263f1e 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -290,6 +290,7 @@ export ComplexPair @deprecate parse(str::String, pos::Int, greedy::Bool, raise::Bool) parse(str,pos,greedy=greedy,raise=raise) @deprecate parse(str::String, pos::Int, greedy::Bool) parse(str,pos,greedy=greedy) + function amap(f::Function, A::AbstractArray, axis::Integer) depwarn("amap is deprecated, use mapslices(f, A, dims) instead", :amap) dimsA = size(A) @@ -368,5 +369,17 @@ const CharString = UTF32String @deprecate with_bigfloat_rounding(f::Function, r::RoundingMode) with_rounding(f::Function, BigFloat, r) eval(Sys, :(@deprecate shlib_list dllist)) +@deprecate dzeros(args...) zeros(DArray, args...) +@deprecate dzeros(d::Int...) zeros(DArray, d...) +@deprecate dones(args...) ones(DArray, args...) +@deprecate dones(d::Int...) ones(DArray, d...) +@deprecate dfill(v, args...) fill(v, DArray, args...) +@deprecate dfill(v, d::Int...) fill(v, DArray, d...) +@deprecate drand(args...) rand(DArray, args...) +@deprecate drand(d::Int...) rand(DArray, d...) +@deprecate drandn(args...) randn(DArray, args...) +@deprecate drandn(d::Int...) randn(DArray, d...) + + # 0.3 discontinued functions diff --git a/base/exports.jl b/base/exports.jl index 9de94ffe3f1fe..950411f740f15 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1149,11 +1149,6 @@ export localpart, myindexes, procs, - sharedfill, - sharedones, - sharedrand, - sharedrandn, - sharedzeros, share, diff --git a/base/multi.jl b/base/multi.jl index 3967d79e9a269..116538b509489 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -1667,6 +1667,7 @@ function interrupt(pids::AbstractVector=workers()) end +# FIXME : For other workers that connect to us, we have to query socket itself to get the other endpoint - TBD... function islocalwrkr(id) if (myid() == id) return true diff --git a/base/sharedarray.jl b/base/sharedarray.jl index d48df71e30139..f77580bfceecb 100644 --- a/base/sharedarray.jl +++ b/base/sharedarray.jl @@ -1,4 +1,4 @@ -type SharedArray{T,N,A} <: AbstractArray{T,N} +type SharedArray{T,N} <: AbstractArray{T,N} dims::NTuple{N,Int} ad::ArrayDist{N} @@ -14,8 +14,8 @@ function SharedArray(T::Type, dims, dprocs, dist; init=false) @windows_only error(" SharedArray is not available on Windows yet.") - # Ensure that all processes are on localhost - if !(all(x->islocalwrkr(x), [dprocs, myid()])) + # Ensure that all processes are on localhost. Currently only checking this if pid is 1. + if (myid() == 1) && !(all(x->islocalwrkr(x), [dprocs, myid()])) error("SharedArray requires all requested processes to be on the same machine.") end @@ -46,8 +46,8 @@ function SharedArray(T::Type, dims, dprocs, dist; init=false) # get the typeof the chunks - A = typeof(sub(Array(T, ntuple(N,i->0)), ntuple(N, i->1:0))) - sa = SharedArray{T,N,A}(dims, ad, local_shmmap, localpartindex(ad)) +# A = typeof(sub(Array(T, ntuple(N,i->0)), ntuple(N, i->1:0))) + sa = SharedArray{T,N}(dims, ad, local_shmmap, localpartindex(ad)) # if present init function is called on each of the parts @sync begin @@ -75,39 +75,48 @@ function SharedArray(T, dims, procs; kwargs...) end SharedArray(T, dims; kwargs...) = SharedArray(T, dims, workers()[1:min(nworkers(),maximum(dims))]; kwargs...) -# new DArray similar to an existing one +# new SharedArray similar to an existing one SharedArray{T}(sa::SharedArray{T}; kwargs...) = SharedArray(T, size(sa), procs(sa), [dimdist(sa.ad)...]; kwargs...) length(sa::SharedArray) = prod(sa.dims) size(sa::SharedArray) = sa.dims procs(sa::SharedArray) = procs(sa.ad) -chunktype{T,N,A}(sa::SharedArray{T,N,A}) = A - localpartindex(sa::SharedArray) = localpartindex(sa.ad) -function localpart{T,N,A}(sa::SharedArray{T,N,A}) +function localpart{T,N}(sa::SharedArray{T,N}) if sa.local_idx == 0 sub(Array(T, ntuple(N,i->0)), ntuple(N, i->1:0)) else - fetch(chunk_ref(sa.ad, sa.local_idx))::A + fetch(chunk_ref(sa.ad, sa.local_idx)) end end myindexes(sa::SharedArray) = myindexes(sa.ad) locate(sa::SharedArray, I::Int...) = locate(sa.ad, I...) ## convenience constructors ## +for (arrtype) in (:zero, :one, :inf, :nan) + f = symbol(string(arrtype, "s")) + @eval begin + ($f)(::Type{SharedArray}, d::Int...) = ($f)(SharedArray, Float64, d) + ($f)(::Type{SharedArray}, T::DataType, d::Int...) = ($f)(SharedArray, T, d) + + ($f)(::Type{SharedArray}, args...) = ($f)(SharedArray, Float64, args...) + ($f)(::Type{SharedArray}, T::DataType, args...) = SharedArray(T, args...; init = S->fill!(localpart(S), ($arrtype)(T))) + end +end + +rand(::Type{SharedArray}, I::(Int...)) = rand(SharedArray, Float64, I) +rand(::Type{SharedArray}, T::DataType, args...) = SharedArray(T, args...; init = S->map!((x)->rand(T), localpart(S))) +rand(::Type{SharedArray}, R::Range1, args...) = SharedArray(Int, args...; init = S->map!((x)->rand(R), localpart(S))) +rand(::Type{SharedArray}, args...) = rand(SharedArray, Float64, args...) + +fill(v, ::Type{SharedArray}, args...) = SharedArray(typeof(v), args...; init = S->fill!(localpart(S), v)) +fill(v, ::Type{SharedArray}, d::Int...) = fill(v, SharedArray, d) + +randn(::Type{SharedArray}, args...) = SharedArray(Float64, args...; init = S-> map!((x)->randn(), localpart(S))) +randn(::Type{SharedArray}, d::Int...) = randn(SharedArray, d) -sharedzeros(args...) = SharedArray(Float64, args...; init = S->fill!(localpart(S), 0.0)) -sharedzeros(d::Int...) = sharedzeros(d) -sharedones(args...) = SharedArray(Float64, args...; init = S->fill!(localpart(S), 1.0)) -sharedones(d::Int...) = sharedones(d) -sharedfill(v, args...) = SharedArray(typeof(v), args...; init = S->fill!(localpart(S), v)) -sharedfill(v, d::Int...) = sharedfill(v, d) -sharedrand(args...) = SharedArray(Float64, args...; init = S->fill!(localpart(S), rand())) -sharedrand(d::Int...) = sharedrand(d) -sharedrandn(args...) = SharedArray(Float64, args...; init = S->fill!(localpart(S), randn())) -sharedrandn(d::Int...) = sharedrandn(d) ## conversions ## @@ -132,7 +141,7 @@ function serialize(s, sa::SharedArray) end end -function deserialize{T,N,A}(s, t::Type{SharedArray{T,N,A}}) +function deserialize{T,N}(s, t::Type{SharedArray{T,N}}) sa = invoke(deserialize, (Any, DataType), s, t) sa.local_idx = localpartindex(sa) @@ -156,6 +165,25 @@ getindex(sa::SharedArray, x::AbstractArray) = getindex(sa.local_shmmap, x) getindex(sa::SharedArray, args...) = getindex(sa.local_shmmap, args...) setindex!(sa::SharedArray, args...) = (setindex!(sa.local_shmmap, args...); sa) +function print_shmem_limits() + try + @linux_only pfx = "kernel" + @osx_only pfx = "kern.sysv" + + shmmax_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmmax`)[1]))[end]), 1024*1024) + page_size = int(split(readall(readsfrom(`getconf PAGE_SIZE`)[1]))[end]) + shmall_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmall`)[1]))[end]) * page_size, 1024*1024) + + println("System max size of single shmem segment(MB) : ", shmmax_MB, + "\nSystem max size of all shmem segments(MB) : ", shmall_MB, + "\nRequested size(MB) : ", div(prod(dims)*sizeof(T), 1024*1024), + "\nPlease ensure requested size is within system limits.", + "\nIf not, increase system limits and try again." + ) + catch e + ; # Ignore any errors in this... + end +end # utilities function shm_mmap_array(T, dims, shm_seg_name, mode) @@ -181,20 +209,7 @@ function shm_mmap_array(T, dims, shm_seg_name, mode) A = mmap_array(T, dims, s, 0, grow=false) catch e - @linux_only pfx = "kernel" - @osx_only pfx = "kern.sysv" - - shmmax_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmmax`)[1]))[end]), 1024*1024) - page_size = int(split(readall(readsfrom(`getconf PAGE_SIZE`)[1]))[end]) - shmall_MB = div(int(split(readall(readsfrom(`sysctl $(pfx).shmall`)[1]))[end]) * page_size, 1024*1024) - - println("System max size of single shmem segment(MB) : ", shmmax_MB, - "\nSystem max size of all shmem segments(MB) : ", shmall_MB, - "\nRequested size(MB) : ", div(prod(dims)*sizeof(T), 1024*1024), - "\nPlease ensure requested size is within system limits.", - "\nIf not, increase system limits and try again." - ) - + print_shmem_limits() rethrow() finally diff --git a/test/parallel.jl b/test/parallel.jl index 4410ebfc3e9b9..fcc8b619387cf 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -12,13 +12,58 @@ id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] @test @fetchfrom id_other begin myid() end == id_other @fetch begin myid() end -d = drand((200,200), [id_me, id_other]) -s = convert(Array, d[1:150, 1:150]) + +@windows_only dist_test_types = [DArray] +@unix_only dist_test_types = [DArray, SharedArray] + +for tt in dist_test_types + d = rand(tt, (200,200), [id_me, id_other]) + s = convert(Array, d[1:150, 1:150]) + a = convert(Array, d) + @test a[1:150,1:150] == s + + @test fetch(@spawnat id_me localpart(d)[1,1]) == d[1,1] + @test fetch(@spawnat id_other localpart(d)[1,1]) == d[1,101] +end + +@unix_only begin +# SharedArray tests +dims = (20,20,20) +d = rand(SharedArray, 1:100, dims) a = convert(Array, d) -@test a[1:150,1:150] == s -@test fetch(@spawnat id_me localpart(d)[1,1]) == d[1,1] -@test fetch(@spawnat id_other localpart(d)[1,1]) == d[1,101] +partsums = Array(Int, length(procs(d))) +@sync begin + for (i, p) in enumerate(procs(d)) + @async partsums[i] = remotecall_fetch(p, D->sum(localpart(D)), d) + end +end +@test sum(a) == sum(partsums) + +d = rand(SharedArray, dims) +for p in procs(d) + idxes_in_p = remotecall_fetch(p, D->parentindexes(localpart(D)), d) + idxf = sub2ind(dims, map(first,idxes_in_p)...) + idxl = sub2ind(dims, map(last,idxes_in_p)...) + d[idxf] = float64(idxf) + rv = remotecall_fetch(p, (D,idxf,idxl) -> begin assert(D[idxf] == float64(idxf)); D[idxl] = float64(idxl); D[idxl]; end, d,idxf,idxl) + @test d[idxl] == rv +end + +@test ones(10, 10, 10) == ones(SharedArray, 10, 10, 10) +@test zeros(Int32, 10, 10, 10) == zeros(SharedArray, 10, 10, 10) + +d = SharedArray(Int, dims; init = D->fill!(localpart(D), myid())) +for p in procs(d) + idxes_in_p = remotecall_fetch(p, D->parentindexes(localpart(D)), d) + idxf = sub2ind(dims, map(first,idxes_in_p)...) + idxl = sub2ind(dims, map(last,idxes_in_p)...) + @test d[idxf] == p + @test d[idxl] == p +end + + +end # @unix_only(SharedArray tests) # Test @parallel load balancing - all processors should get either M or M+1 # iterations out of the loop range for some M. From 39c08063b6a8786edbef92ff772da1af89391289 Mon Sep 17 00:00:00 2001 From: amitmurthy Date: Thu, 5 Dec 2013 12:31:19 +0530 Subject: [PATCH 3/4] Added doc. Checking for isbits for SharedArrays --- base/multi.jl | 12 +++-- base/sharedarray.jl | 17 +++--- doc/manual/parallel-computing.rst | 54 ++++++++++++------- doc/stdlib/base.rst | 89 ++++++++++++++++++++++--------- 4 files changed, 114 insertions(+), 58 deletions(-) diff --git a/base/multi.jl b/base/multi.jl index 116538b509489..c04f200545569 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -212,7 +212,7 @@ function add_workers(pg::ProcessGroup, ws::Array{Any,1}) end all_locs = map(x -> isa(x, Worker) ? (x.privhost, x.port, x.id) : ("", 0, x.id), pg.workers) for w in ws - send_msg_now(w, :join_pgrp, w.id, all_locs) + send_msg_now(w, :join_pgrp, w.id, all_locs, string(getipaddr())) end for w in ws @schedule begin @@ -859,25 +859,28 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately put(lookup_ref(oid), val) elseif is(msg, :identify_socket) otherid = deserialize(sock) - register_worker(Worker("", 0, sock, otherid)) + otherhost = deserialize(sock) + register_worker(Worker(otherhost, 0, sock, otherid)) elseif is(msg, :join_pgrp) # first connection; get process group info from client self_pid = LPROC.id = deserialize(sock) locs = deserialize(sock) + otherhost = deserialize(sock) #print("\nLocation: ",locs,"\nId:",myid(),"\n") # joining existing process group - register_worker(Worker("", 0, sock, 1)) + register_worker(Worker(otherhost, 0, sock, 1)) register_worker(LPROC) for (rhost, rport, rpid) in locs + self_ipaddr_str = string(getipaddr()) if (rpid < self_pid) && (!(rpid == 1)) # Connect to them w = Worker(rhost, rport) w.id = rpid register_worker(w) create_message_handler_loop(w.socket) - send_msg_now(w, :identify_socket, self_pid) + send_msg_now(w, :identify_socket, self_pid, self_ipaddr_str) else # Others will connect to us. Don't do anything just yet continue @@ -1667,7 +1670,6 @@ function interrupt(pids::AbstractVector=workers()) end -# FIXME : For other workers that connect to us, we have to query socket itself to get the other endpoint - TBD... function islocalwrkr(id) if (myid() == id) return true diff --git a/base/sharedarray.jl b/base/sharedarray.jl index f77580bfceecb..525ba4f7cec98 100644 --- a/base/sharedarray.jl +++ b/base/sharedarray.jl @@ -12,10 +12,11 @@ end function SharedArray(T::Type, dims, dprocs, dist; init=false) N = length(dims) - @windows_only error(" SharedArray is not available on Windows yet.") + !isbits(T) ? error("Type of Shared Array elements must be bits types") : nothing + @windows_only error(" SharedArray is not supported on Windows yet.") - # Ensure that all processes are on localhost. Currently only checking this if pid is 1. - if (myid() == 1) && !(all(x->islocalwrkr(x), [dprocs, myid()])) + # Ensure that all processes are on localhost. + if !(all(x->islocalwrkr(x), [dprocs, myid()])) error("SharedArray requires all requested processes to be on the same machine.") end @@ -165,7 +166,7 @@ getindex(sa::SharedArray, x::AbstractArray) = getindex(sa.local_shmmap, x) getindex(sa::SharedArray, args...) = getindex(sa.local_shmmap, args...) setindex!(sa::SharedArray, args...) = (setindex!(sa.local_shmmap, args...); sa) -function print_shmem_limits() +function print_shmem_limits(slen) try @linux_only pfx = "kernel" @osx_only pfx = "kern.sysv" @@ -176,12 +177,12 @@ function print_shmem_limits() println("System max size of single shmem segment(MB) : ", shmmax_MB, "\nSystem max size of all shmem segments(MB) : ", shmall_MB, - "\nRequested size(MB) : ", div(prod(dims)*sizeof(T), 1024*1024), + "\nRequested size(MB) : ", div(slen, 1024*1024), "\nPlease ensure requested size is within system limits.", "\nIf not, increase system limits and try again." ) catch e - ; # Ignore any errors in this... + nothing # Ignore any errors in this... end end @@ -209,8 +210,8 @@ function shm_mmap_array(T, dims, shm_seg_name, mode) A = mmap_array(T, dims, s, 0, grow=false) catch e - print_shmem_limits() - rethrow() + print_shmem_limits(prod(dims)*sizeof(T)) + rethrow(e) finally if s != nothing diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index 8944977c973cc..ec95ffb381a5b 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -398,8 +398,8 @@ required, since the threads are scheduled cooperatively and not preemptively. This means context switches only occur at well-defined points: in this case, when ``remotecall_fetch`` is called. -Distributed Arrays ------------------- +Distributed and Shared Arrays +----------------------------- Large computations are often organized around large arrays of data. In these cases, a particularly natural way to obtain parallelism is to @@ -415,21 +415,21 @@ A ``DArray`` can also use arbitrary array-like types to represent the local chunks that store actual data. The data in a ``DArray`` is distributed by dividing the index space into some number of blocks in each dimension. -Common kinds of arrays can be constructed with functions beginning with -``d``:: +Common kinds of darrays can be constructed using the same convenience +functions for Arrays with the first parameter specified as ``DArray``:: - dzeros(100,100,10) - dones(100,100,10) - drand(100,100,10) - drandn(100,100,10) - dfill(x, 100,100,10) + zeros(DArray, 100,100,10) + ones(DArray, 100,100,10) + rand(DArray, 100,100,10) + randn(DArray, 100,100,10) + fill(x, DArray, 100,100,10) # second parameter for fill In the last case, each element will be initialized to the specified value ``x``. These functions automatically pick a distribution for you. For more control, you can specify which processors to use, and how the data should be distributed:: - dzeros((100,100), workers()[1:4], [1,4]) + zeros(DArray, (100,100), workers()[1:4], [1,4]) The second argument specifies that the array should be created on the first four workers. When dividing data among a large number of processes, @@ -464,11 +464,12 @@ Constructing Distributed Arrays The primitive ``DArray`` constructor has the following somewhat elaborate signature:: - DArray(init, dims[, procs, dist]) + DArray(alloc, dims[, procs, dist]; init=false) -``init`` is a function that accepts a tuple of index ranges. This function should +``alloc`` is a function that accepts a tuple of index ranges. This function should allocate a local chunk of the distributed array and initialize it for the specified -indices. ``dims`` is the overall size of the distributed array. +indices. You can also pass a type instead and the required space is allocated by DArray +itself. ``dims`` is the overall size of the distributed array. ``procs`` optionally specifies a vector of processor IDs to use. ``dist`` is an integer vector specifying how many chunks the distributed array should be divided into in each dimension. @@ -476,13 +477,14 @@ distributed array should be divided into in each dimension. The last two arguments are optional, and defaults will be used if they are omitted. -As an example, here is how to turn the local array constructor ``fill`` -into a distributed array constructor:: +If a keyword argument ``init`` function is specified, it is called on all the workers +with the ``darray`` object and is expected to initialize its ``localpart``. - dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...) +As an example, the DArray ``fill`` is implemented as:: -In this case the ``init`` function only needs to call ``fill`` with the -dimensions of the local piece it is creating. + fill(v, ::Type{DArray}, args...) = DArray(I->fill(v, map(length,I)), args...) + +In this case the ``alloc`` allocates and initializes the local piece it is creating. Distributed Array Operations ---------------------------- @@ -519,7 +521,7 @@ following code accomplishes this:: As you can see, we use a series of indexing expressions to fetch data into a local array ``old``. Note that the ``do`` block syntax is -convenient for passing ``init`` functions to the ``DArray`` constructor. +convenient for passing ``alloc`` functions to the ``DArray`` constructor. Next, the serial function ``life_rule`` is called to apply the update rules to the data, yielding the needed ``DArray`` chunk. Nothing about ``life_rule`` is ``DArray``\ -specific, but we list it here for completeness:: @@ -540,6 +542,20 @@ is ``DArray``\ -specific, but we list it here for completeness:: new end +SharedArrays +------------ + +SharedArrays are like DArrays in the sense that they allow computation to be more easily +spread across a bunch of worker processes. Unlike DArrays which allocate a chunk of +the Array on each participating worker, SharedArrays map the complete array into each +worker using operating system provided shared memory facilities. + +While each worker has full visibility into the SharedArray, local chunks in SharedArrays +(of type SubArray) may be used to partition work across paticipating workers. + +All participating workers in a SharedArray must be on the same host. Limits on the +size and number of SharedArray objects are system defined. + ClusterManagers --------------- diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index a37b0f1f57d0c..aec440d64bd5b 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -4425,50 +4425,66 @@ Parallel Computing Wait until all dynamically-enclosed uses of ``@async``, ``@spawn``, and ``@spawnat`` complete. -Distributed Arrays ------------------- +Distributed and Shared Arrays +----------------------------- -.. function:: DArray(init, dims, [procs, dist]) +.. function:: DArray(alloc::Union(Type, Function), dims, [procs, dist]; init=false) - Construct a distributed array. ``init`` is a function that accepts a tuple of index ranges. - This function should allocate a local chunk of the distributed array and initialize it for the specified indices. + Construct a distributed array. If ``alloc`` is a ``DataType``, local array chunks are alocated. If ``alloc`` is + a ``Function``, it should be a a function that accepts a tuple of index ranges, and should allocate a local + chunk of the distributed array. It may also optionally initialize it. ``dims`` is the overall size of the distributed array. ``procs`` optionally specifies a vector of processor IDs to use. If unspecified, the array is distributed over all worker processes only. Typically, when runnning in distributed mode, i.e., ``nprocs() > 1``, this would mean that no chunk of the distributed array exists on the process hosting the interactive julia prompt. ``dist`` is an integer vector specifying how many chunks the distributed array should be divided into in each dimension. + If keyword argument ``init`` is specified, it should be a function that accepts a DArray object. It is expected to + initialize its ``localpart``. - For example, the ``dfill`` function that creates a distributed array and fills it with a value ``v`` is implemented as: - - ``dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)`` - -.. function:: dzeros(dims, ...) - - Construct a distributed array of zeros. Trailing arguments are the same as those accepted by ``darray``. - -.. function:: dones(dims, ...) - - Construct a distributed array of ones. Trailing arguments are the same as those accepted by ``darray``. - -.. function:: dfill(x, dims, ...) - - Construct a distributed array filled with value ``x``. Trailing arguments are the same as those accepted by ``darray``. + For example, the ``fill`` variant that creates a distributed array and fills it with a value ``v`` is implemented as: -.. function:: drand(dims, ...) + `` fill(v, ::Type{DArray}, args...) = DArray(I->fill(v, map(length,I)), args...)`` - Construct a distributed uniform random array. Trailing arguments are the same as those accepted by ``darray``. +.. function:: SharedArray(arrtype, dims, [procs, dist]; init=false) -.. function:: drandn(dims, ...) + Construct an array mapped across all participating processes using shared memory. ``arrtype`` determines the type of + the array created in shared memory. ``isbits(arrtype)`` MUST be true. + ``dims``, ``procs`` and ``dist`` are similar to those in ``DArray``. + The shared array created is initialized by calling the ``init``function if specified. It should be a function that + accepts a SharedArray object and is expected to initialize its ``localpart`` which will be a ``SubArray``. + + +.. function:: ones, zeros, infs, nans, rand, randn, fill - Construct a distributed normal random array. Trailing arguments are the same as those accepted by ``darray``. + DArray / SharedArray variants of the standard Array convenience constructors. + + Except for ``fill``, they are called by specifying the distribution type as the first parameter. + ``fill`` accepts the distribution type as the second parameter. + + ``zeros(SharedArray, dims, ...)``, ``ones(SharedArray, dims, ...)``, ``rand(DArray, dims, ...)`` and so on. + Trailing arguments are the same as those accepted by DArray and SharedArray. + + For example, + + ``rand(DArray, 1:100, (100,100))`` will created a DArray of size 100 x 100 with random integers between + 1 and 100. + + ``fill(float64(pi), SharedArray, (100,100))`` gets you an all pi SharedArray. + + .. function:: distribute(a) Convert a local array to distributed +.. function:: share(a) + + Convert a local array to shared + .. function:: localpart(d) - Get the local piece of a distributed array. Returns an empty array if no local part exists on the calling process. + Get the local piece of a distributed/shared array. Returns an empty array if no local part exists on the calling process. + If ``d`` is a SharedArray, returns a ``SubArray``. .. function:: myindexes(d) @@ -4477,7 +4493,28 @@ Distributed Arrays .. function:: procs(d) - Get the vector of processors storing pieces of ``d`` + Get the vector of processors participating in the distribution of ``d`` + + +.. function:: dzeros(dims, ...) : DEPRECATED + + Construct a distributed array of zeros. Trailing arguments are the same as those accepted by ``darray``. + +.. function:: dones(dims, ...) : DEPRECATED + + Construct a distributed array of ones. Trailing arguments are the same as those accepted by ``darray``. + +.. function:: dfill(x, dims, ...) : DEPRECATED + + Construct a distributed array filled with value ``x``. Trailing arguments are the same as those accepted by ``darray``. + +.. function:: drand(dims, ...) : DEPRECATED + + Construct a distributed uniform random array. Trailing arguments are the same as those accepted by ``darray``. + +.. function:: drandn(dims, ...) : DEPRECATED + + Construct a distributed normal random array. Trailing arguments are the same as those accepted by ``darray``. System ------ From cf2d18cc5de22e56446a9c3ac5f56fbd0512e051 Mon Sep 17 00:00:00 2001 From: amitmurthy Date: Tue, 31 Dec 2013 13:10:42 +0530 Subject: [PATCH 4/4] revised as per discussion --- base/arraydist.jl | 199 +++++++++++++++++++++++++++++++------------- base/darray.jl | 139 ++++++++++++++++--------------- base/deprecated.jl | 36 +++++--- base/exports.jl | 6 +- base/multi.jl | 24 +----- base/sharedarray.jl | 152 +++++++++++++++++++-------------- test/parallel.jl | 19 +++-- 7 files changed, 348 insertions(+), 227 deletions(-) diff --git a/base/arraydist.jl b/base/arraydist.jl index 303fa9a22ca5b..e042c55edf0ed 100644 --- a/base/arraydist.jl +++ b/base/arraydist.jl @@ -1,54 +1,68 @@ -# ArrayDist holds the chunks of an array distribution across workers -type ArrayDist{N} - # ppmap[i]==p ⇒ processor p has piece i - ppmap::Vector{Int} +# ArrayDist specifies the distribution of an array +# Specifically it should provide the following: +# dims(ad) : dimensions of the array being distributed +# pdims(ad) : dimensions of how the workers are partitoned - Number of parts in each dimension +# ngroups(ad) : Equal to the number of workers +# length(ad) : Number of tiles/partitions +# getindex(ad, i) : array of indices at index i +# locate(ad, I) : returns the partition index of the requested array index +# dmode(ad) : returns a const indicating the type of array using the distribution + + +abstract ArrayDist + +# To be implemented +#type TileDist <: ArrayDist +#end +# Different ways in which an ArrDist may be distributed. +global const DISTMODE_DISTRIBUTED = 1 +global const DISTMODE_SHARED = 2 + +type DimDist{N} <: ArrayDist + dims::NTuple{N,Int} + + # number of parts in each dimension + pdims::Vector{Int} + # indexes held by piece i indexes::Array{NTuple{N,Range1{Int}},N} # cuts[d][i] = first index of chunk i in dimension d cuts::Vector{Vector{Int}} - chunks::Array{RemoteRef,N} - - function ArrayDist(dims, ppmap, indexes, cuts, chunks) - # check invariants - assert(dims == map(last,last(indexes))) - assert(size(chunks) == size(indexes)) - assert(length(chunks) == length(ppmap)) + dmode::Integer - new(ppmap, indexes, cuts, chunks) - end + DimDist(dims, pdims, indexes, cuts, mode) = new(dims, pdims, indexes, cuts, mode) end -function ArrayDist(allocf, dims, procs, dist) - np = prod(dist) - if np > length(procs) +function DimDist(dims, ngroups, pdims; mode=DISTMODE_DISTRIBUTED) + if prod(pdims) > ngroups error("Total requested number of chunks is greater than the number of workers") end - ppmap = procs[1:np] - idxs, cuts = chunk_idxs([dims...], dist) - chunks = Array(RemoteRef, dist...) - for i = 1:np - chunks[i] = remotecall(procs[i], allocf, idxs[i]) - end - - ArrayDist{length(dims)}(dims, ppmap, idxs, cuts, chunks) + idxs, cuts = chunk_idxs([dims...], pdims) + assert(dims == map(last,last(idxs))) + + DimDist{length(dims)}(dims, pdims, idxs, cuts, mode) end +DimDist(dims, ngroups; kwargs...) = DimDist(dims, ngroups, defaultdist(dims, ngroups); kwargs...) +DimDist(dims::(Integer...); kwargs...) = DimDist(dims, nworkers(); kwargs...) +DimDist(dims::Integer...; kwargs...) = DimDist(dims; kwargs...) + ## chunk index utilities ## # decide how to divide each dimension # returns size of chunks array -function defaultdist(dims, procs) +# allocates largest factor to largest dim +function defaultdist(dims, ngroups) dims = [dims...] chunks = ones(Int, length(dims)) - np = length(procs) - f = sort!(collect(keys(factor(np))), rev=true) + f = sort!(collect(keys(factor(ngroups))), rev=true) k = 1 - while np > 1 + while ngroups > 1 # repeatedly allocate largest factor to largest dim - if np%f[k] != 0 + if ngroups%f[k] != 0 k += 1 if k > length(f) break @@ -62,7 +76,7 @@ function defaultdist(dims, procs) dims[dno] = div(dims[dno], fac) chunks[dno] *= fac end - np = div(np,fac) + ngroups = div(ngroups,fac) end chunks end @@ -88,44 +102,115 @@ function chunk_idxs(dims, chunks) idxs, cuts end -function localpartindex(dist::ArrayDist) - ppmap = dist.ppmap - mi = myid() - for i = 1:length(ppmap) - if ppmap[i] == mi - return i - end + +dims(dimdist::DimDist) = dimdist.dims +pdims(dimdist::DimDist) = dimdist.pdims +ngroups(dimdist::DimDist) = length(dimdist.indexes) +length(dimdist::DimDist) = length(dimdist.indexes) +dmode(dimdist::DimDist) = dimdist.dmode + +getindex(dimdist::DimDist, i::Int) = dimdist.indexes[i] +getindex(dimdist::DimDist, i::Int...) = dimdist.indexes[i...] + +# find which piece holds index (I...) +function locate{N}(dimdist::DimDist{N}, I::Int...) + ntuple(N, i->searchsortedlast(dimdist.cuts[i], I[i])) +end + + +# Helper types and functions for distributed and shared arrays +type DistRefs{N} + ppmap::Vector{Int} + chunks::Array{RemoteRef,N} + + DistRefs(p, c) = new(p,c) +end + +procs(dr::DistRefs) = dr.ppmap +length(dr::DistRefs) = length(dr.chunks) + +getindex(ar::DistRefs, i::Int) = ar.chunks[i] +getindex(ar::DistRefs, i...) = ar.chunks[i...] + + +function setup_chunks(allocf, dprocs, arrdist) + if ngroups(arrdist) > length(dprocs) + error("Number of array partitions requested is more than the number of workers specified") + end + + ppmap = dprocs[1:ngroups(arrdist)] + chunks = Array(RemoteRef, pdims(arrdist)...) + for (i, p) in enumerate(ppmap) + chunks[i] = remotecall(p, allocf, arrdist[i]) end - return 0 + + dr = DistRefs{length(dims(arrdist))}(ppmap, chunks) + +# assert(size(chunks) == size(arrdist)) + assert(length(chunks) == length(ppmap)) + + dr end -indexes(ad::ArrayDist) = ad.indexes -procs(ad::ArrayDist) = ad.ppmap -length(ad::ArrayDist) = length(ad.chunks) -dimdist(ad::ArrayDist) = size(ad.chunks) # number of parts in each dimension +localpartindex(dr::DistRefs) = findfirst(dr.ppmap, myid()) + -chunk_ref(ad::ArrayDist, i::Int) = ad.chunks[i] -chunk_ref(ad::ArrayDist, i...) = ad.chunks[i...] +# additional distributed/shared array constructors +function fill(v, dimdist::DimDist; kwargs...) + if dmode(dimdist) == DISTMODE_DISTRIBUTED + DArray(I->fill(v, map(length,I)), dimdist; kwargs...) + else + SharedArray(typeof(v), dimdist; init = S->fill!(localpart(S), v), kwargs...) + end +end -function myindexes{N}(ad::ArrayDist{N}) - lpidx = localpartindex(ad) - if lpidx == 0 - ntuple(N, i->1:0) +# rand variant with range +function rand(TR::Union(DataType, Range1), dimdist::DimDist; kwargs...) + if dmode(dimdist) == DISTMODE_DISTRIBUTED + DArray(I->rand(TR, map(length,I)), dimdist; kwargs...) else - ad.indexes[lpidx] + if isa(TR, Range1) + SharedArray(Int, dimdist; init = S->map!((x)->rand(TR), localpart(S)), kwargs...) + else + SharedArray(TR, dimdist; init = S->map!((x)->rand(TR), localpart(S)), kwargs...) + end end end +rand(dimdist::DimDist; kwargs...) = rand(Float64, dimdist; kwargs...) -# find which piece holds index (I...) -function locate{N}(ad::ArrayDist{N}, I::Int...) - ntuple(N, i->searchsortedlast(ad.cuts[i], I[i])) +function randn(dimdist::DimDist; kwargs...) + if dmode(dimdist) == DISTMODE_DISTRIBUTED + DArray(I->randn(map(length,I)), dimdist; kwargs...) + else + SharedArray(Float64, dimdist; init = S-> map!((x)->randn(), localpart(S)), kwargs...) + end end -function localindex_rr{N}(ad::ArrayDist{N}, I::(Int...)) - chidx = locate(ad, I...) - idxs = ad.indexes[chidx...] - chunk_rr = ad.chunks[chidx...] - chunk_rr, ntuple(N, i->(I[i]-first(idxs[i])+1)) +# ambiguity warning removal +similar{T}(a::Array{T, 1}, dimdist::DimDist) = make_distributed(a, T, dimdist) +similar{T}(a::Array{T, 2}, dimdist::DimDist) = make_distributed(a, T, dimdist) + +function make_distributed(a, T, dimdist; kwargs...) + if dmode(dimdist) == DISTMODE_DISTRIBUTED + owner = myid() + rr = RemoteRef() + put(rr, a) + DArray(dimdist; kwargs...) do I + remotecall_fetch(owner, ()->fetch(rr)[I...]) + end + else + sa = SharedArray(T, dimdist; kwargs...) + if isdefined(sa, :local_shmmap) + copy!(sa.local_shmmap, a) + else + remotecall_fetch(procs(sa)[1], SA -> copy!(SA.local_shmmap, a), SA) + end + sa + end end +# generic version +similar{T}(a::AbstractArray{T}, dimdist::DimDist; kwargs...) = make_distributed(a, T, dimdist; kwargs...) + + diff --git a/base/darray.jl b/base/darray.jl index e0207baf27690..cbbffb6e35f67 100644 --- a/base/darray.jl +++ b/base/darray.jl @@ -1,6 +1,7 @@ type DArray{T,N,A} <: AbstractArray{T,N} - dims::NTuple{N,Int} - ad::ArrayDist{N} + dimdist::DimDist{N} + refs::DistRefs{N} + DArray(ad, r) = new(ad, r) end @@ -9,21 +10,21 @@ typealias SubOrDArray{T,N} Union(DArray{T,N}, SubDArray{T,N}) ## core constructors ## -function DArray(alloc_arg::Union(Type, Function), dims, dprocs, dist; init=false) - N = length(dims) +function DArray(alloc_arg::Union(Type, Function), dimdist::DimDist; init=false, dprocs=workers()) + N = length(dims(dimdist)) - ad = if isa(alloc_arg, Type) - alloc_chunk = (idxs) -> Array(alloc_arg, map(length, idxs)) - ArrayDist(alloc_chunk, dims, dprocs, dist) + allocf = (idxs) -> Array(alloc_arg, map(length, idxs)) else - ArrayDist(alloc_arg, dims, dprocs, dist) + allocf = alloc_arg end - p = max(1, localpartindex(ad)) - A = remotecall_fetch(procs(ad)[p], r->typeof(fetch(r)), chunk_ref(ad, p)) + refs = setup_chunks(allocf, dprocs, dimdist) + + p = max(1, localpartindex(refs)) + A = remotecall_fetch(procs(refs)[p], r->typeof(fetch(r)), refs[p]) T = eltype(A) - d = DArray{T,N,A}(dims, ad) + d = DArray{T,N,A}(dimdist, refs) # if present, init function is called on each of the parts @sync begin @@ -36,81 +37,82 @@ function DArray(alloc_arg::Union(Type, Function), dims, dprocs, dist; init=false d end -function DArray(alloc_arg, dims, procs; kwargs...) - if isempty(procs) - error("no processors") +function DArray(alloc_arg, dims; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + dprocs = kwargs[idx][2] + DArray(alloc_arg, DimDist(dims, length(dprocs)); kwargs...) + else + DArray(alloc_arg, DimDist(dims); kwargs...) + end +end +function DArray(alloc_arg, dims, dist; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + dprocs = kwargs[idx][2] + DArray(alloc_arg, DimDist(dims, length(dprocs), dist); kwargs...) + else + DArray(alloc_arg, DimDist(dims, length(workers()), dist); kwargs...) end - DArray(alloc_arg, dims, procs, defaultdist(dims,procs); kwargs...) end -DArray(alloc_arg, dims; kwargs...) = DArray(alloc_arg, dims, workers()[1:min(nworkers(),maximum(dims))]; kwargs...) -# new DArray similar to an existing one -DArray{T}(d::DArray{T}; kwargs...) = DArray(T, size(d), procs(d), [dimdist(d.ad)...]; kwargs...) +# new DArray similar to an existing one, ensure length of dprocs is same as original if present +function DArray{T}(d::DArray{T}; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + dprocs = kwargs[idx][2] + if length(procs(d)) != length(dprocs) + error("Requested number of workers must be same as existing DArray") + end + DArray(T, DimDist(size(d), length(dprocs), pdims(d.dimdist)); kwargs...) + else + append!(kwargs, [(:dprocs, procs(d))]) + DArray(T, DimDist(size(d), length(procs(d)), pdims(d.dimdist)); kwargs...) + end +end -length(d::DArray) = prod(d.dims) -size(d::DArray) = d.dims -procs(d::DArray) = procs(d.ad) +length(d::DArray) = prod(dims(d.dimdist)) +size(d::DArray) = dims(d.dimdist) +procs(d::DArray) = procs(d.refs) chunktype{T,N,A}(d::DArray{T,N,A}) = A -localpartindex(d::DArray) = localpartindex(d.ad) +localpartindex(d::DArray) = localpartindex(d.refs) function localpart{T,N,A}(d::DArray{T,N,A}) - lpidx = localpartindex(d) + lpidx = localpartindex(d.refs) if lpidx == 0 convert(A, Array(T, ntuple(N,i->0)))::A else - fetch(chunk_ref(d.ad, lpidx))::A + fetch(d.refs[lpidx])::A end end -myindexes(d::DArray) = myindexes(d.ad) -locate(d::DArray, I::Int...) = locate(d.ad, I...) - -chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(chunk_ref(d.ad, i...))::A - -## convenience constructors ## -for (f) in (:zeros, :ones, :nans, :infs) - @eval begin - ($f)(::Type{DArray}, T::DataType, args...) = DArray(I->($f)(T, map(length,I)), args...) - ($f)(::Type{DArray}, args...) = ($f)(DArray, Float64, args...) - ($f)(::Type{DArray}, T::DataType, d::Int...) = ($f)(DArray, T, d) - ($f)(::Type{DArray}, d::Int...) = ($f)(DArray, Float64, d) + + +function myindexes(d::DArray) + lpidx = localpartindex(d.refs) + if lpidx == 0 + ntuple(N, i->1:0) + else + (d.dimdist[lpidx])[1] end end -# rand variant with range -rand(::Type{DArray}, TR::Union(DataType, Range1), d::Int...) = rand(DArray, TR, d) -rand(::Type{DArray}, TR::Union(DataType, Range1), args...) = DArray(I->rand(TR, map(length,I)), args...) -rand(::Type{DArray}, I::(Int...)) = rand(DArray, Float64, I) -rand(::Type{DArray}, args...) = rand(DArray, Float64, args...) -rand(::Type{DArray}, d::Int...) = rand(DArray, Float64, d) +locate(d::DArray, I::Int...) = locate(d.dimdist, I...) -fill(v, ::Type{DArray}, args...) = DArray(I->fill(v, map(length,I)), args...) -fill(v, ::Type{DArray}, d::Int...) = fill(v, DArray, d) +chunk{T,N,A}(d::DArray{T,N,A}, i...) = fetch(d.refs[i...])::A -randn(::Type{DArray}, args...) = DArray(I->randn(map(length,I)), args...) -randn(::Type{DArray}, d::Int...) = randn(DArray, d) ## conversions ## - -function distribute(a::AbstractArray) - owner = myid() - rr = RemoteRef() - put(rr, a) - DArray(size(a)) do I - remotecall_fetch(owner, ()->fetch(rr)[I...]) - end -end - convert{T,N}(::Type{Array}, d::SubOrDArray{T,N}) = convert(Array{T,N}, d) function convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N}) a = Array(S, size(d)) @sync begin - for i = 1:length(d.ad) - @async a[indexes(d.ad)[i]...] = chunk(d, i) + for i = 1:length(d.dimdist) + @async a[(d.dimdist[i])...] = chunk(d, i) end end a @@ -121,7 +123,7 @@ function convert{S,T,N}(::Type{Array{S,N}}, s::SubDArray{T,N}) d = s.parent if isa(I,(Range1{Int}...)) && S<:T && T<:S l = locate(d, map(first, I)...) - if isequal(indexes(d.ad)[l...], I) + if isequal(d.dimdist[l...], I) # SubDArray corresponds to a chunk return chunk(d, l...) end @@ -166,11 +168,16 @@ function getchunk(r::RemoteRef, args...) end end + getindex(d::DArray, i::Int) = getindex(d, ind2sub(size(d), i)) getindex(d::DArray, i::Int...) = getindex(d, sub2ind(size(d), i...)) -function getindex{T}(d::DArray{T}, I::(Int...)) - chunk_rr, localidx = localindex_rr(d.ad, I) +function getindex{T, N}(d::DArray{T,N}, I::(Int...)) + chidx = locate(d.dimdist, I...) + idxs = d.dimdist[chidx...] + chunk_rr = d.refs[chidx...] + localidx = ntuple(N, i->(I[i]-first(idxs[i])+1)) + getchunk(chunk_rr, localidx...)::T end @@ -185,8 +192,8 @@ copy(d::SubOrDArray) = d function setindex!(a::Array, d::DArray, I::Range1{Int}...) n = length(I) @sync begin - for i = 1:length(d.ad) - K = indexes(d.ad)[i] + for i = 1:length(d.dimdist) + K = d.dimdist[i] @async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i) end end @@ -203,8 +210,8 @@ function setindex!(a::Array, s::SubDArray, I::Range1{Int}...) end offs = [isa(J[i],Int) ? J[i]-1 : first(J[i])-1 for i=1:n] @sync begin - for i = 1:length(d.ad) - K_c = {indexes(d.ad)[i]...} + for i = 1:length(d.dimdist) + K_c = {d.dimdist[i]...} K = [ intersect(J[j],K_c[j]) for j=1:n ] if !any(isempty, K) idxs = [ I[j][K[j]-offs[j]] for j=1:n ] @@ -213,7 +220,7 @@ function setindex!(a::Array, s::SubDArray, I::Range1{Int}...) @async a[idxs...] = chunk(d, i) else # partial chunk - ch = chunk_ref(d.ad, i) + ch = d.refs[i] @async a[idxs...] = remotecall_fetch(ch.where, ()->sub(fetch(ch), [K[j]-first(K_c[j])+1 for j=1:n]...)) end end diff --git a/base/deprecated.jl b/base/deprecated.jl index 360e605263f1e..d4d8ce81cbf2f 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -369,17 +369,33 @@ const CharString = UTF32String @deprecate with_bigfloat_rounding(f::Function, r::RoundingMode) with_rounding(f::Function, BigFloat, r) eval(Sys, :(@deprecate shlib_list dllist)) -@deprecate dzeros(args...) zeros(DArray, args...) -@deprecate dzeros(d::Int...) zeros(DArray, d...) -@deprecate dones(args...) ones(DArray, args...) -@deprecate dones(d::Int...) ones(DArray, d...) -@deprecate dfill(v, args...) fill(v, DArray, args...) -@deprecate dfill(v, d::Int...) fill(v, DArray, d...) -@deprecate drand(args...) rand(DArray, args...) -@deprecate drand(d::Int...) rand(DArray, d...) -@deprecate drandn(args...) randn(DArray, args...) -@deprecate drandn(d::Int...) randn(DArray, d...) +@deprecate dzeros(dims) fill(0.0, DimDist(dims)) +@deprecate dzeros(dims, procs) fill(0.0, DimDist(dims, length(procs)); dprocs=procs) +@deprecate dzeros(dims, procs, dist) fill(0.0, DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate dzeros(d::Int...) fill(0.0, DimDist(d)) + +@deprecate dones(dims) fill(1.0, DimDist(dims)) +@deprecate dones(dims, procs) fill(1.0, DimDist(dims, length(procs)); dprocs=procs) +@deprecate dones(dims, procs, dist) fill(1.0, DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate dones(d::Int...) fill(1.0, DimDist(d)) + +@deprecate drand(dims) rand(DimDist(dims)) +@deprecate drand(dims, procs) rand(DimDist(dims, length(procs)); dprocs=procs) +@deprecate drand(dims, procs, dist) rand(DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate drand(d::Int...) rand(DimDist(d)) + +@deprecate drandn(dims) randn(DimDist(dims)) +@deprecate drandn(dims, procs) randn(DimDist(dims, length(procs)); dprocs=procs) +@deprecate drandn(dims, procs, dist) randn(DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate drandn(d::Int...) randn(DimDist(d)) + +@deprecate dfill(v, dims) fill(v, DimDist(dims)) +@deprecate dfill(v, dims, procs) fill(v, DimDist(dims, length(procs)); dprocs=procs) +@deprecate dfill(v, dims, procs, dist) fill(v, DimDist(dims, length(procs), dist); dprocs=procs) +@deprecate dfill(v, d::Int...) fill(v, DimDist(d)) + +@deprecate distribute(a::Array) similar(a, DimDist(size(a))) # 0.3 discontinued functions diff --git a/base/exports.jl b/base/exports.jl index 950411f740f15..d19092a9ffe17 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -18,6 +18,7 @@ export AbstractVector, AbstractVecOrMat, Array, + ArrayDist, Associative, Bidiagonal, BigFloat, @@ -36,11 +37,11 @@ export Complex64, Complex32, DArray, - SharedArray, DevNull, Diagonal, Dict, Dims, + DimDist, EachLine, Eigen, Enumerate, @@ -90,6 +91,7 @@ export RoundUp, Schur, Set, + SharedArray, SparseMatrixCSC, StatStruct, StridedArray, @@ -156,6 +158,8 @@ export C_NULL, CPU_CORES, DL_LOAD_PATH, + DISTMODE_DISTRIBUTED, + DISTMODE_SHARED, ENDIAN_BOM, ENV, Inf, diff --git a/base/multi.jl b/base/multi.jl index c04f200545569..0d26bd268438e 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -212,7 +212,7 @@ function add_workers(pg::ProcessGroup, ws::Array{Any,1}) end all_locs = map(x -> isa(x, Worker) ? (x.privhost, x.port, x.id) : ("", 0, x.id), pg.workers) for w in ws - send_msg_now(w, :join_pgrp, w.id, all_locs, string(getipaddr())) + send_msg_now(w, :join_pgrp, w.id, all_locs) end for w in ws @schedule begin @@ -859,28 +859,25 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately put(lookup_ref(oid), val) elseif is(msg, :identify_socket) otherid = deserialize(sock) - otherhost = deserialize(sock) - register_worker(Worker(otherhost, 0, sock, otherid)) + register_worker(Worker("", 0, sock, otherid)) elseif is(msg, :join_pgrp) # first connection; get process group info from client self_pid = LPROC.id = deserialize(sock) locs = deserialize(sock) - otherhost = deserialize(sock) #print("\nLocation: ",locs,"\nId:",myid(),"\n") # joining existing process group - register_worker(Worker(otherhost, 0, sock, 1)) + register_worker(Worker("", 0, sock, 1)) register_worker(LPROC) for (rhost, rport, rpid) in locs - self_ipaddr_str = string(getipaddr()) if (rpid < self_pid) && (!(rpid == 1)) # Connect to them w = Worker(rhost, rport) w.id = rpid register_worker(w) create_message_handler_loop(w.socket) - send_msg_now(w, :identify_socket, self_pid, self_ipaddr_str) + send_msg_now(w, :identify_socket, self_pid) else # Others will connect to us. Don't do anything just yet continue @@ -1668,16 +1665,3 @@ function interrupt(pids::AbstractVector=workers()) end end end - - -function islocalwrkr(id) - if (myid() == id) - return true - else - wrkr = worker_from_id(id) - if (wrkr.host == "127.0.0.1") || (wrkr.host == "localhost") || (wrkr.host == string(getipaddr())) - return true - end - end - return false -end \ No newline at end of file diff --git a/base/sharedarray.jl b/base/sharedarray.jl index 525ba4f7cec98..5287ccec88716 100644 --- a/base/sharedarray.jl +++ b/base/sharedarray.jl @@ -1,6 +1,6 @@ type SharedArray{T,N} <: AbstractArray{T,N} - dims::NTuple{N,Int} - ad::ArrayDist{N} + arrdist::ArrayDist + refs::DistRefs{N} # Local shmem map. Not to be serialized. local_shmmap::Array{T,N} @@ -9,47 +9,53 @@ type SharedArray{T,N} <: AbstractArray{T,N} local_idx::Int end -function SharedArray(T::Type, dims, dprocs, dist; init=false) - N = length(dims) +function SharedArray(T::Type, arrdist::ArrayDist; init=false, dprocs=workers()) + N = length(dims(arrdist)) !isbits(T) ? error("Type of Shared Array elements must be bits types") : nothing @windows_only error(" SharedArray is not supported on Windows yet.") - # Ensure that all processes are on localhost. - if !(all(x->islocalwrkr(x), [dprocs, myid()])) - error("SharedArray requires all requested processes to be on the same machine.") - end + onlocalhost = assert_same_host(dprocs) local shm_seg_name = "" local local_shmmap local sa = nothing + local shmmem_create_pid try # On OSX, the shm_seg_name length must be < 32 characters shm_seg_name = string("/jl", getpid(), int64(time() * 10^9)) + if onlocalhost + shmmem_create_pid = myid() + local_shmmap = shm_mmap_array(T, dims(arrdist), shm_seg_name, JL_O_CREAT | JL_O_RDWR) + else + # The shared array is being created on a remote machine.... + shmmem_create_pid = dprocs[1] + remotecall(dprocs[1], () -> begin shm_mmap_array(T, dims(arrdist), shm_seg_name, JL_O_CREAT | JL_O_RDWR); nothing end) + end - local_shmmap = shm_mmap_array(T, dims, shm_seg_name, JL_O_CREAT | JL_O_RDWR) - func_alloc = (idxs) -> begin - basemap = shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR) + basemap = shm_mmap_array(T, dims(arrdist), shm_seg_name, JL_O_RDWR) sub(basemap, idxs) end - ad = ArrayDist(func_alloc, dims, dprocs, dist) + refs = setup_chunks(func_alloc, dprocs, arrdist) # Wait till all the workers have mapped the segment - for i in 1:length(ad) - wait(chunk_ref(ad, i)) + for i in 1:length(refs) + wait(refs[i]) end # All good, immediately unlink the segment. - shm_unlink(shm_seg_name) - shm_seg_name = "" ; + remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end) + shm_seg_name = "" + if onlocalhost + sa = SharedArray{T,N}(arrdist, refs, local_shmmap, localpartindex(refs)) + else + sa = SharedArray{T,N}(arrdist, refs) + sa.local_idx = 0 + end - # get the typeof the chunks -# A = typeof(sub(Array(T, ntuple(N,i->0)), ntuple(N, i->1:0))) - sa = SharedArray{T,N}(dims, ad, local_shmmap, localpartindex(ad)) - # if present init function is called on each of the parts @sync begin if isa(init, Function) @@ -61,71 +67,72 @@ function SharedArray(T::Type, dims, dprocs, dist; init=false) finally if shm_seg_name != "" - shm_unlink(shm_seg_name) + remotecall(shmmem_create_pid, () -> begin shm_unlink(shm_seg_name); nothing end) end end sa end -function SharedArray(T, dims, procs; kwargs...) - if isempty(procs) - error("no processors") +function SharedArray(T, dims; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + SharedArray(T, DimDist(dims, length(kwargs[idx][2]); mode=DISTMODE_SHARED); kwargs...) + else + SharedArray(T, DimDist(dims, length(workers()); mode=DISTMODE_SHARED); kwargs...) + end +end + +function SharedArray(T, dims, dist; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + SharedArray(T, DimDist(dims, length(kwargs[idx][2]), dist; mode=DISTMODE_SHARED); kwargs...) + else + SharedArray(T, DimDist(dims, length(workers()), dist; mode=DISTMODE_SHARED); kwargs...) end - SharedArray(T, dims, procs, defaultdist(dims,procs); kwargs...) end -SharedArray(T, dims; kwargs...) = SharedArray(T, dims, workers()[1:min(nworkers(),maximum(dims))]; kwargs...) # new SharedArray similar to an existing one -SharedArray{T}(sa::SharedArray{T}; kwargs...) = SharedArray(T, size(sa), procs(sa), [dimdist(sa.ad)...]; kwargs...) +function SharedArray{T}(sa::SharedArray{T}; kwargs...) + idx = findfirst(Arg -> begin (S,_) = Arg; S == :dprocs end, kwargs) + if idx > 0 + dprocs = kwargs[idx][2] + if length(procs(d)) != length(dprocs) + error("Requested number of workers must be same as existing SharedArray") + end + SharedArray(T, size(sa), length(dprocs), pdims(sa.arrdist); kwargs...) + else + append!(kwargs, [(:dprocs, procs(d))]) + SharedArray(T, size(sa), length(dprocs), length(procs(d)), pdims(sa.arrdist); kwargs...) + end +end + -length(sa::SharedArray) = prod(sa.dims) -size(sa::SharedArray) = sa.dims -procs(sa::SharedArray) = procs(sa.ad) +length(sa::SharedArray) = prod(dims(sa.arrdist)) +size(sa::SharedArray) = dims(sa.arrdist) +procs(sa::SharedArray) = procs(sa.refs) -localpartindex(sa::SharedArray) = localpartindex(sa.ad) +localpartindex(sa::SharedArray) = localpartindex(sa.refs) function localpart{T,N}(sa::SharedArray{T,N}) if sa.local_idx == 0 sub(Array(T, ntuple(N,i->0)), ntuple(N, i->1:0)) else - fetch(chunk_ref(sa.ad, sa.local_idx)) + fetch(sa.refs[sa.local_idx]) end end -myindexes(sa::SharedArray) = myindexes(sa.ad) -locate(sa::SharedArray, I::Int...) = locate(sa.ad, I...) - -## convenience constructors ## -for (arrtype) in (:zero, :one, :inf, :nan) - f = symbol(string(arrtype, "s")) - @eval begin - ($f)(::Type{SharedArray}, d::Int...) = ($f)(SharedArray, Float64, d) - ($f)(::Type{SharedArray}, T::DataType, d::Int...) = ($f)(SharedArray, T, d) - - ($f)(::Type{SharedArray}, args...) = ($f)(SharedArray, Float64, args...) - ($f)(::Type{SharedArray}, T::DataType, args...) = SharedArray(T, args...; init = S->fill!(localpart(S), ($arrtype)(T))) +function myindexes(sa::SharedArray) + lpidx = localpartindex(sa.refs) + if lpidx == 0 + ntuple(N, i->1:0) + else + d.dimdist[lpidx] end -end - -rand(::Type{SharedArray}, I::(Int...)) = rand(SharedArray, Float64, I) -rand(::Type{SharedArray}, T::DataType, args...) = SharedArray(T, args...; init = S->map!((x)->rand(T), localpart(S))) -rand(::Type{SharedArray}, R::Range1, args...) = SharedArray(Int, args...; init = S->map!((x)->rand(R), localpart(S))) -rand(::Type{SharedArray}, args...) = rand(SharedArray, Float64, args...) - -fill(v, ::Type{SharedArray}, args...) = SharedArray(typeof(v), args...; init = S->fill!(localpart(S), v)) -fill(v, ::Type{SharedArray}, d::Int...) = fill(v, SharedArray, d) -randn(::Type{SharedArray}, args...) = SharedArray(Float64, args...; init = S-> map!((x)->randn(), localpart(S))) -randn(::Type{SharedArray}, d::Int...) = randn(SharedArray, d) - -## conversions ## +end +locate(sa::SharedArray, I::Int...) = locate(sa.arrdist, I...) -function share{T}(a::AbstractArray{T}) - sa = SharedArray(T, size(a)) - copy!(sa.local_shmmap, a) - sa -end # Don't serialize local_shmmap (it is the complete array) and @@ -147,14 +154,14 @@ function deserialize{T,N}(s, t::Type{SharedArray{T,N}}) sa.local_idx = localpartindex(sa) if (sa.local_idx > 0) - sa.local_shmmap = parent(fetch(chunk_ref(sa.ad, sa.local_idx))) + sa.local_shmmap = parent(fetch(sa.refs[sa.local_idx])) else error("SharedArray cannot be used on a non-participating process") end sa end -localpartindex(sa::SharedArray) = localpartindex(sa.ad) +localpartindex(sa::SharedArray) = localpartindex(sa.refs) convert(::Type{Array}, sa::SharedArray) = sa.local_shmmap @@ -224,3 +231,20 @@ end @unix_only shm_unlink(shm_seg_name) = ccall(:shm_unlink, Cint, (Ptr{Uint8},), shm_seg_name) @unix_only shm_open(shm_seg_name, oflags, permissions) = ccall(:shm_open, Int, (Ptr{Uint8}, Int, Int), shm_seg_name, oflags, permissions) + +function assert_same_host(procs) + myip = + resp = Array(Any, length(procs)) + + @sync begin + for (i, p) in enumerate(procs) + @async resp[i] = remotecall_fetch(p, () -> getipaddr()) + end + end + + if !all(x->x==resp[1], resp) + error("SharedArray requires all requested processes to be on the same machine.") + end + + return (resp[1] != getipaddr()) ? false : true +end diff --git a/test/parallel.jl b/test/parallel.jl index fcc8b619387cf..3a805eb415cfc 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -13,11 +13,12 @@ id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] @fetch begin myid() end -@windows_only dist_test_types = [DArray] -@unix_only dist_test_types = [DArray, SharedArray] +@windows_only dist_modes = [DISTMODE_DISTRIBUTED] +@unix_only dist_modes = [DISTMODE_DISTRIBUTED, DISTMODE_SHARED] -for tt in dist_test_types - d = rand(tt, (200,200), [id_me, id_other]) +for dm in dist_modes + dprocs = [id_me, id_other] + d = rand(DimDist((200,200), length(dprocs); mode = dm); dprocs = dprocs) s = convert(Array, d[1:150, 1:150]) a = convert(Array, d) @test a[1:150,1:150] == s @@ -29,7 +30,7 @@ end @unix_only begin # SharedArray tests dims = (20,20,20) -d = rand(SharedArray, 1:100, dims) +d = rand(1:100, DimDist(dims; mode=DISTMODE_SHARED)) a = convert(Array, d) partsums = Array(Int, length(procs(d))) @@ -40,7 +41,7 @@ partsums = Array(Int, length(procs(d))) end @test sum(a) == sum(partsums) -d = rand(SharedArray, dims) +d = rand(DimDist(dims; mode=DISTMODE_SHARED)) for p in procs(d) idxes_in_p = remotecall_fetch(p, D->parentindexes(localpart(D)), d) idxf = sub2ind(dims, map(first,idxes_in_p)...) @@ -50,10 +51,10 @@ for p in procs(d) @test d[idxl] == rv end -@test ones(10, 10, 10) == ones(SharedArray, 10, 10, 10) -@test zeros(Int32, 10, 10, 10) == zeros(SharedArray, 10, 10, 10) +@test ones(10, 10, 10) == fill(1.0, DimDist(10, 10, 10; mode=DISTMODE_SHARED)) +@test zeros(Int32, 10, 10, 10) == fill(0, DimDist(10, 10, 10; mode=DISTMODE_SHARED)) -d = SharedArray(Int, dims; init = D->fill!(localpart(D), myid())) +d = SharedArray(Int, DimDist(dims; mode=DISTMODE_SHARED); init = D->fill!(localpart(D), myid())) for p in procs(d) idxes_in_p = remotecall_fetch(p, D->parentindexes(localpart(D)), d) idxf = sub2ind(dims, map(first,idxes_in_p)...)