From 05da904806d0523e7b26e437da841bb56123ec38 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Tue, 15 Jun 2021 13:29:27 +0200 Subject: [PATCH] p2p: Propose transport agnostic abstractions (#904) Describes broadly a new set of abstractions to support a transport agnostic p2p stack. Bundled together are the types and traits in `p2p::transport`. Other parts like the `Supervisor` will land in follow-up change-sets. Closes #692 Signed-off-by: Alexander Simmerl --- ...009-transport-agnostic-peer-abstraction.md | 193 ++++++++++++++++++ p2p/src/lib.rs | 1 + p2p/src/transport.rs | 145 +++++++++++++ 3 files changed, 339 insertions(+) create mode 100644 docs/architecture/adr-009-transport-agnostic-peer-abstraction.md create mode 100644 p2p/src/transport.rs diff --git a/docs/architecture/adr-009-transport-agnostic-peer-abstraction.md b/docs/architecture/adr-009-transport-agnostic-peer-abstraction.md new file mode 100644 index 000000000..792b22061 --- /dev/null +++ b/docs/architecture/adr-009-transport-agnostic-peer-abstraction.md @@ -0,0 +1,193 @@ +# ADR 009: Transport agnostic Peer abstraction + +## Changelog +* 2021-02-05: drafted + +## Context + +With the opportunity to design and implement the peer-to-peer stack from +scratch in the context of the Tendermint implementation in Rust, a lot of the +learnings of the shortcomings of the original Go implementation can be used to +prevent certain mistakes. Namely two: + +* Leakage of physical concerns into the core domain +* Flexibility to adopt different wire protocols for transport of messages + +For that, the first set of newly introduced concepts will attempt to be generic +over which transport is used to connect and converse with other peers. Given +strongly tailored abstract interfaces, concrete implementations will be easy to +spin up and plug into the machinery which lifts bytes from the wire into the +core domain and transports messages into the rest of the system. + +## Decision + +### Transport + +Wraping the design is the `Transport`. Modelled with the properties of +a physical network endpoint in mind, which can be bound and stopped. It should +strongly correspond to the acquisition and lifecycle management of network +resources on the system. + +``` rust +pub trait Transport { + type Connection: Connection; + type Endpoint: Endpoint::Connection>; + type Incoming: Iterator::Connection>> + Send; + + fn bind(self, bind_info: BindInfo) -> Result<(Self::Endpoint, Self::Incoming)>; +} +``` + +After the successful bind the caller holds an `Endpoint` as well as a stream of +incoming `Connection`s. Which is a standardised way to connect to new peers and +react to newly connected ones respectively. + +``` rust +pub trait Endpoint: Send { + type Connection; + + fn connect(&self, info: ConnectInfo) -> Result; + fn listen_addrs(&self) -> Vec; +} +``` + +Centerpiece of the whole shebang is the `Connection`. It represents a connected +peer and provides the primitives to get data and send data from a peer. It is +designed with the outlook to support stream based transports down the road. +While being open to enable feature parity with current production installations +based on tendermint-go's `MConn`. + +``` rust +pub trait StreamSend { + fn send>(msg: B) -> Result<()>; +} + +pub trait Connection: Send { + type Error: std::error::Error + Send + Sync + 'static; + type StreamRead: Iterator>> + Send; + type StreamSend: StreamSend; + + fn advertised_addrs(&self) -> Vec; + fn close(&self) -> Result<()>; + fn local_addr(&self) -> SocketAddr; + fn open_bidirectional( + &self, + stream_id: StreamId, + ) -> Result<(Self::StreamRead, Self::StreamSend), Self::Error>; + fn public_key(&self) -> PublicKey; + fn remote_addr(&self) -> SocketAddr; +} +``` + +### Peer + +Given a correct implementation of a `Transport` and its `Connection` newly +established ones will be wrapped with a `Peer`. Which is in charge of setting +up the correct streams on the `Connection` and multiplex messages - incoming +and outgoing alike - efficiently. It's also an attempt to enforce +correct-by-construction constraints on the state machine of the peer. To avoid +misuse or unexpected transitions. The only way to construct is, is from an +existing connection which gives the caller a connected peer. When invoking run +on that one a fully function peer is "returned". Therefore the states look +like: `Connected -> Running -> Stopped`. + +``` rust +impl Peer> +where + Conn: Connection, +{ + pub fn run(self, stream_ids: Vec) -> Result>> { + // ... + } + + fn stop(self) -> Result> { + // ... + } +} + +impl Peer> +where + Conn: Connection, +{ + pub fn send(&self, message: message::Send) -> Result<()> { + // ... + } + + pub fn stop(self) -> Result> { + // ... + } +} +``` + +While sending messages is done through a method on a running peer, getting hold +of incoming messages can be achieved by draining the `Receiver` part of the +running state. + + +### Supervisor + +The `Supervisor` is the main entry point to the p2p package giving higher-level +components access to a unified stream of peer events and messages as well as +the ability to control peer lifecycle (connect, disconnect, etc.). + +``` rust +pub enum Command { + Accept, + Connect(SocketAddr), + Disconnect(node::Id), + Msg(node::Id, message::Send), +} + +pub enum Event { + Connected(node::Id, Direction), + Disconnected(node::Id, Report), + Message(node::Id, message::Receive), + Upgraded(node::Id), + UpgradeFailed(node::Id, Report), +} + +impl Supervisor { + pub fn run(transport: T) -> Result + where + T: transport::Transport + Send + 'static, + { + // ... + } + + pub fn recv(&self) -> Result { + // ... + } + + pub fn command(&self, cmd: Command) -> Result<()> { + // ... + } +} +``` + +## Status + +Proposed + +## Consequences + +### Positive + +* Unified way to bootstrap and integrate transports +* Potential for connecting different wire transports in the same process +* Rest of the domain is simply concerned with `node::Id`s as identity + +### Negative + +* Significant set of abstractions need to be satisfied for a new transport + implementation +* Non-stream based transports need to be fitted into this model + +### Neutral + +## Open Questions + +## References + +* [rfc: add P2P stream proposal](https://github.com/tendermint/spec/pull/227) +* [P2P Refactor](https://github.com/tendermint/tendermint/issues/2067) +* [p2p: support multiple transports](https://github.com/tendermint/tendermint/issues/5587) diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index ef95cbccc..2b1fd8426 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -27,3 +27,4 @@ pub mod error; pub mod secret_connection; +pub mod transport; diff --git a/p2p/src/transport.rs b/p2p/src/transport.rs new file mode 100644 index 000000000..9f731ae9b --- /dev/null +++ b/p2p/src/transport.rs @@ -0,0 +1,145 @@ +//! Abstractions that describe types which support the physical transport - i.e. connection +//! management - used in the p2p stack. + +use std::net::{SocketAddr, ToSocketAddrs}; + +use eyre::Result; + +use tendermint::node; +use tendermint::public_key::PublicKey; + +/// Information which resources to bind to and how to identify on the network. +pub struct BindInfo +where + A: ToSocketAddrs, +{ + /// List of addresses to be communicated as publicly reachable to other nodes, which in turn + /// can use that to share with third parties. + /// + /// TODO(xla): Dependning on where this information is going to be disseminated it might be + /// better placed in a higher-level protocol. What stands in opposition to that is the fact + /// that advertised addresses will be helpful for hole punching and other involved network + /// traversals. + pub advertise_addrs: A, + /// Local address(es) to bind to and accept connections on. + pub bind_addrs: A, + /// Public key of the peer used for identity on the network. + pub public_key: PublicKey, +} + +/// Information to establish a connection to a remote peer and validate its identity. +pub struct ConnectInfo +where + A: ToSocketAddrs, +{ + /// Known address(es) of the peer. + pub addrs: A, + /// The expected id of the remote peer. + pub id: node::Id, +} + +/// Known list of typed streams. +#[derive(Clone, Copy, Hash, Eq, PartialEq)] +pub enum StreamId { + /// Stream to exchange message concerning Peer Exchange. + Pex, +} + +/// Envelope to trace the original direction of an established connection. +pub enum Direction { + /// A peer that connected to the local node. + Incoming(Conn), + /// A remote peer the local node established a connection to. + Outgoing(Conn), +} + +/// Trait that describes the send end of a stream. +pub trait StreamSend { + /// Sends the message to the peer over the open stream. `msg` should be a valid and properly + /// encoded byte array according to the supported messages of the stream. + /// + /// # Errors + /// + /// * If the underlying I/O operations fail. + /// * If the stream is closed. + /// * If the peer is gone + fn send>(msg: B) -> Result<()>; +} + +/// Trait which describes the core concept of a connection between two peers established by +/// `[Transport]`. +pub trait Connection: Send { + /// Errors emitted by the connection. + type Error; + /// Read end of a bidirectional stream. Carries a finite stream of framed messages. Decoding is + /// left to the caller and should correspond to the type of stream. + type StreamRead: Iterator>> + Send; + /// Send end of a stream. + type StreamSend: StreamSend; + + /// Returns the list of advertised addresses known for this connection. + fn advertised_addrs(&self) -> Vec; + /// Tears down the connection and releases all attached resources. + /// + /// # Errors + /// + /// * If release of attached resources failed. + fn close(&self) -> Result<()>; + /// Returns the local address for the connection. + fn local_addr(&self) -> SocketAddr; + /// Opens a new bi-bidirectional stream for the given [`StreamId`]. + /// + /// # Errors + /// + /// * If the stream type is not supported. + /// * If the peer is gone. + /// * If resources necessary for the stream creation aren't available/accessible. + fn open_bidirectional( + &self, + stream_id: StreamId, + ) -> Result<(Self::StreamRead, Self::StreamSend), Self::Error>; + /// Public key of the remote peer. + fn public_key(&self) -> PublicKey; + /// Local address(es) to the endpoint listens on. + fn remote_addr(&self) -> SocketAddr; +} + +/// Local handle on a resource which allows connecting to remote peers. +pub trait Endpoint: Send +where + A: ToSocketAddrs, +{ + /// Core type that represents a connection between two peers established through the transport. + type Connection; + + /// Establishes a new connection to a remote peer. + /// + /// # Errors + /// + /// * If the remote is not reachable. + /// * If resources necessary for the connection creation aren't available/accessible. + fn connect(&self, info: ConnectInfo) -> Result; + /// Local address(es) the endpoint listens on. + fn listen_addrs(&self) -> Vec; +} + +/// Trait that describes types which support connection management of the p2p stack. +pub trait Transport +where + A: ToSocketAddrs, +{ + /// Core type that represents a connection between two peers established through the transport. + type Connection: Connection; + /// Local handle on a resource which allows connecting to remote peers. + type Endpoint: Endpoint>::Connection> + Drop; + /// Infinite stream of inbound connections. + type Incoming: Iterator>::Connection>> + Send; + + /// Consumes the transport to bind the resources in exchange for the `Endpoint` and `Incoming` + /// stream. + /// + /// # Errors + /// + /// * If resource allocation fails for lack of priviliges or being not available. + fn bind(self, bind_info: BindInfo) -> Result<(Self::Endpoint, Self::Incoming)>; +}