diff --git a/tutorials/casper/Makefile b/tutorials/casper/Makefile new file mode 100644 index 000000000..14caf1ff0 --- /dev/null +++ b/tutorials/casper/Makefile @@ -0,0 +1,19 @@ +SMARTREDIS_FTN = $(SMARTREDIS_FSRC)/client.F90 \ + $(SMARTREDIS_FSRC)/dataset.F90 \ + $(SMARTREDIS_FSRC)/fortran_c_interop.F90 + +SMARTREDIS_OBJ = client.o dataset.o fortran_c_interop.o +MPIFC = mpif90 + +smartredis_put_get_3D: smartredis_put_get_3D.F90 $(SMARTREDIS_OBJ) + $(MPIFC) $< -o $@ $(SMARTREDIS_OBJ) -L$(SMARTREDIS_LIB) -lsmartredis -Wl,-rpath $(SMARTREDIS_LIB) + +%.o : $(SMARTREDIS_FSRC)/%.F90 + $(MPIFC) $< -c -o $@ -I $(SMARTREDIS_INCLUDE) + + +client.o: dataset.o +dataset.o: fortran_c_interop.o + +clean: + $(RM) *.o *.mod diff --git a/tutorials/casper/README.md b/tutorials/casper/README.md new file mode 100644 index 000000000..9957a9bad --- /dev/null +++ b/tutorials/casper/README.md @@ -0,0 +1,60 @@ +# Casper + +```bash +module purge +module use /glade/p/cesmdata/cseg/PROGS/modulefiles/CrayLabs +module load gnu ncarcompilers openmpi netcdf ncarenv cmake +module load SmartRedis +``` + +I also needed a newer version of gmake, it's in /glade/work/jedwards/make-4.3/bin/make + +I am using a python environment created with: +``` +ncar_pylib -c 20201220 /glade/work/$USER/casper_npl_clone +``` + +``pip install smartsim`` + +``smart --device gpu`` + +``pip install smartredis`` + +First you need to build the smartredis_put_get_3D.F90 fortran example: +``` +make +``` + +launch.py is the primary launch script +``` +usage: launch.py [-h] [--db-nodes DB_NODES] [--ngpus-per-node NGPUS_PER_NODE] + [--walltime WALLTIME] [--ensemble-size ENSEMBLE_SIZE] + [--member-nodes MEMBER_NODES] [--account ACCOUNT] + [--db-port DB_PORT] + +optional arguments: + -h, --help show this help message and exit + --db-nodes DB_NODES Number of nodes for the SmartSim database, default=1 + --ngpus-per-node NGPUS_PER_NODE + Number of gpus per SmartSim database node, default=0 + --walltime WALLTIME Total walltime for submitted job, default=00:30:00 + --ensemble-size ENSEMBLE_SIZE + Number of ensemble members to run, default=1 + --member-nodes MEMBER_NODES + Number of nodes per ensemble member, default=1 + --account ACCOUNT Account ID + --db-port DB_PORT db port, default=6780 +``` +It creates pbs jobs from each of the 3 templates +1. resv_job.template +2. launch_database_cluster.template +3. launch_client.template + +and submits the resv_job.sh which in turn will create a reservation large enough for the db and all the ensemble members. +It submits those jobs in the newly created reservation. It starts the database and sets the SSDB environment variable +then launchs each of the clients, all of this is done within the newly created reservation. The database job monitors progress of the clients and exits and removes the reservation when it is complete. + +Note that this launches the database and client jobs separately - The prefered method is to launch the client through SmartSim. + +** Currently to use this feature you must first send a note to cislhelp@ucar.edu and ask for permission to use the +create_resv_from_job feature of PBS. ** \ No newline at end of file diff --git a/tutorials/casper/launch.py b/tutorials/casper/launch.py new file mode 100755 index 000000000..3f7629d40 --- /dev/null +++ b/tutorials/casper/launch.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +import os, sys + +import argparse, subprocess +from string import Template +from utils import run_cmd + +def parse_command_line(args, description): + parser = argparse.ArgumentParser(description=description, + formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument("--db-nodes", default=1, + help="Number of nodes for the SmartSim database, default=1") + parser.add_argument("--ngpus-per-node", default=0, + help="Number of gpus per SmartSim database node, default=0") + parser.add_argument("--walltime", default="00:30:00", + help="Total walltime for submitted job, default=00:30:00") + parser.add_argument("--ensemble-size", default=1, + help="Number of ensemble members to run, default=1") + parser.add_argument("--member-nodes", default=1, + help="Number of nodes per ensemble member, default=1") + parser.add_argument("--account", default="P93300606", + help="Account ID") + parser.add_argument("--db-port", default=6780, + help="db port, default=6780") + + args = parser.parse_args(args[1:]) + ngpus = "" + if int(args.ngpus_per_node) > 0: + ngpus = ":ngpus="+args.ngpus_per_node + + + return {"db_nodes":args.db_nodes, "ngpus": ngpus, "client_nodes": args.ensemble_size*args.member_nodes, + "walltime": args.walltime, "account" : args.account, "member_nodes": args.member_nodes, + "ensemble_size": args.ensemble_size, "db_port": args.db_port, "python_sys_path": sys.path} + +def _main_func(desc): + templatevars = parse_command_line(sys.argv, desc) + + template_files = ["resv_job.template", "launch_database_cluster.template", "launch_client.template"] + + for template in template_files: + with open(template) as f: + src = Template(f.read()) + result = src.safe_substitute(templatevars) + result_file = template.replace("template","sh") + with open(result_file, "w") as f: + f.write(result) + + run_cmd("qsub resv_job.sh", verbose=True) + +if __name__ == "__main__": + _main_func(__doc__) diff --git a/tutorials/casper/launch_client.template b/tutorials/casper/launch_client.template new file mode 100644 index 000000000..91fe6d0a6 --- /dev/null +++ b/tutorials/casper/launch_client.template @@ -0,0 +1,10 @@ +#!/bin/bash +#PBS -N ss_client +#PBS -l select=$member_nodes:ncpus=36:mpiprocs=36 +#PBS -l walltime=$walltime +#PBS -j oe +#PBS -k oed +#PBS -A $account + +np=$(expr $member_nodes \* 36) +mpirun -np $np ./smartredis_put_get_3D diff --git a/tutorials/casper/launch_database_cluster.template b/tutorials/casper/launch_database_cluster.template new file mode 100644 index 000000000..d8f75f7dc --- /dev/null +++ b/tutorials/casper/launch_database_cluster.template @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +#PBS -N smartsimtest +#PBS -r n +#PBS -j oe +#PBS -V +#PBS -l walltime=$walltime +#PBS -A $account +##PBS -q regular +#PBS -V +#PBS -l select=$db_nodes:ncpus=1:ompthreads=1:mpiprocs=1$ngpus + +import os, sys, time + +# The python environment is not passed properly to submitted jobs on casper +_LIBDIR = $python_sys_path +sys.path.extend(_LIBDIR) + +import socket, subprocess +import numpy as np +from utils import run_cmd +from smartsim import Experiment, constants +from smartsim.database import PBSOrchestrator + +""" +Launch a distributed, in memory database cluster and use the +SmartRedis python client to send and recieve some numpy arrays. + +i.e. qsub -l select=3:ncpus=1 -l walltime=00:10:00 -A -q premium -I +""" + +def collect_db_hosts(num_hosts): + """A simple method to collect hostnames because we are using + openmpi. (not needed for aprun(ALPS), Slurm, etc. + """ + + hosts = [] + if "PBS_NODEFILE" in os.environ: + node_file = os.environ["PBS_NODEFILE"] + with open(node_file, "r") as f: + for line in f.readlines(): + host = line.split(".")[0] + hosts.append(host) + else: + raise Exception("could not parse allocation nodes from PBS_NODEFILE") + + # account for mpiprocs causing repeats in PBS_NODEFILE + hosts = list(set(hosts)) + if len(hosts) >= num_hosts: + return hosts[:num_hosts] + else: + raise Exception("PBS_NODEFILE {} had {} hosts, not {}".format(node_file, len(hosts),num_hosts)) + + +def launch_cluster_orc(exp, db_hosts, port): + """Just spin up a database cluster, check the status + and tear it down""" + + print(f"Starting Orchestrator on hosts: {db_hosts}") + # batch = False to launch on existing allocation + db = PBSOrchestrator(port=port, db_nodes=len(db_hosts), batch=False, + run_command="mpirun", hosts=db_hosts) + + # generate directories for output files + # pass in objects to make dirs for + exp.generate(db, overwrite=True) + + # start the database within the reservation allocation + exp.start(db, block=True) + + # get the status of the database + statuses = exp.get_status(db) + print(f"Status of all database nodes: {statuses}") + + return db + +def monitor_client_jobs(rsvname): + jobs_done=False + while not jobs_done: + s, o, e = run_cmd("qstat -q {}".format(rsvname), verbose=True) + jobs_left = o.split()[-2:] + print("Jobs left: Running {} Queued {}".format(int(jobs_left[0]),int(jobs_left[1]))) + if int(jobs_left[0]) + int(jobs_left[1]) == 1: + jobs_done = True + else: + time.sleep(60) + + + + + +# create the experiment and specify PBS because cheyenne is a PBS system +exp = Experiment("launch_cluster_db", launcher="pbs") + +db_port = $db_port +db_hosts = collect_db_hosts($db_nodes) +# start the database +db = launch_cluster_orc(exp, db_hosts, db_port) + +rsvname = os.environ["RSVNAME"] +# stay alive until client jobs have completed +monitor_client_jobs(rsvname) + +# shutdown the database because we don't need it anymore +exp.stop(db) +# delete the job reservation +run_cmd("pbs_rdel {}".format(rsvname)) diff --git a/tutorials/casper/resv_job.template b/tutorials/casper/resv_job.template new file mode 100644 index 000000000..ef8baf621 --- /dev/null +++ b/tutorials/casper/resv_job.template @@ -0,0 +1,39 @@ +#!/bin/bash -x +#PBS -N resv_job +#PBS -l select=$db_nodes:ncpus=1:mpiprocs=1$ngpus+$client_nodes:ncpus=36:mpiprocs=36 +#PBS -l gpu_type=v100 +#PBS -l walltime=$walltime +#PBS -W create_resv_from_job=true +#PBS -j oe +#PBS -k oed +#PBS -q casper +#PBS -A $account + +for rsv in $(qstat -Q|awk '$1 ~ /^R/{print $1}') +do + parent_job=$(pbs_rstat -F $rsv|awk '$1 ~ /^reserve_job/{print $3}') + if [[ "${PBS_JOBID}" == "${parent_job}" ]] ; then + rsvname=$rsv + break + fi +done +if [ -z $rsvname ]; then echo "rsv is unset"; exit -1; else echo "rsv name is set to '$rsvname'"; fi + +me=$(whoami) +pbs_ralter -U $me $rsvname + +db_jobid=$(qsub -q $rsvname -vRSVNAME=$rsvname launch_database_cluster.sh) + +head_host=$(qstat -f $PBS_JOBID|awk '$1 ~ /^exec_host$/{print $3}'|cut -d\/ -f1-1) +# This gets the ib network +SSDB="$(getent hosts ${head_host}-ib|awk '{print $1}'):$db_port" +# This gets the external network +#SSDB="$(getent hosts ${head_host}.ucar.edu |awk '{print $1}'):$db_port" +export SSDB +for i in `seq 1 $ensemble_size`; +do + client_id=$(qsub -q $rsvname -v SSDB ./launch_client.sh) +done + + + diff --git a/tutorials/casper/smartredis_put_get_3D.F90 b/tutorials/casper/smartredis_put_get_3D.F90 new file mode 100644 index 000000000..199d1eae7 --- /dev/null +++ b/tutorials/casper/smartredis_put_get_3D.F90 @@ -0,0 +1,40 @@ +program main + + use mpi + use iso_c_binding + use smartredis_client, only : client_type + + implicit none + + integer, parameter :: dim1 = 10 + integer, parameter :: dim2 = 20 + integer, parameter :: dim3 = 30 + + real(kind=8), dimension(dim1, dim2, dim3) :: recv_array_real_64 + + real(kind=c_double), dimension(dim1, dim2, dim3) :: true_array_real_64 + + integer :: i, j, k + type(client_type) :: client + + integer :: err_code, pe_id + character(len=9) :: key_prefix + + call MPI_init( err_code ) + call MPI_comm_rank( MPI_COMM_WORLD, pe_id, err_code) + write(key_prefix, "(A,I6.6)") "pe_",pe_id + + call random_number(true_array_real_64) + + call random_number(recv_array_real_64) + + call client%initialize(.false.) + + call client%put_tensor(key_prefix//"true_array_real_64", true_array_real_64, shape(true_array_real_64)) + call client%unpack_tensor(key_prefix//"true_array_real_64", recv_array_real_64, shape(recv_array_real_64)) + if (.not. all(true_array_real_64 == recv_array_real_64)) stop 'true_array_real_64: FAILED' + + call mpi_finalize(err_code) + if (pe_id == 0) write(*,*) "SmartRedis MPI Fortran example 3D put/get finished." + +end program main diff --git a/tutorials/casper/utils.py b/tutorials/casper/utils.py new file mode 100644 index 000000000..c822d594a --- /dev/null +++ b/tutorials/casper/utils.py @@ -0,0 +1,82 @@ +import subprocess, os, io + +def _convert_to_fd(filearg, from_dir, mode="a"): + filearg = _get_path(filearg, from_dir) + + return open(filearg, mode) + + +def run_cmd(cmd, input_str=None, from_dir=None, verbose=None, + arg_stdout=subprocess.PIPE, arg_stderr=subprocess.PIPE, env=None, + combine_output=False, timeout=None, executable=None): + """ + Wrapper around subprocess to make it much more convenient to run shell commands + + >>> run_cmd('ls file_i_hope_doesnt_exist')[0] != 0 + True + """ + + # Real defaults for these value should be subprocess.PIPE + if isinstance(arg_stdout, str): + arg_stdout = _convert_to_fd(arg_stdout, from_dir) + + if combine_output: + arg_stderr = subprocess.STDOUT + elif isinstance(arg_stderr, str): + arg_stderr = _convert_to_fd(arg_stdout, from_dir) + + if verbose: + print("RUN: {}\nFROM: {}".format(cmd, os.getcwd() if from_dir is None else from_dir)) + + if (input_str is not None): + stdin = subprocess.PIPE + else: + stdin = None + + proc = subprocess.Popen(cmd, + shell=True, + stdout=arg_stdout, + stderr=arg_stderr, + stdin=stdin, + cwd=from_dir, + executable=executable, + env=env) + + output, errput = proc.communicate(input_str) + + # In Python3, subprocess.communicate returns bytes. We want to work with strings + # as much as possible, so we convert bytes to string (which is unicode in py3) via + # decode. + if output is not None: + try: + output = output.decode('utf-8', errors='ignore') + except AttributeError: + pass + if errput is not None: + try: + errput = errput.decode('utf-8', errors='ignore') + except AttributeError: + pass + + # Always strip outputs + if output: + output = output.strip() + if errput: + errput = errput.strip() + + stat = proc.wait() + if isinstance(arg_stdout, io.IOBase): + arg_stdout.close() # pylint: disable=no-member + if isinstance(arg_stderr, io.IOBase) and arg_stderr is not arg_stdout: + arg_stderr.close() # pylint: disable=no-member + + + if verbose: + if stat != 0: + print(" stat: {:d}\n".format(stat)) + if output: + print(" output: {}\n".format(output)) + if errput: + print(" errput: {}\n".format(errput)) + + return stat, output, errput