Skip to content

Commit

Permalink
p2p: Propose transport agnostic abstractions (#904)
Browse files Browse the repository at this point in the history
Describes broadly a new set of abstractions to support a transport
agnostic p2p stack. Bundled together are the types and traits in
`p2p::transport`. Other parts like the `Supervisor` will land in
follow-up change-sets.

Closes #692

Signed-off-by: Alexander Simmerl <[email protected]>
  • Loading branch information
xla authored Jun 15, 2021
1 parent f6f8720 commit 05da904
Show file tree
Hide file tree
Showing 3 changed files with 339 additions and 0 deletions.
193 changes: 193 additions & 0 deletions docs/architecture/adr-009-transport-agnostic-peer-abstraction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# ADR 009: Transport agnostic Peer abstraction

## Changelog
* 2021-02-05: drafted

## Context

With the opportunity to design and implement the peer-to-peer stack from
scratch in the context of the Tendermint implementation in Rust, a lot of the
learnings of the shortcomings of the original Go implementation can be used to
prevent certain mistakes. Namely two:

* Leakage of physical concerns into the core domain
* Flexibility to adopt different wire protocols for transport of messages

For that, the first set of newly introduced concepts will attempt to be generic
over which transport is used to connect and converse with other peers. Given
strongly tailored abstract interfaces, concrete implementations will be easy to
spin up and plug into the machinery which lifts bytes from the wire into the
core domain and transports messages into the rest of the system.

## Decision

### Transport

Wraping the design is the `Transport`. Modelled with the properties of
a physical network endpoint in mind, which can be bound and stopped. It should
strongly correspond to the acquisition and lifecycle management of network
resources on the system.

``` rust
pub trait Transport {
type Connection: Connection;
type Endpoint: Endpoint<Connection = <Self as Transport>::Connection>;
type Incoming: Iterator<Item = Result<<Self as Transport>::Connection>> + Send;

fn bind(self, bind_info: BindInfo) -> Result<(Self::Endpoint, Self::Incoming)>;
}
```

After the successful bind the caller holds an `Endpoint` as well as a stream of
incoming `Connection`s. Which is a standardised way to connect to new peers and
react to newly connected ones respectively.

``` rust
pub trait Endpoint: Send {
type Connection;

fn connect(&self, info: ConnectInfo) -> Result<Self::Connection>;
fn listen_addrs(&self) -> Vec<SocketAddr>;
}
```

Centerpiece of the whole shebang is the `Connection`. It represents a connected
peer and provides the primitives to get data and send data from a peer. It is
designed with the outlook to support stream based transports down the road.
While being open to enable feature parity with current production installations
based on tendermint-go's `MConn`.

``` rust
pub trait StreamSend {
fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
}

pub trait Connection: Send {
type Error: std::error::Error + Send + Sync + 'static;
type StreamRead: Iterator<Item = Result<Vec<u8>>> + Send;
type StreamSend: StreamSend;

fn advertised_addrs(&self) -> Vec<SocketAddr>;
fn close(&self) -> Result<()>;
fn local_addr(&self) -> SocketAddr;
fn open_bidirectional(
&self,
stream_id: StreamId,
) -> Result<(Self::StreamRead, Self::StreamSend), Self::Error>;
fn public_key(&self) -> PublicKey;
fn remote_addr(&self) -> SocketAddr;
}
```

### Peer

Given a correct implementation of a `Transport` and its `Connection` newly
established ones will be wrapped with a `Peer`. Which is in charge of setting
up the correct streams on the `Connection` and multiplex messages - incoming
and outgoing alike - efficiently. It's also an attempt to enforce
correct-by-construction constraints on the state machine of the peer. To avoid
misuse or unexpected transitions. The only way to construct is, is from an
existing connection which gives the caller a connected peer. When invoking run
on that one a fully function peer is "returned". Therefore the states look
like: `Connected -> Running -> Stopped`.

``` rust
impl<Conn> Peer<Connected<Conn>>
where
Conn: Connection,
{
pub fn run(self, stream_ids: Vec<StreamId>) -> Result<Peer<Running<Conn>>> {
// ...
}

fn stop(self) -> Result<Peer<Stopped>> {
// ...
}
}

impl<Conn> Peer<Running<Conn>>
where
Conn: Connection,
{
pub fn send(&self, message: message::Send) -> Result<()> {
// ...
}

pub fn stop(self) -> Result<Peer<Stopped>> {
// ...
}
}
```

While sending messages is done through a method on a running peer, getting hold
of incoming messages can be achieved by draining the `Receiver` part of the
running state.


### Supervisor

The `Supervisor` is the main entry point to the p2p package giving higher-level
components access to a unified stream of peer events and messages as well as
the ability to control peer lifecycle (connect, disconnect, etc.).

``` rust
pub enum Command {
Accept,
Connect(SocketAddr),
Disconnect(node::Id),
Msg(node::Id, message::Send),
}

pub enum Event {
Connected(node::Id, Direction),
Disconnected(node::Id, Report),
Message(node::Id, message::Receive),
Upgraded(node::Id),
UpgradeFailed(node::Id, Report),
}

impl Supervisor {
pub fn run<T>(transport: T) -> Result<Self>
where
T: transport::Transport + Send + 'static,
{
// ...
}

pub fn recv(&self) -> Result<Event> {
// ...
}

pub fn command(&self, cmd: Command) -> Result<()> {
// ...
}
}
```

## Status

Proposed

## Consequences

### Positive

* Unified way to bootstrap and integrate transports
* Potential for connecting different wire transports in the same process
* Rest of the domain is simply concerned with `node::Id`s as identity

### Negative

* Significant set of abstractions need to be satisfied for a new transport
implementation
* Non-stream based transports need to be fitted into this model

### Neutral

## Open Questions

## References

* [rfc: add P2P stream proposal](https://github.com/tendermint/spec/pull/227)
* [P2P Refactor](https://github.com/tendermint/tendermint/issues/2067)
* [p2p: support multiple transports](https://github.com/tendermint/tendermint/issues/5587)
1 change: 1 addition & 0 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@

pub mod error;
pub mod secret_connection;
pub mod transport;
145 changes: 145 additions & 0 deletions p2p/src/transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//! Abstractions that describe types which support the physical transport - i.e. connection
//! management - used in the p2p stack.
use std::net::{SocketAddr, ToSocketAddrs};

use eyre::Result;

use tendermint::node;
use tendermint::public_key::PublicKey;

/// Information which resources to bind to and how to identify on the network.
pub struct BindInfo<A>
where
A: ToSocketAddrs,
{
/// List of addresses to be communicated as publicly reachable to other nodes, which in turn
/// can use that to share with third parties.
///
/// TODO(xla): Dependning on where this information is going to be disseminated it might be
/// better placed in a higher-level protocol. What stands in opposition to that is the fact
/// that advertised addresses will be helpful for hole punching and other involved network
/// traversals.
pub advertise_addrs: A,
/// Local address(es) to bind to and accept connections on.
pub bind_addrs: A,
/// Public key of the peer used for identity on the network.
pub public_key: PublicKey,
}

/// Information to establish a connection to a remote peer and validate its identity.
pub struct ConnectInfo<A>
where
A: ToSocketAddrs,
{
/// Known address(es) of the peer.
pub addrs: A,
/// The expected id of the remote peer.
pub id: node::Id,
}

/// Known list of typed streams.
#[derive(Clone, Copy, Hash, Eq, PartialEq)]
pub enum StreamId {
/// Stream to exchange message concerning Peer Exchange.
Pex,
}

/// Envelope to trace the original direction of an established connection.
pub enum Direction<Conn> {
/// A peer that connected to the local node.
Incoming(Conn),
/// A remote peer the local node established a connection to.
Outgoing(Conn),
}

/// Trait that describes the send end of a stream.
pub trait StreamSend {
/// Sends the message to the peer over the open stream. `msg` should be a valid and properly
/// encoded byte array according to the supported messages of the stream.
///
/// # Errors
///
/// * If the underlying I/O operations fail.
/// * If the stream is closed.
/// * If the peer is gone
fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
}

/// Trait which describes the core concept of a connection between two peers established by
/// `[Transport]`.
pub trait Connection: Send {
/// Errors emitted by the connection.
type Error;
/// Read end of a bidirectional stream. Carries a finite stream of framed messages. Decoding is
/// left to the caller and should correspond to the type of stream.
type StreamRead: Iterator<Item = Result<Vec<u8>>> + Send;
/// Send end of a stream.
type StreamSend: StreamSend;

/// Returns the list of advertised addresses known for this connection.
fn advertised_addrs(&self) -> Vec<SocketAddr>;
/// Tears down the connection and releases all attached resources.
///
/// # Errors
///
/// * If release of attached resources failed.
fn close(&self) -> Result<()>;
/// Returns the local address for the connection.
fn local_addr(&self) -> SocketAddr;
/// Opens a new bi-bidirectional stream for the given [`StreamId`].
///
/// # Errors
///
/// * If the stream type is not supported.
/// * If the peer is gone.
/// * If resources necessary for the stream creation aren't available/accessible.
fn open_bidirectional(
&self,
stream_id: StreamId,
) -> Result<(Self::StreamRead, Self::StreamSend), Self::Error>;
/// Public key of the remote peer.
fn public_key(&self) -> PublicKey;
/// Local address(es) to the endpoint listens on.
fn remote_addr(&self) -> SocketAddr;
}

/// Local handle on a resource which allows connecting to remote peers.
pub trait Endpoint<A>: Send
where
A: ToSocketAddrs,
{
/// Core type that represents a connection between two peers established through the transport.
type Connection;

/// Establishes a new connection to a remote peer.
///
/// # Errors
///
/// * If the remote is not reachable.
/// * If resources necessary for the connection creation aren't available/accessible.
fn connect(&self, info: ConnectInfo<A>) -> Result<Self::Connection>;
/// Local address(es) the endpoint listens on.
fn listen_addrs(&self) -> Vec<SocketAddr>;
}

/// Trait that describes types which support connection management of the p2p stack.
pub trait Transport<A>
where
A: ToSocketAddrs,
{
/// Core type that represents a connection between two peers established through the transport.
type Connection: Connection;
/// Local handle on a resource which allows connecting to remote peers.
type Endpoint: Endpoint<A, Connection = <Self as Transport<A>>::Connection> + Drop;
/// Infinite stream of inbound connections.
type Incoming: Iterator<Item = Result<<Self as Transport<A>>::Connection>> + Send;

/// Consumes the transport to bind the resources in exchange for the `Endpoint` and `Incoming`
/// stream.
///
/// # Errors
///
/// * If resource allocation fails for lack of priviliges or being not available.
fn bind(self, bind_info: BindInfo<A>) -> Result<(Self::Endpoint, Self::Incoming)>;
}

0 comments on commit 05da904

Please sign in to comment.