From 11ed1d7d01a6878d187aaaf380ae8819f5f96d82 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Fri, 29 Dec 2023 21:37:45 -0700 Subject: [PATCH] datastore: Add a RefCounters cache --- Project.toml | 2 ++ src/MemPool.jl | 1 + src/datastore.jl | 55 +++++++++++++++++++++++++++++++++++------------- 3 files changed, 43 insertions(+), 15 deletions(-) diff --git a/Project.toml b/Project.toml index d511987..5ff4694 100644 --- a/Project.toml +++ b/Project.toml @@ -5,9 +5,11 @@ desc = "a simple distributed data store" version = "0.4.6" [deps] +ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd" DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Mmap = "a63ad114-7e13-5084-954f-fe012c677804" +MultiThreadedCaches = "18056a9e-ed0c-4ef3-9ad7-376a7fb08032" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" diff --git a/src/MemPool.jl b/src/MemPool.jl index 65dd3bc..98d0219 100644 --- a/src/MemPool.jl +++ b/src/MemPool.jl @@ -5,6 +5,7 @@ import Serialization: serialize, deserialize export DRef, FileRef, poolset, poolget, mmwrite, mmread, cleanup import .Threads: ReentrantLock using ScopedValues +using ConcurrentCollections ## Wrapping-unwrapping of payloads: diff --git a/src/datastore.jl b/src/datastore.jl index 99dd0ac..7595bc7 100644 --- a/src/datastore.jl +++ b/src/datastore.jl @@ -4,9 +4,9 @@ mutable struct DRef owner::Int id::Int size::UInt - function DRef(owner, id, size) + function DRef(owner, id, size; doref::Bool=true) d = new(owner, id, size) - poolref(d) + doref && poolref(d) finalizer(poolunref, d) d end @@ -123,11 +123,33 @@ struct RefCounters # Locks for access to send/recv counters tx_lock::NonReentrantLock end -RefCounters() = RefCounters(Atomic{Int}(0), +function RefCounters() + rc = maybepop!(REFCOUNTERS_CACHE) + if rc === nothing + rc = RefCounters(Atomic{Int}(0), Atomic{Int}(0), Dict{Int,Int}(), Dict{Int,Int}(), NonReentrantLock()) + else + Threads.atomic_sub!(REFCOUNTERS_STORED, 1) + rc = something(rc) + end + return rc +end +function refcounters_replace!(rc) + if REFCOUNTERS_STORED[] < REFCOUNTERS_CACHE_MAX + rc.worker_counter[] = 0 + rc.local_counter[] = 0 + empty!(rc.send_counters) + empty!(rc.recv_counters) + Threads.atomic_add!(REFCOUNTERS_STORED, 1) + push!(REFCOUNTERS_CACHE, rc) + end +end +const REFCOUNTERS_CACHE = ConcurrentStack{RefCounters}() +const REFCOUNTERS_STORED = Threads.Atomic{Int}(0) +const REFCOUNTERS_CACHE_MAX = 2^12 Base.show(io::IO, ctrs::RefCounters) = with_lock(ctrs.tx_lock) do print(io, string(ctrs)) @@ -259,7 +281,7 @@ function poolref(d::DRef, recv=false) # We've never seen this DRef, so tell the owner DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "!! (", d.owner, ", ", d.id, ") at ", myid(), "\n") if myid() == d.owner - poolref_owner(d.id) + poolref_owner(d.id, ctrs) else _enqueue_work(remotecall_wait, poolref_owner, d.owner, d.id) end @@ -274,10 +296,12 @@ function poolref(d::DRef, recv=false) end end "Called on owner when a worker first holds a reference to DRef with ID `id`." -function poolref_owner(id::Int) +function poolref_owner(id::Int, ctrs=nothing) free = false - ctrs = with_lock(()->datastore_counters[(myid(), id)], - datastore_counters_lock) + if ctrs === nothing + ctrs = with_lock(()->datastore_counters[(myid(), id)], + datastore_counters_lock) + end update_and_check_owner!(ctrs, id, 1) DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "== (", myid(), ", ", id, ")\n") end @@ -422,12 +446,12 @@ function poolset(@nospecialize(x), pid=myid(); size=approx_size(x), sstate = if !restore StorageState(Some{Any}(x), Vector{StorageLeaf}(), - CPURAMDevice()) + device) else @assert !isa(leaf_device, CPURAMDevice) "Cannot use `CPURAMDevice()` as leaf device when `restore=true`" StorageState(nothing, [StorageLeaf(leaf_device, Some{Any}(x), retain)], - leaf_device) + device) end notify(sstate) state = RefState(sstate, @@ -435,18 +459,17 @@ function poolset(@nospecialize(x), pid=myid(); size=approx_size(x), tag, leaf_tag, destructor) + rc = RefCounters() + Threads.atomic_add!(rc.local_counter, 1) + Threads.atomic_add!(rc.worker_counter, 1) with_lock(datastore_counters_lock) do - datastore_counters[(pid, id)] = RefCounters() + datastore_counters[(pid, id)] = rc end with_lock(datastore_lock) do datastore[id] = state end DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "++ (", myid(), ", ", id, ") [", x, "]\n") - d = DRef(myid(), id, size) - # Switch the root - notify(storage_rcu!(state) do sstate - StorageState(sstate; root=device) - end) + d = DRef(myid(), id, size; doref=false) if !(restore && device === leaf_device) try write_to_device!(device, state, id) @@ -502,6 +525,8 @@ end function datastore_delete(id) @safe_lock_spin datastore_counters_lock begin DEBUG_REFCOUNTING[] && _enqueue_work(Core.print, "-- (", myid(), ", ", id, ") with ", string(datastore_counters[(myid(), id)]), "\n"; gc_context=true) + ctrs = datastore_counters[(myid(), id)] + refcounters_replace!(ctrs) delete!(datastore_counters, (myid(), id)) end state = @safe_lock_spin datastore_lock begin