Skip to content

Commit

Permalink
datastore: Add a RefCounters cache
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Jan 13, 2024
1 parent b6293d5 commit 11ed1d7
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 15 deletions.
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ desc = "a simple distributed data store"
version = "0.4.6"

[deps]
ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
MultiThreadedCaches = "18056a9e-ed0c-4ef3-9ad7-376a7fb08032"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Expand Down
1 change: 1 addition & 0 deletions src/MemPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import Serialization: serialize, deserialize
export DRef, FileRef, poolset, poolget, mmwrite, mmread, cleanup
import .Threads: ReentrantLock
using ScopedValues
using ConcurrentCollections

## Wrapping-unwrapping of payloads:

Expand Down
55 changes: 40 additions & 15 deletions src/datastore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ mutable struct DRef
owner::Int
id::Int
size::UInt
function DRef(owner, id, size)
function DRef(owner, id, size; doref::Bool=true)
d = new(owner, id, size)
poolref(d)
doref && poolref(d)
finalizer(poolunref, d)
d
end
Expand Down Expand Up @@ -123,11 +123,33 @@ struct RefCounters
# Locks for access to send/recv counters
tx_lock::NonReentrantLock
end
RefCounters() = RefCounters(Atomic{Int}(0),
function RefCounters()
rc = maybepop!(REFCOUNTERS_CACHE)
if rc === nothing
rc = RefCounters(Atomic{Int}(0),
Atomic{Int}(0),
Dict{Int,Int}(),
Dict{Int,Int}(),
NonReentrantLock())
else
Threads.atomic_sub!(REFCOUNTERS_STORED, 1)
rc = something(rc)
end
return rc
end
function refcounters_replace!(rc)
if REFCOUNTERS_STORED[] < REFCOUNTERS_CACHE_MAX
rc.worker_counter[] = 0
rc.local_counter[] = 0
empty!(rc.send_counters)
empty!(rc.recv_counters)
Threads.atomic_add!(REFCOUNTERS_STORED, 1)
push!(REFCOUNTERS_CACHE, rc)
end
end
const REFCOUNTERS_CACHE = ConcurrentStack{RefCounters}()
const REFCOUNTERS_STORED = Threads.Atomic{Int}(0)
const REFCOUNTERS_CACHE_MAX = 2^12

Base.show(io::IO, ctrs::RefCounters) = with_lock(ctrs.tx_lock) do
print(io, string(ctrs))
Expand Down Expand Up @@ -259,7 +281,7 @@ function poolref(d::DRef, recv=false)
# We've never seen this DRef, so tell the owner
DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "!! (", d.owner, ", ", d.id, ") at ", myid(), "\n")
if myid() == d.owner
poolref_owner(d.id)
poolref_owner(d.id, ctrs)
else
_enqueue_work(remotecall_wait, poolref_owner, d.owner, d.id)
end
Expand All @@ -274,10 +296,12 @@ function poolref(d::DRef, recv=false)
end
end
"Called on owner when a worker first holds a reference to DRef with ID `id`."
function poolref_owner(id::Int)
function poolref_owner(id::Int, ctrs=nothing)
free = false
ctrs = with_lock(()->datastore_counters[(myid(), id)],
datastore_counters_lock)
if ctrs === nothing
ctrs = with_lock(()->datastore_counters[(myid(), id)],
datastore_counters_lock)
end
update_and_check_owner!(ctrs, id, 1)
DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "== (", myid(), ", ", id, ")\n")
end
Expand Down Expand Up @@ -422,31 +446,30 @@ function poolset(@nospecialize(x), pid=myid(); size=approx_size(x),
sstate = if !restore
StorageState(Some{Any}(x),
Vector{StorageLeaf}(),
CPURAMDevice())
device)
else
@assert !isa(leaf_device, CPURAMDevice) "Cannot use `CPURAMDevice()` as leaf device when `restore=true`"
StorageState(nothing,
[StorageLeaf(leaf_device, Some{Any}(x), retain)],
leaf_device)
device)
end
notify(sstate)
state = RefState(sstate,
size,
tag,
leaf_tag,
destructor)
rc = RefCounters()
Threads.atomic_add!(rc.local_counter, 1)
Threads.atomic_add!(rc.worker_counter, 1)
with_lock(datastore_counters_lock) do
datastore_counters[(pid, id)] = RefCounters()
datastore_counters[(pid, id)] = rc
end
with_lock(datastore_lock) do
datastore[id] = state
end
DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "++ (", myid(), ", ", id, ") [", x, "]\n")
d = DRef(myid(), id, size)
# Switch the root
notify(storage_rcu!(state) do sstate
StorageState(sstate; root=device)
end)
d = DRef(myid(), id, size; doref=false)
if !(restore && device === leaf_device)
try
write_to_device!(device, state, id)
Expand Down Expand Up @@ -502,6 +525,8 @@ end
function datastore_delete(id)
@safe_lock_spin datastore_counters_lock begin
DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "-- (", myid(), ", ", id, ") with ", string(datastore_counters[(myid(), id)]), "\n"; gc_context=true)
ctrs = datastore_counters[(myid(), id)]
refcounters_replace!(ctrs)
delete!(datastore_counters, (myid(), id))
end
state = @safe_lock_spin datastore_lock begin
Expand Down

0 comments on commit 11ed1d7

Please sign in to comment.