From 68ac4e01c8d2d71fe9a4b71783efcfeefa7e73c5 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Wed, 12 May 2021 00:30:10 +0300 Subject: [PATCH 01/21] auxiliary cpu peers initial commit --- hivemind/client/averaging/__init__.py | 21 +++++++---- hivemind/client/averaging/allreduce.py | 50 ++++++++++++++++++-------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index c009cb41d..9a740cd22 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -38,6 +38,8 @@ logger = get_logger(__name__) DEFAULT_CHUNK_SIZE_BYTES = 2 ** 16 +NODE, CLIENT, AUX = 0, 1, 2#TODO ENUM + class DecentralizedAverager(mp.Process, averaging_pb2_grpc.DecentralizedAveragingServicer): """ @@ -95,17 +97,22 @@ def __init__(self, averaged_tensors: Sequence[torch.Tensor], dht: DHT, *, start: compression_type: runtime_pb2.CompressionType = runtime_pb2.CompressionType.NONE, throughput: Optional[float] = None, min_vector_size: int = 0, listen: bool = True, listen_on: Endpoint = '0.0.0.0:*', daemon: bool = True, - channel_options: Optional[Sequence[Tuple[str, Any]]] = None, **kwargs): + channel_options: Optional[Sequence[Tuple[str, Any]]] = None, auxiliary: bool = False, **kwargs): assert '.' not in prefix, "group prefix must be a string without trailing '.'" assert throughput is None or (throughput >= 0 and np.isfinite(np.float32(throughput))), \ "throughput must be a non-negative float32" if not is_power_of_two(target_group_size): logger.warning("It is recommended to set target_group_size to a power of 2.") assert initial_group_bits is None or all(bit in '01' for bit in initial_group_bits) + assert listen or not auxiliary, "auxiliary peers must accept incoming connections" super().__init__() self.dht = dht self.listen, self.listen_on, self.kwargs = listen, listen_on, kwargs + self.mode = NODE if self.listen else CLIENT + if auxiliary: + self.mode = AUX + self.channel_options = channel_options self.daemon = daemon @@ -237,6 +244,7 @@ def step(self, gather: Optional[DataForGather] = None, weight: float = 1.0, time :returns: on success, update averaged_tensors and return group info; on failure, return None """ assert isinstance(weight, (int, float)) and weight > 0, f"Expected a positive int/float, got {type(weight)}" + #TODO if aux, check that weight is default and exactly zero future, _future = MPFuture.make_pair() gather_binary = self.serializer.dumps(gather) # serialize here to avoid loading modules in the averager process self.pipe.send(('_step', [], dict(future=_future, gather_binary=gather_binary, weight=weight, @@ -253,7 +261,7 @@ async def _step(self, *, future: MPFuture, gather_binary: bytes, weight: float, while not future.done(): try: self._pending_group_assembled.clear() - data_for_gather = self.serializer.dumps([weight, self._throughput, self.listen, gather_binary]) + data_for_gather = self.serializer.dumps([weight, self._throughput, self.mode, gather_binary]) group_info = await self._matchmaking.look_for_group(timeout=timeout, data_for_gather=data_for_gather) if group_info is None: @@ -263,7 +271,8 @@ async def _step(self, *, future: MPFuture, gather_binary: bytes, weight: float, self._running_groups[group_id] = allreduce_runner self._pending_group_assembled.set() await asyncio.wait_for(allreduce_runner.run(), self._allreduce_timeout) - await loop.run_in_executor(None, self.update_tensors, allreduce_runner) + if self.mode != AUX: + await loop.run_in_executor(None, self.update_tensors, allreduce_runner) # averaging is finished, exit the loop future.set_result(allreduce_runner.gathered) @@ -297,15 +306,15 @@ async def _make_allreduce_runner(self, group_info: GroupInfo, min_vector_size: i user_gathered = dict(zip(group_info.endpoints, map(self.serializer.loads, user_gathered))) # compute optimal part sizes from peer throughputs - incoming_throughputs = [thr if listen else 0.0 for thr, listen in zip(throughputs, modes)] + incoming_throughputs = [thr if mode != CLIENT else 0.0 for thr, mode in zip(throughputs, modes)] # TODO: replace with proper load balancing part_sizes = await asyncio.get_event_loop().run_in_executor( None, load_balance_peers, self.total_size, incoming_throughputs, min_vector_size) async with self.get_tensors_async() as averaged_tensors: return AllReduceRunner(group_id=group_info.group_id, tensors=averaged_tensors, endpoint=self.endpoint, ordered_group_endpoints=group_info.endpoints, part_sizes=part_sizes, - weights=weights, gathered=user_gathered, return_deltas=True, **kwargs) + weights=weights, gathered=user_gathered, return_deltas=True, modes=modes, **kwargs) except Exception as e: - raise MatchmakingException(f"Unable to create allreduce runner ({e}), group_info: {group_info}") + raise MatchmakingException(f"Unable to create allreduce runner ({e}), group_info: {weights, throughputs, modes, user_gathered}") def update_tensors(self, allreduce_group: AllReduceRunner): """ diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index 06fb3e9da..04aacfd78 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -1,5 +1,5 @@ import asyncio -from typing import Sequence, Set, Dict, Tuple, Iterable, AsyncIterator, Any +from typing import Sequence, Set, Dict, Tuple, Iterable, AsyncIterator, Any, Optional import grpc import torch @@ -13,6 +13,8 @@ GroupID = bytes logger = get_logger(__name__) +NODE, CLIENT, AUX = 0, 1, 2 + class AllReduceProtocol: """ @@ -27,12 +29,15 @@ class AllReduceProtocol: """ def __init__(self, *, group_id: GroupID, tensors: Sequence[torch.Tensor], endpoint: Endpoint, - ordered_group_endpoints: Sequence[Endpoint], part_sizes: Tuple[int, ...], return_deltas: bool = False): + ordered_group_endpoints: Sequence[Endpoint], part_sizes: Tuple[int, ...], return_deltas: bool = False, + modes: Optional[Sequence[int]] = None): assert endpoint in ordered_group_endpoints, "endpoint is not a part of the group" self.group_id, self.endpoint = group_id, endpoint self.ordered_group_endpoints, self.part_sizes = ordered_group_endpoints, part_sizes - self.client_mode_endpoints = {endpoint for endpoint, part_size in zip(self.ordered_group_endpoints, part_sizes) - if part_size == 0} + if modes is None: + modes = [CLIENT if part_size == 0 else NODE for part_size in part_sizes] + self.peer_modes = dict(zip(ordered_group_endpoints, modes)) + self.local_tensor_parts = dict(zip(ordered_group_endpoints, split_into_parts(tensors, part_sizes))) self.tensor_shapes = tuple(tensor.shape for tensor in tensors) self.return_deltas = return_deltas @@ -43,8 +48,10 @@ def __init__(self, *, group_id: GroupID, tensors: Sequence[torch.Tensor], endpoi self.averaged_part: asyncio.Future[torch.Tensor] = asyncio.Future() # will be set to [accumulator / group size] self.averaged_tensor_parts: Dict[Endpoint, torch.Tensor] = {} # averaged chunks from all peers will be put here self.future: asyncio.Future[Sequence[torch.Tensor]] = asyncio.Future() # final result or exception - for endpoint in self.client_mode_endpoints: - self.averaged_tensor_parts[endpoint] = torch.tensor([]) + self.num_senders = len([mode for mode in modes if mode != AUX]) + for endpoint, mode in self.peer_modes.items(): + if mode == CLIENT: #TODO do we really need that + self.averaged_tensor_parts[endpoint] = torch.tensor([]) def __repr__(self): return f"{self.__class__.__name__}({self.endpoint}, group_size={self.group_size})" @@ -65,20 +72,26 @@ async def accumulate_part(self, source: Endpoint, remote_part: torch.Tensor, wei assert not self.future.done(), f"already finished allreduce: {self.future}" assert source in self.local_tensor_parts, "unexpected source, not a part of current group" assert source not in self.accumulated_from, "duplicate source, already received that part" - assert not self.endpoint in self.client_mode_endpoints, f"{self.endpoint} is in client mode" + assert self.peer_modes[self.endpoint] != CLIENT, f"{self.endpoint} is in client mode" assert isinstance(weight, (int, float)) and weight > 0, "averaging weights must be a non-negative int/float" logger.debug(f"{self} - accumulating tensor part from {source}") + print(end=f"{self} - accumulating tensor part from {source}\n") + self.accumulator.add_(remote_part, alpha=weight) self.denominator += weight self.accumulated_from.add(source) - assert len(self.accumulated_from) <= self.group_size - if len(self.accumulated_from) == len(self.local_tensor_parts): + assert len(self.accumulated_from) <= self.num_senders + if len(self.accumulated_from) == self.num_senders: #TODO excluding AUX average_result = self.accumulator.div_(self.denominator) - self.register_averaged_part(self.endpoint, average_result) self.averaged_part.set_result(average_result) + if self.peer_modes[self.endpoint] == AUX: + self.future.set_result(None) # auxiliary mode has finished averaging + else: + self.register_averaged_part(self.endpoint, average_result) + return await self.averaged_part def register_averaged_part(self, source: Endpoint, averaged_part: torch.Tensor): @@ -87,6 +100,7 @@ def register_averaged_part(self, source: Endpoint, averaged_part: torch.Tensor): assert source not in self.averaged_tensor_parts, "already registered the average from this peer" assert averaged_part.shape == self.local_tensor_parts[source].shape, "averaged part shape mismatch" assert averaged_part.dtype == self.local_tensor_parts[source].dtype, "averaged part dtype mismatch" + assert self.peer_modes[self.endpoint] != AUX, "You hear a reasonable explanation on why this behavior is wrong" logger.debug(f"{self} - receiving averaged tensor part from {source}") self.averaged_tensor_parts[source] = averaged_part if len(self.averaged_tensor_parts) == len(self.local_tensor_parts): @@ -133,9 +147,9 @@ class AllReduceRunner(AllReduceProtocol, averaging_pb2_grpc.DecentralizedAveragi def __init__(self, *, group_id: GroupID, tensors: Sequence[torch.Tensor], endpoint: Endpoint, ordered_group_endpoints: Sequence[Endpoint], compression_type: runtime_pb2.CompressionType, chunk_size_bytes: int, part_sizes: Tuple[int, ...], weights: Tuple[float, ...], - gathered: Dict[Endpoint, Any], return_deltas: bool = False): + gathered: Dict[Endpoint, Any], return_deltas: bool = False, **kwargs): super().__init__(group_id=group_id, tensors=tensors, endpoint=endpoint, part_sizes=part_sizes, - ordered_group_endpoints=ordered_group_endpoints, return_deltas=return_deltas) + ordered_group_endpoints=ordered_group_endpoints, return_deltas=return_deltas, **kwargs) self.compression_type, self.chunk_size_bytes, self.gathered = compression_type, chunk_size_bytes, gathered self.peer_weights = dict(zip(self.ordered_group_endpoints, weights)) @@ -144,6 +158,7 @@ def _get_peer_stub(self, peer: Endpoint) -> averaging_pb2_grpc.DecentralizedAver async def _communicate_with_peer(self, peer_endpoint: Endpoint, local_part: torch.Tensor) -> torch.Tensor: """ Send a part of local tensors and metadata to a single peer, receive the average for that part of tensors """ + assert self.peer_modes[self.endpoint] != AUX, "TODO TEXT" if peer_endpoint == self.endpoint: return await self.accumulate_part(self.endpoint, local_part, weight=self.peer_weights[self.endpoint]) serialized_tensor_part = serialize_torch_tensor(local_part, self.compression_type, allow_inplace=False) @@ -182,9 +197,14 @@ async def run(self) -> Sequence[torch.Tensor]: send allreduce requests to all peers and collect results, return the averaged tensor (or deltas) """ try: - await asyncio.gather(self, *(self._communicate_with_peer(peer, self.local_tensor_parts[peer]) - for i, peer in enumerate(self.ordered_group_endpoints) - if peer not in self.client_mode_endpoints)) + if self.peer_modes[self.endpoint] != AUX: + print(f'{self.endpoint} - SENDING STUFF, {self.peer_modes}') + await asyncio.gather(self, *(self._communicate_with_peer(peer, self.local_tensor_parts[peer]) + for i, peer in enumerate(self.ordered_group_endpoints) + if self.peer_modes[peer] != CLIENT)) + else: + print(f'{self.endpoint} - NOT SENDING STUFF {self.peer_modes}') + return await self except BaseException as e: code = averaging_pb2.CANCELLED if isinstance(e, asyncio.CancelledError) else averaging_pb2.INTERNAL_ERROR From 7f55bc2589dea1bf4dfa0d5a9b2236f55a7126d1 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Wed, 12 May 2021 21:27:51 +0300 Subject: [PATCH 02/21] update test_allreduce_once for new aux peers --- hivemind/client/averaging/__init__.py | 21 ++++++++--------- hivemind/client/averaging/allreduce.py | 27 ++++++++++++---------- tests/test_averaging.py | 32 +++++++++++++++----------- 3 files changed, 43 insertions(+), 37 deletions(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index e0a01c0b1..18362d936 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -20,7 +20,7 @@ import numpy as np from hivemind.dht import DHT, DHTID -from hivemind.client.averaging.allreduce import AllReduceRunner, AllreduceException, GroupID, split_into_parts +from hivemind.client.averaging.allreduce import AllReduceRunner, AllreduceException, GroupID, Mode from hivemind.client.averaging.load_balancing import load_balance_peers from hivemind.client.averaging.matchmaking import Matchmaking, MatchmakingException from hivemind.client.averaging.group_info import GroupInfo @@ -38,8 +38,6 @@ logger = get_logger(__name__) DEFAULT_CHUNK_SIZE_BYTES = 2 ** 16 -NODE, CLIENT, AUX = 0, 1, 2#TODO ENUM - class DecentralizedAverager(mp.Process, averaging_pb2_grpc.DecentralizedAveragingServicer): """ @@ -104,14 +102,14 @@ def __init__(self, averaged_tensors: Sequence[torch.Tensor], dht: DHT, *, start: if not is_power_of_two(target_group_size): logger.warning("It is recommended to set target_group_size to a power of 2.") assert initial_group_bits is None or all(bit in '01' for bit in initial_group_bits) - assert listen or not auxiliary, "auxiliary peers must accept incoming connections" + assert listen or not auxiliary, f"auxiliary peers must accept incoming connections" super().__init__() self.dht = dht self.listen, self.listen_on, self.kwargs = listen, listen_on, kwargs - self.mode = NODE if self.listen else CLIENT + self.mode = Mode.NODE if self.listen else Mode.CLIENT if auxiliary: - self.mode = AUX + self.mode = Mode.AUX self.channel_options = channel_options self.daemon = daemon @@ -261,7 +259,7 @@ async def _step(self, *, future: MPFuture, gather_binary: bytes, weight: float, while not future.done(): try: self._pending_group_assembled.clear() - data_for_gather = self.serializer.dumps([weight, self._throughput, self.mode, gather_binary]) + data_for_gather = self.serializer.dumps([weight, self._throughput, self.mode.value, gather_binary]) group_info = await self._matchmaking.look_for_group(timeout=timeout, data_for_gather=data_for_gather) if group_info is None: @@ -271,7 +269,7 @@ async def _step(self, *, future: MPFuture, gather_binary: bytes, weight: float, self._running_groups[group_id] = allreduce_runner self._pending_group_assembled.set() await asyncio.wait_for(allreduce_runner.run(), self._allreduce_timeout) - if self.mode != AUX: + if self.mode != Mode.AUX: await loop.run_in_executor(None, self.update_tensors, allreduce_runner) # averaging is finished, exit the loop @@ -302,11 +300,12 @@ async def _step(self, *, future: MPFuture, gather_binary: bytes, weight: float, async def _make_allreduce_runner(self, group_info: GroupInfo, min_vector_size: int, **kwargs) -> AllReduceRunner: """ Use a group description found by Matchmaking to form AllreduceRunner """ try: - weights, throughputs, modes, user_gathered = zip(*map(self.serializer.loads, group_info.gathered)) + weights, throughputs, modes_ix, user_gathered = zip(*map(self.serializer.loads, group_info.gathered)) user_gathered = dict(zip(group_info.endpoints, map(self.serializer.loads, user_gathered))) - + modes = tuple(map(Mode, modes_ix)) + # compute optimal part sizes from peer throughputs - incoming_throughputs = [thr if mode != CLIENT else 0.0 for thr, mode in zip(throughputs, modes)] # TODO: replace with proper load balancing + incoming_throughputs = [thr if mode != Mode.CLIENT else 0.0 for thr, mode in zip(throughputs, modes)] # TODO: replace with proper load balancing part_sizes = await asyncio.get_event_loop().run_in_executor( None, load_balance_peers, self.total_size, incoming_throughputs, min_vector_size) async with self.get_tensors_async() as averaged_tensors: diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index 04aacfd78..2b0f07a6e 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -1,5 +1,6 @@ import asyncio from typing import Sequence, Set, Dict, Tuple, Iterable, AsyncIterator, Any, Optional +from enum import Enum import grpc import torch @@ -13,7 +14,9 @@ GroupID = bytes logger = get_logger(__name__) -NODE, CLIENT, AUX = 0, 1, 2 + +class Mode(Enum): + NODE, CLIENT, AUX = 0, 1, 2 class AllReduceProtocol: @@ -30,12 +33,12 @@ class AllReduceProtocol: def __init__(self, *, group_id: GroupID, tensors: Sequence[torch.Tensor], endpoint: Endpoint, ordered_group_endpoints: Sequence[Endpoint], part_sizes: Tuple[int, ...], return_deltas: bool = False, - modes: Optional[Sequence[int]] = None): + modes: Optional[Sequence[Mode]] = None): assert endpoint in ordered_group_endpoints, "endpoint is not a part of the group" self.group_id, self.endpoint = group_id, endpoint self.ordered_group_endpoints, self.part_sizes = ordered_group_endpoints, part_sizes if modes is None: - modes = [CLIENT if part_size == 0 else NODE for part_size in part_sizes] + modes = [Mode.CLIENT if part_size == 0 else Mode.NODE for part_size in part_sizes] self.peer_modes = dict(zip(ordered_group_endpoints, modes)) self.local_tensor_parts = dict(zip(ordered_group_endpoints, split_into_parts(tensors, part_sizes))) @@ -48,9 +51,9 @@ def __init__(self, *, group_id: GroupID, tensors: Sequence[torch.Tensor], endpoi self.averaged_part: asyncio.Future[torch.Tensor] = asyncio.Future() # will be set to [accumulator / group size] self.averaged_tensor_parts: Dict[Endpoint, torch.Tensor] = {} # averaged chunks from all peers will be put here self.future: asyncio.Future[Sequence[torch.Tensor]] = asyncio.Future() # final result or exception - self.num_senders = len([mode for mode in modes if mode != AUX]) + self.num_senders = len([mode for mode in modes if mode != Mode.AUX]) for endpoint, mode in self.peer_modes.items(): - if mode == CLIENT: #TODO do we really need that + if mode == Mode.CLIENT: self.averaged_tensor_parts[endpoint] = torch.tensor([]) def __repr__(self): @@ -72,7 +75,7 @@ async def accumulate_part(self, source: Endpoint, remote_part: torch.Tensor, wei assert not self.future.done(), f"already finished allreduce: {self.future}" assert source in self.local_tensor_parts, "unexpected source, not a part of current group" assert source not in self.accumulated_from, "duplicate source, already received that part" - assert self.peer_modes[self.endpoint] != CLIENT, f"{self.endpoint} is in client mode" + assert self.peer_modes[self.endpoint] != Mode.CLIENT, f"{self.endpoint} is in Mode.client mode" assert isinstance(weight, (int, float)) and weight > 0, "averaging weights must be a non-negative int/float" logger.debug(f"{self} - accumulating tensor part from {source}") @@ -83,11 +86,11 @@ async def accumulate_part(self, source: Endpoint, remote_part: torch.Tensor, wei self.accumulated_from.add(source) assert len(self.accumulated_from) <= self.num_senders - if len(self.accumulated_from) == self.num_senders: #TODO excluding AUX + if len(self.accumulated_from) == self.num_senders: average_result = self.accumulator.div_(self.denominator) self.averaged_part.set_result(average_result) - if self.peer_modes[self.endpoint] == AUX: + if self.peer_modes[self.endpoint] == Mode.AUX: self.future.set_result(None) # auxiliary mode has finished averaging else: self.register_averaged_part(self.endpoint, average_result) @@ -100,7 +103,7 @@ def register_averaged_part(self, source: Endpoint, averaged_part: torch.Tensor): assert source not in self.averaged_tensor_parts, "already registered the average from this peer" assert averaged_part.shape == self.local_tensor_parts[source].shape, "averaged part shape mismatch" assert averaged_part.dtype == self.local_tensor_parts[source].dtype, "averaged part dtype mismatch" - assert self.peer_modes[self.endpoint] != AUX, "You hear a reasonable explanation on why this behavior is wrong" + assert self.peer_modes[self.endpoint] != Mode.AUX, "You hear a reasonable explanation on why this behavior is wrong" logger.debug(f"{self} - receiving averaged tensor part from {source}") self.averaged_tensor_parts[source] = averaged_part if len(self.averaged_tensor_parts) == len(self.local_tensor_parts): @@ -158,7 +161,7 @@ def _get_peer_stub(self, peer: Endpoint) -> averaging_pb2_grpc.DecentralizedAver async def _communicate_with_peer(self, peer_endpoint: Endpoint, local_part: torch.Tensor) -> torch.Tensor: """ Send a part of local tensors and metadata to a single peer, receive the average for that part of tensors """ - assert self.peer_modes[self.endpoint] != AUX, "TODO TEXT" + assert self.peer_modes[self.endpoint] != Mode.AUX, "TODO TEXT" if peer_endpoint == self.endpoint: return await self.accumulate_part(self.endpoint, local_part, weight=self.peer_weights[self.endpoint]) serialized_tensor_part = serialize_torch_tensor(local_part, self.compression_type, allow_inplace=False) @@ -197,11 +200,11 @@ async def run(self) -> Sequence[torch.Tensor]: send allreduce requests to all peers and collect results, return the averaged tensor (or deltas) """ try: - if self.peer_modes[self.endpoint] != AUX: + if self.peer_modes[self.endpoint] != Mode.AUX: print(f'{self.endpoint} - SENDING STUFF, {self.peer_modes}') await asyncio.gather(self, *(self._communicate_with_peer(peer, self.local_tensor_parts[peer]) for i, peer in enumerate(self.ordered_group_endpoints) - if self.peer_modes[peer] != CLIENT)) + if self.peer_modes[peer] != Mode.CLIENT)) else: print(f'{self.endpoint} - NOT SENDING STUFF {self.peer_modes}') diff --git a/tests/test_averaging.py b/tests/test_averaging.py index 65bfea96d..a73c0a3e9 100644 --- a/tests/test_averaging.py +++ b/tests/test_averaging.py @@ -5,7 +5,7 @@ import torch import pytest import hivemind -from hivemind.client.averaging.allreduce import AllReduceProtocol, split_into_parts, restore_from_parts +from hivemind.client.averaging.allreduce import AllReduceProtocol, split_into_parts, restore_from_parts, Mode from hivemind.client.averaging.load_balancing import load_balance_peers from hivemind.client.averaging.key_manager import GroupKeyManager from hivemind.utils import Endpoint @@ -42,25 +42,28 @@ async def test_key_manager(): @pytest.mark.forked -@pytest.mark.parametrize("n_client_mode_peers", [0, 2]) -def test_allreduce_once(n_client_mode_peers): +@pytest.mark.parametrize("n_aux", [0, 1, 2]) +@pytest.mark.parametrize("n_clients", [0, 1, 2]) +def test_allreduce_once(n_clients, n_aux): dht = hivemind.DHT(start=True, endpoint=f'{hivemind.LOCALHOST}:*') n_peers = 4 - should_listen = [False] * n_client_mode_peers + [True] * (n_peers - n_client_mode_peers) - random.shuffle(should_listen) - + modes = [Mode.CLIENT] * n_clients + [Mode.AUX] * n_aux + [Mode.NODE] * (n_peers - n_clients - n_aux) + random.shuffle(modes) + tensors1 = [torch.randn(123), torch.zeros(3)] tensors2 = [torch.rand(123), torch.ones(3)] tensors3 = [-torch.rand(123), torch.arange(3).to(torch.float32)] tensors4 = [torch.randn(123) ** 3, torch.arange(3).to(torch.float32) / 2] - - reference = [(tensors1[i] + tensors2[i] + tensors3[i] + tensors4[i]) / 4 for i in range(len(tensors1))] + peer_tensors = [tensors1, tensors2, tensors3, tensors4] + + reference = [sum(tensors[i] for tensors, mode in zip(peer_tensors, modes) + if mode != Mode.AUX) / (n_peers - n_aux) for i in range(len(tensors1))] averagers = [hivemind.DecentralizedAverager(tensors, dht=dht, target_group_size=4, averaging_expiration=15, - prefix='mygroup', listen=listen, listen_on='127.0.0.1:*', - start=True) - for tensors, listen in zip([tensors1, tensors2, tensors3, tensors4], should_listen)] + prefix='mygroup', listen=mode != Mode.CLIENT, listen_on='127.0.0.1:*', + auxiliary=mode == Mode.AUX, start=True) + for tensors, mode in zip(peer_tensors, modes)] futures = [] for averager in averagers: @@ -71,9 +74,10 @@ def test_allreduce_once(n_client_mode_peers): assert averager.endpoint in result for averager in averagers: - with averager.get_tensors() as averaged_tensors: - for ref, our in zip(reference, averaged_tensors): - assert torch.allclose(ref, our, atol=1e-6) + if averager.mode != Mode.AUX: + with averager.get_tensors() as averaged_tensors: + for ref, our in zip(reference, averaged_tensors): + assert torch.allclose(ref, our, atol=1e-6) for averager in averagers: averager.shutdown() From 7cf337ececeb06d25f76dc85ca32b535671b818f Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Wed, 12 May 2021 21:49:05 +0300 Subject: [PATCH 03/21] add test_allreduce_once_edge_cases --- hivemind/client/averaging/allreduce.py | 5 +++++ tests/test_averaging.py | 20 +++++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index 2b0f07a6e..dd4c6981c 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -39,6 +39,7 @@ def __init__(self, *, group_id: GroupID, tensors: Sequence[torch.Tensor], endpoi self.ordered_group_endpoints, self.part_sizes = ordered_group_endpoints, part_sizes if modes is None: modes = [Mode.CLIENT if part_size == 0 else Mode.NODE for part_size in part_sizes] + assert any(mode != Mode.CLIENT for mode in modes), "Cannot run allreduce without reducers." self.peer_modes = dict(zip(ordered_group_endpoints, modes)) self.local_tensor_parts = dict(zip(ordered_group_endpoints, split_into_parts(tensors, part_sizes))) @@ -51,7 +52,11 @@ def __init__(self, *, group_id: GroupID, tensors: Sequence[torch.Tensor], endpoi self.averaged_part: asyncio.Future[torch.Tensor] = asyncio.Future() # will be set to [accumulator / group size] self.averaged_tensor_parts: Dict[Endpoint, torch.Tensor] = {} # averaged chunks from all peers will be put here self.future: asyncio.Future[Sequence[torch.Tensor]] = asyncio.Future() # final result or exception + self.num_senders = len([mode for mode in modes if mode != Mode.AUX]) + + if self.num_senders == 0: + self.future.set_result(None) for endpoint, mode in self.peer_modes.items(): if mode == Mode.CLIENT: self.averaged_tensor_parts[endpoint] = torch.tensor([]) diff --git a/tests/test_averaging.py b/tests/test_averaging.py index a73c0a3e9..bc6c86875 100644 --- a/tests/test_averaging.py +++ b/tests/test_averaging.py @@ -41,10 +41,7 @@ async def test_key_manager(): assert len(q5) == 0 -@pytest.mark.forked -@pytest.mark.parametrize("n_aux", [0, 1, 2]) -@pytest.mark.parametrize("n_clients", [0, 1, 2]) -def test_allreduce_once(n_clients, n_aux): +def _test_allreduce_once(n_clients, n_aux): dht = hivemind.DHT(start=True, endpoint=f'{hivemind.LOCALHOST}:*') n_peers = 4 @@ -58,7 +55,7 @@ def test_allreduce_once(n_clients, n_aux): peer_tensors = [tensors1, tensors2, tensors3, tensors4] reference = [sum(tensors[i] for tensors, mode in zip(peer_tensors, modes) - if mode != Mode.AUX) / (n_peers - n_aux) for i in range(len(tensors1))] + if mode != Mode.AUX) / max(1, n_peers - n_aux) for i in range(len(tensors1))] averagers = [hivemind.DecentralizedAverager(tensors, dht=dht, target_group_size=4, averaging_expiration=15, prefix='mygroup', listen=mode != Mode.CLIENT, listen_on='127.0.0.1:*', @@ -84,6 +81,19 @@ def test_allreduce_once(n_clients, n_aux): dht.shutdown() +@pytest.mark.forked +@pytest.mark.parametrize("n_clients", [0, 1, 2]) +@pytest.mark.parametrize("n_aux", [0, 1, 2]) +def test_allreduce_once(n_clients, n_aux): + _test_allreduce_once(n_clients, n_aux) + + +@pytest.mark.forked +@pytest.mark.parametrize("n_clients, n_aux", [(0, 4), (1, 3), (0, 3)]) +def test_allreduce_once_edge_cases(n_clients, n_aux): + _test_allreduce_once(n_clients, n_aux) + + @pytest.mark.forked def test_allreduce_weighted(n_client_mode_peers: int = 2): dht = hivemind.DHT(start=True, endpoint=f'{hivemind.LOCALHOST}:*') From 9ca834821881a0e750dd8f494ecfdc4e96e3dc2a Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Wed, 12 May 2021 21:54:40 +0300 Subject: [PATCH 04/21] resolve todo --- hivemind/client/averaging/__init__.py | 3 ++- hivemind/client/averaging/allreduce.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index 18362d936..d49a54072 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -242,7 +242,8 @@ def step(self, gather: Optional[DataForGather] = None, weight: float = 1.0, time :returns: on success, update averaged_tensors and return group info; on failure, return None """ assert isinstance(weight, (int, float)) and weight > 0, f"Expected a positive int/float, got {type(weight)}" - #TODO if aux, check that weight is default and exactly zero + if self.mode == Mode.AUX and weight != 1: + logger.warning("Averager is running in auxiliary mode, weight is unused.") future, _future = MPFuture.make_pair() gather_binary = self.serializer.dumps(gather) # serialize here to avoid loading modules in the averager process self.pipe.send(('_step', [], dict(future=_future, gather_binary=gather_binary, weight=weight, diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index dd4c6981c..24ee4df42 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -166,7 +166,7 @@ def _get_peer_stub(self, peer: Endpoint) -> averaging_pb2_grpc.DecentralizedAver async def _communicate_with_peer(self, peer_endpoint: Endpoint, local_part: torch.Tensor) -> torch.Tensor: """ Send a part of local tensors and metadata to a single peer, receive the average for that part of tensors """ - assert self.peer_modes[self.endpoint] != Mode.AUX, "TODO TEXT" + assert self.peer_modes[self.endpoint] != Mode.AUX, "aux peers are disallowed from sending tensors" if peer_endpoint == self.endpoint: return await self.accumulate_part(self.endpoint, local_part, weight=self.peer_weights[self.endpoint]) serialized_tensor_part = serialize_torch_tensor(local_part, self.compression_type, allow_inplace=False) From ac516ec18797a8d8cc938b40aae6824a812f116c Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:14:56 +0300 Subject: [PATCH 05/21] change name for peer averaging mode enum --- hivemind/client/averaging/__init__.py | 14 +++++++------- hivemind/client/averaging/allreduce.py | 24 ++++++++++++------------ tests/test_averaging.py | 12 ++++++------ 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index d49a54072..7ca268773 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -20,7 +20,7 @@ import numpy as np from hivemind.dht import DHT, DHTID -from hivemind.client.averaging.allreduce import AllReduceRunner, AllreduceException, GroupID, Mode +from hivemind.client.averaging.allreduce import AllReduceRunner, AllreduceException, GroupID, AveragingMode from hivemind.client.averaging.load_balancing import load_balance_peers from hivemind.client.averaging.matchmaking import Matchmaking, MatchmakingException from hivemind.client.averaging.group_info import GroupInfo @@ -107,9 +107,9 @@ def __init__(self, averaged_tensors: Sequence[torch.Tensor], dht: DHT, *, start: super().__init__() self.dht = dht self.listen, self.listen_on, self.kwargs = listen, listen_on, kwargs - self.mode = Mode.NODE if self.listen else Mode.CLIENT + self.mode = AveragingMode.NODE if self.listen else AveragingMode.CLIENT if auxiliary: - self.mode = Mode.AUX + self.mode = AveragingMode.AUX self.channel_options = channel_options self.daemon = daemon @@ -242,7 +242,7 @@ def step(self, gather: Optional[DataForGather] = None, weight: float = 1.0, time :returns: on success, update averaged_tensors and return group info; on failure, return None """ assert isinstance(weight, (int, float)) and weight > 0, f"Expected a positive int/float, got {type(weight)}" - if self.mode == Mode.AUX and weight != 1: + if self.mode == AveragingMode.AUX and weight != 1: logger.warning("Averager is running in auxiliary mode, weight is unused.") future, _future = MPFuture.make_pair() gather_binary = self.serializer.dumps(gather) # serialize here to avoid loading modules in the averager process @@ -270,7 +270,7 @@ async def _step(self, *, future: MPFuture, gather_binary: bytes, weight: float, self._running_groups[group_id] = allreduce_runner self._pending_group_assembled.set() await asyncio.wait_for(allreduce_runner.run(), self._allreduce_timeout) - if self.mode != Mode.AUX: + if self.mode != AveragingMode.AUX: await loop.run_in_executor(None, self.update_tensors, allreduce_runner) # averaging is finished, exit the loop @@ -303,10 +303,10 @@ async def _make_allreduce_runner(self, group_info: GroupInfo, min_vector_size: i try: weights, throughputs, modes_ix, user_gathered = zip(*map(self.serializer.loads, group_info.gathered)) user_gathered = dict(zip(group_info.endpoints, map(self.serializer.loads, user_gathered))) - modes = tuple(map(Mode, modes_ix)) + modes = tuple(map(AveragingMode, modes_ix)) # compute optimal part sizes from peer throughputs - incoming_throughputs = [thr if mode != Mode.CLIENT else 0.0 for thr, mode in zip(throughputs, modes)] # TODO: replace with proper load balancing + incoming_throughputs = [thr if mode != AveragingMode.CLIENT else 0.0 for thr, mode in zip(throughputs, modes)] # TODO: replace with proper load balancing part_sizes = await asyncio.get_event_loop().run_in_executor( None, load_balance_peers, self.total_size, incoming_throughputs, min_vector_size) async with self.get_tensors_async() as averaged_tensors: diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index 24ee4df42..f30f57281 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -15,7 +15,7 @@ logger = get_logger(__name__) -class Mode(Enum): +class AveragingMode(Enum): NODE, CLIENT, AUX = 0, 1, 2 @@ -33,13 +33,13 @@ class AllReduceProtocol: def __init__(self, *, group_id: GroupID, tensors: Sequence[torch.Tensor], endpoint: Endpoint, ordered_group_endpoints: Sequence[Endpoint], part_sizes: Tuple[int, ...], return_deltas: bool = False, - modes: Optional[Sequence[Mode]] = None): + modes: Optional[Sequence[AveragingMode]] = None): assert endpoint in ordered_group_endpoints, "endpoint is not a part of the group" self.group_id, self.endpoint = group_id, endpoint self.ordered_group_endpoints, self.part_sizes = ordered_group_endpoints, part_sizes if modes is None: - modes = [Mode.CLIENT if part_size == 0 else Mode.NODE for part_size in part_sizes] - assert any(mode != Mode.CLIENT for mode in modes), "Cannot run allreduce without reducers." + modes = [AveragingMode.CLIENT if part_size == 0 else AveragingMode.NODE for part_size in part_sizes] + assert any(mode != AveragingMode.CLIENT for mode in modes), "Cannot run allreduce without reducers." self.peer_modes = dict(zip(ordered_group_endpoints, modes)) self.local_tensor_parts = dict(zip(ordered_group_endpoints, split_into_parts(tensors, part_sizes))) @@ -53,12 +53,12 @@ def __init__(self, *, group_id: GroupID, tensors: Sequence[torch.Tensor], endpoi self.averaged_tensor_parts: Dict[Endpoint, torch.Tensor] = {} # averaged chunks from all peers will be put here self.future: asyncio.Future[Sequence[torch.Tensor]] = asyncio.Future() # final result or exception - self.num_senders = len([mode for mode in modes if mode != Mode.AUX]) + self.num_senders = len([mode for mode in modes if mode != AveragingMode.AUX]) if self.num_senders == 0: self.future.set_result(None) for endpoint, mode in self.peer_modes.items(): - if mode == Mode.CLIENT: + if mode == AveragingMode.CLIENT: self.averaged_tensor_parts[endpoint] = torch.tensor([]) def __repr__(self): @@ -80,7 +80,7 @@ async def accumulate_part(self, source: Endpoint, remote_part: torch.Tensor, wei assert not self.future.done(), f"already finished allreduce: {self.future}" assert source in self.local_tensor_parts, "unexpected source, not a part of current group" assert source not in self.accumulated_from, "duplicate source, already received that part" - assert self.peer_modes[self.endpoint] != Mode.CLIENT, f"{self.endpoint} is in Mode.client mode" + assert self.peer_modes[self.endpoint] != AveragingMode.CLIENT, f"{self.endpoint} is in AveragingMode.client mode" assert isinstance(weight, (int, float)) and weight > 0, "averaging weights must be a non-negative int/float" logger.debug(f"{self} - accumulating tensor part from {source}") @@ -95,7 +95,7 @@ async def accumulate_part(self, source: Endpoint, remote_part: torch.Tensor, wei average_result = self.accumulator.div_(self.denominator) self.averaged_part.set_result(average_result) - if self.peer_modes[self.endpoint] == Mode.AUX: + if self.peer_modes[self.endpoint] == AveragingMode.AUX: self.future.set_result(None) # auxiliary mode has finished averaging else: self.register_averaged_part(self.endpoint, average_result) @@ -108,7 +108,7 @@ def register_averaged_part(self, source: Endpoint, averaged_part: torch.Tensor): assert source not in self.averaged_tensor_parts, "already registered the average from this peer" assert averaged_part.shape == self.local_tensor_parts[source].shape, "averaged part shape mismatch" assert averaged_part.dtype == self.local_tensor_parts[source].dtype, "averaged part dtype mismatch" - assert self.peer_modes[self.endpoint] != Mode.AUX, "You hear a reasonable explanation on why this behavior is wrong" + assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "You hear a reasonable explanation on why this behavior is wrong" logger.debug(f"{self} - receiving averaged tensor part from {source}") self.averaged_tensor_parts[source] = averaged_part if len(self.averaged_tensor_parts) == len(self.local_tensor_parts): @@ -166,7 +166,7 @@ def _get_peer_stub(self, peer: Endpoint) -> averaging_pb2_grpc.DecentralizedAver async def _communicate_with_peer(self, peer_endpoint: Endpoint, local_part: torch.Tensor) -> torch.Tensor: """ Send a part of local tensors and metadata to a single peer, receive the average for that part of tensors """ - assert self.peer_modes[self.endpoint] != Mode.AUX, "aux peers are disallowed from sending tensors" + assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "aux peers are disallowed from sending tensors" if peer_endpoint == self.endpoint: return await self.accumulate_part(self.endpoint, local_part, weight=self.peer_weights[self.endpoint]) serialized_tensor_part = serialize_torch_tensor(local_part, self.compression_type, allow_inplace=False) @@ -205,11 +205,11 @@ async def run(self) -> Sequence[torch.Tensor]: send allreduce requests to all peers and collect results, return the averaged tensor (or deltas) """ try: - if self.peer_modes[self.endpoint] != Mode.AUX: + if self.peer_modes[self.endpoint] != AveragingMode.AUX: print(f'{self.endpoint} - SENDING STUFF, {self.peer_modes}') await asyncio.gather(self, *(self._communicate_with_peer(peer, self.local_tensor_parts[peer]) for i, peer in enumerate(self.ordered_group_endpoints) - if self.peer_modes[peer] != Mode.CLIENT)) + if self.peer_modes[peer] != AveragingMode.CLIENT)) else: print(f'{self.endpoint} - NOT SENDING STUFF {self.peer_modes}') diff --git a/tests/test_averaging.py b/tests/test_averaging.py index bc6c86875..76c00e0b6 100644 --- a/tests/test_averaging.py +++ b/tests/test_averaging.py @@ -5,7 +5,7 @@ import torch import pytest import hivemind -from hivemind.client.averaging.allreduce import AllReduceProtocol, split_into_parts, restore_from_parts, Mode +from hivemind.client.averaging.allreduce import AllReduceProtocol, split_into_parts, restore_from_parts, AveragingMode from hivemind.client.averaging.load_balancing import load_balance_peers from hivemind.client.averaging.key_manager import GroupKeyManager from hivemind.utils import Endpoint @@ -45,7 +45,7 @@ def _test_allreduce_once(n_clients, n_aux): dht = hivemind.DHT(start=True, endpoint=f'{hivemind.LOCALHOST}:*') n_peers = 4 - modes = [Mode.CLIENT] * n_clients + [Mode.AUX] * n_aux + [Mode.NODE] * (n_peers - n_clients - n_aux) + modes = [AveragingMode.CLIENT] * n_clients + [AveragingMode.AUX] * n_aux + [AveragingMode.NODE] * (n_peers - n_clients - n_aux) random.shuffle(modes) tensors1 = [torch.randn(123), torch.zeros(3)] @@ -55,11 +55,11 @@ def _test_allreduce_once(n_clients, n_aux): peer_tensors = [tensors1, tensors2, tensors3, tensors4] reference = [sum(tensors[i] for tensors, mode in zip(peer_tensors, modes) - if mode != Mode.AUX) / max(1, n_peers - n_aux) for i in range(len(tensors1))] + if mode != AveragingMode.AUX) / max(1, n_peers - n_aux) for i in range(len(tensors1))] averagers = [hivemind.DecentralizedAverager(tensors, dht=dht, target_group_size=4, averaging_expiration=15, - prefix='mygroup', listen=mode != Mode.CLIENT, listen_on='127.0.0.1:*', - auxiliary=mode == Mode.AUX, start=True) + prefix='mygroup', listen=mode != AveragingMode.CLIENT, listen_on='127.0.0.1:*', + auxiliary=mode == AveragingMode.AUX, start=True) for tensors, mode in zip(peer_tensors, modes)] futures = [] @@ -71,7 +71,7 @@ def _test_allreduce_once(n_clients, n_aux): assert averager.endpoint in result for averager in averagers: - if averager.mode != Mode.AUX: + if averager.mode != AveragingMode.AUX: with averager.get_tensors() as averaged_tensors: for ref, our in zip(reference, averaged_tensors): assert torch.allclose(ref, our, atol=1e-6) From fefc7d2d192d0ac1db0a330d8381f1487fd24250 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:21:18 +0300 Subject: [PATCH 06/21] avoid multiple enum assignments on the same line --- hivemind/client/averaging/allreduce.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index f30f57281..aa95c7e09 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -16,7 +16,9 @@ class AveragingMode(Enum): - NODE, CLIENT, AUX = 0, 1, 2 + NODE = 0 + CLIENT = 1 + AUX = 2 class AllReduceProtocol: From 0506dad0ae412c544ca0dd0f63d92fa3788f010b Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:28:40 +0300 Subject: [PATCH 07/21] add info about auxiliary peers to DecentralizedAverager docstring --- hivemind/client/averaging/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index 7ca268773..1aa2ed45b 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -71,6 +71,8 @@ class DecentralizedAverager(mp.Process, averaging_pb2_grpc.DecentralizedAveragin :param channel_options: options for grpc.aio.insecure_channel, e.g. [('grpc.enable_retries', 0)] see https://grpc.github.io/grpc/core/group__grpc__arg__keys.html for a list of all options :param kwargs: extra parameters forwarded to grpc.aio.server + :param auxiliary: if this flag is specified, averager.step will only assist others without sending + local gradients for averaging Example: From 440e6716c8cd4cf87a946ed3808a51f0f1425db1 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:31:54 +0300 Subject: [PATCH 08/21] minor fix --- hivemind/client/averaging/__init__.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index 1aa2ed45b..554bee510 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -18,6 +18,7 @@ from grpc._cython.cygrpc import InternalError import torch import numpy as np +from torch._C import Node from hivemind.dht import DHT, DHTID from hivemind.client.averaging.allreduce import AllReduceRunner, AllreduceException, GroupID, AveragingMode @@ -104,14 +105,17 @@ def __init__(self, averaged_tensors: Sequence[torch.Tensor], dht: DHT, *, start: if not is_power_of_two(target_group_size): logger.warning("It is recommended to set target_group_size to a power of 2.") assert initial_group_bits is None or all(bit in '01' for bit in initial_group_bits) - assert listen or not auxiliary, f"auxiliary peers must accept incoming connections" + assert listen or not auxiliary, "auxiliary peers must accept incoming connections" super().__init__() self.dht = dht self.listen, self.listen_on, self.kwargs = listen, listen_on, kwargs - self.mode = AveragingMode.NODE if self.listen else AveragingMode.CLIENT - if auxiliary: + if not self.listen: + self.mode = AveragingMode.CLIENT + elif auxiliary: self.mode = AveragingMode.AUX + else: + self.mode = AveragingMode.NODE self.channel_options = channel_options self.daemon = daemon From 11d2fda0d24f40b4a9001ca4725731c78945221e Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:34:29 +0300 Subject: [PATCH 09/21] minor fix --- hivemind/client/averaging/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index 554bee510..54c432a1c 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -309,9 +309,8 @@ async def _make_allreduce_runner(self, group_info: GroupInfo, min_vector_size: i try: weights, throughputs, modes_ix, user_gathered = zip(*map(self.serializer.loads, group_info.gathered)) user_gathered = dict(zip(group_info.endpoints, map(self.serializer.loads, user_gathered))) - modes = tuple(map(AveragingMode, modes_ix)) - # compute optimal part sizes from peer throughputs + modes = tuple(map(AveragingMode, modes_ix)) incoming_throughputs = [thr if mode != AveragingMode.CLIENT else 0.0 for thr, mode in zip(throughputs, modes)] # TODO: replace with proper load balancing part_sizes = await asyncio.get_event_loop().run_in_executor( None, load_balance_peers, self.total_size, incoming_throughputs, min_vector_size) From d67f30821e2215973e03163471d2a342c69f8051 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:35:24 +0300 Subject: [PATCH 10/21] rm debug prints --- hivemind/client/averaging/allreduce.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index aa95c7e09..cb703d0b4 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -84,10 +84,8 @@ async def accumulate_part(self, source: Endpoint, remote_part: torch.Tensor, wei assert source not in self.accumulated_from, "duplicate source, already received that part" assert self.peer_modes[self.endpoint] != AveragingMode.CLIENT, f"{self.endpoint} is in AveragingMode.client mode" assert isinstance(weight, (int, float)) and weight > 0, "averaging weights must be a non-negative int/float" - logger.debug(f"{self} - accumulating tensor part from {source}") - print(end=f"{self} - accumulating tensor part from {source}\n") - + logger.debug(f"{self} - accumulating tensor part from {source}") self.accumulator.add_(remote_part, alpha=weight) self.denominator += weight self.accumulated_from.add(source) @@ -208,7 +206,6 @@ async def run(self) -> Sequence[torch.Tensor]: """ try: if self.peer_modes[self.endpoint] != AveragingMode.AUX: - print(f'{self.endpoint} - SENDING STUFF, {self.peer_modes}') await asyncio.gather(self, *(self._communicate_with_peer(peer, self.local_tensor_parts[peer]) for i, peer in enumerate(self.ordered_group_endpoints) if self.peer_modes[peer] != AveragingMode.CLIENT)) From e6586f1cc1b6f25c61e07df16474a051b67e5b3e Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:38:59 +0300 Subject: [PATCH 11/21] minor fix --- hivemind/client/averaging/allreduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index cb703d0b4..8f533cefe 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -108,7 +108,7 @@ def register_averaged_part(self, source: Endpoint, averaged_part: torch.Tensor): assert source not in self.averaged_tensor_parts, "already registered the average from this peer" assert averaged_part.shape == self.local_tensor_parts[source].shape, "averaged part shape mismatch" assert averaged_part.dtype == self.local_tensor_parts[source].dtype, "averaged part dtype mismatch" - assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "You hear a reasonable explanation on why this behavior is wrong" + assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "auxiliary peers do not have local tensors for sending" logger.debug(f"{self} - receiving averaged tensor part from {source}") self.averaged_tensor_parts[source] = averaged_part if len(self.averaged_tensor_parts) == len(self.local_tensor_parts): From 41ecf93c14082c11e618594e15b99854628d79e4 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:40:36 +0300 Subject: [PATCH 12/21] minor fix --- hivemind/client/averaging/allreduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index 8f533cefe..5dc1cebb0 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -166,7 +166,7 @@ def _get_peer_stub(self, peer: Endpoint) -> averaging_pb2_grpc.DecentralizedAver async def _communicate_with_peer(self, peer_endpoint: Endpoint, local_part: torch.Tensor) -> torch.Tensor: """ Send a part of local tensors and metadata to a single peer, receive the average for that part of tensors """ - assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "aux peers are disallowed from sending tensors" + assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "auxiliary peers are disallowed from sending tensors" if peer_endpoint == self.endpoint: return await self.accumulate_part(self.endpoint, local_part, weight=self.peer_weights[self.endpoint]) serialized_tensor_part = serialize_torch_tensor(local_part, self.compression_type, allow_inplace=False) From e6f0b9cee0d2bb744f665f95f4bbeb91902596f7 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:42:32 +0300 Subject: [PATCH 13/21] minor fix --- hivemind/client/averaging/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index 54c432a1c..140323f68 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -73,7 +73,7 @@ class DecentralizedAverager(mp.Process, averaging_pb2_grpc.DecentralizedAveragin see https://grpc.github.io/grpc/core/group__grpc__arg__keys.html for a list of all options :param kwargs: extra parameters forwarded to grpc.aio.server :param auxiliary: if this flag is specified, averager.step will only assist others without sending - local gradients for averaging + local tensors for averaging Example: From e3623b3b9d0ec0105dc60e71253579687d0d9c4d Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 15:53:00 +0300 Subject: [PATCH 14/21] minor fix --- hivemind/client/averaging/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index 140323f68..e01b7cac4 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -18,7 +18,6 @@ from grpc._cython.cygrpc import InternalError import torch import numpy as np -from torch._C import Node from hivemind.dht import DHT, DHTID from hivemind.client.averaging.allreduce import AllReduceRunner, AllreduceException, GroupID, AveragingMode From 044ff1316acf8230ad66a98aee5641c1648861f0 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 16:31:39 +0300 Subject: [PATCH 15/21] fix naming --- hivemind/client/averaging/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index e01b7cac4..293b01c7a 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -306,10 +306,10 @@ async def _step(self, *, future: MPFuture, gather_binary: bytes, weight: float, async def _make_allreduce_runner(self, group_info: GroupInfo, min_vector_size: int, **kwargs) -> AllReduceRunner: """ Use a group description found by Matchmaking to form AllreduceRunner """ try: - weights, throughputs, modes_ix, user_gathered = zip(*map(self.serializer.loads, group_info.gathered)) + weights, throughputs, mode_ids, user_gathered = zip(*map(self.serializer.loads, group_info.gathered)) user_gathered = dict(zip(group_info.endpoints, map(self.serializer.loads, user_gathered))) # compute optimal part sizes from peer throughputs - modes = tuple(map(AveragingMode, modes_ix)) + modes = tuple(map(AveragingMode, mode_ids)) incoming_throughputs = [thr if mode != AveragingMode.CLIENT else 0.0 for thr, mode in zip(throughputs, modes)] # TODO: replace with proper load balancing part_sizes = await asyncio.get_event_loop().run_in_executor( None, load_balance_peers, self.total_size, incoming_throughputs, min_vector_size) From d720492f46c6df6f64170b954772185ef8bf2d0c Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Fri, 14 May 2021 16:31:46 +0300 Subject: [PATCH 16/21] minor fix --- hivemind/client/averaging/allreduce.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/hivemind/client/averaging/allreduce.py b/hivemind/client/averaging/allreduce.py index 5dc1cebb0..bb35c481a 100644 --- a/hivemind/client/averaging/allreduce.py +++ b/hivemind/client/averaging/allreduce.py @@ -108,7 +108,7 @@ def register_averaged_part(self, source: Endpoint, averaged_part: torch.Tensor): assert source not in self.averaged_tensor_parts, "already registered the average from this peer" assert averaged_part.shape == self.local_tensor_parts[source].shape, "averaged part shape mismatch" assert averaged_part.dtype == self.local_tensor_parts[source].dtype, "averaged part dtype mismatch" - assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "auxiliary peers do not have local tensors for sending" + assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "Auxiliary peers do not have local tensors for sending" logger.debug(f"{self} - receiving averaged tensor part from {source}") self.averaged_tensor_parts[source] = averaged_part if len(self.averaged_tensor_parts) == len(self.local_tensor_parts): @@ -166,7 +166,7 @@ def _get_peer_stub(self, peer: Endpoint) -> averaging_pb2_grpc.DecentralizedAver async def _communicate_with_peer(self, peer_endpoint: Endpoint, local_part: torch.Tensor) -> torch.Tensor: """ Send a part of local tensors and metadata to a single peer, receive the average for that part of tensors """ - assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "auxiliary peers are disallowed from sending tensors" + assert self.peer_modes[self.endpoint] != AveragingMode.AUX, "Auxiliary peers are disallowed from sending tensors" if peer_endpoint == self.endpoint: return await self.accumulate_part(self.endpoint, local_part, weight=self.peer_weights[self.endpoint]) serialized_tensor_part = serialize_torch_tensor(local_part, self.compression_type, allow_inplace=False) @@ -209,9 +209,6 @@ async def run(self) -> Sequence[torch.Tensor]: await asyncio.gather(self, *(self._communicate_with_peer(peer, self.local_tensor_parts[peer]) for i, peer in enumerate(self.ordered_group_endpoints) if self.peer_modes[peer] != AveragingMode.CLIENT)) - else: - print(f'{self.endpoint} - NOT SENDING STUFF {self.peer_modes}') - return await self except BaseException as e: code = averaging_pb2.CANCELLED if isinstance(e, asyncio.CancelledError) else averaging_pb2.INTERNAL_ERROR From 16f42cec7fecdc9a88c2682d6e931260e3f4f42d Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Thu, 20 May 2021 21:01:20 +0300 Subject: [PATCH 17/21] fix bug --- hivemind/client/averaging/matchmaking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/client/averaging/matchmaking.py b/hivemind/client/averaging/matchmaking.py index 8ec866e51..c325e3148 100644 --- a/hivemind/client/averaging/matchmaking.py +++ b/hivemind/client/averaging/matchmaking.py @@ -391,7 +391,7 @@ async def pop_next_leader(self) -> Endpoint: if maybe_next_leader is None or self.max_assured_time <= entry.expiration_time <= self.search_end_time: self.update_triggered.set() - if maybe_next_leader is None or entry.expiration_time >= self.declared_expiration_time: + if maybe_next_leader is None or entry.expiration_time > self.declared_expiration_time: await asyncio.wait({self.update_finished.wait(), self.declared_expiration.wait()}, return_when=asyncio.FIRST_COMPLETED) self.declared_expiration.clear() From 73703d154ae9ec9b2ce1efe96dfee3f217aa87f7 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Thu, 20 May 2021 21:11:34 +0300 Subject: [PATCH 18/21] fix bug (for good now) --- hivemind/client/averaging/matchmaking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/client/averaging/matchmaking.py b/hivemind/client/averaging/matchmaking.py index c325e3148..eb50fdf0f 100644 --- a/hivemind/client/averaging/matchmaking.py +++ b/hivemind/client/averaging/matchmaking.py @@ -391,7 +391,7 @@ async def pop_next_leader(self) -> Endpoint: if maybe_next_leader is None or self.max_assured_time <= entry.expiration_time <= self.search_end_time: self.update_triggered.set() - if maybe_next_leader is None or entry.expiration_time > self.declared_expiration_time: + if maybe_next_leader is None or (entry.expiration_time, maybe_next_leader) > (self.declared_expiration_time, self.endpoint): await asyncio.wait({self.update_finished.wait(), self.declared_expiration.wait()}, return_when=asyncio.FIRST_COMPLETED) self.declared_expiration.clear() From 761cd98f57c22f57026e3269e6183ff675483a52 Mon Sep 17 00:00:00 2001 From: Dmitry Popov Date: Thu, 20 May 2021 21:14:44 +0300 Subject: [PATCH 19/21] mv assert --- hivemind/client/averaging/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index 293b01c7a..7851ad89c 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -246,9 +246,11 @@ def step(self, gather: Optional[DataForGather] = None, weight: float = 1.0, time :param wait: if True (default), return when finished. Otherwise return MPFuture and run in background. :returns: on success, update averaged_tensors and return group info; on failure, return None """ - assert isinstance(weight, (int, float)) and weight > 0, f"Expected a positive int/float, got {type(weight)}" if self.mode == AveragingMode.AUX and weight != 1: logger.warning("Averager is running in auxiliary mode, weight is unused.") + else: + assert isinstance(weight, (int, float)) and weight > 0, f"Expected a positive int/float, got {type(weight)}" + future, _future = MPFuture.make_pair() gather_binary = self.serializer.dumps(gather) # serialize here to avoid loading modules in the averager process self.pipe.send(('_step', [], dict(future=_future, gather_binary=gather_binary, weight=weight, From 24fa02d4965a1bff7210f960634a5f06d5dec0a2 Mon Sep 17 00:00:00 2001 From: justheuristic Date: Thu, 20 May 2021 23:44:38 +0300 Subject: [PATCH 20/21] implement a flag that disables state sharing in averager --- hivemind/client/averaging/__init__.py | 37 +++++++++++++++++++++++---- tests/test_averaging.py | 7 +++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/hivemind/client/averaging/__init__.py b/hivemind/client/averaging/__init__.py index 7851ad89c..6bc642485 100644 --- a/hivemind/client/averaging/__init__.py +++ b/hivemind/client/averaging/__init__.py @@ -73,6 +73,8 @@ class DecentralizedAverager(mp.Process, averaging_pb2_grpc.DecentralizedAveragin :param kwargs: extra parameters forwarded to grpc.aio.server :param auxiliary: if this flag is specified, averager.step will only assist others without sending local tensors for averaging + :param allow_state_sharing: if set to True, other peers can download this peer's state. Can be overwritten + with averager.allow_state_sharing = True / False Example: @@ -96,8 +98,9 @@ def __init__(self, averaged_tensors: Sequence[torch.Tensor], dht: DHT, *, start: allreduce_timeout: Optional[float] = None, averaging_alpha: float = 1.0, compression_type: runtime_pb2.CompressionType = runtime_pb2.CompressionType.NONE, throughput: Optional[float] = None, min_vector_size: int = 0, + auxiliary: bool = False, allow_state_sharing: Optional[bool] = None, listen: bool = True, listen_on: Endpoint = '0.0.0.0:*', daemon: bool = True, - channel_options: Optional[Sequence[Tuple[str, Any]]] = None, auxiliary: bool = False, **kwargs): + channel_options: Optional[Sequence[Tuple[str, Any]]] = None, **kwargs): assert '.' not in prefix, "group prefix must be a string without trailing '.'" assert throughput is None or (throughput >= 0 and np.isfinite(np.float32(throughput))), \ "throughput must be a non-negative float32" @@ -139,6 +142,10 @@ def __init__(self, averaged_tensors: Sequence[torch.Tensor], dht: DHT, *, start: self._pipe, self.pipe = mp.Pipe(duplex=True) # a control pipe used to communicate with a background process self._port = mp.Value(ctypes.c_uint32, 0) # assigned when averager starts, accessible via self.port + + self._allow_state_sharing = mp.Value(ctypes.c_bool, 0) + self.allow_state_sharing = (listen and not auxiliary) if allow_state_sharing is None else allow_state_sharing + self._averager_endpoint: Optional[Endpoint] = None if not self.listen: self._averager_endpoint = f'client::{uuid.uuid4()}' @@ -156,6 +163,18 @@ def __init__(self, averaged_tensors: Sequence[torch.Tensor], dht: DHT, *, start: def port(self) -> Optional[Port]: return self._port.value if self._port.value != 0 else None + @property + def allow_state_sharing(self) -> bool: + """ if set to True, other peers can download this peer's state """ + return bool(self._allow_state_sharing.value) + + @allow_state_sharing.setter + def allow_state_sharing(self, value: bool): + if value is True and not self.listen: + logger.warning("Cannot allow state sharing: averager in client mode (listen=False) cannot share its state.") + else: + self._allow_state_sharing.value = value + @property def endpoint(self) -> Optional[Endpoint]: if self.listen and self._averager_endpoint is None: @@ -381,10 +400,11 @@ async def rpc_aggregate_part(self, stream: AsyncIterator[averaging_pb2.Averaging async def _declare_for_download_periodically(self): download_key = f'{self._matchmaking.group_key_manager.prefix}.all_averagers' while True: - asyncio.create_task(asyncio.wait_for(self.dht.store( - download_key, subkey=self.endpoint, value=self.last_updated, - expiration_time=get_dht_time() + self._matchmaking.averaging_expiration, return_future=True), - timeout=self._matchmaking.averaging_expiration)) + if self.allow_state_sharing: + asyncio.create_task(asyncio.wait_for(self.dht.store( + download_key, subkey=self.endpoint, value=self.last_updated, + expiration_time=get_dht_time() + self._matchmaking.averaging_expiration, return_future=True), + timeout=self._matchmaking.averaging_expiration)) await asyncio.sleep(self._matchmaking.averaging_expiration) async def rpc_download_state(self, request: averaging_pb2.DownloadRequest, context: grpc.ServicerContext @@ -396,6 +416,8 @@ async def rpc_download_state(self, request: averaging_pb2.DownloadRequest, conte - serialized_metadata is a small serialized bytestring meant to store scalars and hyperparameters - tensors is a sequence of pytorch tensors that represent model parameters or optimizer statistics """ + if not self.allow_state_sharing: + return # deny request and direct peer to the next prospective averager chunk_size_bytes = self.matchmaking_kwargs.get('chunk_size_bytes', DEFAULT_CHUNK_SIZE_BYTES) metadata, tensors = await self._get_current_state_from_host_process() @@ -467,6 +489,11 @@ async def _load_state_from_peers(self, future: MPFuture): current_tensor_parts.append(message.tensor_part) if current_tensor_parts: tensors.append(deserialize_torch_tensor(combine_from_streaming(current_tensor_parts))) + + if not metadata: + logger.debug(f"Peer {peer} did not send its state.") + continue + logger.info(f"Finished downloading state from {peer}") future.set_result((metadata, tensors)) self.last_updated = get_dht_time() diff --git a/tests/test_averaging.py b/tests/test_averaging.py index 76c00e0b6..50a185acf 100644 --- a/tests/test_averaging.py +++ b/tests/test_averaging.py @@ -383,6 +383,13 @@ def get_current_state(self): assert got_metadata == super_metadata assert all(map(torch.allclose, got_tensors, super_tensors)) + averager1.allow_state_sharing = False + assert averager2.load_state_from_peers() is None + averager1.allow_state_sharing = True + got_metadata, got_tensors = averager2.load_state_from_peers() + assert num_calls == 3 + assert got_metadata == super_metadata + @pytest.mark.forked def test_getset_bits(): From a8e71fad80f60395b977ba7453897ef1495bdb3e Mon Sep 17 00:00:00 2001 From: justheuristic Date: Fri, 21 May 2021 00:03:01 +0300 Subject: [PATCH 21/21] more natural parameterization of batch_size vs batch_size_per_step --- hivemind/optim/collaborative.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/optim/collaborative.py b/hivemind/optim/collaborative.py index 5bfde8a4f..72dc27c92 100644 --- a/hivemind/optim/collaborative.py +++ b/hivemind/optim/collaborative.py @@ -191,7 +191,7 @@ def step(self, batch_size: Optional[int] = None, **kwargs): with self.lock_local_progress: self.local_samples_accumulated += batch_size self.local_steps_accumulated += 1 - self.performance_ema.update(num_processed=self.batch_size_per_step) + self.performance_ema.update(num_processed=batch_size) self.should_report_progress.set() if not self.collaboration_state.ready_for_step: