Skip to content

Commit 7435b2f

Browse files
authored
Ability to initialize distributed backend outside deepspeed runtime (#608)
1 parent fd2f970 commit 7435b2f

13 files changed

+175
-136
lines changed

DeepSpeedExamples

deepspeed/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from .runtime.activation_checkpointing import checkpointing
1515
from .ops.transformer import DeepSpeedTransformerLayer, DeepSpeedTransformerConfig
1616
from .utils import log_dist
17+
from .utils.distributed import init_distributed
1718

1819
from .pipe import PipelineModule
1920

deepspeed/constants.py

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
'''
2+
Copyright 2020 The Microsoft DeepSpeed Team
3+
'''
4+
5+
#############################################
6+
# Torch distributed constants
7+
#############################################
8+
TORCH_DISTRIBUTED_DEFAULT_PORT = 29500

deepspeed/launcher/constants.py

-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
# Copyright 2020 The Microsoft DeepSpeed Team
22

3-
#############################################
4-
# Torch distributed constants
5-
#############################################
6-
TORCH_DISTRIBUTED_DEFAULT_PORT = 29500
7-
83
PDSH_LAUNCHER = 'pdsh'
94
PDSH_MAX_FAN_OUT = 1024
105

deepspeed/launcher/launch.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from collections import defaultdict
1717
from argparse import ArgumentParser, REMAINDER
1818

19-
from .constants import TORCH_DISTRIBUTED_DEFAULT_PORT
19+
from ..constants import TORCH_DISTRIBUTED_DEFAULT_PORT
2020
from ..utils import logger
2121

2222

deepspeed/launcher/runner.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import torch.cuda
2020

2121
from .multinode_runner import PDSHRunner, OpenMPIRunner, MVAPICHRunner
22-
from .constants import TORCH_DISTRIBUTED_DEFAULT_PORT, \
23-
PDSH_LAUNCHER, OPENMPI_LAUNCHER, MVAPICH_LAUNCHER
22+
from .constants import PDSH_LAUNCHER, OPENMPI_LAUNCHER, MVAPICH_LAUNCHER
23+
from ..constants import TORCH_DISTRIBUTED_DEFAULT_PORT
2424
from ..utils import logger
2525

2626
DLTS_HOSTFILE = "/job/hostfile"

deepspeed/runtime/constants.py

-5
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,6 @@
7373
ZERO_ALLOW_UNTESTED_OPTIMIZER = "zero_allow_untested_optimizer"
7474
ZERO_ALLOW_UNTESTED_OPTIMIZER_DEFAULT = False
7575

76-
#############################################
77-
# Torch distributed constants
78-
#############################################
79-
TORCH_DISTRIBUTED_DEFAULT_PORT = "29500"
80-
8176
# Steps
8277
STEPS_PER_PRINT = "steps_per_print"
8378
STEPS_PER_PRINT_DEFAULT = 10

deepspeed/runtime/engine.py

+6-102
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@
2424
from deepspeed.runtime.dataloader import DeepSpeedDataLoader
2525
from deepspeed.runtime.constants import \
2626
ROUTE_TRAIN, ROUTE_PREDICT, ROUTE_EVAL, \
27-
TORCH_DISTRIBUTED_DEFAULT_PORT, PLD_THETA, PLD_GAMMA
27+
PLD_THETA, PLD_GAMMA
2828
from deepspeed.runtime.zero.constants import \
2929
ZERO_OPTIMIZATION_OPTIMIZER_STATES, ZERO_OPTIMIZATION_GRADIENTS
3030
from deepspeed.runtime.csr_tensor import CSRTensor
3131
import deepspeed.runtime.lr_schedules as lr_schedules
32-
from deepspeed.utils import logger, log_dist
32+
from deepspeed.utils import logger, log_dist, init_distributed
3333
from deepspeed.utils.timer import ThroughputTimer, SynchronizedWallClockTimer
3434
from deepspeed.runtime.progressive_layer_drop import ProgressiveLayerDrop
3535

@@ -130,29 +130,14 @@ def __init__(self,
130130
if dist_init_required is False:
131131
assert (dist.is_initialized()==True), "Torch distributed not initialized. Please set dist_init_required to True or initialize before calling deepspeed.initialize()"
132132

133-
# DeepSpeed will initialize torch distributed only if the user has not already intialized it.
134-
if dist_init_required and not dist.is_initialized():
135-
# discover using mpi4py if user specifies the flag
136-
if hasattr(args, 'deepspeed_mpi') and args.deepspeed_mpi:
137-
# if in Azure ML environment and user specified this flag, notify the user to remove the flag.
138-
if self._in_aml():
139-
logger.warning(
140-
"Please remove the --deepspeed_mpi flag if running on AzureML.")
141-
self._mpi_check(args, dist_init_required)
142-
else:
143-
# detect if we are in Azure ML environment
144-
if self._in_aml():
145-
self._set_environment_variables_for_nccl_backend(args)
146-
147-
logger.info("Initializing torch distributed with backend: {}".format(
148-
self.dist_backend))
149-
dist.init_process_group(backend=self.dist_backend)
133+
# Initialize torch distributed if needed
134+
init_distributed(dist_backend=self.dist_backend)
150135

151136
self._do_args_sanity_check(args)
152137
self._configure_with_arguments(args, mpu)
153138
self._do_sanity_check()
154139

155-
self._init_distributed(dist_init_required)
140+
self._set_distributed_vars()
156141

157142
if self.tensorboard_enabled() and self.global_rank == 0:
158143
self.summary_writer = self.get_summary_writer()
@@ -209,87 +194,6 @@ def __init__(self,
209194
self.flatten = util_ops.flatten
210195
self.unflatten = util_ops.unflatten
211196

212-
def _in_aml(self):
213-
# read AzureML environment variable to detect if we are using an Azure ML environment
214-
if 'AZUREML_EXPERIMENT_ID' in os.environ:
215-
return True
216-
else:
217-
return False
218-
219-
def _set_environment_variables_for_nccl_backend(self,
220-
args,
221-
master_port=6105,
222-
verbose=True):
223-
"""Helper routine to get and set environment variables.
224-
This is adapted from Azure ML's documentation available from:
225-
https://azure.github.io/azureml-web/docs/cheatsheet/distributed-training/#environment-variables-from-openmpi
226-
"""
227-
os.environ["RANK"] = os.environ["OMPI_COMM_WORLD_RANK"]
228-
os.environ["WORLD_SIZE"] = os.environ["OMPI_COMM_WORLD_SIZE"]
229-
single_node = int(os.environ["OMPI_COMM_WORLD_LOCAL_SIZE"]) == int(
230-
os.environ["WORLD_SIZE"])
231-
if not single_node:
232-
master_node_params = os.environ["AZ_BATCH_MASTER_NODE"].split(":")
233-
os.environ["MASTER_ADDR"] = master_node_params[0]
234-
# Do not overwrite master port with that defined in AZ_BATCH_MASTER_NODE
235-
if "MASTER_PORT" not in os.environ:
236-
os.environ["MASTER_PORT"] = str(master_port)
237-
else:
238-
os.environ["MASTER_ADDR"] = os.environ["AZ_BATCHAI_MPI_MASTER_NODE"]
239-
os.environ["MASTER_PORT"] = "54965"
240-
print("NCCL_SOCKET_IFNAME original value = {}".format(
241-
os.environ["NCCL_SOCKET_IFNAME"]))
242-
243-
os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo"
244-
args.local_rank = int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"])
245-
246-
if verbose:
247-
logger.info(
248-
"Discovered AzureML settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}"
249-
.format(os.environ['RANK'],
250-
args.local_rank,
251-
os.environ['WORLD_SIZE'],
252-
os.environ['MASTER_ADDR'],
253-
os.environ['MASTER_PORT']))
254-
255-
def _mpi_check(self, args, dist_init_required):
256-
from mpi4py import MPI
257-
import subprocess
258-
comm = MPI.COMM_WORLD
259-
rank = comm.Get_rank()
260-
world_size = comm.Get_size()
261-
262-
master_addr = None
263-
if rank == 0:
264-
hostname_cmd = ["hostname -I"]
265-
result = subprocess.check_output(hostname_cmd, shell=True)
266-
master_addr = result.decode('utf-8').split()[0]
267-
master_addr = comm.bcast(master_addr, root=0)
268-
269-
# Determine local rank by assuming hostnames are unique
270-
proc_name = MPI.Get_processor_name()
271-
all_procs = comm.allgather(proc_name)
272-
local_rank = sum([i == proc_name for i in all_procs[:rank]])
273-
274-
os.environ['RANK'] = str(rank)
275-
os.environ['WORLD_SIZE'] = str(world_size)
276-
args.local_rank = local_rank
277-
os.environ['MASTER_ADDR'] = master_addr
278-
os.environ['MASTER_PORT'] = TORCH_DISTRIBUTED_DEFAULT_PORT
279-
280-
logger.info(
281-
"Discovered MPI settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}"
282-
.format(os.environ['RANK'],
283-
args.local_rank,
284-
os.environ['WORLD_SIZE'],
285-
os.environ['MASTER_ADDR'],
286-
os.environ['MASTER_PORT']))
287-
288-
if not dist_init_required and dist.is_initialized():
289-
assert dist.get_rank() == rank, "MPI rank {} does not match torch rank {}".format(rank, dist.get_rank())
290-
assert dist.get_world_size() == world_size, "MPI world size {} does not match torch world size {}".format(
291-
world_size, dist.get_world_size())
292-
293197
def pld_enabled(self):
294198
return self._config.pld_enabled
295199

@@ -497,7 +401,7 @@ def _scheduler_from_config(self, optimizer):
497401
else:
498402
return None
499403

500-
def _init_distributed(self, dist_init_required):
404+
def _set_distributed_vars(self):
501405
if self.local_rank >= 0:
502406
torch.cuda.set_device(self.local_rank)
503407
self.device = torch.device("cuda", self.local_rank)

deepspeed/utils/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
from deepspeed.utils.logging import logger, log_dist
1+
from .logging import logger, log_dist
2+
from .distributed import init_distributed
23
from deepspeed.runtime.dataloader import RepeatingLoader

deepspeed/utils/distributed.py

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
'''
2+
Copyright 2020 The Microsoft DeepSpeed Team
3+
'''
4+
import os
5+
import torch
6+
7+
from .logging import logger
8+
from ..constants import TORCH_DISTRIBUTED_DEFAULT_PORT
9+
10+
11+
def init_distributed(dist_backend="nccl",
12+
auto_mpi_discovery=True,
13+
distributed_port=TORCH_DISTRIBUTED_DEFAULT_PORT,
14+
verbose=True):
15+
"""
16+
Initialize torch.distributed backend, potentially performing MPI discovery if needed
17+
Arguments:
18+
dist_backend (str): torch distributed backend, e.g., nccl, mpi, gloo
19+
auto_mpi_discovery (bool): if distributed environment variables are not set, attempt to discover them from MPI
20+
distributed_port (int, optional): torch distributed backend port
21+
verbose (bool, optional): verbose logging
22+
"""
23+
24+
required_env = ["RANK", "WORLD_SIZE", "MASTER_ADDR", "MASTER_PORT", "LOCAL_RANK"]
25+
if auto_mpi_discovery and not all(map(lambda v: v in os.environ, required_env)):
26+
if verbose:
27+
logger.info(
28+
"Not using the DeepSpeed or torch.distributed launchers, attempting to detect MPI environment..."
29+
)
30+
if in_aml() and not in_dlts():
31+
patch_aml_env_for_torch_nccl_backend(verbose=verbose)
32+
else:
33+
mpi_discovery(distributed_port=distributed_port, verbose=verbose)
34+
35+
if not torch.distributed.is_initialized():
36+
if verbose:
37+
logger.info(
38+
"Initializing torch distributed with backend: {}".format(dist_backend))
39+
torch.distributed.init_process_group(backend=dist_backend)
40+
41+
42+
def mpi_discovery(distributed_port=TORCH_DISTRIBUTED_DEFAULT_PORT, verbose=True):
43+
"""
44+
Discovery MPI environment via mpi4py and map to relevant torch.distributed state
45+
"""
46+
from mpi4py import MPI
47+
import subprocess
48+
comm = MPI.COMM_WORLD
49+
rank = comm.Get_rank()
50+
world_size = comm.Get_size()
51+
52+
master_addr = None
53+
if rank == 0:
54+
hostname_cmd = ["hostname -I"]
55+
result = subprocess.check_output(hostname_cmd, shell=True)
56+
master_addr = result.decode('utf-8').split()[0]
57+
master_addr = comm.bcast(master_addr, root=0)
58+
59+
# Determine local rank by assuming hostnames are unique
60+
proc_name = MPI.Get_processor_name()
61+
all_procs = comm.allgather(proc_name)
62+
local_rank = sum([i == proc_name for i in all_procs[:rank]])
63+
64+
os.environ['RANK'] = str(rank)
65+
os.environ['WORLD_SIZE'] = str(world_size)
66+
os.environ['LOCAL_RANK'] = str(local_rank)
67+
os.environ['MASTER_ADDR'] = master_addr
68+
os.environ['MASTER_PORT'] = str(distributed_port)
69+
70+
if verbose:
71+
logger.info(
72+
"Discovered MPI settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}"
73+
.format(os.environ['RANK'],
74+
os.environ['LOCAL_RANK'],
75+
os.environ['WORLD_SIZE'],
76+
os.environ['MASTER_ADDR'],
77+
os.environ['MASTER_PORT']))
78+
79+
if torch.distributed.is_initialized():
80+
assert dist.get_rank() == rank, "MPI rank {} does not match torch rank {}".format(rank, dist.get_rank())
81+
assert dist.get_world_size() == world_size, "MPI world size {} does not match torch world size {}".format(
82+
world_size, dist.get_world_size())
83+
84+
85+
def in_aml():
86+
# Are we running inside an Azure Machine Learning (AML) environment?
87+
return 'AZUREML_EXPERIMENT_ID' in os.environ
88+
89+
90+
def in_dlts():
91+
# Are we running on a DLTS cluster?
92+
return 'DLTS_JOB_ID' in os.environ
93+
94+
95+
def patch_aml_env_for_torch_nccl_backend(master_port=6105, verbose=True):
96+
"""Helper routine to get and set environment variables.
97+
This is adapted from Azure ML's documentation available from:
98+
https://azure.github.io/azureml-web/docs/cheatsheet/distributed-training/#environment-variables-from-openmpi
99+
"""
100+
os.environ["RANK"] = os.environ["OMPI_COMM_WORLD_RANK"]
101+
os.environ["WORLD_SIZE"] = os.environ["OMPI_COMM_WORLD_SIZE"]
102+
single_node = int(os.environ["OMPI_COMM_WORLD_LOCAL_SIZE"]) == int(
103+
os.environ["WORLD_SIZE"])
104+
105+
if not single_node:
106+
master_node_params = os.environ["AZ_BATCH_MASTER_NODE"].split(":")
107+
os.environ["MASTER_ADDR"] = master_node_params[0]
108+
# Do not overwrite master port with that defined in AZ_BATCH_MASTER_NODE
109+
if "MASTER_PORT" not in os.environ:
110+
os.environ["MASTER_PORT"] = str(master_port)
111+
else:
112+
os.environ["MASTER_ADDR"] = os.environ["AZ_BATCHAI_MPI_MASTER_NODE"]
113+
os.environ["MASTER_PORT"] = "54965"
114+
115+
if verbose:
116+
logger.info("NCCL_SOCKET_IFNAME original value = {}".format(
117+
os.environ["NCCL_SOCKET_IFNAME"]))
118+
119+
os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo"
120+
os.environ['LOCAL_RANK'] = os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]
121+
122+
if verbose:
123+
logger.info(
124+
"Discovered AzureML settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}"
125+
.format(os.environ['RANK'],
126+
os.environ['LOCAL_RANK'],
127+
os.environ['WORLD_SIZE'],
128+
os.environ['MASTER_ADDR'],
129+
os.environ['MASTER_PORT']))

docs/_tutorials/getting-started.md

+16-14
Original file line numberDiff line numberDiff line change
@@ -216,25 +216,27 @@ DeepSpeed will then make sure that these environment variables are set when
216216
launching each process on every node across their training job.
217217

218218

219-
### MPI Compatibility
219+
### MPI and AzureML Compatibility
220220
As described above, DeepSpeed provides its own parallel launcher to help launch
221221
multi-node/multi-gpu training jobs. If you prefer to launch your training job
222222
using MPI (e.g., mpirun), we provide support for this. It should be noted that
223223
DeepSpeed will still use the torch distributed NCCL backend and *not* the MPI
224-
backend. To launch your training job with mpirun + DeepSpeed you simply pass us
225-
an additional flag `--deepspeed_mpi`. DeepSpeed will then use
226-
[mpi4py](https://pypi.org/project/mpi4py/) to discover the MPI environment (e.g.,
227-
rank, world size) and properly initialize torch distributed for training. In this
228-
case you will explicitly invoke `python` to launch your model script instead of using
229-
the `deepspeed` launcher, here is an example:
230-
```bash
231-
mpirun <mpi-args> python \
232-
<client_entry.py> <client args> \
233-
--deepspeed_mpi --deepspeed --deepspeed_config ds_config.json
234-
```
224+
backend.
225+
226+
To launch your training job with mpirun + DeepSpeed or with AzureML (which uses
227+
mpirun as a launcher backend) you simply need to install the
228+
[mpi4py](https://pypi.org/project/mpi4py/) python package. DeepSpeed will use
229+
this to discover the MPI environment and pass the necessary state (e.g., world
230+
size, rank) to the torch distributed backend.
235231

236-
If you want to use this feature of DeepSpeed, please ensure that mpi4py is
237-
installed via `pip install mpi4py`.
232+
If you are using model parallelism, pipeline parallelism, or otherwise require
233+
torch.distributed calls before calling `deepspeed.initialize(..)` we provide
234+
the same MPI support with an additional DeepSpeed API call. Replace your initial
235+
`torch.distributed.init_process_group(..)` call with:
236+
237+
```python
238+
deepspeed.init_distributed()
239+
```
238240

239241
## Resource Configuration (single-node)
240242
In the case that we are only running on a single node (with one or more GPUs)

install.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,5 +171,5 @@ else
171171
pdcp -w $hosts dist/deepspeed*.whl $tmp_wheel_path/
172172
pdsh -w $hosts "$PIP_SUDO $PIP_INSTALL $tmp_wheel_path/deepspeed*.whl"
173173
pdsh -w $hosts "ds_report"
174-
pdsh -w $hosts "if [ -d $tmp_wheel_path ]; then rm $tmp_wheel_path/*.whl; rmdir $tmp_wheel_path; fi"
174+
pdsh -w $hosts "if [ -d $tmp_wheel_path ]; then rm $tmp_wheel_path/*.whl; rm $tmp_wheel_path/*.txt; rmdir $tmp_wheel_path; fi"
175175
fi

0 commit comments

Comments
 (0)