Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests for nodes/listeners.rs #541

Merged
merged 20 commits into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
target
Cargo.lock
.idea/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The content of gitignore should be specific to the project => https://stackoverflow.com/questions/7335420/global-git-ignore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was a mistake. Reverting.

CMakeLists.txt
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Right now everything including the crate organization is very much Work in Progr

## Documentation

This repository includes a facade crate named `libp2p`, which reexports the rest of the repository.
This repository includes a façade crate named `libp2p`, which reexports the rest of the repository.

For documentation, you are encouraged to clone this repository or add `libp2p` as a dependency in
your Cargo.toml and run `cargo doc`.
Expand Down
16 changes: 8 additions & 8 deletions core/src/nodes/handled_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ pub trait NodeHandler<TSubstream> {
/// The handler is responsible for upgrading the substream to whatever protocol it wants.
fn inject_substream(&mut self, substream: TSubstream, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>);

/// Indicates the handler that the inbound part of the muxer has been closed, and that
/// Indicates to the handler that the inbound part of the muxer has been closed, and that
/// therefore no more inbound substream will be produced.
fn inject_inbound_closed(&mut self);

/// Indicates the handler that an outbound substream failed to open because the outbound
/// Indicates to the handler that an outbound substream failed to open because the outbound
/// part of the muxer has been closed.
fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo);

/// Indicates the handler that the multiaddr future has resolved.
/// Indicates to the handler that the multiaddr future has resolved.
fn inject_multiaddr(&mut self, multiaddr: Result<Multiaddr, IoError>);

/// Injects an event coming from the outside in the handler.
/// Injects an event coming from the outside into the handler.
fn inject_event(&mut self, event: Self::InEvent);

/// Indicates the node that it should shut down. After that, it is expected that `poll()`
/// Indicates that the node that it should shut down. After that, it is expected that `poll()`
/// returns `Ready(None)` as soon as possible.
///
/// This method allows an implementation to perform a graceful shutdown of the substreams, and
Expand All @@ -78,7 +78,7 @@ pub enum NodeHandlerEndpoint<TOutboundOpenInfo> {
Listener,
}

/// Event produces by a handler.
/// Event produced by a handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
/// Require a new outbound substream to be opened with the remote.
Expand All @@ -88,7 +88,7 @@ pub enum NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
Custom(TCustom),
}

/// Event produces by a handler.
/// Event produced by a handler.
impl<TOutboundOpenInfo, TCustom> NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
/// If this is `OutboundSubstreamRequest`, maps the content to something else.
#[inline]
Expand Down Expand Up @@ -173,7 +173,7 @@ where
self.node.is_none()
}

/// Indicates the handled node that it should shut down. After calling this method, the
/// Indicates to the handled node that it should shut down. After calling this method, the
/// `Stream` will end in the not-so-distant future.
///
/// After this method returns, `is_shutting_down()` should return true.
Expand Down
199 changes: 198 additions & 1 deletion core/src/nodes/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ where
}

/// A single active listener.
#[derive(Debug)]
struct Listener<TTrans>
where
TTrans: Transport,
Expand Down Expand Up @@ -161,7 +162,7 @@ where
}
}

// We register the current task to be waken up if a new listener is added.
// We register the current task to be woken up if a new listener is added.
Async::NotReady
}
}
Expand Down Expand Up @@ -220,9 +221,48 @@ where
#[cfg(test)]
mod tests {
extern crate libp2p_tcp_transport;

use super::*;
use transport;
use tokio::runtime::current_thread::Runtime;
use std::io;
use futures::{future::{self}, stream};
use tests::dummy_transport::{DummyTransport, ListenerState};

// Test helper that lets us poke in innards of individual `Listener`s and
// set things up for the tests.
impl ListenersStream<DummyTransport> {
fn set_listener_state(&mut self, idx: usize, state: ListenerState) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer a freestanding method that accepts a &mut ListenersStream<DummyTransport>.
We don't want to hide method implementations behind #[cfg(test)].

let mut l = self.listeners.remove(idx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing and re-inserting? Why not just let l = &mut self.listeners[idx];?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly to satisfy the compiler (got a error[E0507]: cannot move out of indexed content otherwise) but when I wrote it I was trying to understand the re-ordering behaviour of the code so I thought I'd make it behave "realistically". I think it is necessary though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let l = &mut self.listeners[idx]; should work.

l.listener =
match state {
ListenerState::Error => {
let stream = stream::poll_fn(|| future::err(io::Error::new(io::ErrorKind::Other, "oh noes")).poll() );
Box::new(stream)
}
ListenerState::Ok(async) => {
match async {
Async::NotReady => {
let stream = stream::poll_fn(|| future::empty().poll() );
Box::new(stream)
}
Async::Ready(Some(_)) => {
let addr = l.address.clone();
let stream = stream::poll_fn(|| future::ok(Some(1)).poll() )
.map(move |stream| future::ok( (stream, future::ok(addr.clone())) ));
Box::new(stream)

}
Async::Ready(None) => {
let stream = stream::poll_fn(|| future::ok(None).poll() );
Box::new(stream)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the ListenerState::Ok branch of this match not be doing the same as in DummyTransport::listen_on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely should, fixing.

};
self.listeners.insert(idx, l);
}
}

#[test]
fn incoming_event() {
Expand Down Expand Up @@ -251,4 +291,161 @@ mod tests {
let mut runtime = Runtime::new().unwrap();
runtime.block_on(future).unwrap();
}

#[test]
fn listener_stream_returns_transport() {
let t = DummyTransport::new();
let ls = ListenersStream::new(t);
assert_eq!(ls.transport(), &t);
}

#[test]
fn listener_stream_can_iterate_over_listeners() {
let t = DummyTransport::new();
let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = "/ip4/127.0.0.1/tcp/4321".parse::<Multiaddr>().expect("bad multiaddr");
let expected_addrs = vec![addr1.to_string(), addr2.to_string()];

let mut ls = ListenersStream::new(t);
ls.listen_on(addr1).expect("listen_on failed");
ls.listen_on(addr2).expect("listen_on failed");

let listener_addrs = ls.listeners().map(|ma| ma.to_string() ).collect::<Vec<String>>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically shouldn't have to convert to string, but why not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trouble was that the iterator doesn't impl Debug (and PartialEq) so I did this to move ahead. Should I try to improve on this or not worth it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not worth it since it's just a test.

assert_eq!(listener_addrs, expected_addrs);
}

#[test]
fn listener_stream_poll_without_listeners_is_not_ready() {
let t = DummyTransport::new();
let mut ls = ListenersStream::new(t);
assert_matches!(ls.poll(), Async::NotReady);
}

#[test]
fn listener_stream_poll_with_listeners_that_arent_ready_is_not_ready() {
let t = DummyTransport::new();
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let mut ls = ListenersStream::new(t);
ls.listen_on(addr).expect("listen_on failed");
ls.set_listener_state(0, ListenerState::Ok(Async::NotReady));
assert_matches!(ls.poll(), Async::NotReady);
assert_eq!(ls.listeners.len(), 1); // listener is still there
}

#[test]
fn listener_stream_poll_with_ready_listeners_is_ready() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = "/ip4/127.0.0.2/tcp/4321".parse::<Multiaddr>().expect("bad multiaddr");
let mut ls = ListenersStream::new(t);
ls.listen_on(addr1).expect("listen_on failed");
ls.listen_on(addr2).expect("listen_on failed");

assert_matches!(ls.poll(), Async::Ready(Some(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.2/tcp/4321");
assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => {
assert_matches!(tup, (1, _))
});
})
});
// TODO: When several listeners are continuously Async::Ready –
// admittetdly a corner case – the last one is processed first and then
// put back *last* on the pile. This means that at the next poll() it
// will get polled again and if it always has data to yield, it will
// effectively block all other listeners from being "heard". One way
// around this is to switch to using a `VecDeque` to keep the listeners
// collection, and instead of pushing the processed item to the end of
// the list, stick it on top so that it'll be processed *last* instead
// during the next poll. This might also get us a performance win as
// even in the normal case, the most recently polled listener is more
// unlikely to have anything to yield than the others so we might avoid
// a few unneeded poll calls.

// Make the second listener return NotReady so we get the first listener next poll()
ls.set_listener_state(1, ListenerState::Ok(Async::NotReady));
assert_matches!(ls.poll(), Async::Ready(Some(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234");
assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => {
assert_matches!(tup, (1, _))
});
})
});
assert_eq!(ls.listeners.len(), 2);
}

#[test]
fn listener_stream_poll_with_closed_listener_emits_closed_event() {
let t = DummyTransport::new();
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let mut ls = ListenersStream::new(t);
ls.listen_on(addr).expect("listen_on failed");
ls.set_listener_state(0, ListenerState::Ok(Async::Ready(None)));
assert_matches!(ls.poll(), Async::Ready(Some(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Closed{..})
});
assert_eq!(ls.listeners.len(), 0); // it's gone
}

#[test]
fn listener_stream_poll_with_erroring_listener_emits_closed_event() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let mut ls = ListenersStream::new(t);
ls.listen_on(addr).expect("listen_on failed");
ls.set_listener_state(0, ListenerState::Error); // simulate an error on the socket
assert_matches!(ls.poll(), Async::Ready(Some(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Closed{..})
});
assert_eq!(ls.listeners.len(), 0); // it's gone
}

#[test]
fn listener_stream_poll_chatty_listeners_may_drown_others() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let mut ls = ListenersStream::new(t);
for n in 0..4 {
let addr = format!("/ip4/127.0.0.{}/tcp/123{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");
ls.listen_on(addr).expect("listen_on failed");
}

// polling processes listeners in reverse order
// Only the last listener ever gets processed
for _n in 0..10 {
assert_matches!(ls.poll(), Async::Ready(Some(ListenersEvent::Incoming{listen_addr, ..})) => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.3/tcp/1233")
})
}
// Make last listener NotReady so now only the third listener is processed
ls.set_listener_state(3, ListenerState::Ok(Async::NotReady));
for _n in 0..10 {
assert_matches!(ls.poll(), Async::Ready(Some(ListenersEvent::Incoming{listen_addr, ..})) => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.2/tcp/1232")
})
}
}

#[test]
fn listener_stream_poll_processes_listeners_as_expected_if_they_are_not_yielding_continuously() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let mut ls = ListenersStream::new(t);
for n in 0..4 {
let addr = format!("/ip4/127.0.0.{}/tcp/123{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");
ls.listen_on(addr).expect("listen_on failed");
}
// If the listeners do not yield items continuously (the normal case) we
// process them in the expected, reverse, order.
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(Some(ListenersEvent::Incoming{listen_addr, ..})) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/123{}", n, n));
});
// kick the last listener (current) to NotReady state
ls.set_listener_state(3, ListenerState::Ok(Async::NotReady));
}
}
}
94 changes: 94 additions & 0 deletions core/src/tests/dummy_transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! `DummyTransport` is a `Transport` used in tests. It implements a bare-bones
//! version of the trait along with a way to setup the transport listeners with
//! an initial state to facilitate testing.

use futures::prelude::*;
use futures::{future::{self, FutureResult}, stream};
use {Multiaddr, Transport};
use std::io;


#[derive(Debug, PartialEq, Clone, Copy)]
pub(crate) enum ListenerState {
Ok(Async<Option<usize>>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation should be 4 spaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so it's spaces on libp2p, interesting! :)

Is it ok if I add an .editorconfig to the project?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment about the usize here could be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here was just to put "something" in there, preferably non-void. I see why it could be surprising to the reader; what's a better choice? I did add a comment though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I assumed it would be a dummy, in which case I would have suggested () instead. However one can write tests which check the item index of a listener (hence my other suggestion (stream::iter_ok(n..)). Consequently I would write a comment stating that the usize value indexes items produced by the listener.

Error
}

#[derive(Debug, PartialEq, Clone, Copy)]
pub(crate) struct DummyTransport {
listener_state: ListenerState,
}
impl DummyTransport {
pub(crate) fn new() -> Self { DummyTransport{ listener_state: ListenerState::Ok(Async::NotReady) }}
pub(crate) fn set_initial_listener_state(&mut self, state: ListenerState) {
self.listener_state = state;
}
}
impl Transport for DummyTransport {
type Output = usize;
type Listener = Box<Stream<Item=Self::ListenerUpgrade, Error=io::Error> + Send>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), io::Error>;
type MultiaddrFuture = FutureResult<Multiaddr, io::Error>;
type Dial = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error> + Send>;

fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where
Self: Sized
{
let addr2 = addr.clone();
match self.listener_state {
ListenerState::Ok(async) => {
let tupelize = move |stream| future::ok( (stream, future::ok(addr.clone())) );
Ok(match async {
Async::NotReady => {
let stream = stream::poll_fn(|| future::empty().poll() )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let stream = stream::poll_fn(|| Ok(Async::NotReady))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

As you can tell I struggled quite a bit to get the type-soup right here. Great suggestions, much appreciated.

.map(tupelize);
(Box::new(stream), addr2)
},
Async::Ready(Some(_)) => {
let stream = stream::poll_fn(|| future::ok(Some(1usize)).poll() )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let stream = stream::repeat(1)?

I think it would be more useful even to enumerate the events instead of constantly yielding 1, so how about:

Async::Ready(Some(n)) => {
    let stream = stream::iter_ok(n..)
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it, much clearer.

.map(tupelize);
(Box::new(stream), addr2)
},
Async::Ready(None) => {
let stream = stream::poll_fn(|| future::ok(None).poll() )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let stream = stream::empty()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is indeed a lot better. :)

.map(tupelize);
(Box::new(stream), addr2)
},
})
}
ListenerState::Error => Err( (self, addr2) )
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This } looks misaligned.


fn dial(self, _addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
where
Self: Sized
{
unimplemented!();
}

fn nat_traversal(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
unimplemented!();
}
}
2 changes: 2 additions & 0 deletions core/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@

#[cfg(test)]
pub(crate) mod dummy_muxer;
#[cfg(test)]
pub(crate) mod dummy_transport;