Skip to content

Commit

Permalink
Add the "auto-detect the current allocation" feature
Browse files Browse the repository at this point in the history
  • Loading branch information
DilumAluthge committed Jan 21, 2025
1 parent 18db175 commit 076b6b8
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ version = "0.4.8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
SlurmClusterManager = "c82cd089-7bf7-41d7-976b-6b5d413cbe0a"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
Distributed = "< 0.0.1, 1"
Logging = "< 0.0.1, 1"
Pkg = "< 0.0.1, 1"
SlurmClusterManager = "0.1.3"
Sockets = "< 0.0.1, 1"
julia = "1.2"

Expand Down
9 changes: 8 additions & 1 deletion src/ClusterManagers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,23 @@ using Distributed
using Sockets
using Pkg

import SlurmClusterManager

export launch, manage, kill, init_worker, connect
import Distributed: launch, manage, kill, init_worker, connect

# Bring some other names into scope, just for convenience:
using Distributed: addprocs


worker_cookie() = begin Distributed.init_multi(); cluster_cookie() end
worker_arg() = `--worker=$(worker_cookie())`


# PBS doesn't have the same semantics as SGE wrt to file accumulate,
# a different solution will have to be found
include("qsub.jl")

include("auto_detect.jl")
include("scyld.jl")
include("condor.jl")
include("slurm.jl")
Expand Down
103 changes: 103 additions & 0 deletions src/auto_detect.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
function addprocs_autodetect_current_scheduler(; kwargs...)
sched = _autodetect_is_slurm()
if sched == :slurm
return addprocs(SlurmClusterManager.SlurmManager(); kwargs...)
elseif sched == :sge
np = _sge_get_number_of_tasks()
return addprocs_sge(np; kwargs...)
elseif sched == :pbs
np = _torque_get_numtasks()
return addprocs_pbs(np; kwargs...)
end
error("Unable to auto-detect cluster scheduler: $(sched)")
end

function autodetect_current_scheduler()
if _autodetect_is_slurm()
return :slurm
elseif _autodetect_is_sge()
return :sge
elseif _autodetect_is_pbs()
return :pbs
end
return nothing
end

##### Slurm:

function _autodetect_is_slurm()
has_SLURM_JOB_ID = _has_env_nonempty("SLURM_JOB_ID")
has_SLURM_JOBID = _has_env_nonempty("SLURM_JOBID")
res = has_SLURM_JOB_ID || has_SLURM_JOBID
return res
end

##### SGE (Sun Grid Engine):

function _autodetect_is_sge()
# https://docs.oracle.com/cd/E19957-01/820-0699/chp4-21/index.html
has_SGE_O_HOST = _has_env_nonempty("SGE_O_HOST")
return has_SGE_O_HOST

# Important note:
# The "job ID" environment variable in SGE is just named `JOB_ID`.
# This is obviously too vague, because the variable name is not specific to SGE.
# Therefore, we can't use that variable for our SGE auto-detection.
end

function _sge_get_numtasks()
msg = "Because this is Sun Grid Engine (SGE), ClusterManagers.jl is not able " *
"to correctly auto-detect the number of tasks. " *
"Therefore, ClusterManagers.jl will instead use the value of the " *
"NHOSTS environment variable: $(np)"
@warn msg

# https://docs.oracle.com/cd/E19957-01/820-0699/chp4-21/index.html
name = "NHOSTS"
value_int = _getenv_parse_int(name)
return value_int
end

##### PBS and Torque:

function _autodetect_is_pbs()
# https://docs.adaptivecomputing.com/torque/2-5-12/help.htm#topics/2-jobs/exportedBatchEnvVar.htm
has_PBS_JOBID = _has_env_nonempty("PBS_JOBID")
return has_PBS_JOBID
end

function _torque_get_numtasks()
# https://docs.adaptivecomputing.com/torque/2-5-12/help.htm#topics/2-jobs/exportedBatchEnvVar.htm
name = "PBS_TASKNUM"
value_int = _getenv_parse_int(name)
return value_int

@info "Using auto-detected num_tasks: $(np)"
end

##### General utility functions:

function _getenv_parse_int(name::AbstractString)
if !haskey(ENV, name)
msg = "Environment variable is not defined: $(name)"
error(msg)
end
original_value = ENV[name]
if isempty(original_value)
msg = "Environment variable is defined, but is empty: $(name)"
error(msg)
end
stripped_value_str = strip(original_value)
if isempty(stripped_value)
msg = "Environment variable is defined, but contains only whitespace: $(name)"
error(msg)
end
value_int = tryparse(Int, stripped_value_str)
if !(value_int isa Int)
msg =
"Environment variable \"$(name)\" is defined, " *
"but its value \"$(stripped_value_str)\" could not be parsed as an integer."
error(msg)
end
return value_int
end
2 changes: 1 addition & 1 deletion src/qsub.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ function launch(manager::Union{PBSManager, SGEManager, QRSHManager},
for i in 1:np
config = WorkerConfig()
config.io, io_proc = stream_proc[i]
config.userdata = Dict{Symbol, Any}(:task => i,
config.userdata = Dict{Symbol, Any}(:task => i,
:process => io_proc)
push!(instances_arr, config)
notify(c)
Expand Down

0 comments on commit 076b6b8

Please sign in to comment.