Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask help - Addd batch runner section #730

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions docs/Documentation/Development/Languages/Python/dask.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,98 @@ python dask_slurm_example.py

Note that although 2 jobs are requested, Dask launches the jobs dynamically, so depending on the status of the job queue, your results may indicate that only a single node was used.

### Batch Runners

Alternatively, the [`dask-jobqueue`](https://jobqueue.dask.org/en/latest/index.html#) library provides [`batch runners`](https://jobqueue.dask.org/en/stable/runners-overview.html) that are desgined to make it simple to kick off Python scripts as multi-node HPC jobs. In contrast to the dynamic cluster, the batch runner starts when all requested nodes are available.

The following example was modified from the official [`dask-jobqueue`](https://jobqueue.dask.org/en/latest/index.html#) help to reflect usage on Kestrel.

??? example "`dask_slurm_runner_example.py`"

```python
# dask_slurm_runner_example.py
import os
import getpass
import random
from dask.distributed import Client
from dask_jobqueue.slurm import SLURMRunner

user_name = getpass.getuser()
job_id = int(os.environ["SLURM_JOB_ID"])
n_tasks = int(os.environ["SLURM_NTASKS"])
n_nodes = int(os.environ["SLURM_NNODES"])
try:
# This is necessary to specify the correct amount of memory for each worker
mem_per_node = int(os.environ["SLURM_MEM_PER_NODE"])
mem_worker = (1e6 * mem_per_node) / (n_tasks / n_nodes)
except:
print("Couldn't determine SLURM worker memory.")
raise

# When entering the SlurmRunner context manager processes will decide if they should be
# the client, schdeduler or a worker.
# Only process ID 1 executes the contents of the context manager.
# All other processes start the Dask components and then block here forever.
with SLURMRunner(
scheduler_file=f"/scratch/{user_name}/scheduler-{job_id}.json",
scheduler_options={
"dashboard_address": f":{random.randint(30000, 40000)}",
"interface": "hsn0",
},
worker_options={
"memory_limit": mem_worker,
"local_directory": f"/scratch/{user_name}",
"interface": "hsn0",
},
) as runner:
# The runner object contains the scheduler address info and can be used to construct a client.
with Client(runner) as client:

# Wait for all the workers to be ready before continuing.
client.wait_for_workers(runner.n_workers)

print(f"Dask cluster dashboard at: {client.dashboard_link}")
print(f"Dask cluster scheduler address: {client.scheduler.address}")

# Then we can submit some work to the Dask scheduler.
assert client.submit(lambda x: x + 1, 10).result() == 11
assert client.submit(lambda x: x + 1, 20, workers=2).result() == 21

print("Dask SLURMRunner is working!")

# When process ID 1 exits the SlurmRunner context manager it sends a graceful shutdown to the Dask processes.
```

The python script is submitted to SLURM via a sbatch script. Note that because this example job requests two partial nodes, it is submitted to the [`shared` partition](../../../Systems/Kestrel/Running/index.md#shared-node-partition). Be sure to replace `your-HPC-account` accordingly.

??? example "`dask_slurm_runner_launcher.sh`"

```bash
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --ntasks=4
#SBATCH --mem=8G
#SBATCH --partition=shared
#SBATCH --time=10
#SBATCH --account=your-HPC-account # replace with your HPC account
#SBATCH --output=slurm-%j.log

export MALLOC_TRIM_THRESHOLD_=65536

ml conda
conda activate /path/to/dask-env
srun -n 4 python -u dask_slurm_runner_example.py
```

Note that SlurmRunner does not start a distributed nanny process, which would normally set the limits for resources consumed by Dask workers. Thus, you must manually export the `MALLOC_TRIM_THRESHOLD_` variable, which sets the minimum amount of contiguous free memory required to trigger a release of memory back to the system from each worker. You can find further details on worker memory managament [`here`](https://distributed.dask.org/en/stable/worker-memory.html).

The job is then launched as:

```bash
sbatch dask_slurm_runner_launcher.sh
```



## Dask MPI

Expand Down