Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing deepspeed distributed call #9540

Merged
merged 7 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Fixed error reporting in DDP process reconciliation when processes are launched by an external agent ([#9389](https://github.com/PyTorchLightning/pytorch-lightning/pull/9389))


- Fixed missing deepspeed distributed call ([#9540](https://github.com/PyTorchLightning/pytorch-lightning/pull/9540))
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved


## [1.4.5] - 2021-08-31

- Fixed reduction using `self.log(sync_dict=True, reduce_fx={mean,max})` ([#9142](https://github.com/PyTorchLightning/pytorch-lightning/pull/9142))
Expand Down
21 changes: 17 additions & 4 deletions pytorch_lightning/plugins/training_type/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from pytorch_lightning.utilities.distributed import log, rank_zero_info, rank_zero_only
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from pytorch_lightning.utilities.imports import _DEEPSPEED_AVAILABLE
from pytorch_lightning.utilities.seed import reset_seed
from pytorch_lightning.utilities.types import _PATH, LRSchedulerTypeTuple
from pytorch_lightning.utilities.warnings import rank_zero_warn, WarningCache

Expand Down Expand Up @@ -334,16 +335,28 @@ def _load_config(self, config):
return config

def setup_distributed(self):
super().setup_distributed()
reset_seed()

# determine which process we are and world size
self.set_world_ranks()

# set warning rank
rank_zero_only.rank = self.global_rank
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved

self.init_deepspeed_distributed()

# set the ranks and devices
self.dist.rank = self.global_rank
self.dist.device = self.root_device
if not self._config_initialized:
self._format_config()
self._config_initialized = True

def init_ddp_connection(self, global_rank: Optional[int] = None, world_size: Optional[int] = None) -> None:
def init_deepspeed_distributed(self) -> None:
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
if platform.system() != "Windows":
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
# do not set env variables on windows, allow deepspeed to control setup
global_rank = global_rank if global_rank is not None else self.cluster_environment.global_rank()
world_size = world_size if world_size is not None else self.cluster_environment.world_size()
global_rank = self.global_rank if self.global_rank is not None else self.cluster_environment.global_rank()
world_size = self.world_size if self.world_size is not None else self.cluster_environment.world_size()
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
self._set_node_environment_variables(global_rank, world_size)
log.info(
"initializing deepspeed distributed: "
Expand Down
10 changes: 7 additions & 3 deletions tests/plugins/test_deepspeed_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from tests.helpers.runif import RunIf

if _DEEPSPEED_AVAILABLE:
import deepspeed
from deepspeed.utils.zero_to_fp32 import convert_zero_checkpoint_to_fp32_state_dict


Expand Down Expand Up @@ -383,12 +384,15 @@ def on_before_accelerator_backend_setup(self, trainer, pl_module) -> None:

@RunIf(min_gpus=2, deepspeed=True, special=True)
def test_deepspeed_multigpu(tmpdir):
"""Test to ensure that DeepSpeed with multiple GPUs works."""
"""Test to ensure that DeepSpeed with multiple GPUs works and deepspeed distributed is initialized
correctly."""
model = BoringModel()
trainer = Trainer(
default_root_dir=tmpdir, plugins=[DeepSpeedPlugin(stage=3)], gpus=2, fast_dev_run=True, precision=16
)
trainer.fit(model)
with mock.patch("deepspeed.init_distributed", wraps=deepspeed.init_distributed) as mock_deepspeed_distributed:
trainer.fit(model)
mock_deepspeed_distributed.assert_called_once()
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
trainer.test(model)

_assert_save_model_is_equal(model, tmpdir, trainer)
Expand Down Expand Up @@ -810,7 +814,7 @@ def test_deepspeed_plugin_env_variables(mock_deepspeed_distributed, tmpdir, plat
plugin = trainer.training_type_plugin
assert isinstance(plugin, DeepSpeedPlugin)
with mock.patch("platform.system", return_value=platform) as mock_platform:
plugin.init_ddp_connection()
plugin.init_deepspeed_distributed()
mock_deepspeed_distributed.assert_called()
mock_platform.assert_called()
if platform == "Windows":
Expand Down