From df68317945c9e514aac2c721e18768064beb1fe8 Mon Sep 17 00:00:00 2001 From: Denis Mazur Date: Mon, 2 Aug 2021 19:21:47 +0300 Subject: [PATCH 01/15] speed up p2p creation --- hivemind/p2p/p2p_daemon.py | 37 ++++++++++++------------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index 33ede7c53..ba7a796f2 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -5,7 +5,7 @@ from contextlib import closing, suppress from dataclasses import dataclass from importlib.resources import path -from subprocess import Popen +from subprocess import Popen, PIPE from typing import Any, AsyncIterator, Awaitable, Callable, List, Optional, Sequence, Tuple, TypeVar, Union from multiaddr import Multiaddr @@ -90,7 +90,6 @@ async def create( use_relay_discovery: bool = False, use_auto_relay: bool = False, relay_hop_limit: int = 0, - quiet: bool = True, ping_n_attempts: int = 5, ping_delay: float = 0.4, ) -> "P2P": @@ -113,7 +112,6 @@ async def create( :param use_relay_discovery: enables passive discovery for relay :param use_auto_relay: enables autorelay :param relay_hop_limit: sets the hop limit for hop relays - :param quiet: make the daemon process quiet :param ping_n_attempts: try to ping the daemon with this number of attempts after starting it :param ping_delay: wait for ``ping_delay * (2 ** (k - 1))`` seconds before the k-th attempt to ping the daemon (in particular, wait for ``ping_delay`` seconds before the first attempt) @@ -157,37 +155,18 @@ async def create( autoRelay=use_auto_relay, relayHopLimit=relay_hop_limit, b=need_bootstrap, - q=quiet, + q=False, **process_kwargs, ) - self._child = Popen(args=proc_args, encoding="utf8") + self._child = Popen(args=proc_args, stdout=PIPE, encoding="utf8") self._alive = True self._client = p2pclient.Client(self._daemon_listen_maddr, self._client_listen_maddr) - await self._ping_daemon_with_retries(ping_n_attempts, ping_delay) + await self._wait_until_ready() return self - async def _ping_daemon_with_retries(self, ping_n_attempts: int, ping_delay: float) -> None: - for try_number in range(ping_n_attempts): - await asyncio.sleep(ping_delay * (2 ** try_number)) - - if self._child.poll() is not None: # Process died - break - - try: - await self._ping_daemon() - break - except Exception as e: - if try_number == ping_n_attempts - 1: - logger.exception("Failed to ping p2pd that has just started") - await self.shutdown() - raise - - if self._child.returncode is not None: - raise RuntimeError(f"The p2p daemon has died with return code {self._child.returncode}") - @classmethod async def replicate(cls, daemon_listen_maddr: Multiaddr) -> "P2P": """ @@ -502,6 +481,14 @@ def _convert_process_arg_type(val: Any) -> Any: def _maddrs_to_str(maddrs: List[Multiaddr]) -> str: return ",".join(str(addr) for addr in maddrs) + async def _wait_until_ready(self): + while True: + line = self._child.stdout.readline().rstrip() + if line.startswith("Peer ID:"): + break + + self.peer_id, self._visible_maddrs = await self._client.identify() + class P2PInterruptedError(Exception): pass From 77b92b3e50545bdcc7c69dd742df76da65517a77 Mon Sep 17 00:00:00 2001 From: Denis Mazur Date: Mon, 2 Aug 2021 19:31:44 +0300 Subject: [PATCH 02/15] sort imports --- hivemind/p2p/p2p_daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index ba7a796f2..828697165 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -5,7 +5,7 @@ from contextlib import closing, suppress from dataclasses import dataclass from importlib.resources import path -from subprocess import Popen, PIPE +from subprocess import PIPE, Popen from typing import Any, AsyncIterator, Awaitable, Callable, List, Optional, Sequence, Tuple, TypeVar, Union from multiaddr import Multiaddr From 4849943de291b946cbf87af2c39fe516cc2c3f20 Mon Sep 17 00:00:00 2001 From: Denis Mazur Date: Mon, 2 Aug 2021 19:52:29 +0300 Subject: [PATCH 03/15] Update hivemind/p2p/p2p_daemon.py Co-authored-by: Alexander Borzunov --- hivemind/p2p/p2p_daemon.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index 828697165..af36ac6bb 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -155,7 +155,6 @@ async def create( autoRelay=use_auto_relay, relayHopLimit=relay_hop_limit, b=need_bootstrap, - q=False, **process_kwargs, ) From f311b592dc02fb59c69c39fda8ddc7b3189294d8 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 05:05:54 +0300 Subject: [PATCH 04/15] Fix asyncio warnings --- hivemind/p2p/p2p_daemon.py | 4 +++- tests/conftest.py | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index 33ede7c53..3e9e646d6 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -331,7 +331,9 @@ async def _process_stream() -> None: await P2P.send_protobuf(response, writer) except Exception as e: logger.warning("Exception while processing stream and sending responses:", exc_info=True) - await P2P.send_protobuf(RPCError(message=str(e)), writer) + # Sometimes `e` is a connection error, so we won't be able to report the error to the caller + with suppress(Exception): + await P2P.send_protobuf(RPCError(message=str(e)), writer) with closing(writer): processing_task = asyncio.create_task(_process_stream()) diff --git a/tests/conftest.py b/tests/conftest.py index 62c76cdd0..1b684e114 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import asyncio import gc import multiprocessing as mp from contextlib import suppress @@ -11,6 +12,22 @@ logger = get_logger(__name__) +@pytest.fixture +def event_loop(): + """ + This overrides the ``event_loop`` fixture from pytest-asyncio + (e.g. to make it compatible with ``asyncio.subprocess``). + + This fixture is identical to the original one but does not call ``loop.close()`` in the end. + Indeed, at this point, the loop is already stopped (i.e. next tests are free to create new loops). + However, finalizers of objects created in the current test may reference the current loop and fail if it is closed. + For example, this happens while using ``asyncio.subprocess`` (the ``asyncio.subprocess.Process`` finalizer + fails if the loop is closed, but works if the loop is only stopped). + """ + + yield asyncio.get_event_loop() + + @pytest.fixture(autouse=True, scope="session") def cleanup_children(): yield From b00bf584db7a80fea417bc973eeaae340810d240 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 05:29:49 +0300 Subject: [PATCH 05/15] Use asyncio.subprocess in hivemind.p2p.P2P --- hivemind/p2p/p2p_daemon.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index 87784f9ad..ebceab1c9 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -5,7 +5,6 @@ from contextlib import closing, suppress from dataclasses import dataclass from importlib.resources import path -from subprocess import PIPE, Popen from typing import Any, AsyncIterator, Awaitable, Callable, List, Optional, Sequence, Tuple, TypeVar, Union from multiaddr import Multiaddr @@ -158,7 +157,8 @@ async def create( **process_kwargs, ) - self._child = Popen(args=proc_args, stdout=PIPE, encoding="utf8") + self._child = await asyncio.subprocess.create_subprocess_exec( + *proc_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) self._alive = True self._client = p2pclient.Client(self._daemon_listen_maddr, self._client_listen_maddr) @@ -452,9 +452,8 @@ async def shutdown(self) -> None: def _terminate(self) -> None: self._alive = False - if self._child is not None and self._child.poll() is None: + if self._child is not None and self._child.returncode is None: self._child.terminate() - self._child.wait() logger.debug(f"Terminated p2pd with id = {self.peer_id}") with suppress(FileNotFoundError): @@ -484,7 +483,7 @@ def _maddrs_to_str(maddrs: List[Multiaddr]) -> str: async def _wait_until_ready(self): while True: - line = self._child.stdout.readline().rstrip() + line = (await self._child.stdout.readline()).rstrip().decode() if line.startswith("Peer ID:"): break From b96e7ba1dd19c942724cb3992e3d8de083901128 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 06:28:02 +0300 Subject: [PATCH 06/15] Catch daemon errors and read outputs until process is closed --- hivemind/p2p/__init__.py | 2 +- hivemind/p2p/p2p_daemon.py | 27 ++++++++++++++++++++------- tests/test_p2p_daemon.py | 8 +++++++- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/hivemind/p2p/__init__.py b/hivemind/p2p/__init__.py index 15964dbb6..383121a77 100644 --- a/hivemind/p2p/__init__.py +++ b/hivemind/p2p/__init__.py @@ -1,3 +1,3 @@ -from hivemind.p2p.p2p_daemon import P2P, P2PContext, P2PHandlerError +from hivemind.p2p.p2p_daemon import P2P, P2PContext, P2PDaemonError, P2PHandlerError from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo from hivemind.p2p.servicer import ServicerBase, StubBase diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index ebceab1c9..cbbad34aa 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -67,6 +67,7 @@ def __init__(self): self.peer_id = None self._child = None self._alive = False + self._reader_task = None self._listen_task = None self._server_stopped = asyncio.Event() @@ -160,10 +161,13 @@ async def create( self._child = await asyncio.subprocess.create_subprocess_exec( *proc_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) self._alive = True - self._client = p2pclient.Client(self._daemon_listen_maddr, self._client_listen_maddr) - await self._wait_until_ready() + ready = asyncio.Future() + self._reader_task = asyncio.create_task(self._read_outputs(ready)) + await ready + self._client = p2pclient.Client(self._daemon_listen_maddr, self._client_listen_maddr) + await self._ping_daemon() return self @classmethod @@ -451,6 +455,9 @@ async def shutdown(self) -> None: await asyncio.get_event_loop().run_in_executor(None, self._terminate) def _terminate(self) -> None: + if self._reader_task is not None: + self._reader_task.cancel() + self._alive = False if self._child is not None and self._child.returncode is None: self._child.terminate() @@ -481,16 +488,22 @@ def _convert_process_arg_type(val: Any) -> Any: def _maddrs_to_str(maddrs: List[Multiaddr]) -> str: return ",".join(str(addr) for addr in maddrs) - async def _wait_until_ready(self): + async def _read_outputs(self, ready: asyncio.Future) -> None: + last_line = None while True: - line = (await self._child.stdout.readline()).rstrip().decode() - if line.startswith("Peer ID:"): + line = await self._child.stdout.readline() + if not line: # Stream closed break + last_line = line.rstrip().decode(errors='ignore') - self.peer_id, self._visible_maddrs = await self._client.identify() + if last_line.startswith("Peer ID:"): + ready.set_result(None) + + if not ready.done(): + ready.set_exception(P2PDaemonError(f'Daemon failed to start: {last_line}')) -class P2PInterruptedError(Exception): +class P2PDaemonError(RuntimeError): pass diff --git a/tests/test_p2p_daemon.py b/tests/test_p2p_daemon.py index 3cb1611d6..a105107b5 100644 --- a/tests/test_p2p_daemon.py +++ b/tests/test_p2p_daemon.py @@ -9,7 +9,7 @@ import pytest from multiaddr import Multiaddr -from hivemind.p2p import P2P, P2PHandlerError +from hivemind.p2p import P2P, P2PDaemonError, P2PHandlerError from hivemind.proto import dht_pb2 from hivemind.utils.serializer import MSGPackSerializer @@ -33,6 +33,12 @@ async def test_daemon_killed_on_del(): assert not is_process_running(child_pid) +@pytest.mark.asyncio +async def test_startup_error_message(): + with pytest.raises(P2PDaemonError, match=r'Failed to connect to bootstrap peers'): + await P2P.create(initial_peers=['/ip4/127.0.0.1/tcp/666/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1']) + + @pytest.mark.parametrize( "host_maddrs", [ From 921317d6efa9c2a20f6f3256e804dc70471d7899 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 06:32:24 +0300 Subject: [PATCH 07/15] Blackify --- hivemind/p2p/p2p_daemon.py | 7 ++++--- tests/test_p2p_daemon.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index cbbad34aa..be9de76ea 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -159,7 +159,8 @@ async def create( ) self._child = await asyncio.subprocess.create_subprocess_exec( - *proc_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) + *proc_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT + ) self._alive = True ready = asyncio.Future() @@ -494,13 +495,13 @@ async def _read_outputs(self, ready: asyncio.Future) -> None: line = await self._child.stdout.readline() if not line: # Stream closed break - last_line = line.rstrip().decode(errors='ignore') + last_line = line.rstrip().decode(errors="ignore") if last_line.startswith("Peer ID:"): ready.set_result(None) if not ready.done(): - ready.set_exception(P2PDaemonError(f'Daemon failed to start: {last_line}')) + ready.set_exception(P2PDaemonError(f"Daemon failed to start: {last_line}")) class P2PDaemonError(RuntimeError): diff --git a/tests/test_p2p_daemon.py b/tests/test_p2p_daemon.py index a105107b5..de4ce0ccb 100644 --- a/tests/test_p2p_daemon.py +++ b/tests/test_p2p_daemon.py @@ -35,8 +35,8 @@ async def test_daemon_killed_on_del(): @pytest.mark.asyncio async def test_startup_error_message(): - with pytest.raises(P2PDaemonError, match=r'Failed to connect to bootstrap peers'): - await P2P.create(initial_peers=['/ip4/127.0.0.1/tcp/666/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1']) + with pytest.raises(P2PDaemonError, match=r"Failed to connect to bootstrap peers"): + await P2P.create(initial_peers=["/ip4/127.0.0.1/tcp/666/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1"]) @pytest.mark.parametrize( From 1275fc9714ac262170ae0990889d3a826d4a1a99 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 06:39:09 +0300 Subject: [PATCH 08/15] Fix waiting for child --- hivemind/p2p/p2p_daemon.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index be9de76ea..e5c9c33bf 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -453,7 +453,9 @@ def is_alive(self) -> bool: async def shutdown(self) -> None: await self._stop_listening() - await asyncio.get_event_loop().run_in_executor(None, self._terminate) + self._terminate() + if self._child is not None: + await self._child.wait() def _terminate(self) -> None: if self._reader_task is not None: From 92d193f43d839eaedbc495941645ef68ca7a6862 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 06:51:58 +0300 Subject: [PATCH 09/15] Simplify P2P._terminate() --- hivemind/p2p/p2p_daemon.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index e5c9c33bf..ec385ac98 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -69,7 +69,6 @@ def __init__(self): self._alive = False self._reader_task = None self._listen_task = None - self._server_stopped = asyncio.Event() @classmethod async def create( @@ -420,20 +419,10 @@ def iterate_protobuf_handler( def _start_listening(self) -> None: async def listen() -> None: async with self._client.listen(): - await self._server_stopped.wait() + await asyncio.Future() # Block until this task will be cancelled in _terminate() self._listen_task = asyncio.create_task(listen()) - async def _stop_listening(self) -> None: - if self._listen_task is not None: - self._server_stopped.set() - self._listen_task.cancel() - try: - await self._listen_task - except asyncio.CancelledError: - self._listen_task = None - self._server_stopped.clear() - async def add_binary_stream_handler(self, name: str, handler: p2pclient.StreamHandler) -> None: if self._listen_task is None: self._start_listening() @@ -452,12 +441,13 @@ def is_alive(self) -> bool: return self._alive async def shutdown(self) -> None: - await self._stop_listening() self._terminate() if self._child is not None: await self._child.wait() def _terminate(self) -> None: + if self._listen_task is not None: + self._listen_task.cancel() if self._reader_task is not None: self._reader_task.cancel() From cdf5e8f4372e54c252b3967743ef5bc631a2f17c Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 06:55:00 +0300 Subject: [PATCH 10/15] Improve comment --- hivemind/p2p/p2p_daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index ec385ac98..bbc9d24c0 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -419,7 +419,7 @@ def iterate_protobuf_handler( def _start_listening(self) -> None: async def listen() -> None: async with self._client.listen(): - await asyncio.Future() # Block until this task will be cancelled in _terminate() + await asyncio.Future() # Wait until this task will be cancelled in _terminate() self._listen_task = asyncio.create_task(listen()) From 3adb96c97861d852971efe636a5ebbbc36ee0bde Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 19:00:26 +0300 Subject: [PATCH 11/15] Raise P2PDaemonError if the daemon does not start in `startup_timeout` seconds --- hivemind/p2p/p2p_daemon.py | 13 +++++++------ tests/test_utils/dht_swarms.py | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hivemind/p2p/p2p_daemon.py b/hivemind/p2p/p2p_daemon.py index bbc9d24c0..922aa4f40 100644 --- a/hivemind/p2p/p2p_daemon.py +++ b/hivemind/p2p/p2p_daemon.py @@ -89,8 +89,7 @@ async def create( use_relay_discovery: bool = False, use_auto_relay: bool = False, relay_hop_limit: int = 0, - ping_n_attempts: int = 5, - ping_delay: float = 0.4, + startup_timeout: float = 15, ) -> "P2P": """ Start a new p2pd process and connect to it. @@ -111,9 +110,7 @@ async def create( :param use_relay_discovery: enables passive discovery for relay :param use_auto_relay: enables autorelay :param relay_hop_limit: sets the hop limit for hop relays - :param ping_n_attempts: try to ping the daemon with this number of attempts after starting it - :param ping_delay: wait for ``ping_delay * (2 ** (k - 1))`` seconds before the k-th attempt to ping the daemon - (in particular, wait for ``ping_delay`` seconds before the first attempt) + :param startup_timeout: raise a P2PDaemonError if the daemon does not start in ``startup_timeout`` seconds :return: a wrapper for the p2p daemon """ @@ -164,7 +161,11 @@ async def create( ready = asyncio.Future() self._reader_task = asyncio.create_task(self._read_outputs(ready)) - await ready + try: + await asyncio.wait_for(ready, startup_timeout) + except asyncio.TimeoutError: + await self.shutdown() + raise P2PDaemonError(f"Daemon failed to start in {startup_timeout:.1f} seconds") self._client = p2pclient.Client(self._daemon_listen_maddr, self._client_listen_maddr) await self._ping_daemon() diff --git a/tests/test_utils/dht_swarms.py b/tests/test_utils/dht_swarms.py index 0c086cf35..804bf0cda 100644 --- a/tests/test_utils/dht_swarms.py +++ b/tests/test_utils/dht_swarms.py @@ -18,7 +18,7 @@ def run_node(initial_peers: List[Multiaddr], info_queue: mp.Queue, **kwargs): asyncio.set_event_loop(asyncio.new_event_loop()) loop = asyncio.get_event_loop() - node = loop.run_until_complete(DHTNode.create(initial_peers=initial_peers, ping_n_attempts=10, **kwargs)) + node = loop.run_until_complete(DHTNode.create(initial_peers=initial_peers, **kwargs)) maddrs = loop.run_until_complete(node.get_visible_maddrs()) info_queue.put((node.node_id, node.peer_id, maddrs)) From db3e9f461f9a61fd0eee7e6fc175eefc64a05945 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 19:15:25 +0300 Subject: [PATCH 12/15] Add test for startup_timeout --- tests/test_p2p_daemon.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_p2p_daemon.py b/tests/test_p2p_daemon.py index de4ce0ccb..146ff8929 100644 --- a/tests/test_p2p_daemon.py +++ b/tests/test_p2p_daemon.py @@ -38,6 +38,9 @@ async def test_startup_error_message(): with pytest.raises(P2PDaemonError, match=r"Failed to connect to bootstrap peers"): await P2P.create(initial_peers=["/ip4/127.0.0.1/tcp/666/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1"]) + with pytest.raises(P2PDaemonError, match=r"Daemon failed to start in .+ seconds"): + await P2P.create(startup_timeout=0.1) # Test that startup_timeout works + @pytest.mark.parametrize( "host_maddrs", From 491bbad0457bbd96ca60e54d2c3044aff4aa9025 Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 19:38:41 +0300 Subject: [PATCH 13/15] Use unused_tcp_port instead of port 666 in tests --- tests/test_p2p_daemon.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_p2p_daemon.py b/tests/test_p2p_daemon.py index 146ff8929..a0ef49d95 100644 --- a/tests/test_p2p_daemon.py +++ b/tests/test_p2p_daemon.py @@ -34,9 +34,11 @@ async def test_daemon_killed_on_del(): @pytest.mark.asyncio -async def test_startup_error_message(): +async def test_startup_error_message(unused_tcp_port: int): with pytest.raises(P2PDaemonError, match=r"Failed to connect to bootstrap peers"): - await P2P.create(initial_peers=["/ip4/127.0.0.1/tcp/666/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1"]) + await P2P.create( + initial_peers=[f"/ip4/127.0.0.1/tcp/{unused_tcp_port}/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1"] + ) with pytest.raises(P2PDaemonError, match=r"Daemon failed to start in .+ seconds"): await P2P.create(startup_timeout=0.1) # Test that startup_timeout works From cee03d81b32f8330a24d53c94281551f671bf07b Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 19:46:31 +0300 Subject: [PATCH 14/15] Use find_open_port() instead of unused_tcp_port --- tests/test_p2p_daemon.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_p2p_daemon.py b/tests/test_p2p_daemon.py index a0ef49d95..6ead81c6f 100644 --- a/tests/test_p2p_daemon.py +++ b/tests/test_p2p_daemon.py @@ -11,6 +11,7 @@ from hivemind.p2p import P2P, P2PDaemonError, P2PHandlerError from hivemind.proto import dht_pb2 +from hivemind.utils.networking import find_open_port from hivemind.utils.serializer import MSGPackSerializer @@ -34,10 +35,10 @@ async def test_daemon_killed_on_del(): @pytest.mark.asyncio -async def test_startup_error_message(unused_tcp_port: int): +async def test_startup_error_message(): with pytest.raises(P2PDaemonError, match=r"Failed to connect to bootstrap peers"): await P2P.create( - initial_peers=[f"/ip4/127.0.0.1/tcp/{unused_tcp_port}/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1"] + initial_peers=[f"/ip4/127.0.0.1/tcp/{find_open_port()}/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1"] ) with pytest.raises(P2PDaemonError, match=r"Daemon failed to start in .+ seconds"): From a355c9dd2c499dcfbb3b5688e5a2543244ad068e Mon Sep 17 00:00:00 2001 From: Alexander Borzunov Date: Tue, 3 Aug 2021 19:55:24 +0300 Subject: [PATCH 15/15] Rename find_open_port() -> get_free_port() --- benchmarks/benchmark_throughput.py | 4 ++-- hivemind/moe/server/__init__.py | 4 ++-- hivemind/utils/networking.py | 2 +- tests/test_p2p_daemon.py | 4 ++-- tests/test_utils/p2p_daemon.py | 8 ++++---- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/benchmarks/benchmark_throughput.py b/benchmarks/benchmark_throughput.py index 09cac42f9..e279108aa 100644 --- a/benchmarks/benchmark_throughput.py +++ b/benchmarks/benchmark_throughput.py @@ -7,7 +7,7 @@ import torch import hivemind -from hivemind import find_open_port +from hivemind import get_free_port from hivemind.moe.server import layers from hivemind.utils.limits import increase_file_limit from hivemind.utils.logging import get_logger @@ -66,7 +66,7 @@ def benchmark_throughput( or torch.device(device) == torch.device("cpu") ) assert expert_cls in layers.name_to_block - port = port or find_open_port() + port = port or get_free_port() max_batch_size = max_batch_size or batch_size * 4 num_handlers = max(1, num_handlers or num_clients // 2) benchmarking_failed = mp.Event() diff --git a/hivemind/moe/server/__init__.py b/hivemind/moe/server/__init__.py index 55dcd3c42..08c713294 100644 --- a/hivemind/moe/server/__init__.py +++ b/hivemind/moe/server/__init__.py @@ -27,7 +27,7 @@ ) from hivemind.moe.server.runtime import Runtime from hivemind.proto.runtime_pb2 import CompressionType -from hivemind.utils import BatchTensorDescriptor, Endpoint, find_open_port, get_logger, get_port, replace_port +from hivemind.utils import BatchTensorDescriptor, Endpoint, get_free_port, get_logger, get_port, replace_port logger = get_logger(__name__) @@ -68,7 +68,7 @@ def __init__( super().__init__() self.dht, self.experts, self.update_period = dht, expert_backends, update_period if get_port(listen_on) is None: - listen_on = replace_port(listen_on, new_port=find_open_port()) + listen_on = replace_port(listen_on, new_port=get_free_port()) self.listen_on, self.port = listen_on, get_port(listen_on) self.conn_handlers = [ConnectionHandler(listen_on, self.experts) for _ in range(num_connection_handlers)] diff --git a/hivemind/utils/networking.py b/hivemind/utils/networking.py index de8025abf..c637b30d9 100644 --- a/hivemind/utils/networking.py +++ b/hivemind/utils/networking.py @@ -30,7 +30,7 @@ def strip_port(endpoint: Endpoint) -> Hostname: return endpoint[: endpoint.rindex(":")] if maybe_port.isdigit() or maybe_port == "*" else endpoint -def find_open_port(params=(socket.AF_INET, socket.SOCK_STREAM), opt=(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)): +def get_free_port(params=(socket.AF_INET, socket.SOCK_STREAM), opt=(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)): """Finds a tcp port that can be occupied with a socket with *params and use *opt options""" try: with closing(socket.socket(*params)) as sock: diff --git a/tests/test_p2p_daemon.py b/tests/test_p2p_daemon.py index 6ead81c6f..dd981c12d 100644 --- a/tests/test_p2p_daemon.py +++ b/tests/test_p2p_daemon.py @@ -11,7 +11,7 @@ from hivemind.p2p import P2P, P2PDaemonError, P2PHandlerError from hivemind.proto import dht_pb2 -from hivemind.utils.networking import find_open_port +from hivemind.utils.networking import get_free_port from hivemind.utils.serializer import MSGPackSerializer @@ -38,7 +38,7 @@ async def test_daemon_killed_on_del(): async def test_startup_error_message(): with pytest.raises(P2PDaemonError, match=r"Failed to connect to bootstrap peers"): await P2P.create( - initial_peers=[f"/ip4/127.0.0.1/tcp/{find_open_port()}/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1"] + initial_peers=[f"/ip4/127.0.0.1/tcp/{get_free_port()}/p2p/QmdaK4LUeQaKhqSFPRu9N7MvXUEWDxWwtCvPrS444tCgd1"] ) with pytest.raises(P2PDaemonError, match=r"Daemon failed to start in .+ seconds"): diff --git a/tests/test_utils/p2p_daemon.py b/tests/test_utils/p2p_daemon.py index 5602115c6..57ca176ff 100644 --- a/tests/test_utils/p2p_daemon.py +++ b/tests/test_utils/p2p_daemon.py @@ -10,7 +10,7 @@ from multiaddr import Multiaddr, protocols from pkg_resources import resource_filename -from hivemind import find_open_port +from hivemind import get_free_port from hivemind.p2p.p2p_daemon_bindings.p2pclient import Client TIMEOUT_DURATION = 30 # seconds @@ -57,7 +57,7 @@ def _start_logging(self): def _run(self): cmd_list = [P2PD_PATH, f"-listen={str(self.control_maddr)}"] - cmd_list += [f"-hostAddrs=/ip4/127.0.0.1/tcp/{find_open_port()}"] + cmd_list += [f"-hostAddrs=/ip4/127.0.0.1/tcp/{get_free_port()}"] if self.enable_connmgr: cmd_list += ["-connManager=true", "-connLo=1", "-connHi=2", "-connGrace=0"] if self.enable_dht: @@ -129,8 +129,8 @@ async def make_p2pd_pair_unix(enable_control, enable_connmgr, enable_dht, enable @asynccontextmanager async def make_p2pd_pair_ip4(enable_control, enable_connmgr, enable_dht, enable_pubsub): - control_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{find_open_port()}") - listen_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{find_open_port()}") + control_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{get_free_port()}") + listen_maddr = Multiaddr(f"/ip4/127.0.0.1/tcp/{get_free_port()}") async with _make_p2pd_pair( control_maddr=control_maddr, listen_maddr=listen_maddr,