Skip to content

Commit

Permalink
call_binary_stream_handler: Retry on ControlError
Browse files Browse the repository at this point in the history
  • Loading branch information
borzunov committed Jul 21, 2021
1 parent c524f96 commit b1a43a5
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions hivemind/p2p/p2p_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import hivemind.hivemind_cli as cli
import hivemind.p2p.p2p_daemon_bindings.p2pclient as p2pclient
from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
from hivemind.p2p.p2p_daemon_bindings.utils import ControlFailure
from hivemind.proto.p2pd_pb2 import RPCError
from hivemind.utils.asyncio import aiter
from hivemind.utils.logging import get_logger
Expand Down Expand Up @@ -469,9 +470,19 @@ async def add_binary_stream_handler(self, name: str, handler: p2pclient.StreamHa
await self._client.stream_handler(name, handler)

async def call_binary_stream_handler(
self, peer_id: PeerID, handler_name: str
self, peer_id: PeerID, handler_name: str, n_attempts: int = 5, delay: float = 0.5
) -> Tuple[StreamInfo, asyncio.StreamReader, asyncio.StreamWriter]:
return await self._client.stream_open(peer_id, (handler_name,))
for try_number in range(n_attempts):
try:
return await self._client.stream_open(peer_id, (handler_name,))
except ControlFailure:
if try_number == n_attempts - 1:
logger.exception(f"Failed to open stream to {peer_id} for handle `{handler_name}`. "
f"Made {n_attempts} attempts, giving up:")
raise
logger.warning(f"Failed to open stream to {peer_id} for handle `{handler_name}`:")

await asyncio.sleep(delay * (2 ** try_number))

def __del__(self):
self._terminate()
Expand Down

0 comments on commit b1a43a5

Please sign in to comment.