Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Move init dist connection into the setup function #6506

Merged
merged 33 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6bf721e
Move connection setup into the setup function. Call setup hook after …
Mar 13, 2021
1576176
Added CHANGELOG.md
Mar 13, 2021
7148ee6
fix setup order in callback test
awaelchli Mar 13, 2021
4fd0c02
fix input arguments in test
awaelchli Mar 13, 2021
cbfa681
Mock distributed function, remove protection to turn into training ty…
Mar 13, 2021
2a1dfbf
Remove import
Mar 13, 2021
e9c3f83
Add missing mock, ensure custom plugin does not create children process
Mar 14, 2021
2141a1f
Merge branch 'master' into fix/setup_ddp_hook
Mar 14, 2021
96ca54f
Merge branch 'master' into fix/setup_ddp_hook
SeanNaren Mar 15, 2021
ffe1c3f
Skip test on windows
Mar 15, 2021
1709cdb
Update deepspeed to init connection in setup
Mar 15, 2021
708f97f
Do not initialize distributed module
Mar 15, 2021
ec33b96
Move DeepSpeed tests to special tests since dist communication is bei…
Mar 16, 2021
d782554
Merge branch 'master' into fix/setup_ddp_hook
Mar 16, 2021
0c03487
Special the test to see if this fixes CI
Mar 16, 2021
edde60b
Delete accelerator connector test to see if its causing build to fail
Mar 16, 2021
9d31742
Delete deepspeed test
Mar 16, 2021
9db893a
Revert "Delete accelerator connector test to see if its causing build…
Mar 16, 2021
56ef252
Revert "Delete deepspeed test"
Mar 16, 2021
cad0671
Reverse hook
Mar 16, 2021
6b7d835
Reverse setup hooks to debug again
Mar 16, 2021
4651e57
Add todo so i know where i left off
Mar 17, 2021
d7ec33e
For single device move in pre_dispatch after setup function
Mar 17, 2021
72097ba
Merge branch 'master' into fix/setup_ddp_hook
Mar 17, 2021
bd2a53a
Add additional model to device hook if any additional parameters have…
Mar 17, 2021
b5450de
See if we can enable deepspeed tests
Mar 17, 2021
136ddc5
Revert "See if we can enable deepspeed tests"
Mar 17, 2021
0210f17
See if this hook approach works
Mar 18, 2021
1bae940
Introduce new granular hooks
Mar 18, 2021
69d6c32
Remove import, fix tpu spawn by moving the function to setup
Mar 18, 2021
91fff3a
Added missing special test
Mar 18, 2021
88e2e09
Merge branch 'master' into fix/setup_ddp_hook
Mar 18, 2021
3eced98
Clean up the setup comment, since its run on train and test
Mar 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Move connection setup into the setup function. Call setup hook after …
…we set up the accelerator
  • Loading branch information
SeanNaren committed Mar 13, 2021
commit 6bf721e372199a6799d8e9e3fc5076727cdfd611
65 changes: 31 additions & 34 deletions pytorch_lightning/plugins/training_type/ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def setup(self, model):
# set the task idx
self.task_idx = self.cluster_environment.local_rank()

self._setup_distributed()

def _call_children_scripts(self):

# bookkeeping of spawned processes
Expand Down Expand Up @@ -161,6 +163,34 @@ def _call_children_scripts(self):
delay = np.random.uniform(1, 5, 1)[0]
sleep(delay)

def _setup_distributed(self):
# TODO: check if needed
seed = os.environ.get("PL_GLOBAL_SEED")
if seed is not None:
seed_everything(int(seed))

# determine which process we are and world size
self.set_world_ranks()

# set warning rank
rank_zero_only.rank = self.global_rank

# set up server using proc 0's ip address
# try to init for 20 times at max in case ports are taken
# where to store ip_table
self.init_ddp_connection(self.global_rank, self.world_size)

# on world_size=0 let everyone know training is starting
if self.is_global_zero and not torch.distributed.is_initialized():
log.info("-" * 100)
log.info(f"distributed_backend={self.distributed_backend}")
log.info(f"All DDP processes registered. Starting ddp with {self.world_size} processes")
log.info("-" * 100)
Comment on lines +183 to +186
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, shall we have this as a single message intend for 4 separate?


# set the ranks and devices
self.dist.rank = self.global_rank
self.dist.device = self.root_device

def _check_can_spawn_children(self):
if self._has_spawned_children:
raise RuntimeError(
Expand All @@ -179,9 +209,7 @@ def pre_configure_ddp(self):
# Many models require setting this parameter to True, as there are corner cases
# when not all parameter backward hooks are fired by the autograd engine even if require_grad is set to True.
# This flag does come with a performance hit, so it is suggested to disable in cases where it is possible.
self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get(
"find_unused_parameters", True
)
self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get("find_unused_parameters", True)
# todo: PyTorch 1.7.0 DDP introduces ``self.reducer._rebuild_buckets()`` breaking manual_optimization
if _TORCH_GREATER_EQUAL_1_7 and not self.lightning_module.automatic_optimization and not self._ddp_kwargs.get(
"find_unused_parameters", False
Expand Down Expand Up @@ -215,37 +243,6 @@ def init_ddp_connection(self, global_rank: int, world_size: int) -> None:
torch_distrib.init_process_group(self.torch_distributed_backend, rank=global_rank, world_size=world_size)

def pre_dispatch(self):
# TODO: check if needed
seed = os.environ.get("PL_GLOBAL_SEED")
if seed is not None:
seed_everything(int(seed))

# determine which process we are and world size
self.set_world_ranks()

# set warning rank
rank_zero_only.rank = self.global_rank

# set up server using proc 0's ip address
# try to init for 20 times at max in case ports are taken
# where to store ip_table
self.init_ddp_connection(self.global_rank, self.world_size)

# TODO: we moved it to the trainer.fit after calling pre_dispatch
# ... need to double check that it is the correct place
# self.trainer.call_setup_hook(self.model)

Comment on lines -231 to -235
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah my silly todo....
"need to double check that it is the correct place"

Thanks for double checking @SeanNaren 😄

# on world_size=0 let everyone know training is starting
if self.is_global_zero and not torch.distributed.is_initialized():
log.info("-" * 100)
log.info(f"distributed_backend={self.distributed_backend}")
log.info(f"All DDP processes registered. Starting ddp with {self.world_size} processes")
log.info("-" * 100)

# set the ranks and devices
self.dist.rank = self.global_rank
self.dist.device = self.root_device

if self.sync_batchnorm:
self.model = self.configure_sync_batchnorm(self.model)

Expand Down
6 changes: 2 additions & 4 deletions pytorch_lightning/trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,9 @@ def fit(
# ----------------------------
# SET UP TRAINING
# ----------------------------
self.call_setup_hook(model)
self.call_hook("on_before_accelerator_backend_setup", model)
self.accelerator.setup(self, model) # note: this sets up self.lightning_module
self.call_setup_hook(model)

# ----------------------------
# INSPECT THE CORE LOOPS
Expand Down Expand Up @@ -922,9 +922,7 @@ def test(

# If you supply a datamodule you can't supply test_dataloaders
if test_dataloaders and datamodule:
raise MisconfigurationException(
'You cannot pass both `trainer.test(test_dataloaders=..., datamodule=...)`'
)
raise MisconfigurationException('You cannot pass both `trainer.test(test_dataloaders=..., datamodule=...)`')

model_provided = model is not None
model = model or self.lightning_module
Expand Down
29 changes: 28 additions & 1 deletion tests/accelerators/test_ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Optional
from unittest import mock
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -91,7 +93,6 @@ def test_torch_distributed_backend_env_variables(tmpdir):
_environ = {"PL_TORCH_DISTRIBUTED_BACKEND": "undefined", "CUDA_VISIBLE_DEVICES": "0,1", "WORLD_SIZE": "2"}
with patch.dict(os.environ, _environ), \
patch('torch.cuda.device_count', return_value=2):

with pytest.raises(ValueError, match="Invalid backend: 'undefined'"):
model = BoringModel()
trainer = Trainer(
Expand All @@ -102,3 +103,29 @@ def test_torch_distributed_backend_env_variables(tmpdir):
logger=False,
)
trainer.fit(model)


@mock.patch('torch.cuda.device_count', return_value=1)
@mock.patch('torch.cuda.is_available', return_value=True)
@mock.patch('torch.cuda.set_device')
@mock.patch.dict(os.environ, {'PL_TORCH_DISTRIBUTED_BACKEND': 'gloo'}, clear=True)
def test_ddp_torch_dist_is_available_in_setup(mock_device_count, mock_is_available, tmpdir):
"""
Test to ensure torch distributed is available within the setup hook using ddp
"""

class TestModel(BoringModel):

def setup(self, stage: Optional[str] = None) -> None:
assert torch.distributed.is_initialized()
raise SystemExit()

model = TestModel()
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=True,
accelerator="ddp",
gpus=1,
)
with pytest.raises(SystemExit):
trainer.fit(model)