Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add migration helper #90

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions src/datastore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,9 @@ end

function poolget(ref::DRef)
DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "?? (", ref.owner, ", ", ref.id, ") at ", myid(), "\n")
return access_ref(identity, ref)
end
function access_ref(f, ref::DRef, args...; local_only::Bool=false)
original_ref = ref

# Check global redirect cache
Expand All @@ -529,11 +532,11 @@ function poolget(ref::DRef)
# Fetch the value (or a RedirectTo) from the owner
@label fetch
value = if ref.owner == myid()
_getlocal(ref.id, false)
_getlocal(f, ref.id, false, args...; local_only, from=myid())
else
forwardkeyerror() do
remotecall_fetch(ref.owner, ref) do ref
MMWrap(_getlocal(ref.id, true))
remotecall_fetch(ref.owner, ref, args) do ref, args
MMWrap(_getlocal(f, ref.id, true, args...; local_only, from=myid()))
end
end
end |> unwrap_payload
Expand All @@ -551,13 +554,17 @@ function poolget(ref::DRef)
return something(value)
end

function _getlocal(id, remote)
function _getlocal(f, id, remote, args...; local_only::Bool, from::Int)
state = with_lock(()->datastore[id], datastore_lock)
lock_read(state.lock) do
if state.redirect !== nothing
return RedirectTo(state.redirect)
end
return Some{Any}(read_from_device(state, id, true))
if local_only && from != myid()
throw(ConcurrencyViolationError("Attempted to access a DRef from a worker that does not own it"))
end
value = read_from_device(state, id, true)
return Some{Any}(f(value, args...))
end
end

Expand Down Expand Up @@ -598,10 +605,10 @@ Migrate the data referenced by `ref` to another worker `to`, returning the new
any accesses via `poolget` will seamlessly redirect to the new `DRef`, but the
data is no longer stored on the same worker as `ref`.
"""
function migrate!(ref::DRef, to::Integer)
function migrate!(ref::DRef, to::Integer; pre_migration=nothing, dest_post_migration=nothing, post_migration=nothing)
@assert ref.owner != to "Cannot migrate a DRef within the same worker"
if ref.owner != myid()
return remotecall_fetch(migrate!, ref.owner, ref, to)
return remotecall_fetch(migrate!, ref.owner, ref, to; pre_migration, dest_post_migration, post_migration)
end
state = with_lock(()->datastore[ref.id], datastore_lock)

Expand All @@ -612,14 +619,32 @@ function migrate!(ref::DRef, to::Integer)
# Read the current value of the ref
data = read_from_device(state, ref, true)

# Prepare data for migration
pre_migration_data = if pre_migration !== nothing
pre_migration(data)
else
nothing
end

# Create new ref to redirect to
new_ref = remotecall_fetch(poolset, to, data)
new_ref = remotecall_fetch(to, data) do data
new_ref = poolset(data)
if dest_post_migration !== nothing
access_ref(dest_post_migration, new_ref, pre_migration_data; local_only=true)
end
return new_ref
end

# Set the redirect to our new ref
state.redirect = new_ref

# Delete the local data
delete_from_device!(state, ref)

# Clean up old data if requested
if post_migration !== nothing
post_migration(data)
end
end

return new_ref
Expand Down
Loading