From 20a48997f6d16fb6f994cc2f15fa806b34831f2a Mon Sep 17 00:00:00 2001 From: Aleksandr Borzunov Date: Thu, 29 Jul 2021 00:33:31 +0300 Subject: [PATCH 1/9] Set default DHTNode.num_workers = 8 Co-authored-by: justheuristic Co-authored-by: Denis Mazur --- hivemind/dht/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/dht/node.py b/hivemind/dht/node.py index 6d126d56b..1354fc82e 100644 --- a/hivemind/dht/node.py +++ b/hivemind/dht/node.py @@ -110,7 +110,7 @@ async def create( cache_refresh_before_expiry: float = 5, cache_on_store: bool = True, reuse_get_requests: bool = True, - num_workers: int = 1, + num_workers: int = 8, chunk_size: int = 16, blacklist_time: float = 5.0, backoff_rate: float = 2.0, From 2a02de02474f5cfcef3c347570f3821ff9c4a850 Mon Sep 17 00:00:00 2001 From: Denis Mazur Date: Thu, 29 Jul 2021 18:19:30 +0300 Subject: [PATCH 2/9] Add environment variable for DHT max workers --- hivemind/dht/__init__.py | 2 +- hivemind/dht/node.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/hivemind/dht/__init__.py b/hivemind/dht/__init__.py index fe045f077..ab764e532 100644 --- a/hivemind/dht/__init__.py +++ b/hivemind/dht/__init__.py @@ -106,7 +106,7 @@ def run(self) -> None: async def _run(): self._node = await DHTNode.create( initial_peers=self.initial_peers, - num_workers=self.max_workers or 1, + num_workers=self.max_workers, record_validator=self._record_validator, **self.kwargs, ) diff --git a/hivemind/dht/node.py b/hivemind/dht/node.py index 1354fc82e..eb10021e1 100644 --- a/hivemind/dht/node.py +++ b/hivemind/dht/node.py @@ -2,6 +2,7 @@ import asyncio import dataclasses +import os import random from collections import defaultdict, Counter from dataclasses import dataclass, field @@ -154,7 +155,7 @@ async def create( :param backoff_rate: blacklist time will be multiplied by :backoff_rate: for each successive non-response :param validate: if True, use initial peers to validate that this node is accessible and synchronized :param strict: if True, any error encountered in validation will interrupt the creation of DHTNode - :param client_mode: if False (default), this node will accept incoming requests as a full DHT "citzen" + :param client_mode: if False (default), this node will accept incoming requests as a full DHT "citizen" if True, this node will refuse any incoming requests, effectively being only a client :param record_validator: instance of RecordValidatorBase used for signing and validating stored records :param authorizer: instance of AuthorizerBase used for signing and validating requests and response @@ -164,7 +165,13 @@ async def create( """ self = cls(_initialized_with_create=True) self.node_id = node_id if node_id is not None else DHTID.generate() - self.num_replicas, self.num_workers, self.chunk_size = num_replicas, num_workers, chunk_size + + if num_workers is None: + self.num_workers = int(os.environ.get("HIVEMIND_DHT_NUM_WORKERS", 8)) + else: + self.num_workers = num_workers + + self.num_replicas, self.chunk_size = num_replicas, chunk_size self.is_alive = True # if set to False, cancels all background jobs such as routing table refresh self.reuse_get_requests = reuse_get_requests From 2e0671e203fcee817c020a1f38fa93b1a8518fca Mon Sep 17 00:00:00 2001 From: Denis Mazur Date: Thu, 29 Jul 2021 22:50:46 +0300 Subject: [PATCH 3/9] add dht_max_workers argument for dht benchmark and refactor dht --- benchmarks/benchmark_dht.py | 7 ++++++- hivemind/dht/__init__.py | 4 ++-- hivemind/dht/node.py | 13 +++++-------- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/benchmarks/benchmark_dht.py b/benchmarks/benchmark_dht.py index 101946b03..be086ce84 100644 --- a/benchmarks/benchmark_dht.py +++ b/benchmarks/benchmark_dht.py @@ -5,6 +5,7 @@ from tqdm import trange import hivemind +from hivemind.dht import DHT_DEFAULT_NUM_WORKERS from hivemind.moe.server import declare_experts, get_experts from hivemind.utils.limits import increase_file_limit @@ -28,6 +29,7 @@ def benchmark_dht( wait_before_read: float, wait_timeout: float, expiration: float, + dht_max_workers: int, ): random.seed(random_seed) @@ -37,7 +39,9 @@ def benchmark_dht( neighbors = sum( [peer.get_visible_maddrs() for peer in random.sample(peers, min(initial_peers, len(peers)))], [] ) - peer = hivemind.DHT(initial_peers=neighbors, start=True, wait_timeout=wait_timeout) + peer = hivemind.DHT( + initial_peers=neighbors, start=True, wait_timeout=wait_timeout, max_workers=dht_max_workers + ) peers.append(peer) store_peer, get_peer = peers[-2:] @@ -118,6 +122,7 @@ def benchmark_dht( parser.add_argument("--wait_before_read", type=float, default=0, required=False) parser.add_argument("--wait_timeout", type=float, default=5, required=False) parser.add_argument("--random_seed", type=int, default=random.randint(1, 1000)) + parser.add_argument("--dht_max_workers", type=int, default=DHT_DEFAULT_NUM_WORKERS) parser.add_argument("--increase_file_limit", action="store_true") args = vars(parser.parse_args()) diff --git a/hivemind/dht/__init__.py b/hivemind/dht/__init__.py index ab764e532..171f730b8 100644 --- a/hivemind/dht/__init__.py +++ b/hivemind/dht/__init__.py @@ -23,7 +23,7 @@ from multiaddr import Multiaddr -from hivemind.dht.node import DHTNode +from hivemind.dht.node import DHT_DEFAULT_NUM_WORKERS, DHTNode from hivemind.dht.routing import DHTID, DHTKey, DHTValue, Subkey from hivemind.dht.validation import CompositeValidator, RecordValidatorBase from hivemind.p2p import P2P, PeerID @@ -62,7 +62,7 @@ def __init__( *, start: bool, daemon: bool = True, - max_workers: Optional[int] = None, + max_workers: int = DHT_DEFAULT_NUM_WORKERS, record_validators: Iterable[RecordValidatorBase] = (), shutdown_timeout: float = 3, await_ready: bool = True, diff --git a/hivemind/dht/node.py b/hivemind/dht/node.py index eb10021e1..6ce3a0f64 100644 --- a/hivemind/dht/node.py +++ b/hivemind/dht/node.py @@ -39,6 +39,9 @@ logger = get_logger(__name__) +DHT_DEFAULT_NUM_WORKERS = os.environ.get("HIVEMIND_DHT_NUM_WORKERS", 8) + + class DHTNode: """ Asyncio-based class that represents one DHT participant. Created via await DHTNode.create(...) @@ -111,7 +114,7 @@ async def create( cache_refresh_before_expiry: float = 5, cache_on_store: bool = True, reuse_get_requests: bool = True, - num_workers: int = 8, + num_workers: int = DHT_DEFAULT_NUM_WORKERS, chunk_size: int = 16, blacklist_time: float = 5.0, backoff_rate: float = 2.0, @@ -165,13 +168,7 @@ async def create( """ self = cls(_initialized_with_create=True) self.node_id = node_id if node_id is not None else DHTID.generate() - - if num_workers is None: - self.num_workers = int(os.environ.get("HIVEMIND_DHT_NUM_WORKERS", 8)) - else: - self.num_workers = num_workers - - self.num_replicas, self.chunk_size = num_replicas, chunk_size + self.num_replicas, self.num_workers, self.chunk_size = num_replicas, num_workers, chunk_size self.is_alive = True # if set to False, cancels all background jobs such as routing table refresh self.reuse_get_requests = reuse_get_requests From af09245fffb7c7dabb1038c9eb9bc62bb5b62a7b Mon Sep 17 00:00:00 2001 From: Denis Mazur Date: Fri, 30 Jul 2021 04:53:29 +0300 Subject: [PATCH 4/9] fix suggestions --- benchmarks/benchmark_dht.py | 6 ++---- hivemind/dht/__init__.py | 4 ++-- hivemind/dht/node.py | 4 ++-- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/benchmarks/benchmark_dht.py b/benchmarks/benchmark_dht.py index be086ce84..9b3e04abd 100644 --- a/benchmarks/benchmark_dht.py +++ b/benchmarks/benchmark_dht.py @@ -5,7 +5,7 @@ from tqdm import trange import hivemind -from hivemind.dht import DHT_DEFAULT_NUM_WORKERS +from hivemind.dht import DEFAULT_NUM_WORKERS from hivemind.moe.server import declare_experts, get_experts from hivemind.utils.limits import increase_file_limit @@ -29,7 +29,6 @@ def benchmark_dht( wait_before_read: float, wait_timeout: float, expiration: float, - dht_max_workers: int, ): random.seed(random_seed) @@ -40,7 +39,7 @@ def benchmark_dht( [peer.get_visible_maddrs() for peer in random.sample(peers, min(initial_peers, len(peers)))], [] ) peer = hivemind.DHT( - initial_peers=neighbors, start=True, wait_timeout=wait_timeout, max_workers=dht_max_workers + initial_peers=neighbors, start=True, wait_timeout=wait_timeout ) peers.append(peer) @@ -122,7 +121,6 @@ def benchmark_dht( parser.add_argument("--wait_before_read", type=float, default=0, required=False) parser.add_argument("--wait_timeout", type=float, default=5, required=False) parser.add_argument("--random_seed", type=int, default=random.randint(1, 1000)) - parser.add_argument("--dht_max_workers", type=int, default=DHT_DEFAULT_NUM_WORKERS) parser.add_argument("--increase_file_limit", action="store_true") args = vars(parser.parse_args()) diff --git a/hivemind/dht/__init__.py b/hivemind/dht/__init__.py index 171f730b8..93c89bd5a 100644 --- a/hivemind/dht/__init__.py +++ b/hivemind/dht/__init__.py @@ -23,7 +23,7 @@ from multiaddr import Multiaddr -from hivemind.dht.node import DHT_DEFAULT_NUM_WORKERS, DHTNode +from hivemind.dht.node import DEFAULT_NUM_WORKERS, DHTNode from hivemind.dht.routing import DHTID, DHTKey, DHTValue, Subkey from hivemind.dht.validation import CompositeValidator, RecordValidatorBase from hivemind.p2p import P2P, PeerID @@ -62,7 +62,7 @@ def __init__( *, start: bool, daemon: bool = True, - max_workers: int = DHT_DEFAULT_NUM_WORKERS, + max_workers: int = DEFAULT_NUM_WORKERS, record_validators: Iterable[RecordValidatorBase] = (), shutdown_timeout: float = 3, await_ready: bool = True, diff --git a/hivemind/dht/node.py b/hivemind/dht/node.py index 6ce3a0f64..18650b8e2 100644 --- a/hivemind/dht/node.py +++ b/hivemind/dht/node.py @@ -39,7 +39,7 @@ logger = get_logger(__name__) -DHT_DEFAULT_NUM_WORKERS = os.environ.get("HIVEMIND_DHT_NUM_WORKERS", 8) +DEFAULT_NUM_WORKERS = os.environ.get("HIVEMIND_DHT_NUM_WORKERS", 8) class DHTNode: @@ -114,7 +114,7 @@ async def create( cache_refresh_before_expiry: float = 5, cache_on_store: bool = True, reuse_get_requests: bool = True, - num_workers: int = DHT_DEFAULT_NUM_WORKERS, + num_workers: int = DEFAULT_NUM_WORKERS, chunk_size: int = 16, blacklist_time: float = 5.0, backoff_rate: float = 2.0, From 1bc5e8f7e2089a4d5fc3d5bf1445ff01475c2cf5 Mon Sep 17 00:00:00 2001 From: Denis Mazur Date: Fri, 30 Jul 2021 04:55:20 +0300 Subject: [PATCH 5/9] run black on benchmarks --- benchmarks/benchmark_dht.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/benchmarks/benchmark_dht.py b/benchmarks/benchmark_dht.py index 9b3e04abd..48d2772b2 100644 --- a/benchmarks/benchmark_dht.py +++ b/benchmarks/benchmark_dht.py @@ -38,9 +38,7 @@ def benchmark_dht( neighbors = sum( [peer.get_visible_maddrs() for peer in random.sample(peers, min(initial_peers, len(peers)))], [] ) - peer = hivemind.DHT( - initial_peers=neighbors, start=True, wait_timeout=wait_timeout - ) + peer = hivemind.DHT(initial_peers=neighbors, start=True, wait_timeout=wait_timeout) peers.append(peer) store_peer, get_peer = peers[-2:] From c278a8a5ea37d19c57d48ff2a99753f8108c002e Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Fri, 30 Jul 2021 16:31:11 +0300 Subject: [PATCH 6/9] Remove excess import Already discussed with @deniskamazur --- benchmarks/benchmark_dht.py | 1 - 1 file changed, 1 deletion(-) diff --git a/benchmarks/benchmark_dht.py b/benchmarks/benchmark_dht.py index 48d2772b2..101946b03 100644 --- a/benchmarks/benchmark_dht.py +++ b/benchmarks/benchmark_dht.py @@ -5,7 +5,6 @@ from tqdm import trange import hivemind -from hivemind.dht import DEFAULT_NUM_WORKERS from hivemind.moe.server import declare_experts, get_experts from hivemind.utils.limits import increase_file_limit From 90a81c44586e1d74611e87e6b0ec11b8eb95df08 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Fri, 30 Jul 2021 23:27:58 +0300 Subject: [PATCH 7/9] Set default to 4 --- hivemind/dht/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/dht/node.py b/hivemind/dht/node.py index 18650b8e2..11f34731c 100644 --- a/hivemind/dht/node.py +++ b/hivemind/dht/node.py @@ -39,7 +39,7 @@ logger = get_logger(__name__) -DEFAULT_NUM_WORKERS = os.environ.get("HIVEMIND_DHT_NUM_WORKERS", 8) +DEFAULT_NUM_WORKERS = os.environ.get("HIVEMIND_DHT_NUM_WORKERS", 4) class DHTNode: From 75052028624c3384f302ee988d08e1515ed3897d Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Fri, 30 Jul 2021 23:40:49 +0300 Subject: [PATCH 8/9] Rename max_workers -> num_workers --- hivemind/dht/__init__.py | 8 ++++---- hivemind/moe/client/beam_search.py | 6 +++--- hivemind/moe/server/dht_handler.py | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/hivemind/dht/__init__.py b/hivemind/dht/__init__.py index 93c89bd5a..a7acad9a5 100644 --- a/hivemind/dht/__init__.py +++ b/hivemind/dht/__init__.py @@ -43,7 +43,7 @@ class DHT(mp.Process): :param initial_peers: multiaddrs of one or more active DHT peers (if you want to join an existing DHT) :param start: if True, automatically starts the background process on creation. Otherwise await manual start :param daemon: if True, the background process is marked as daemon and automatically terminated after main process - :param max_workers: declare_experts and get_experts will use up to this many parallel workers + :param num_workers: declare_experts and get_experts will use up to this many parallel workers (but no more than one per key) :param expiration: experts declared from this node expire after this many seconds (default = 5 minutes) :param record_validators: instances of RecordValidatorBase used for signing and validating stored records. @@ -62,7 +62,7 @@ def __init__( *, start: bool, daemon: bool = True, - max_workers: int = DEFAULT_NUM_WORKERS, + num_workers: int = DEFAULT_NUM_WORKERS, record_validators: Iterable[RecordValidatorBase] = (), shutdown_timeout: float = 3, await_ready: bool = True, @@ -81,7 +81,7 @@ def __init__( raise TypeError("initial_peers should be of type Optional[Sequence[Union[Multiaddr, str]]]") self.initial_peers = initial_peers self.kwargs = kwargs - self.max_workers = max_workers + self.num_workers = num_workers self._record_validator = CompositeValidator(record_validators) self._inner_pipe, self._outer_pipe = mp.Pipe(duplex=True) @@ -106,7 +106,7 @@ def run(self) -> None: async def _run(): self._node = await DHTNode.create( initial_peers=self.initial_peers, - num_workers=self.max_workers, + num_workers=self.num_workers, record_validator=self._record_validator, **self.kwargs, ) diff --git a/hivemind/moe/client/beam_search.py b/hivemind/moe/client/beam_search.py index 04a75ff74..228ea6da4 100644 --- a/hivemind/moe/client/beam_search.py +++ b/hivemind/moe/client/beam_search.py @@ -125,7 +125,7 @@ async def _get_initial_beam( cache_expiration: DHTExpiration, num_workers: Optional[int] = None, ) -> List[Tuple[Score, ExpertPrefix, Dict[Coordinate, UidEndpoint]]]: - num_workers = num_workers or dht.max_workers or beam_size + num_workers = num_workers or dht.num_workers or beam_size beam: List[Tuple[Score, ExpertPrefix, Dict[Coordinate, UidEndpoint]]] = [] unattempted_indices: List[Coordinate] = sorted( range(len(scores)), key=scores.__getitem__ @@ -206,7 +206,7 @@ async def _get_active_successors( num_workers: Optional[int] = None, ) -> Dict[ExpertPrefix, Dict[Coordinate, UidEndpoint]]: grid_size = grid_size or float("inf") - num_workers = num_workers or min(len(prefixes), dht.max_workers or len(prefixes)) + num_workers = num_workers or min(len(prefixes), dht.num_workers or len(prefixes)) dht_responses = await node.get_many(keys=prefixes, num_workers=num_workers) successors: Dict[ExpertPrefix, Dict[Coordinate, UidEndpoint]] = {} for prefix, found in dht_responses.items(): @@ -270,7 +270,7 @@ async def _find_best_experts( cache_expiration: DHTExpiration, num_workers: Optional[int] = None, ) -> List[RemoteExpert]: - num_workers = num_workers or min(beam_size, dht.max_workers or beam_size) + num_workers = num_workers or min(beam_size, dht.num_workers or beam_size) # form initial beam from top-k active L1 prefixes, each row is (score, uid prefix, possible suffixes) beam: List[Tuple[Score, ExpertPrefix, Dict[Coordinate, UidEndpoint]]] = await cls._get_initial_beam( diff --git a/hivemind/moe/server/dht_handler.py b/hivemind/moe/server/dht_handler.py index 7bd29e6ef..135b42110 100644 --- a/hivemind/moe/server/dht_handler.py +++ b/hivemind/moe/server/dht_handler.py @@ -56,7 +56,7 @@ def declare_experts( async def _declare_experts( dht: DHT, node: DHTNode, uids: List[ExpertUID], endpoint: Endpoint, expiration: DHTExpiration ) -> Dict[ExpertUID, bool]: - num_workers = len(uids) if dht.max_workers is None else min(len(uids), dht.max_workers) + num_workers = len(uids) if dht.num_workers is None else min(len(uids), dht.num_workers) expiration_time = get_dht_time() + expiration data_to_store: Dict[Tuple[ExpertPrefix, Optional[Coordinate]], DHTValue] = {} for uid in uids: @@ -89,7 +89,7 @@ async def _get_experts( ) -> List[Optional[RemoteExpert]]: if expiration_time is None: expiration_time = get_dht_time() - num_workers = len(uids) if dht.max_workers is None else min(len(uids), dht.max_workers) + num_workers = len(uids) if dht.num_workers is None else min(len(uids), dht.num_workers) found: Dict[ExpertUID, DHTValue] = await node.get_many(uids, expiration_time, num_workers=num_workers) experts: List[Optional[RemoteExpert]] = [None] * len(uids) From 10ad6e8e0c5ae3b9a6c17ca4c62549dc61a9e300 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Sat, 31 Jul 2021 21:31:25 +0300 Subject: [PATCH 9/9] Fix bug in reading env variable Co-authored-by: Max Ryabinin --- hivemind/dht/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/dht/node.py b/hivemind/dht/node.py index 7054f9ee9..5e9192b3a 100644 --- a/hivemind/dht/node.py +++ b/hivemind/dht/node.py @@ -39,7 +39,7 @@ logger = get_logger(__name__) -DEFAULT_NUM_WORKERS = os.environ.get("HIVEMIND_DHT_NUM_WORKERS", 4) +DEFAULT_NUM_WORKERS = int(os.getenv("HIVEMIND_DHT_NUM_WORKERS", 4)) class DHTNode: