-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize unary handlers with persistent connections to P2P daemon #328
Changes from all commits
3f7c2e3
779bcee
914fa51
013938f
03ad71a
a476e3c
72ad4cc
9ba6a51
044b232
632c20c
864cb3e
e87d862
14ed9d5
81d115a
a0c534f
6a61895
cbcbae7
010b466
0d1b02b
fca7d69
86c10dc
56d0efd
c92e633
b49b9bd
71ca067
56ade26
463fba9
0358a75
e08177b
9c59a4d
70c61c4
58887d2
86d01c8
b1a2ca2
38a182c
0e3a4b1
cb07a45
d315709
8322485
f271c76
1925723
1feb884
a39a157
96c59b9
b058e6e
90592e0
e6e6dde
8d4f4a4
bca5b9f
56cfb4e
2292d62
97c6c17
619931d
7d8eb40
72d3705
f7d43f7
ab5f3ec
1e96285
105c104
635d1fe
5a84ba7
ff55d4a
8ceb45c
595362c
e02b762
cfde3f5
55fcefb
3b5619a
eca939d
a2fe880
e6db7f0
ae40827
3c6ec13
527f0ad
b01bac7
26ee55e
62294cc
36cc66c
5f1a65f
209d3e1
2758410
d8e0ae1
995ce49
73d52ba
9676cf4
602ffdc
bbc7ee8
938aeaa
2e53b47
eacf5d7
6b6c38a
18e8c38
d044882
85b9e0a
1ded110
4336578
f4ce4c3
3855df6
f730559
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ coverage.xml | |
.project | ||
.pydevproject | ||
.idea | ||
.vscode | ||
.ipynb_checkpoints | ||
|
||
# Rope | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -5,12 +5,14 @@ | |||||
from contextlib import closing, suppress | ||||||
from dataclasses import dataclass | ||||||
from importlib.resources import path | ||||||
from typing import Any, AsyncIterator, Awaitable, Callable, List, Optional, Sequence, Tuple, TypeVar, Union | ||||||
from typing import Any, AsyncIterator, Awaitable, Callable, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union | ||||||
|
||||||
from google.protobuf.message import Message | ||||||
from multiaddr import Multiaddr | ||||||
|
||||||
import hivemind.hivemind_cli as cli | ||||||
import hivemind.p2p.p2p_daemon_bindings.p2pclient as p2pclient | ||||||
from hivemind.p2p.p2p_daemon_bindings.control import P2PDaemonError, P2PHandlerError | ||||||
from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo | ||||||
from hivemind.proto.p2pd_pb2 import RPCError | ||||||
from hivemind.utils.asyncio import aiter, asingle | ||||||
|
@@ -27,7 +29,6 @@ class P2PContext(object): | |||||
handle_name: str | ||||||
local_id: PeerID | ||||||
remote_id: PeerID = None | ||||||
remote_maddr: Multiaddr = None | ||||||
|
||||||
|
||||||
class P2P: | ||||||
|
@@ -65,6 +66,7 @@ class P2P: | |||||
|
||||||
def __init__(self): | ||||||
self.peer_id = None | ||||||
self._client = None | ||||||
self._child = None | ||||||
self._alive = False | ||||||
self._reader_task = None | ||||||
|
@@ -90,6 +92,7 @@ async def create( | |||||
use_auto_relay: bool = False, | ||||||
relay_hop_limit: int = 0, | ||||||
startup_timeout: float = 15, | ||||||
idle_timeout: float = 30, | ||||||
) -> "P2P": | ||||||
""" | ||||||
Start a new p2pd process and connect to it. | ||||||
|
@@ -111,6 +114,8 @@ async def create( | |||||
:param use_auto_relay: enables autorelay | ||||||
:param relay_hop_limit: sets the hop limit for hop relays | ||||||
:param startup_timeout: raise a P2PDaemonError if the daemon does not start in ``startup_timeout`` seconds | ||||||
:param idle_timeout: kill daemon if client has been idle for a given number of | ||||||
seconds before opening persistent streams | ||||||
:return: a wrapper for the p2p daemon | ||||||
""" | ||||||
|
||||||
|
@@ -150,6 +155,7 @@ async def create( | |||||
relayDiscovery=use_relay_discovery, | ||||||
autoRelay=use_auto_relay, | ||||||
relayHopLimit=relay_hop_limit, | ||||||
idleTimeout=f"{idle_timeout}s", | ||||||
b=need_bootstrap, | ||||||
**process_kwargs, | ||||||
) | ||||||
|
@@ -167,7 +173,7 @@ async def create( | |||||
await self.shutdown() | ||||||
raise P2PDaemonError(f"Daemon failed to start in {startup_timeout:.1f} seconds") | ||||||
|
||||||
self._client = p2pclient.Client(self._daemon_listen_maddr, self._client_listen_maddr) | ||||||
self._client = await p2pclient.Client.create(self._daemon_listen_maddr, self._client_listen_maddr) | ||||||
await self._ping_daemon() | ||||||
return self | ||||||
|
||||||
|
@@ -189,7 +195,7 @@ async def replicate(cls, daemon_listen_maddr: Multiaddr) -> "P2P": | |||||
self._daemon_listen_maddr = daemon_listen_maddr | ||||||
self._client_listen_maddr = Multiaddr(cls._UNIX_SOCKET_PREFIX + f"p2pclient-{socket_uid}.sock") | ||||||
|
||||||
self._client = p2pclient.Client(self._daemon_listen_maddr, self._client_listen_maddr) | ||||||
self._client = await p2pclient.Client.create(self._daemon_listen_maddr, self._client_listen_maddr) | ||||||
|
||||||
await self._ping_daemon() | ||||||
return self | ||||||
|
@@ -258,7 +264,7 @@ async def send_protobuf(protobuf: Union[TOutputProtobuf, RPCError], writer: asyn | |||||
|
||||||
@staticmethod | ||||||
async def receive_protobuf( | ||||||
input_protobuf_type: type, reader: asyncio.StreamReader | ||||||
input_protobuf_type: Type[Message], reader: asyncio.StreamReader | ||||||
) -> Tuple[Optional[TInputProtobuf], Optional[RPCError]]: | ||||||
msg_type = await reader.readexactly(1) | ||||||
if msg_type == P2P.MESSAGE_MARKER: | ||||||
|
@@ -279,7 +285,7 @@ async def _add_protobuf_stream_handler( | |||||
self, | ||||||
name: str, | ||||||
handler: Callable[[TInputStream, P2PContext], TOutputStream], | ||||||
input_protobuf_type: type, | ||||||
input_protobuf_type: Type[Message], | ||||||
max_prefetch: int = 5, | ||||||
) -> None: | ||||||
""" | ||||||
|
@@ -297,7 +303,6 @@ async def _handle_stream( | |||||
handle_name=name, | ||||||
local_id=self.peer_id, | ||||||
remote_id=stream_info.peer_id, | ||||||
remote_maddr=stream_info.addr, | ||||||
) | ||||||
requests = asyncio.Queue(max_prefetch) | ||||||
|
||||||
|
@@ -349,7 +354,7 @@ async def _process_stream() -> None: | |||||
await self.add_binary_stream_handler(name, _handle_stream) | ||||||
|
||||||
async def _iterate_protobuf_stream_handler( | ||||||
self, peer_id: PeerID, name: str, requests: TInputStream, output_protobuf_type: type | ||||||
self, peer_id: PeerID, name: str, requests: TInputStream, output_protobuf_type: Type[Message] | ||||||
) -> TOutputStream: | ||||||
_, reader, writer = await self.call_binary_stream_handler(peer_id, name) | ||||||
|
||||||
|
@@ -381,15 +386,22 @@ async def add_protobuf_handler( | |||||
handler: Callable[ | ||||||
[Union[TInputProtobuf, TInputStream], P2PContext], Union[Awaitable[TOutputProtobuf], TOutputStream] | ||||||
], | ||||||
input_protobuf_type: type, | ||||||
input_protobuf_type: Type[Message], | ||||||
*, | ||||||
stream_input: bool = False, | ||||||
stream_output: bool = False, | ||||||
) -> None: | ||||||
""" | ||||||
:param stream_input: If True, assume ``handler`` to take ``TInputStream`` | ||||||
(not just ``TInputProtobuf``) as input. | ||||||
:param stream_output: If True, assume ``handler`` to return ``TOutputStream`` | ||||||
dvmazur marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
(not ``Awaitable[TOutputProtobuf]``). | ||||||
""" | ||||||
|
||||||
if not stream_input and not stream_output: | ||||||
await self._add_protobuf_unary_handler(name, handler, input_protobuf_type) | ||||||
mryab marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
return | ||||||
dvmazur marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
async def _stream_handler(requests: P2P.TInputStream, context: P2PContext) -> P2P.TOutputStream: | ||||||
input = requests if stream_input else await asingle(requests) | ||||||
output = handler(input, context) | ||||||
|
@@ -402,23 +414,65 @@ async def _stream_handler(requests: P2P.TInputStream, context: P2PContext) -> P2 | |||||
|
||||||
await self._add_protobuf_stream_handler(name, _stream_handler, input_protobuf_type) | ||||||
|
||||||
async def _add_protobuf_unary_handler( | ||||||
self, | ||||||
handle_name: str, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, we both use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, if possible, let's make a note of it somewhere (as an issue or as a TODO in the top message of this PR) |
||||||
handler: Callable[[TInputProtobuf, P2PContext], Awaitable[TOutputProtobuf]], | ||||||
input_protobuf_type: Type[Message], | ||||||
) -> None: | ||||||
""" | ||||||
Register a request-response (unary) handler. Unary requests and responses | ||||||
are sent through persistent multiplexed connections to the daemon for the | ||||||
sake of reducing the number of open files. | ||||||
:param handle_name: name of the handler (protocol id) | ||||||
:param handler: function handling the unary requests | ||||||
:param input_protobuf_type: protobuf type of the request | ||||||
""" | ||||||
|
||||||
async def _unary_handler(request: bytes, remote_id: PeerID) -> bytes: | ||||||
input_serialized = input_protobuf_type.FromString(request) | ||||||
context = P2PContext( | ||||||
handle_name=handle_name, | ||||||
local_id=self.peer_id, | ||||||
remote_id=remote_id, | ||||||
) | ||||||
|
||||||
response = await handler(input_serialized, context) | ||||||
return response.SerializeToString() | ||||||
|
||||||
await self._client.add_unary_handler(handle_name, _unary_handler) | ||||||
|
||||||
async def call_protobuf_handler( | ||||||
self, | ||||||
peer_id: PeerID, | ||||||
name: str, | ||||||
input: Union[TInputProtobuf, TInputStream], | ||||||
output_protobuf_type: type, | ||||||
output_protobuf_type: Type[Message], | ||||||
) -> Awaitable[TOutputProtobuf]: | ||||||
requests = input if isinstance(input, AsyncIterableABC) else aiter(input) | ||||||
responses = self._iterate_protobuf_stream_handler(peer_id, name, requests, output_protobuf_type) | ||||||
|
||||||
if not isinstance(input, AsyncIterableABC): | ||||||
return await self._call_unary_protobuf_handler(peer_id, name, input, output_protobuf_type) | ||||||
|
||||||
responses = self._iterate_protobuf_stream_handler(peer_id, name, input, output_protobuf_type) | ||||||
return await asingle(responses) | ||||||
|
||||||
async def _call_unary_protobuf_handler( | ||||||
self, | ||||||
peer_id: PeerID, | ||||||
handle_name: str, | ||||||
input: TInputProtobuf, | ||||||
output_protobuf_type: Type[Message], | ||||||
) -> Awaitable[TOutputProtobuf]: | ||||||
serialized_input = input.SerializeToString() | ||||||
response = await self._client.call_unary_handler(peer_id, handle_name, serialized_input) | ||||||
return output_protobuf_type.FromString(response) | ||||||
|
||||||
def iterate_protobuf_handler( | ||||||
self, | ||||||
peer_id: PeerID, | ||||||
name: str, | ||||||
input: Union[TInputProtobuf, TInputStream], | ||||||
output_protobuf_type: type, | ||||||
output_protobuf_type: Type[Message], | ||||||
) -> TOutputStream: | ||||||
requests = input if isinstance(input, AsyncIterableABC) else aiter(input) | ||||||
return self._iterate_protobuf_stream_handler(peer_id, name, requests, output_protobuf_type) | ||||||
|
@@ -453,6 +507,8 @@ async def shutdown(self) -> None: | |||||
await self._child.wait() | ||||||
|
||||||
def _terminate(self) -> None: | ||||||
if self._client is not None: | ||||||
self._client.close() | ||||||
if self._listen_task is not None: | ||||||
self._listen_task.cancel() | ||||||
if self._reader_task is not None: | ||||||
|
@@ -501,11 +557,3 @@ async def _read_outputs(self, ready: asyncio.Future) -> None: | |||||
|
||||||
if not ready.done(): | ||||||
ready.set_exception(P2PDaemonError(f"Daemon failed to start: {last_line}")) | ||||||
|
||||||
|
||||||
class P2PDaemonError(RuntimeError): | ||||||
pass | ||||||
|
||||||
|
||||||
class P2PHandlerError(Exception): | ||||||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To future reviewers: I and @deniskamazur have agreed to remove this because
remote_maddr
simplifies the unary handler implementation