Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dp/chore/test-cor…
Browse files Browse the repository at this point in the history
…e-handled_node_tasks

* upstream/master:
  Add a IdentifyTransport (libp2p#569)
  Tests for HandledNode (libp2p#546)
  Some minor fixes reported by clippy (libp2p#600)
  Add a PeriodicIdentification protocol handler (libp2p#579)
  Add ProtocolsHandler trait (libp2p#573)
  Use paritytech/rust-secp256k1 (libp2p#598)
  Use websocket 0.21.0 (libp2p#597)
  Reexport multihash from the facade (libp2p#587)
  Add substrate to the list of projects using libp2p (libp2p#595)
  Remove spaces before semicolons (libp2p#591)
  • Loading branch information
dvdplm committed Nov 2, 2018
2 parents c9dcf38 + 4225d26 commit b214ede
Show file tree
Hide file tree
Showing 29 changed files with 1,215 additions and 105 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ secio-secp256k1 = ["libp2p-secio/secp256k1"]
bytes = "0.4"
futures = "0.1"
multiaddr = { path = "./misc/multiaddr" }
multihash = { path = "./misc/multihash" }
libp2p-mplex = { path = "./muxers/mplex" }
libp2p-identify = { path = "./protocols/identify" }
libp2p-kad = { path = "./protocols/kad" }
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ libp2p = { git = "https://github.com/libp2p/rust-libp2p" }
(open a pull request if you want your project to be added here)

- https://github.com/paritytech/polkadot
- https://github.com/paritytech/substrate
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rw-stream-sink = { path = "../misc/rw-stream-sink" }
smallvec = "0.6"
tokio-executor = "0.1.4"
tokio-io = "0.1"
tokio-timer = "0.2"
void = "1"

[dev-dependencies]
Expand Down
3 changes: 1 addition & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ extern crate rw_stream_sink;
extern crate smallvec;
extern crate tokio_executor;
extern crate tokio_io;
extern crate tokio_timer;
extern crate void;

#[cfg(test)]
Expand All @@ -193,8 +194,6 @@ extern crate tokio;
#[cfg(test)]
extern crate tokio_codec;
#[cfg(test)]
extern crate tokio_timer;
#[cfg(test)]
#[macro_use]
extern crate assert_matches;
#[cfg(test)]
Expand Down
32 changes: 16 additions & 16 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ impl<'a, TInEvent, TOutEvent, THandler> CollectionReachEvent<'a, TInEvent, TOutE
let ret_value = if let Some(former_task_id) = former_task_id {
self.parent.inner.task(former_task_id)
.expect("whenever we receive a TaskClosed event or close a node, we remove the \
corresponding entry from self.nodes ; therefore all elements in \
self.nodes are valid tasks in the HandledNodesTasks ; qed")
corresponding entry from self.nodes; therefore all elements in \
self.nodes are valid tasks in the HandledNodesTasks; qed")
.close();
let _former_other_state = self.parent.tasks.remove(&former_task_id);
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone())));
Expand Down Expand Up @@ -237,10 +237,10 @@ impl<'a, TInEvent, TOutEvent, THandler> Drop for CollectionReachEvent<'a, TInEve
let task_state = self.parent.tasks.remove(&self.id);
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
self.parent.inner.task(self.id)
.expect("we create the CollectionReachEvent with a valid task id ; the \
.expect("we create the CollectionReachEvent with a valid task id; the \
CollectionReachEvent mutably borrows the collection, therefore nothing \
can delete this task during the lifetime of the CollectionReachEvent ; \
therefore the task is still valid when we delete it ; qed")
can delete this task during the lifetime of the CollectionReachEvent; \
therefore the task is still valid when we delete it; qed")
.close();
}
}
Expand Down Expand Up @@ -297,16 +297,16 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
Entry::Vacant(_) => Err(()),
Entry::Occupied(entry) => {
match entry.get() {
&TaskState::Connected(_) => return Err(()),
&TaskState::Pending => (),
TaskState::Connected(_) => return Err(()),
TaskState::Pending => (),
};

entry.remove();
self.inner.task(id.0)
.expect("whenever we receive a TaskClosed event or interrupt a task, we \
remove the corresponding entry from self.tasks ; therefore all \
remove the corresponding entry from self.tasks; therefore all \
elements in self.tasks are valid tasks in the \
HandledNodesTasks ; qed")
HandledNodesTasks; qed")
.close();

Ok(())
Expand Down Expand Up @@ -381,7 +381,7 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
})
},
(Some(TaskState::Pending), _, _) => {
// TODO: this variant shouldn't happen ; prove this
// TODO: this variant shouldn't happen; prove this
panic!()
},
(Some(TaskState::Connected(peer_id)), Ok(()), _handler) => {
Expand All @@ -402,9 +402,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
})
},
(None, _, _) => {
panic!("self.tasks is always kept in sync with the tasks in self.inner ; \
panic!("self.tasks is always kept in sync with the tasks in self.inner; \
when we add a task in self.inner we add a corresponding entry in \
self.tasks, and remove the entry only when the task is closed ; \
self.tasks, and remove the entry only when the task is closed; \
qed")
},
}
Expand All @@ -420,9 +420,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
let peer_id = match self.tasks.get(&id) {
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task ; \
received a corresponding NodeReached event from that same task; \
when we receive a NodeReached event, we ensure that the entry in \
self.tasks is switched to the Connected state ; qed"),
self.tasks is switched to the Connected state; qed"),
};

Async::Ready(CollectionEvent::NodeEvent {
Expand Down Expand Up @@ -457,8 +457,8 @@ impl<'a, TInEvent> PeerMut<'a, TInEvent> {
let old_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(old_task_id, Some(self.inner.id()));
} else {
panic!("a PeerMut can only be created if an entry is present in nodes ; an entry in \
nodes always matched a Connected entry in tasks ; qed");
panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \
nodes always matched a Connected entry in tasks; qed");
};

self.inner.close();
Expand Down
57 changes: 36 additions & 21 deletions core/src/nodes/handled_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::io::Error as IoError;

/// Handler for the substreams of a node.
// TODO: right now it is possible for a node handler to be built, then shut down right after if we
// realize we dialed the wrong peer for example ; this could be surprising and should either
// realize we dialed the wrong peer for example; this could be surprising and should either
// be documented or changed (favouring the "documented" right now)
pub trait NodeHandler {
/// Custom event that can be received from the outside.
Expand All @@ -41,6 +41,12 @@ pub trait NodeHandler {
/// Sends a new substream to the handler.
///
/// The handler is responsible for upgrading the substream to whatever protocol it wants.
///
/// # Panic
///
/// Implementations are allowed to panic in the case of dialing if the `user_data` in
/// `endpoint` doesn't correspond to what was returned earlier when polling, or is used
/// multiple times.
fn inject_substream(&mut self, substream: Self::Substream, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>);

/// Indicates to the handler that the inbound part of the muxer has been closed, and that
Expand All @@ -49,6 +55,11 @@ pub trait NodeHandler {

/// Indicates to the handler that an outbound substream failed to open because the outbound
/// part of the muxer has been closed.
///
/// # Panic
///
/// Implementations are allowed to panic if `user_data` doesn't correspond to what was returned
/// earlier when polling, or is used multiple times.
fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo);

/// Injects an event coming from the outside into the handler.
Expand Down Expand Up @@ -165,6 +176,16 @@ where
}
}

/// Returns a reference to the `NodeHandler`
pub fn handler(&self) -> &THandler{
&self.handler
}

/// Returns a mutable reference to the `NodeHandler`
pub fn handler_mut(&mut self) -> &mut THandler{
&mut self.handler
}

/// Injects an event to the handler.
#[inline]
pub fn inject_event(&mut self, event: THandler::InEvent) {
Expand Down Expand Up @@ -366,10 +387,6 @@ mod tests {
}
}

fn did_see_event(handled_node: &mut TestHandledNode, event: &InEvent) -> bool {
handled_node.handler.events.contains(event)
}

// Set the state of the `Handler` after `inject_outbound_closed` is called
fn set_next_handler_outbound_state( handled_node: &mut TestHandledNode, next_state: HandlerState) {
handled_node.handler.next_outbound_state = Some(next_state);
Expand Down Expand Up @@ -447,7 +464,7 @@ mod tests {

let event = InEvent::Custom("banana");
handled.inject_event(event.clone());
assert!(did_see_event(&mut handled, &event));
assert_eq!(handled.handler().events, vec![event]);
}

#[test]
Expand Down Expand Up @@ -549,9 +566,7 @@ mod tests {
.handled_node();

assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => {
assert_matches!(event, OutEvent::Custom(s) => {
assert_eq!(s, "pineapple");
});
assert_matches!(event, OutEvent::Custom("pineapple"))
});
}

Expand All @@ -569,7 +584,7 @@ mod tests {
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("pear"))))
);
handled.poll().expect("poll works");
assert_eq!(handled.handler.events, vec![InEvent::OutboundClosed]);
assert_eq!(handled.handler().events, vec![InEvent::OutboundClosed]);
}

#[test]
Expand All @@ -586,8 +601,8 @@ mod tests {
// closed, `inbound_finished` is set to true.
// - an Async::Ready(NodeEvent::InboundClosed) is yielded (also calls
// `inject_inbound_close`, but that's irrelevant here)
// - back in `poll()` we call `handler.poll()` which does nothing
// because `HandlerState` is `NotReady`: loop continues
// - back in `poll()` we call `handler.poll()` which does nothing because
// `HandlerState` is `NotReady`: loop continues
// - polls the node again which now skips the inbound block because
// `inbound_finished` is true.
// - Now `poll_outbound()` is called which returns `Async::Ready(None)`
Expand All @@ -606,7 +621,7 @@ mod tests {
// – …and causes the Handler to yield Async::Ready(None)
// – which in turn makes the HandledNode to yield Async::Ready(None) as well
assert_matches!(handled.poll(), Ok(Async::Ready(None)));
assert_eq!(handled.handler.events, vec![
assert_eq!(handled.handler().events, vec![
InEvent::InboundClosed, InEvent::OutboundClosed
]);
}
Expand All @@ -618,9 +633,9 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::InboundClosed]);
assert_eq!(h.handler().events, vec![InEvent::InboundClosed]);
}

#[test]
Expand All @@ -632,9 +647,9 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::OutboundClosed]);
assert_eq!(h.handler().events, vec![InEvent::OutboundClosed]);
}

#[test]
Expand All @@ -646,9 +661,9 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::Substream(Some(1))]);
assert_eq!(h.handler().events, vec![InEvent::Substream(Some(1))]);
}

#[test]
Expand All @@ -659,8 +674,8 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::Substream(None)]);
assert_eq!(h.handler().events, vec![InEvent::Substream(None)]);
}
}
8 changes: 3 additions & 5 deletions core/src/nodes/handled_node_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<TInEvent, TOutEvent, THandler> HandledNodesTasks<TInEvent, TOutEvent, THand
/// Returns `None` if the task id is invalid.
#[inline]
pub fn task(&mut self, id: TaskId) -> Option<Task<TInEvent>> {
match self.tasks.entry(id.clone()) {
match self.tasks.entry(id) {
Entry::Occupied(inner) => Some(Task { inner }),
Entry::Vacant(_) => None,
}
Expand Down Expand Up @@ -401,8 +401,7 @@ where
println!("[NodeTask, poll] NodeTaskInner::Future: AsyncReady(Some) – injecting event");
node.inject_event(event);
}
if let Err(e) = self.events_tx.unbounded_send((event, self.id)) {
println!("[NodeTask, poll] NodeTaskInner::Future: AsyncReady(Some) – Error sending NodeReached={:?}", e);
if self.events_tx.unbounded_send((event, self.id)).is_err() {
node.shutdown();
}
self.inner = NodeTaskInner::Node(node);
Expand Down Expand Up @@ -462,8 +461,7 @@ where
Ok(Async::Ready(Some(event))) => {
println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Async::Ready(Some) –– sending a NodeEvent on events_tx");
let event = InToExtMessage::NodeEvent(event);
if let Err(e) = self.events_tx.unbounded_send((event, self.id)) {
println!("[NodeTask, poll] NodeTaskInner::Node; polled node; Async::Ready(Some); error sending on events_tx channel={:?}. Shutting down the node.", e);
if self.events_tx.unbounded_send((event, self.id)).is_err() {
node.shutdown();
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ pub mod handled_node;
pub mod handled_node_tasks;
pub mod listeners;
pub mod node;
pub mod protocols_handler;
pub mod raw_swarm;

pub use self::node::Substream;
pub use self::handled_node::{NodeHandlerEvent, NodeHandlerEndpoint};
pub use self::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent};
pub use self::raw_swarm::{ConnectedPoint, Peer, RawSwarm, RawSwarmEvent};
Loading

0 comments on commit b214ede

Please sign in to comment.