Skip to content

Commit

Permalink
poolset: Call GC when free mem is low
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Jan 10, 2024
1 parent a74be1e commit 7a41006
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/MemPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ function __init__()
DISKCACHE_CONFIG[] = diskcache_config = DiskCacheConfig()
setup_global_device!(diskcache_config)

if haskey(ENV, "JULIA_MEMPOOL_MEMORY_RESERVED")
MEM_RESERVED[] = parse(UInt, ENV["JULIA_MEMPOOL_MEMORY_RESERVED"])

Check warning on line 122 in src/MemPool.jl

View check run for this annotation

Codecov / codecov/patch

src/MemPool.jl#L122

Added line #L122 was not covered by tests
end

# Ensure we cleanup all references
atexit(exit_hook)
end
Expand Down
66 changes: 65 additions & 1 deletion src/datastore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,18 @@ end
mutable struct SendQueue
queue::Channel{Any}
@atomic task::Union{Task,Nothing}
processing::Bool
end
const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing)
const SEND_QUEUE = SendQueue(Channel(typemax(Int)), nothing, false)
function _enqueue_work(f, args...; gc_context=false)
if SEND_QUEUE.task === nothing
task = Task() do
while true
try
work, _args = take!(SEND_QUEUE.queue)
SEND_QUEUE.processing = true
work(_args...)
SEND_QUEUE.processing = false
catch err
exit_flag[] && continue
err isa ProcessExitedException && continue # TODO: Remove proc from counters
Expand Down Expand Up @@ -348,12 +351,73 @@ isondisk(id::Int) =
isinmemory(x::DRef) = isinmemory(x.id)
isondisk(x::DRef) = isondisk(x.id)

const MEM_RESERVED = Ref{UInt}(512 * (1024^2)) # Reserve 512MB of RAM for OS
const MEM_RESERVE_LOCK = Threads.ReentrantLock()

"""
When called, ensures that at least `MEM_RESERVED[] + size` bytes are available
to the OS. If there is not enough memory available, then a variety of calls to
the GC are performed to free up memory until either the reservation limit is
satisfied, or `max_sweeps` number of cycles have elapsed.
"""
function ensure_memory_reserved(size::Integer=0; max_sweeps::Integer=5)
sat_sub(x::T, y::T) where T = x < y ? zero(T) : x-y

# Check whether the OS is running tight on memory
sweep_ctr = 0
while true
with(QUERY_MEM_OVERRIDE => true) do
Int(storage_available(CPURAMResource())) - size < MEM_RESERVED[]
end || break

# We need more memory! Let's encourage the GC to clear some memory...
sweep_start = time_ns()
mem_used = with(QUERY_MEM_OVERRIDE => true) do
storage_utilized(CPURAMResource())

Check warning on line 376 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L374-L376

Added lines #L374 - L376 were not covered by tests
end
if sweep_ctr == 0
@debug "Not enough memory to continue! Sweeping up unused memory..."
GC.gc(false)
elseif sweep_ctr == 1
GC.gc(true)

Check warning on line 382 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L378-L382

Added lines #L378 - L382 were not covered by tests
else
@everywhere GC.gc(true)

Check warning on line 384 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L384

Added line #L384 was not covered by tests
end

# Let finalizers run
yield()

Check warning on line 388 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L388

Added line #L388 was not covered by tests

# Wait for send queue to clear
while SEND_QUEUE.processing
yield()
end

Check warning on line 393 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L391-L393

Added lines #L391 - L393 were not covered by tests

with(QUERY_MEM_OVERRIDE => true) do
mem_freed = sat_sub(mem_used, storage_utilized(CPURAMResource()))
@debug "Freed $(Base.format_bytes(mem_freed)) bytes, available: $(Base.format_bytes(storage_available(CPURAMResource())))"

Check warning on line 397 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L395-L397

Added lines #L395 - L397 were not covered by tests
end

sweep_ctr += 1
if sweep_ctr == max_sweeps
@debug "Made too many sweeps, bailing out..."
break

Check warning on line 403 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L400-L403

Added lines #L400 - L403 were not covered by tests
end
end

Check warning on line 405 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L405

Added line #L405 was not covered by tests
if sweep_ctr > 0
@debug "Swept for $sweep_ctr cycles"

Check warning on line 407 in src/datastore.jl

View check run for this annotation

Codecov / codecov/patch

src/datastore.jl#L407

Added line #L407 was not covered by tests
end
end

function poolset(@nospecialize(x), pid=myid(); size=approx_size(x),
retain=false, restore=false,
device=GLOBAL_DEVICE[], leaf_device=initial_leaf_device(device),
tag=nothing, leaf_tag=Tag(),
destructor=nothing)
if pid == myid()
if !restore
@lock MEM_RESERVE_LOCK ensure_memory_reserved(size)
end

id = atomic_add!(id_counter, 1)
sstate = if !restore
StorageState(Some{Any}(x),
Expand Down

0 comments on commit 7a41006

Please sign in to comment.