diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a066a6f..f93d15a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,7 +13,6 @@ jobs: strategy: matrix: julia-version: - - '~1.7' - '~1.8' - '~1.9' - 'nightly' diff --git a/Project.toml b/Project.toml index 9cbcca0..6378630 100644 --- a/Project.toml +++ b/Project.toml @@ -9,12 +9,14 @@ DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Mmap = "a63ad114-7e13-5084-954f-fe012c677804" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" [compat] DataStructures = "0.18" -julia = "1.7" +ScopedValues = "1" +julia = "1.8" [extras] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" diff --git a/src/MemPool.jl b/src/MemPool.jl index 223c8e0..65dd3bc 100644 --- a/src/MemPool.jl +++ b/src/MemPool.jl @@ -4,6 +4,7 @@ using Serialization, Sockets, Random import Serialization: serialize, deserialize export DRef, FileRef, poolset, poolget, mmwrite, mmread, cleanup import .Threads: ReentrantLock +using ScopedValues ## Wrapping-unwrapping of payloads: @@ -117,6 +118,10 @@ function __init__() DISKCACHE_CONFIG[] = diskcache_config = DiskCacheConfig() setup_global_device!(diskcache_config) + if haskey(ENV, "JULIA_MEMPOOL_MEMORY_RESERVED") + MEM_RESERVED[] = parse(UInt, ENV["JULIA_MEMPOOL_MEMORY_RESERVED"]) + end + # Ensure we cleanup all references atexit(exit_hook) end diff --git a/src/datastore.jl b/src/datastore.jl index 89ef44a..99dd0ac 100644 --- a/src/datastore.jl +++ b/src/datastore.jl @@ -203,15 +203,18 @@ end mutable struct SendQueue queue::Channel{Any} @atomic task::Union{Task,Nothing} + processing::Bool end -const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing) +const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing, false) function _enqueue_work(f, args...; gc_context=false) if SEND_QUEUE.task === nothing task = Task() do while true try work, _args = take!(SEND_QUEUE.queue) + SEND_QUEUE.processing = true work(_args...) + SEND_QUEUE.processing = false catch err exit_flag[] && continue err isa ProcessExitedException && continue # TODO: Remove proc from counters @@ -348,12 +351,73 @@ isondisk(id::Int) = isinmemory(x::DRef) = isinmemory(x.id) isondisk(x::DRef) = isondisk(x.id) +const MEM_RESERVED = Ref{UInt}(512 * (1024^2)) # Reserve 512MB of RAM for OS +const MEM_RESERVE_LOCK = Threads.ReentrantLock() + +""" +When called, ensures that at least `MEM_RESERVED[] + size` bytes are available +to the OS. If there is not enough memory available, then a variety of calls to +the GC are performed to free up memory until either the reservation limit is +satisfied, or `max_sweeps` number of cycles have elapsed. +""" +function ensure_memory_reserved(size::Integer=0; max_sweeps::Integer=5) + sat_sub(x::T, y::T) where T = x < y ? zero(T) : x-y + + # Check whether the OS is running tight on memory + sweep_ctr = 0 + while true + with(QUERY_MEM_OVERRIDE => true) do + Int(storage_available(CPURAMResource())) - size < MEM_RESERVED[] + end || break + + # We need more memory! Let's encourage the GC to clear some memory... + sweep_start = time_ns() + mem_used = with(QUERY_MEM_OVERRIDE => true) do + storage_utilized(CPURAMResource()) + end + if sweep_ctr == 0 + @debug "Not enough memory to continue! Sweeping up unused memory..." + GC.gc(false) + elseif sweep_ctr == 1 + GC.gc(true) + else + @everywhere GC.gc(true) + end + + # Let finalizers run + yield() + + # Wait for send queue to clear + while SEND_QUEUE.processing + yield() + end + + with(QUERY_MEM_OVERRIDE => true) do + mem_freed = sat_sub(mem_used, storage_utilized(CPURAMResource())) + @debug "Freed $(Base.format_bytes(mem_freed)) bytes, available: $(Base.format_bytes(storage_available(CPURAMResource())))" + end + + sweep_ctr += 1 + if sweep_ctr == max_sweeps + @debug "Made too many sweeps, bailing out..." + break + end + end + if sweep_ctr > 0 + @debug "Swept for $sweep_ctr cycles" + end +end + function poolset(@nospecialize(x), pid=myid(); size=approx_size(x), retain=false, restore=false, device=GLOBAL_DEVICE[], leaf_device=initial_leaf_device(device), tag=nothing, leaf_tag=Tag(), destructor=nothing) if pid == myid() + if !restore + @lock MEM_RESERVE_LOCK ensure_memory_reserved(size) + end + id = atomic_add!(id_counter, 1) sstate = if !restore StorageState(Some{Any}(x), diff --git a/src/storage.jl b/src/storage.jl index 03d8061..42885b3 100644 --- a/src/storage.jl +++ b/src/storage.jl @@ -186,6 +186,7 @@ QueriedMemInfo() = QueriedMemInfo(UInt64(0), UInt64(0)) const QUERY_MEM_AVAILABLE = Ref(QueriedMemInfo()) const QUERY_MEM_CAPACITY = Ref(QueriedMemInfo()) const QUERY_MEM_PERIOD = 10 * 1000^2 # 10ms +const QUERY_MEM_OVERRIDE = ScopedValue(false) function _query_mem_periodically(kind::Symbol) if !(kind in (:available, :capacity)) throw(ArgumentError("Invalid memory query kind: $kind")) @@ -197,7 +198,7 @@ function _query_mem_periodically(kind::Symbol) end mem_info = mem_bin[] now_ns = time_ns() - if mem_info.last_ns < now_ns - QUERY_MEM_PERIOD + if QUERY_MEM_OVERRIDE[] || mem_info.last_ns < now_ns - QUERY_MEM_PERIOD if kind == :available new_mem_info = QueriedMemInfo(free_memory(), now_ns) elseif kind == :capacity