Skip to content

Commit

Permalink
Add protocol to report external address view.
Browse files Browse the repository at this point in the history
Address part of libp2p#443.
  • Loading branch information
twittner committed Oct 17, 2018
1 parent 840663e commit cba7101
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ members = [
"protocols/identify",
"protocols/kad",
"protocols/ping",
"protocols/observed",
"transports/relay",
"protocols/secio",
"muxers/mplex",
Expand Down
7 changes: 1 addition & 6 deletions core/src/upgrade/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,5 @@ pub trait ConnectionUpgrade<C> {
///
/// Because performing the upgrade may not be instantaneous (eg. it may require a handshake),
/// this function returns a future instead of the direct output.
fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
) -> Self::Future;
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future;
}
17 changes: 17 additions & 0 deletions protocols/observed/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "libp2p-observed-address"
version = "0.1.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"

[dependencies]
bytes = "0.4"
futures = "0.1"
libp2p-core = { path = "../../core" }
tokio-codec = "0.1"
tokio-io = "0.1"
unsigned-varint = { version = "0.2.1", features = ["codec"] }

[dev-dependencies]
tokio-current-thread = "0.1"
tokio-tcp = "0.1"
159 changes: 159 additions & 0 deletions protocols/observed/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Connection upgrade to allow retrieving the externally visible address (as dialer) or
//! to report the externally visible address (as listener).
extern crate bytes;
extern crate futures;
extern crate libp2p_core;
extern crate tokio_codec;
extern crate tokio_io;
extern crate unsigned_varint;

use bytes::Bytes;
use futures::{future, prelude::*};
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr};
use std::{io, iter};
use tokio_codec::{FramedRead, FramedWrite};
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec::UviBytes;

/// The output, this connection upgrade produces.
pub enum Output<C> {
/// As `Dialer`, we get our own externally observed address.
Address(Multiaddr),
/// As `Listener`, we return a sender which allows reporting the observed
/// address the client.
Sender(Sender<C>)
}

/// `Sender` allows reporting back the observed address to the remote endpoint.
pub struct Sender<C> {
io: FramedWrite<C, UviBytes>
}

impl<C: AsyncWrite> Sender<C> {
/// Send address `a` to remote as the observed address.
pub fn send_address(self, a: &Multiaddr) -> impl Future<Item=C, Error=io::Error> {
self.io.send(Bytes::from(a.to_bytes())).map(|io| io.into_inner())
}
}

/// The connection upgrade type to retrieve or report externally visible addresses.
pub struct Observed {}

impl Observed {
pub fn new() -> Self {
Observed {}
}
}

impl<C> ConnectionUpgrade<C> for Observed
where
C: AsyncRead + AsyncWrite + Send + 'static
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
type Output = Output<C>;
type Future = Box<dyn Future<Item=Self::Output, Error=io::Error> + Send>;

fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/paritytech/observed-address/0.1.0"), ()))
}

fn upgrade(self, conn: C, _: (), role: Endpoint) -> Self::Future {
match role {
Endpoint::Dialer => {
let io = FramedRead::new(conn, UviBytes::default());
let future = io.into_future()
.map_err(|(e, _): (io::Error, FramedRead<C, UviBytes>)| e)
.and_then(move |(bytes, _)| {
if let Some(b) = bytes {
let ma = Multiaddr::from_bytes(b.to_vec())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(Output::Address(ma))
} else {
Err(io::ErrorKind::InvalidData.into())
}
});
Box::new(future)
}
Endpoint::Listener => {
let io = FramedWrite::new(conn, UviBytes::default());
Box::new(future::ok(Output::Sender(Sender { io })))
}
}
}
}

#[cfg(test)]
mod tests {
extern crate tokio_current_thread;
extern crate tokio_tcp;

use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr};
use self::tokio_current_thread::spawn;
use self::tokio_tcp::{TcpListener, TcpStream};
use super::*;

#[test]
fn observed_address() {
let server = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let server_addr = server.local_addr().unwrap();

let observed_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap();
let observed_addr2 = observed_addr1.clone();

let server = server.incoming()
.into_future()
.map_err(|(e, _)| e.into())
.and_then(move |(conn, _)| {
Observed::new().upgrade(conn.unwrap(), (), Endpoint::Listener)
})
.and_then(move |output| {
match output {
Output::Sender(s) => s.send_address(&observed_addr1),
Output::Address(_) => unreachable!()
}
});

let client = TcpStream::connect(&server_addr)
.map_err(|e| e.into())
.and_then(|conn| {
Observed::new().upgrade(conn, (), Endpoint::Dialer)
})
.map(move |output| {
match output {
Output::Address(addr) => {
eprintln!("{} {}", addr, observed_addr2);
assert_eq!(addr, observed_addr2)
}
_ => unreachable!()
}
});

tokio_current_thread::block_on_all(future::lazy(move || {
spawn(server.map_err(|e| panic!("server error: {}", e)).map(|_| ()));
client.map_err(|e| panic!("client error: {}", e))
}))
.unwrap();
}
}

0 comments on commit cba7101

Please sign in to comment.