Skip to content

Commit

Permalink
Rewrote lapply_async to automatically adjust number of cores when g…
Browse files Browse the repository at this point in the history
…lobals are too large
  • Loading branch information
dipterix committed Nov 24, 2024
1 parent f22cb75 commit 99bc752
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 23 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: raveio
Type: Package
Title: File-System Toolbox for RAVE Project
Version: 0.9.0.74
Version: 0.9.0.75
Language: en-US
Authors@R: c(
person("Zhengjia", "Wang", email = "[email protected]", role = c("aut", "cre", "cph")),
Expand Down
120 changes: 98 additions & 22 deletions R/aaa.R
Original file line number Diff line number Diff line change
Expand Up @@ -527,39 +527,115 @@ lapply_async <- function(
x, FUN, FUN.args = list(), callback = NULL, ncores = NULL,
on_failure = "multisession", ...) {

future <- asNamespace("future")
dipsaus <- asNamespace("dipsaus")

# Determine the number of cores
# get max number of cores
max_workers <- as.integer(raveio_getopt("max_worker", 1L))
if(!isTRUE(is.finite(max_workers)) || max_workers < 1L) {
# correct `max_workers` is not correctly set
max_workers <- 1L
}
if(length(ncores) == 1) {
if(ncores < 1) { ncores <- 1}
max_workers <- raveio_getopt("max_worker", 1L)
if(ncores < max_workers) {
chunk_size <- ceiling(length(x) / ncores)
} else {
chunk_size <- NULL
# user specified number of cores
ncores <- as.integer(ncores)
if(isTRUE(ncores > 0) && isTRUE(ncores < max_workers)) {
max_workers <- ncores
}
} else {
chunk_size <- NULL
ncores <- raveio_getopt("max_worker", 1L)
}
ncores <- max_workers
if(ncores > length(x)) {
ncores <- max(length(x), 1L)
}


get_globals_size <- dipsaus$get_globals_size
if( ncores > 1 && is.function(get_globals_size)) {

# check only when parallel computing is needed
globals_size <- get_globals_size(FUN)

if(length(globals_size) == 1 && is.numeric(globals_size) && is.finite(globals_size)) {
# We expect a globals_size to at least double in the child nodes
child_mem <- ceiling(globals_size * 4)

# determine the max number of cores so make sure the memory is not exhausted
max_mem <- raveio_getopt("max_mem", default = 8) * 1024^3

ncores_changed <- FALSE
max_size_changed <- FALSE

tryCatch({
max_ncores <- max(floor(max_mem / child_mem), 1L, na.rm = TRUE)
if( max_ncores < ncores ) {
ncores_changed <- TRUE
ncores <- max_ncores
}
}, error = function(e) {})

if( ncores > 1 ) {

# check if option `future.globals.maxSize` is too low
# 500 MB default max globals
max_size <- getOption("future.globals.maxSize", default = 524288000)
if(isTRUE(child_mem >= max(max_size, 1)) && is.finite(child_mem)) {
max_size_changed <- TRUE
options(future.globals.maxSize = child_mem)
}
}

if( ncores_changed || max_size_changed ) {
msg <- "Large variables detected. Temporarily"
if( ncores_changed ) {
msg <- c(msg, sprintf("reduced parallel cores to %.0f", ncores))
if( max_size_changed ) {
msg <- c(msg, "and")
}
}
if( max_size_changed ) {
msg <- c(msg, sprintf("raised limit for globals to %s", dipsaus::to_ram_size(child_mem)))
}
message(paste(msg, collapse = " "))
}
}

}

restore_future <- FALSE
plan <- getOption("raveio.auto.parallel", default = TRUE)
current_plan <- "sequential"

on.exit({
if(restore_future) {
future$plan(current_plan, .skip = TRUE)
}
})

# check if fork is disabled
if(isTRUE(plan)) {
dipsaus::make_forked_clusters(
workers = ncores,
on_failure = on_failure, clean = FALSE, ...
)
on.exit({
future <- asNamespace("future")
if( ncores == 1 ) {
future$plan("sequential")
}, add = FALSE)
} else {
# check if fork is disabled
dipsaus::make_forked_clusters(
workers = ncores,
on_failure = on_failure, clean = FALSE, ...
)
restore_future <- TRUE
}
} else {
restore_future <- TRUE
current_plan <- future$plan("list")
}

# print(ncores)
chunk_size <- ceiling(length(x) / ncores)
re <- dipsaus::lapply_async2(x, FUN = FUN, FUN.args = FUN.args, callback = callback,
plan = FALSE, future.chunk.size = chunk_size)
plan = FALSE, future.chunk.size = chunk_size)

if(isTRUE(plan)) {
future <- asNamespace("future")
future$plan("sequential")
on.exit({}, add = FALSE)
if( restore_future ) {
future$plan(current_plan)
restore_future <- FALSE
}

re
Expand Down

0 comments on commit 99bc752

Please sign in to comment.