Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delimited read/write in protocol muxer(multiselect) #248

Merged
merged 2 commits into from
Aug 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions libp2p/network/swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from libp2p.peer.peerstore_interface import IPeerStore
from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
from libp2p.protocol_muxer.multiselect_communicator import StreamCommunicator
from libp2p.routing.interfaces import IPeerRouting
from libp2p.stream_muxer.abc import IMuxedConn, IMuxedStream
from libp2p.transport.listener_interface import IListener
Expand Down Expand Up @@ -148,7 +149,7 @@ async def new_stream(

# Perform protocol muxing to determine protocol to use
selected_protocol = await self.multiselect_client.select_one_of(
list(protocol_ids), muxed_stream
list(protocol_ids), StreamCommunicator(muxed_stream)
)

# Create a net stream with the selected protocol
Expand Down Expand Up @@ -264,7 +265,9 @@ def create_generic_protocol_handler(swarm: Swarm) -> GenericProtocolHandlerFn:

async def generic_protocol_handler(muxed_stream: IMuxedStream) -> None:
# Perform protocol muxing to determine protocol to use
protocol, handler = await multiselect.negotiate(muxed_stream)
protocol, handler = await multiselect.negotiate(
StreamCommunicator(muxed_stream)
)

net_stream = NetStream(muxed_stream)
net_stream.set_protocol(protocol)
Expand Down
11 changes: 4 additions & 7 deletions libp2p/protocol_muxer/multiselect.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Dict, Tuple

from libp2p.typing import NegotiableTransport, StreamHandlerFn, TProtocol
from libp2p.typing import StreamHandlerFn, TProtocol

from .multiselect_communicator import MultiselectCommunicator
from .multiselect_communicator_interface import IMultiselectCommunicator
from .multiselect_muxer_interface import IMultiselectMuxer

Expand Down Expand Up @@ -31,24 +30,22 @@ def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None:
self.handlers[protocol] = handler

async def negotiate(
self, stream: NegotiableTransport
self, communicator: IMultiselectCommunicator
) -> Tuple[TProtocol, StreamHandlerFn]:
"""
Negotiate performs protocol selection
:param stream: stream to negotiate on
:return: selected protocol name, handler function
:raise Exception: negotiation failed exception
"""
# Create a communicator to handle all communication across the stream
communicator = MultiselectCommunicator(stream)

# Perform handshake to ensure multiselect protocol IDs match
await self.handshake(communicator)

# Read and respond to commands until a valid protocol ID is sent
while True:
# Read message
command = await communicator.read_stream_until_eof()
command = await communicator.read()

# Command is ls or a protocol
if command == "ls":
Expand Down Expand Up @@ -78,7 +75,7 @@ async def handshake(self, communicator: IMultiselectCommunicator) -> None:
await communicator.write(MULTISELECT_PROTOCOL_ID)

# Read in the protocol ID from other party
handshake_contents = await communicator.read_stream_until_eof()
handshake_contents = await communicator.read()

# Confirm that the protocols are the same
if not validate_handshake(handshake_contents):
Expand Down
19 changes: 5 additions & 14 deletions libp2p/protocol_muxer/multiselect_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from typing import Sequence

from libp2p.stream_muxer.abc import IMuxedStream
from libp2p.typing import NegotiableTransport, TProtocol
from libp2p.typing import TProtocol

from .multiselect_client_interface import IMultiselectClient
from .multiselect_communicator import MultiselectCommunicator
from .multiselect_communicator_interface import IMultiselectCommunicator

MULTISELECT_PROTOCOL_ID = "/multistream/1.0.0"
Expand All @@ -31,7 +29,7 @@ async def handshake(self, communicator: IMultiselectCommunicator) -> None:
await communicator.write(MULTISELECT_PROTOCOL_ID)

# Read in the protocol ID from other party
handshake_contents = await communicator.read_stream_until_eof()
handshake_contents = await communicator.read()

# Confirm that the protocols are the same
if not validate_handshake(handshake_contents):
Expand All @@ -40,7 +38,7 @@ async def handshake(self, communicator: IMultiselectCommunicator) -> None:
# Handshake succeeded if this point is reached

async def select_protocol_or_fail(
self, protocol: TProtocol, stream: IMuxedStream
self, protocol: TProtocol, communicator: IMultiselectCommunicator
) -> TProtocol:
"""
Send message to multiselect selecting protocol
Expand All @@ -49,9 +47,6 @@ async def select_protocol_or_fail(
:param stream: stream to communicate with multiselect over
:return: selected protocol
"""
# Create a communicator to handle all communication across the stream
communicator = MultiselectCommunicator(stream)

# Perform handshake to ensure multiselect protocol IDs match
await self.handshake(communicator)

Expand All @@ -61,7 +56,7 @@ async def select_protocol_or_fail(
return selected_protocol

async def select_one_of(
self, protocols: Sequence[TProtocol], stream: NegotiableTransport
self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator
) -> TProtocol:
"""
For each protocol, send message to multiselect selecting protocol
Expand All @@ -71,10 +66,6 @@ async def select_one_of(
:param stream: stream to communicate with multiselect over
:return: selected protocol
"""

# Create a communicator to handle all communication across the stream
communicator = MultiselectCommunicator(stream)

# Perform handshake to ensure multiselect protocol IDs match
await self.handshake(communicator)

Expand Down Expand Up @@ -105,7 +96,7 @@ async def try_select(
await communicator.write(protocol)

# Get what counterparty says in response
response = await communicator.read_stream_until_eof()
response = await communicator.read()

# Return protocol if response is equal to protocol or raise error
if response == protocol:
Expand Down
8 changes: 5 additions & 3 deletions libp2p/protocol_muxer/multiselect_client_interface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from abc import ABC, abstractmethod
from typing import Sequence

from libp2p.stream_muxer.abc import IMuxedStream
from libp2p.protocol_muxer.multiselect_communicator_interface import (
IMultiselectCommunicator,
)
from libp2p.typing import TProtocol


Expand All @@ -13,7 +15,7 @@ class IMultiselectClient(ABC):

@abstractmethod
async def select_protocol_or_fail(
self, protocol: TProtocol, stream: IMuxedStream
self, protocol: TProtocol, communicator: IMultiselectCommunicator
) -> TProtocol:
"""
Send message to multiselect selecting protocol
Expand All @@ -25,7 +27,7 @@ async def select_protocol_or_fail(

@abstractmethod
async def select_one_of(
self, protocols: Sequence[TProtocol], stream: IMuxedStream
self, protocols: Sequence[TProtocol], communicator: IMultiselectCommunicator
) -> TProtocol:
"""
For each protocol, send message to multiselect selecting protocol
Expand Down
66 changes: 39 additions & 27 deletions libp2p/protocol_muxer/multiselect_communicator.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,47 @@
from libp2p.typing import NegotiableTransport
from libp2p.network.connection.raw_connection_interface import IRawConnection
from libp2p.stream_muxer.abc import IMuxedStream
from libp2p.stream_muxer.mplex.utils import decode_uvarint_from_stream, encode_uvarint
from libp2p.typing import StreamReader

from .multiselect_communicator_interface import IMultiselectCommunicator


class MultiselectCommunicator(IMultiselectCommunicator):
"""
Communicator helper class that ensures both the client
and multistream module will follow the same multistream protocol,
which is necessary for them to work
"""
def delim_encode(msg_str: str) -> bytes:
msg_bytes = msg_str.encode()
varint_len_msg = encode_uvarint(len(msg_bytes) + 1)
return varint_len_msg + msg_bytes + b"\n"

reader_writer: NegotiableTransport

def __init__(self, reader_writer: NegotiableTransport) -> None:
"""
MultistreamCommunicator expects a reader_writer object that has
an async read and an async write function (this could be a stream,
raw connection, or other object implementing those functions)
"""
self.reader_writer = reader_writer
async def delim_read(reader: StreamReader, timeout: int = 10) -> str:
len_msg = await decode_uvarint_from_stream(reader, timeout)
msg_bytes = await reader.read(len_msg)
return msg_bytes.decode().rstrip()


class RawConnectionCommunicator(IMultiselectCommunicator):
conn: IRawConnection

def __init__(self, conn: IRawConnection) -> None:
self.conn = conn

async def write(self, msg_str: str) -> None:
msg_bytes = delim_encode(msg_str)
self.conn.writer.write(msg_bytes)
await self.conn.writer.drain()

async def read(self) -> str:
return await delim_read(self.conn.reader)


class StreamCommunicator(IMultiselectCommunicator):
stream: IMuxedStream

def __init__(self, stream: IMuxedStream) -> None:
self.stream = stream

async def write(self, msg_str: str) -> None:
"""
Write message to reader_writer
:param msg_str: message to write
"""
await self.reader_writer.write(msg_str.encode())

async def read_stream_until_eof(self) -> str:
"""
Reads message from reader_writer until EOF
"""
read_str = (await self.reader_writer.read()).decode()
return read_str
msg_bytes = delim_encode(msg_str)
await self.stream.write(msg_bytes)

async def read(self) -> str:
return await delim_read(self.stream)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def write(self, msg_str: str) -> None:
"""

@abstractmethod
async def read_stream_until_eof(self) -> str:
async def read(self) -> str:
"""
Reads message from stream until EOF
"""
6 changes: 4 additions & 2 deletions libp2p/protocol_muxer/multiselect_muxer_interface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from abc import ABC, abstractmethod
from typing import Dict, Tuple

from libp2p.typing import NegotiableTransport, StreamHandlerFn, TProtocol
from libp2p.typing import StreamHandlerFn, TProtocol

from .multiselect_communicator_interface import IMultiselectCommunicator


class IMultiselectMuxer(ABC):
Expand All @@ -23,7 +25,7 @@ def add_handler(self, protocol: TProtocol, handler: StreamHandlerFn) -> None:

@abstractmethod
async def negotiate(
self, stream: NegotiableTransport
self, communicator: IMultiselectCommunicator
) -> Tuple[TProtocol, StreamHandlerFn]:
"""
Negotiate performs protocol selection
Expand Down
2 changes: 2 additions & 0 deletions libp2p/security/secure_conn_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@


class AbstractSecureConn(ABC):
conn: IRawConnection

@abstractmethod
def get_local_peer(self) -> ID:
pass
Expand Down
8 changes: 5 additions & 3 deletions libp2p/security/security_multistream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from libp2p.peer.id import ID
from libp2p.protocol_muxer.multiselect import Multiselect
from libp2p.protocol_muxer.multiselect_client import MultiselectClient
from libp2p.protocol_muxer.multiselect_communicator import RawConnectionCommunicator
from libp2p.security.secure_conn_interface import ISecureConn
from libp2p.security.secure_transport_interface import ISecureTransport
from libp2p.typing import TProtocol
Expand Down Expand Up @@ -74,14 +75,15 @@ async def select_transport(
# instead of stream? In go repo, they pass in a raw conn
# (https://raw.githubusercontent.com/libp2p/go-conn-security-multistream/master/ssms.go)

protocol = None
protocol: TProtocol
communicator = RawConnectionCommunicator(conn)
if initiator:
# Select protocol if initiator
protocol = await self.multiselect_client.select_one_of(
list(self.transports.keys()), conn
list(self.transports.keys()), communicator
)
else:
# Select protocol if non-initiator
protocol, _ = await self.multiselect.negotiate(conn)
protocol, _ = await self.multiselect.negotiate(communicator)
# Return transport from protocol
return self.transports[protocol]
6 changes: 3 additions & 3 deletions libp2p/stream_muxer/mplex/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import struct
from typing import Tuple

from libp2p.typing import StreamReader


def encode_uvarint(number: int) -> bytes:
"""Pack `number` into varint bytes"""
Expand Down Expand Up @@ -31,9 +33,7 @@ def decode_uvarint(buff: bytes, index: int) -> Tuple[int, int]:
return result, index + 1


async def decode_uvarint_from_stream(
reader: asyncio.StreamReader, timeout: float
) -> int:
async def decode_uvarint_from_stream(reader: StreamReader, timeout: float) -> int:
shift = 0
result = 0
while True:
Expand Down
5 changes: 2 additions & 3 deletions libp2p/typing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
from typing import TYPE_CHECKING, Awaitable, Callable, NewType, Union

from libp2p.network.connection.raw_connection_interface import IRawConnection

if TYPE_CHECKING:
from libp2p.network.stream.net_stream_interface import INetStream # noqa: F401
from libp2p.stream_muxer.abc import IMuxedStream # noqa: F401
Expand All @@ -10,4 +9,4 @@
StreamHandlerFn = Callable[["INetStream"], Awaitable[None]]


NegotiableTransport = Union["IMuxedStream", IRawConnection]
StreamReader = Union["IMuxedStream", asyncio.StreamReader]