diff --git a/docs/Documentation/Development/Languages/Python/dask.md b/docs/Documentation/Development/Languages/Python/dask.md index aaceb0cb1..0d7cdac26 100644 --- a/docs/Documentation/Development/Languages/Python/dask.md +++ b/docs/Documentation/Development/Languages/Python/dask.md @@ -142,6 +142,98 @@ python dask_slurm_example.py Note that although 2 jobs are requested, Dask launches the jobs dynamically, so depending on the status of the job queue, your results may indicate that only a single node was used. +### Batch Runners + +Alternatively, the [`dask-jobqueue`](https://jobqueue.dask.org/en/latest/index.html#) library provides [`batch runners`](https://jobqueue.dask.org/en/stable/runners-overview.html) that are desgined to make it simple to kick off Python scripts as multi-node HPC jobs. In contrast to the dynamic cluster, the batch runner starts when all requested nodes are available. + +The following example was modified from the official [`dask-jobqueue`](https://jobqueue.dask.org/en/latest/index.html#) help to reflect usage on Kestrel. + +??? example "`dask_slurm_runner_example.py`" + + ```python + # dask_slurm_runner_example.py + import os + import getpass + import random + from dask.distributed import Client + from dask_jobqueue.slurm import SLURMRunner + + user_name = getpass.getuser() + job_id = int(os.environ["SLURM_JOB_ID"]) + n_tasks = int(os.environ["SLURM_NTASKS"]) + n_nodes = int(os.environ["SLURM_NNODES"]) + try: + # This is necessary to specify the correct amount of memory for each worker + mem_per_node = int(os.environ["SLURM_MEM_PER_NODE"]) + mem_worker = (1e6 * mem_per_node) / (n_tasks / n_nodes) + except: + print("Couldn't determine SLURM worker memory.") + raise + + # When entering the SlurmRunner context manager processes will decide if they should be + # the client, schdeduler or a worker. + # Only process ID 1 executes the contents of the context manager. + # All other processes start the Dask components and then block here forever. + with SLURMRunner( + scheduler_file=f"/scratch/{user_name}/scheduler-{job_id}.json", + scheduler_options={ + "dashboard_address": f":{random.randint(30000, 40000)}", + "interface": "hsn0", + }, + worker_options={ + "memory_limit": mem_worker, + "local_directory": f"/scratch/{user_name}", + "interface": "hsn0", + }, + ) as runner: + # The runner object contains the scheduler address info and can be used to construct a client. + with Client(runner) as client: + + # Wait for all the workers to be ready before continuing. + client.wait_for_workers(runner.n_workers) + + print(f"Dask cluster dashboard at: {client.dashboard_link}") + print(f"Dask cluster scheduler address: {client.scheduler.address}") + + # Then we can submit some work to the Dask scheduler. + assert client.submit(lambda x: x + 1, 10).result() == 11 + assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21 + + print("Dask SLURMRunner is working!") + + # When process ID 1 exits the SlurmRunner context manager it sends a graceful shutdown to the Dask processes. + ``` + +The python script is submitted to SLURM via a sbatch script. Note that because this example job requests two partial nodes, it is submitted to the [`shared` partition](../../../Systems/Kestrel/Running/index.md#shared-node-partition). Be sure to replace `your-HPC-account` accordingly. + +??? example "`dask_slurm_runner_launcher.sh`" + + ```bash + #!/bin/bash + #SBATCH --nodes=2 + #SBATCH --ntasks=4 + #SBATCH --mem=8G + #SBATCH --partition=shared + #SBATCH --time=10 + #SBATCH --account=your-HPC-account # replace with your HPC account + #SBATCH --output=slurm-%j.log + + export MALLOC_TRIM_THRESHOLD_=65536 + + ml conda + conda activate /path/to/dask-env + srun -n 4 python -u dask_slurm_runner_example.py + ``` + +Note that SlurmRunner does not start a distributed nanny process, which would normally set the limits for resources consumed by Dask workers. Thus, you must manually export the `MALLOC_TRIM_THRESHOLD_` variable, which sets the minimum amount of contiguous free memory required to trigger a release of memory back to the system from each worker. You can find further details on worker memory managament [`here`](https://distributed.dask.org/en/stable/worker-memory.html). + +The job is then launched as: + +```bash +sbatch dask_slurm_runner_launcher.sh +``` + + ## Dask MPI