-
-
Notifications
You must be signed in to change notification settings - Fork 67
/
Copy pathchunks.jl
305 lines (265 loc) · 9.57 KB
/
chunks.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
using Serialization
export domain, UnitDomain, project, alignfirst, ArrayDomain
import Base: isempty, getindex, intersect, ==, size, length, ndims
"""
domain(x::T)
Returns metadata about `x`. This metadata will be in the `domain`
field of a Chunk object when an object of type `T` is created as
the result of evaluating a Thunk.
"""
function domain end
"""
UnitDomain
Default domain -- has no information about the value
"""
struct UnitDomain end
"""
If no `domain` method is defined on an object, then
we use the `UnitDomain` on it. A `UnitDomain` is indivisible.
"""
domain(x::Any) = UnitDomain()
###### Chunk ######
"""
Chunk
A reference to a piece of data located on a remote worker. `Chunk`s are
typically created with `Dagger.tochunk(data)`, and the data can then be
accessed from any worker with `collect(::Chunk)`. `Chunk`s are
serialization-safe, and use distributed refcounting (provided by
`MemPool.DRef`) to ensure that the data referenced by a `Chunk` won't be GC'd,
as long as a reference exists on some worker.
Each `Chunk` is associated with a given `Dagger.Processor`, which is (in a
sense) the processor that "owns" or contains the data. Calling
`collect(::Chunk)` will perform data movement and conversions defined by that
processor to safely serialize the data to the calling worker.
## Constructors
See [`tochunk`](@ref).
"""
mutable struct Chunk{T, H, P<:Processor, S<:AbstractScope}
chunktype::Type{T}
domain
handle::H
processor::P
scope::S
persist::Bool
end
domain(c::Chunk) = c.domain
chunktype(c::Chunk) = c.chunktype
persist!(t::Chunk) = (t.persist=true; t)
shouldpersist(p::Chunk) = t.persist
processor(c::Chunk) = c.processor
affinity(c::Chunk) = affinity(c.handle)
is_task_or_chunk(c::Chunk) = true
Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, hash(Chunk, x))
collect_remote(chunk::Chunk) =
move(chunk.processor, OSProc(), poolget(chunk.handle))
function collect(ctx::Context, chunk::Chunk; options=nothing)
# delegate fetching to handle by default.
if chunk.handle isa DRef && !(chunk.processor isa OSProc)
return remotecall_fetch(collect_remote, chunk.handle.owner, chunk)
elseif chunk.handle isa FileRef
return poolget(chunk.handle)
else
return move(chunk.processor, OSProc(), chunk.handle)
end
end
collect(ctx::Context, ref::DRef; options=nothing) =
move(OSProc(ref.owner), OSProc(), ref)
collect(ctx::Context, ref::FileRef; options=nothing) =
poolget(ref) # FIXME: Do move call
function Base.fetch(chunk::Chunk; raw=false)
if raw
poolget(chunk.handle)
else
collect(chunk)
end
end
# Unwrap Chunk, DRef, and FileRef by default
move(from_proc::Processor, to_proc::Processor, x::Chunk) =
move(from_proc, to_proc, x.handle)
move(from_proc::Processor, to_proc::Processor, x::Union{DRef,FileRef}) =
move(from_proc, to_proc, poolget(x))
# Determine from_proc when unspecified
move(to_proc::Processor, chunk::Chunk) =
move(chunk.processor, to_proc, chunk)
move(to_proc::Processor, d::DRef) =
move(OSProc(d.owner), to_proc, d)
move(to_proc::Processor, x) =
move(OSProc(), to_proc, x)
### ChunkIO
affinity(r::DRef) = OSProc(r.owner)=>r.size
# this previously returned a vector with all machines that had the file cached
# but now only returns the owner and size, for consistency with affinity(::DRef),
# see #295
affinity(r::FileRef) = OSProc(1)=>r.size
### Mutation
function _mutable_inner(@nospecialize(f), proc, scope)
result = f()
return Ref(Dagger.tochunk(result, proc, scope))
end
"""
mutable(f::Base.Callable; worker, processor, scope) -> Chunk
@mutable [worker=1] [processor=OSProc()] [scope=ProcessorScope()] f()
Calls `f()` on the specified worker or processor, returning a `Chunk`
referencing the result with the specified scope `scope`.
"""
function mutable(@nospecialize(f); worker=nothing, processor=nothing, scope=nothing)
if processor === nothing
if worker === nothing
processor = OSProc()
else
processor = OSProc(worker)
end
else
@assert worker === nothing "mutable: Can't mix worker and processor"
end
if scope === nothing
scope = processor isa OSProc ? ProcessScope(processor) : ExactScope(processor)
end
return fetch(Dagger.@spawn scope=scope _mutable_inner(f, processor, scope))[]
end
macro mutable(exs...)
opts = esc.(exs[1:end-1])
ex = exs[end]
quote
let f = @noinline ()->$(esc(ex))
$mutable(f; $(opts...))
end
end
end
"""
Maps a value to one of multiple distributed "mirror" values automatically when
used as a thunk argument. Construct using `@shard` or `shard`.
"""
struct Shard
chunks::Dict{Processor,Chunk}
end
"""
shard(f; kwargs...) -> Chunk{Shard}
Executes `f` on all workers in `workers`, wrapping the result in a
process-scoped `Chunk`, and constructs a `Chunk{Shard}` containing all of these
`Chunk`s on the current worker.
Keyword arguments:
- `procs` -- The list of processors to create pieces on. May be any iterable container of `Processor`s.
- `workers` -- The list of workers to create pieces on. May be any iterable container of `Integer`s.
- `per_thread::Bool=false` -- If `true`, creates a piece per each thread, rather than a piece per each worker.
"""
function shard(@nospecialize(f); procs=nothing, workers=nothing, per_thread=false)
if procs === nothing
if workers !== nothing
procs = [OSProc(w) for w in workers]
else
procs = lock(Sch.eager_context()) do
copy(Sch.eager_context().procs)
end
end
if per_thread
_procs = ThreadProc[]
for p in procs
append!(_procs, filter(p->p isa ThreadProc, get_processors(p)))
end
procs = _procs
end
else
if workers !== nothing
throw(ArgumentError("Cannot combine `procs` and `workers`"))
elseif per_thread
throw(ArgumentError("Cannot combine `procs` and `per_thread=true`"))
end
end
isempty(procs) && throw(ArgumentError("Cannot create empty Shard"))
shard_dict = Dict{Processor,Chunk}()
for proc in procs
scope = proc isa OSProc ? ProcessScope(proc) : ExactScope(proc)
thunk = Dagger.@spawn scope=scope _mutable_inner(f, proc, scope)
shard_dict[proc] = fetch(thunk)[]
end
return Shard(shard_dict)
end
"Creates a `Shard`. See [`Dagger.shard`](@ref) for details."
macro shard(exs...)
opts = esc.(exs[1:end-1])
ex = exs[end]
quote
let f = @noinline ()->$(esc(ex))
$shard(f; $(opts...))
end
end
end
function move(from_proc::Processor, to_proc::Processor, shard::Shard)
# Match either this proc or some ancestor
# N.B. This behavior may bypass the piece's scope restriction
proc = to_proc
if haskey(shard.chunks, proc)
return move(from_proc, to_proc, shard.chunks[proc])
end
parent = Dagger.get_parent(proc)
while parent != proc
proc = parent
parent = Dagger.get_parent(proc)
if haskey(shard.chunks, proc)
return move(from_proc, to_proc, shard.chunks[proc])
end
end
throw(KeyError(to_proc))
end
Base.iterate(s::Shard) = iterate(values(s.chunks))
Base.iterate(s::Shard, state) = iterate(values(s.chunks), state)
Base.length(s::Shard) = length(s.chunks)
### Core Stuff
"""
tochunk(x, proc::Processor, scope::AbstractScope; device=nothing, kwargs...) -> Chunk
Create a chunk from data `x` which resides on `proc` and which has scope
`scope`.
`device` specifies a `MemPool.StorageDevice` (which is itself wrapped in a
`Chunk`) which will be used to manage the reference contained in the `Chunk`
generated by this function. If `device` is `nothing` (the default), the data
will be inspected to determine if it's safe to serialize; if so, the default
MemPool storage device will be used; if not, then a `MemPool.CPURAMDevice` will
be used.
All other kwargs are passed directly to `MemPool.poolset`.
"""
function tochunk(x::X, proc::P=OSProc(), scope::S=AnyScope(); persist=false, cache=false, device=nothing, kwargs...) where {X,P,S}
if device === nothing
device = if Sch.walk_storage_safe(x)
MemPool.GLOBAL_DEVICE[]
else
MemPool.CPURAMDevice()
end
end
ref = poolset(x; device, kwargs...)
Chunk{X,typeof(ref),P,S}(X, domain(x), ref, proc, scope, persist)
end
tochunk(x::Union{Chunk, Thunk}, proc=nothing, scope=nothing; kwargs...) = x
function savechunk(data, dir, f)
sz = open(joinpath(dir, f), "w") do io
serialize(io, MemPool.MMWrap(data))
return position(io)
end
fr = FileRef(f, sz)
proc = OSProc()
scope = AnyScope() # FIXME: Scoped to this node
Chunk{typeof(data),typeof(fr),typeof(proc),typeof(scope)}(typeof(data), domain(data), fr, proc, scope, true)
end
struct WeakChunk
wid::Int
id::Int
x::WeakRef
function WeakChunk(c::Chunk)
return new(c.handle.owner, c.handle.id, WeakRef(c))
end
end
unwrap_weak(c::WeakChunk) = c.x.value
function unwrap_weak_checked(c::WeakChunk)
cw = unwrap_weak(c)
@assert cw !== nothing "WeakChunk expired: ($(c.wid), $(c.id))"
return cw
end
is_task_or_chunk(c::WeakChunk) = true
Serialization.serialize(io::AbstractSerializer, wc::WeakChunk) =
error("Cannot serialize a WeakChunk")
Base.@deprecate_binding AbstractPart Union{Chunk, Thunk}
Base.@deprecate_binding Part Chunk
Base.@deprecate parts(args...) chunks(args...)
Base.@deprecate part(args...) tochunk(args...)
Base.@deprecate parttype(args...) chunktype(args...)