Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add collect_results support for Ascend NPU #1309

Merged
merged 3 commits into from
Aug 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions mmengine/dist/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,10 +898,11 @@ def collect_results(results: list,
object.
size (int): Size of the results, commonly equal to length of
the results.
device (str): Device name. Optional values are 'cpu' and 'gpu'.
device (str): Device name. Optional values are 'cpu', 'gpu' or 'npu'.
tmpdir (str | None): Temporal directory for collected results to
store. If set to None, it will create a temporal directory for it.
``tmpdir`` should be None when device is 'gpu'. Defaults to None.
``tmpdir`` should be None when device is 'gpu' or 'npu'.
Defaults to None.

Returns:
list or None: The collected results.
Expand All @@ -920,13 +921,13 @@ def collect_results(results: list,
['foo', 24, {1: 2}, {'a': 'b'}] # rank 0
None # rank 1
"""
if device not in ['gpu', 'cpu']:
if device not in ['gpu', 'cpu', 'npu']:
raise NotImplementedError(
f"device must be 'cpu' or 'gpu', but got {device}")
f"device must be 'cpu' , 'gpu' or 'npu', but got {device}")

if device == 'gpu':
assert tmpdir is None, 'tmpdir should be None when device is "gpu"'
return collect_results_gpu(results, size)
if device == 'gpu' or device == 'npu':
assert tmpdir is None, f'tmpdir should be None when device is {device}'
return _collect_results_device(results, size)
else:
return collect_results_cpu(results, size, tmpdir)

Expand Down Expand Up @@ -1018,6 +1019,28 @@ def collect_results_cpu(result_part: list,
return ordered_results


def _collect_results_device(result_part: list, size: int) -> Optional[list]:
"""Collect results under gpu or npu mode."""
rank, world_size = get_dist_info()
if world_size == 1:
return result_part[:size]

# gather all result part. Note that NCCL does not support gather so use
# all_gather_object instead.
part_list = all_gather_object(result_part)

if rank == 0:
# sort the results
ordered_results = []
for res in zip(*part_list):
ordered_results.extend(list(res))
# the dataloader may pad some samples
ordered_results = ordered_results[:size]
return ordered_results
else:
return None


def collect_results_gpu(result_part: list, size: int) -> Optional[list]:
"""Collect results under gpu mode.

Expand Down Expand Up @@ -1048,24 +1071,7 @@ def collect_results_gpu(result_part: list, size: int) -> Optional[list]:
['foo', 24, {1: 2}, {'a': 'b'}] # rank 0
None # rank 1
"""
rank, world_size = get_dist_info()
if world_size == 1:
return result_part[:size]

# gather all result part. Note that NCCL does not support gather so use
# all_gather_object instead.
part_list = all_gather_object(result_part)

if rank == 0:
# sort the results
ordered_results = []
for res in zip(*part_list):
ordered_results.extend(list(res))
# the dataloader may pad some samples
ordered_results = ordered_results[:size]
return ordered_results
else:
return None
return _collect_results_device(result_part, size)


def _all_reduce_coalesced(tensors: List[torch.Tensor],
Expand Down