Skip to content

Commit

Permalink
[Feature]Add collect_results for Ascend NPU
Browse files Browse the repository at this point in the history
  • Loading branch information
xuuyangg committed Aug 17, 2023
1 parent 488fddc commit 7a7842f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 11 deletions.
18 changes: 10 additions & 8 deletions mmengine/dist/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from .dist import (all_gather_object, all_reduce, all_gather, all_reduce_dict,
collect_results, gather, broadcast, gather_object,
sync_random_seed, broadcast_object_list,
collect_results_cpu, collect_results_gpu, all_reduce_params)
collect_results_cpu, collect_results_gpu,
collect_results_npu, all_reduce_params)
from .utils import (get_dist_info, init_dist, init_local_group, get_backend,
get_world_size, get_rank, get_local_size, get_local_rank,
is_main_process, master_only, barrier, get_local_group,
Expand All @@ -11,11 +12,12 @@

__all__ = [
'all_gather_object', 'all_reduce', 'all_gather', 'all_reduce_dict',
'collect_results', 'collect_results_cpu', 'collect_results_gpu', 'gather',
'broadcast', 'gather_object', 'sync_random_seed', 'broadcast_object_list',
'get_dist_info', 'init_dist', 'init_local_group', 'get_backend',
'get_world_size', 'get_rank', 'get_local_size', 'get_local_group',
'get_local_rank', 'is_main_process', 'master_only', 'barrier',
'is_distributed', 'get_default_group', 'all_reduce_params',
'get_data_device', 'get_comm_device', 'cast_data_device', 'infer_launcher'
'collect_results', 'collect_results_cpu', 'collect_results_gpu',
'collect_results_npu', 'gather', 'broadcast', 'gather_object',
'sync_random_seed', 'broadcast_object_list', 'get_dist_info', 'init_dist',
'init_local_group', 'get_backend', 'get_world_size', 'get_rank',
'get_local_size', 'get_local_group', 'get_local_rank', 'is_main_process',
'master_only', 'barrier', 'is_distributed', 'get_default_group',
'all_reduce_params', 'get_data_device', 'get_comm_device',
'cast_data_device', 'infer_launcher'
]
60 changes: 57 additions & 3 deletions mmengine/dist/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,10 +898,11 @@ def collect_results(results: list,
object.
size (int): Size of the results, commonly equal to length of
the results.
device (str): Device name. Optional values are 'cpu' and 'gpu'.
device (str): Device name. Optional values are 'cpu', 'gpu' and 'npu'.
tmpdir (str | None): Temporal directory for collected results to
store. If set to None, it will create a temporal directory for it.
``tmpdir`` should be None when device is 'gpu'. Defaults to None.
``tmpdir`` should be None when device is 'gpu' and 'npu'.
Defaults to None.
Returns:
list or None: The collected results.
Expand All @@ -920,13 +921,16 @@ def collect_results(results: list,
['foo', 24, {1: 2}, {'a': 'b'}] # rank 0
None # rank 1
"""
if device not in ['gpu', 'cpu']:
if device not in ['gpu', 'cpu', 'npu']:
raise NotImplementedError(
f"device must be 'cpu' or 'gpu', but got {device}")

if device == 'gpu':
assert tmpdir is None, 'tmpdir should be None when device is "gpu"'
return collect_results_gpu(results, size)
elif device == 'npu':
assert tmpdir is None, 'tmpdir should be None when device is "npu"'
return collect_results_npu(results, size)
else:
return collect_results_cpu(results, size, tmpdir)

Expand Down Expand Up @@ -1068,6 +1072,56 @@ def collect_results_gpu(result_part: list, size: int) -> Optional[list]:
return None


def collect_results_npu(result_part: list, size: int) -> Optional[list]:
"""Collect results under npu mode.
On npu mode, this function will encode results to npu tensors and use npu
communication for results collection.
Args:
result_part (list[object]): Result list containing result parts
to be collected. Each item of ``result_part`` should be a picklable
object.
size (int): Size of the results, commonly equal to length of
the results.
Returns:
list or None: The collected results.
Examples:
>>> # distributed environment
>>> # We have 2 process groups, 2 ranks.
>>> import mmengine.dist as dist
>>> if dist.get_rank() == 0:
data = ['foo', {1: 2}]
else:
data = [24, {'a': 'b'}]
>>> size = 4
>>> output = dist.collect_results_npu(data, size)
>>> output
['foo', 24, {1: 2}, {'a': 'b'}] # rank 0
None # rank 1
"""
rank, world_size = get_dist_info()
if world_size == 1:
return result_part[:size]

# gather all result part. Note that NCCL does not support gather so use
# all_gather_object instead.
part_list = all_gather_object(result_part)

if rank == 0:
# sort the results
ordered_results = []
for res in zip(*part_list):
ordered_results.extend(list(res))
# the dataloader may pad some samples
ordered_results = ordered_results[:size]
return ordered_results
else:
return None


def _all_reduce_coalesced(tensors: List[torch.Tensor],
bucket_size_mb: int = -1,
op: str = 'sum',
Expand Down

0 comments on commit 7a7842f

Please sign in to comment.