Skip to content

Commit

Permalink
Merge pull request #730 from martin-springer/dask_add_batch_runner_help
Browse files Browse the repository at this point in the history
Dask help - Addd batch runner section
  • Loading branch information
yandthj authored Jan 28, 2025
2 parents b418204 + be8289b commit 469b6a7
Showing 1 changed file with 92 additions and 0 deletions.
92 changes: 92 additions & 0 deletions docs/Documentation/Development/Languages/Python/dask.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,98 @@ python dask_slurm_example.py

Note that although 2 jobs are requested, Dask launches the jobs dynamically, so depending on the status of the job queue, your results may indicate that only a single node was used.

### Batch Runners

Alternatively, the [`dask-jobqueue`](https://jobqueue.dask.org/en/latest/index.html#) library provides [`batch runners`](https://jobqueue.dask.org/en/stable/runners-overview.html) that are desgined to make it simple to kick off Python scripts as multi-node HPC jobs. In contrast to the dynamic cluster, the batch runner starts when all requested nodes are available.

The following example was modified from the official [`dask-jobqueue`](https://jobqueue.dask.org/en/latest/index.html#) help to reflect usage on Kestrel.

??? example "`dask_slurm_runner_example.py`"

```python
# dask_slurm_runner_example.py
import os
import getpass
import random
from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

user_name = getpass.getuser()
job_id = int(os.environ["SLURM_JOB_ID"])
n_tasks = int(os.environ["SLURM_NTASKS"])
n_nodes = int(os.environ["SLURM_NNODES"])
try:
# This is necessary to specify the correct amount of memory for each worker
mem_per_node = int(os.environ["SLURM_MEM_PER_NODE"])
mem_worker = (1e6 * mem_per_node) / (n_tasks / n_nodes)
except:
print("Couldn't determine SLURM worker memory.")
raise

# When entering the SlurmRunner context manager processes will decide if they should be
# the client, schdeduler or a worker.
# Only process ID 1 executes the contents of the context manager.
# All other processes start the Dask components and then block here forever.
with SLURMRunner(
scheduler_file=f"/scratch/{user_name}/scheduler-{job_id}.json",
scheduler_options={
"dashboard_address": f":{random.randint(30000, 40000)}",
"interface": "hsn0",
},
worker_options={
"memory_limit": mem_worker,
"local_directory": f"/scratch/{user_name}",
"interface": "hsn0",
},
) as runner:
# The runner object contains the scheduler address info and can be used to construct a client.
with Client(runner) as client:

# Wait for all the workers to be ready before continuing.
client.wait_for_workers(runner.n_workers)

print(f"Dask cluster dashboard at: {client.dashboard_link}")
print(f"Dask cluster scheduler address: {client.scheduler.address}")

# Then we can submit some work to the Dask scheduler.
assert client.submit(lambda x: x + 1, 10).result() == 11
assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21

print("Dask SLURMRunner is working!")

# When process ID 1 exits the SlurmRunner context manager it sends a graceful shutdown to the Dask processes.
```

The python script is submitted to SLURM via a sbatch script. Note that because this example job requests two partial nodes, it is submitted to the [`shared` partition](../../../Systems/Kestrel/Running/index.md#shared-node-partition). Be sure to replace `your-HPC-account` accordingly.

??? example "`dask_slurm_runner_launcher.sh`"

```bash
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks=4
#SBATCH --mem=8G
#SBATCH --partition=shared
#SBATCH --time=10
#SBATCH --account=your-HPC-account # replace with your HPC account
#SBATCH --output=slurm-%j.log

export MALLOC_TRIM_THRESHOLD_=65536

ml conda
conda activate /path/to/dask-env
srun -n 4 python -u dask_slurm_runner_example.py
```

Note that SlurmRunner does not start a distributed nanny process, which would normally set the limits for resources consumed by Dask workers. Thus, you must manually export the `MALLOC_TRIM_THRESHOLD_` variable, which sets the minimum amount of contiguous free memory required to trigger a release of memory back to the system from each worker. You can find further details on worker memory managament [`here`](https://distributed.dask.org/en/stable/worker-memory.html).

The job is then launched as:

```bash
sbatch dask_slurm_runner_launcher.sh
```



## Dask MPI

Expand Down

0 comments on commit 469b6a7

Please sign in to comment.