Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dp/chore/test-raw…
Browse files Browse the repository at this point in the history
…_swarm

* upstream/master:
  Add substrate to the list of projects using libp2p (libp2p#595)
  Remove spaces before semicolons (libp2p#591)
  • Loading branch information
dvdplm committed Oct 30, 2018
2 parents 6c9deaa + 7c8d8b5 commit c7212db
Show file tree
Hide file tree
Showing 16 changed files with 65 additions and 64 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ libp2p = { git = "https://github.com/libp2p/rust-libp2p" }
(open a pull request if you want your project to be added here)

- https://github.com/paritytech/polkadot
- https://github.com/paritytech/substrate
28 changes: 14 additions & 14 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ impl<'a, TInEvent, TOutEvent, THandler> CollectionReachEvent<'a, TInEvent, TOutE
let ret_value = if let Some(former_task_id) = former_task_id {
self.parent.inner.task(former_task_id)
.expect("whenever we receive a TaskClosed event or close a node, we remove the \
corresponding entry from self.nodes ; therefore all elements in \
self.nodes are valid tasks in the HandledNodesTasks ; qed")
corresponding entry from self.nodes; therefore all elements in \
self.nodes are valid tasks in the HandledNodesTasks; qed")
.close();
let _former_other_state = self.parent.tasks.remove(&former_task_id);
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone())));
Expand Down Expand Up @@ -241,10 +241,10 @@ impl<'a, TInEvent, TOutEvent, THandler> Drop for CollectionReachEvent<'a, TInEve
let task_state = self.parent.tasks.remove(&self.id);
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
self.parent.inner.task(self.id)
.expect("we create the CollectionReachEvent with a valid task id ; the \
.expect("we create the CollectionReachEvent with a valid task id; the \
CollectionReachEvent mutably borrows the collection, therefore nothing \
can delete this task during the lifetime of the CollectionReachEvent ; \
therefore the task is still valid when we delete it ; qed")
can delete this task during the lifetime of the CollectionReachEvent; \
therefore the task is still valid when we delete it; qed")
.close();
}
}
Expand Down Expand Up @@ -308,9 +308,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
entry.remove();
self.inner.task(id.0)
.expect("whenever we receive a TaskClosed event or interrupt a task, we \
remove the corresponding entry from self.tasks ; therefore all \
remove the corresponding entry from self.tasks; therefore all \
elements in self.tasks are valid tasks in the \
HandledNodesTasks ; qed")
HandledNodesTasks; qed")
.close();

Ok(())
Expand Down Expand Up @@ -397,7 +397,7 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
})
},
(Some(TaskState::Pending), _, _) => {
// TODO: this variant shouldn't happen ; prove this
// TODO: this variant shouldn't happen; prove this
panic!()
},
(Some(TaskState::Connected(peer_id)), Ok(()), _handler) => {
Expand All @@ -420,9 +420,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
})
},
(None, _, _) => {
panic!("self.tasks is always kept in sync with the tasks in self.inner ; \
panic!("self.tasks is always kept in sync with the tasks in self.inner; \
when we add a task in self.inner we add a corresponding entry in \
self.tasks, and remove the entry only when the task is closed ; \
self.tasks, and remove the entry only when the task is closed; \
qed")
},
}
Expand All @@ -440,9 +440,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
let peer_id = match self.tasks.get(&id) {
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task ; \
received a corresponding NodeReached event from that same task; \
when we receive a NodeReached event, we ensure that the entry in \
self.tasks is switched to the Connected state ; qed"),
self.tasks is switched to the Connected state; qed"),
};

Async::Ready(CollectionEvent::NodeEvent {
Expand Down Expand Up @@ -477,8 +477,8 @@ impl<'a, TInEvent> PeerMut<'a, TInEvent> {
let old_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(old_task_id, Some(self.inner.id()));
} else {
panic!("a PeerMut can only be created if an entry is present in nodes ; an entry in \
nodes always matched a Connected entry in tasks ; qed");
panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \
nodes always matched a Connected entry in tasks; qed");
};

self.inner.close();
Expand Down
2 changes: 1 addition & 1 deletion core/src/nodes/handled_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::io::Error as IoError;

/// Handler for the substreams of a node.
// TODO: right now it is possible for a node handler to be built, then shut down right after if we
// realize we dialed the wrong peer for example ; this could be surprising and should either
// realize we dialed the wrong peer for example; this could be surprising and should either
// be documented or changed (favouring the "documented" right now)
pub trait NodeHandler {
/// Custom event that can be received from the outside.
Expand Down
40 changes: 20 additions & 20 deletions core/src/nodes/raw_swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ where
if actual_peer_id == expected_peer_id {
Ok((actual_peer_id, muxer))
} else {
let msg = format!("public key mismatch ; expected = {:?} ; obtained = {:?}",
let msg = format!("public key mismatch; expected = {:?}; obtained = {:?}",
expected_peer_id, actual_peer_id);
Err(IoError::new(IoErrorKind::Other, msg))
}
Expand Down Expand Up @@ -573,10 +573,10 @@ where
}) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \
opened and remove only when a connection is closed ; the \
opened and remove only when a connection is closed; the \
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages ; qed");
messages; qed");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeError {
Expand All @@ -588,10 +588,10 @@ where
Async::Ready(CollectionEvent::NodeClosed { peer_id }) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \
opened and remove only when a connection is closed ; the \
opened and remove only when a connection is closed; the \
underlying API is guaranteed to always deliver a connection \
closed message after it has been opened, and no two closed \
messages ; qed");
messages; qed");
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
Expand All @@ -607,15 +607,15 @@ where
}

if let Some(interrupt) = action.interrupt {
// TODO: improve proof or remove ; this is too complicated right now
// TODO: improve proof or remove; this is too complicated right now
self.active_nodes
.interrupt(interrupt)
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts` ;
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts`;
we insert in out_reach_attempts only when we call \
active_nodes.add_reach_attempt, and we remove only when we call \
interrupt or when a reach attempt succeeds or errors ; therefore the \
interrupt or when a reach attempt succeeds or errors; therefore the \
out_reach_attempts should always be in sync with the actual \
attempts ; qed");
attempts; qed");
}

return Async::Ready(out_event);
Expand Down Expand Up @@ -688,9 +688,9 @@ where
if outcome == CollectionNodeAccept::ReplacedExisting {
let closed_endpoint = closed_endpoint
.expect("We insert into connected_points whenever a connection is opened and \
remove only when a connection is closed ; the underlying API is \
remove only when a connection is closed; the underlying API is \
guaranteed to always deliver a connection closed message after it has \
been opened, and no two closed messages ; qed");
been opened, and no two closed messages; qed");
return (action, RawSwarmEvent::Replaced {
peer_id,
endpoint: opened_endpoint,
Expand Down Expand Up @@ -726,9 +726,9 @@ where
if outcome == CollectionNodeAccept::ReplacedExisting {
let closed_endpoint = closed_endpoint
.expect("We insert into connected_points whenever a connection is opened and \
remove only when a connection is closed ; the underlying API is guaranteed \
remove only when a connection is closed; the underlying API is guaranteed \
to always deliver a connection closed message after it has been opened, \
and no two closed messages ; qed");
and no two closed messages; qed");
return (Default::default(), RawSwarmEvent::Replaced {
peer_id,
endpoint: opened_endpoint,
Expand All @@ -740,7 +740,7 @@ where
}

// We didn't find any entry in neither the outgoing connections not ingoing connections.
// TODO: improve proof or remove ; this is too complicated right now
// TODO: improve proof or remove; this is too complicated right now
panic!("The API of collection guarantees that the id sent back in NodeReached (which is where \
we call handle_node_reached) is one that was passed to add_reach_attempt. Whenever we \
call add_reach_attempt, we also insert at the same time an entry either in \
Expand Down Expand Up @@ -817,7 +817,7 @@ where TTrans: Transport
}

// The id was neither in the outbound list nor the inbound list.
// TODO: improve proof or remove ; this is too complicated right now
// TODO: improve proof or remove; this is too complicated right now
panic!("The API of collection guarantees that the id sent back in ReachError events \
(which is where we call handle_reach_error) is one that was passed to \
add_reach_attempt. Whenever we call add_reach_attempt, we also insert \
Expand Down Expand Up @@ -999,7 +999,7 @@ impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
/// Closes the connection to this node.
///
/// No `NodeClosed` message will be generated for this node.
// TODO: consider returning a `PeerNotConnected` ; however this makes all the borrows things
// TODO: consider returning a `PeerNotConnected`; however this makes all the borrows things
// much more annoying to deal with
pub fn close(self) {
self.connected_points.remove(&self.peer_id);
Expand All @@ -1011,9 +1011,9 @@ impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
pub fn endpoint(&self) -> &ConnectedPoint {
self.connected_points.get(&self.peer_id)
.expect("We insert into connected_points whenever a connection is opened and remove \
only when a connection is closed ; the underlying API is guaranteed to always \
only when a connection is closed; the underlying API is guaranteed to always \
deliver a connection closed message after it has been opened, and no two \
closed messages ; qed")
closed messages; qed")
}

/// Sends an event to the node.
Expand All @@ -1031,13 +1031,13 @@ pub struct PeerPendingConnect<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a> {

impl<'a, TInEvent, TOutEvent, THandler> PeerPendingConnect<'a, TInEvent, TOutEvent, THandler> {
/// Interrupt this connection attempt.
// TODO: consider returning a PeerNotConnected ; however that is really pain in terms of
// TODO: consider returning a PeerNotConnected; however that is really pain in terms of
// borrows
#[inline]
pub fn interrupt(self) {
let attempt = self.attempt.remove();
if let Err(_) = self.active_nodes.interrupt(attempt.id) {
// TODO: improve proof or remove ; this is too complicated right now
// TODO: improve proof or remove; this is too complicated right now
panic!("We retreived this attempt.id from out_reach_attempts. We insert in \
out_reach_attempts only at the same time as we call add_reach_attempt. \
Whenever we receive a NodeReached, NodeReplaced or ReachError event, which \
Expand Down
4 changes: 2 additions & 2 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ where C: AsyncRead + AsyncWrite
let mut inner = self.inner.lock();

if inner.opened_substreams.len() >= inner.config.max_substreams {
debug!("Refused substream ; reached maximum number of substreams {}", inner.config.max_substreams);
debug!("Refused substream; reached maximum number of substreams {}", inner.config.max_substreams);
return Err(IoError::new(IoErrorKind::ConnectionRefused,
"exceeded maximum number of open substreams"));
}
Expand Down Expand Up @@ -460,7 +460,7 @@ where C: AsyncRead + AsyncWrite
Ok(Async::Ready(Some(data))) => substream.current_data = data,
Ok(Async::Ready(None)) => return Ok(Async::Ready(0)),
Ok(Async::NotReady) => {
// There was no data packet in the buffer about this substream ; maybe it's
// There was no data packet in the buffer about this substream; maybe it's
// because it has been closed.
if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) {
return Ok(Async::NotReady)
Expand Down
10 changes: 5 additions & 5 deletions protocols/floodsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl FloodSubController {
let topics = topics.into_iter();

if log_enabled!(Level::Debug) {
debug!("Queuing sub/unsub message ; sub = {:?} ; unsub = {:?}",
debug!("Queuing sub/unsub message; sub = {:?}; unsub = {:?}",
topics.clone().filter(|t| t.1)
.map(|t| t.0.hash().clone().into_string())
.collect::<Vec<_>>(),
Expand Down Expand Up @@ -389,7 +389,7 @@ impl FloodSubController {
{
let topics = topics.into_iter().collect::<Vec<_>>();

debug!("Queueing publish message ; topics = {:?} ; data_len = {:?}",
debug!("Queueing publish message; topics = {:?}; data_len = {:?}",
topics.iter().map(|t| t.hash().clone().into_string()).collect::<Vec<_>>(),
data.len());

Expand Down Expand Up @@ -554,7 +554,7 @@ fn handle_packet_received(
let mut input = match protobuf::parse_from_bytes::<rpc_proto::RPC>(&bytes) {
Ok(msg) => msg,
Err(err) => {
debug!("Failed to parse protobuf message ; err = {:?}", err);
debug!("Failed to parse protobuf message; err = {:?}", err);
return Err(err.into());
}
};
Expand Down Expand Up @@ -588,7 +588,7 @@ fn handle_packet_received(
.lock()
.insert(hash((from.clone(), publish.take_seqno())))
{
trace!("Skipping message because we had already received it ; payload = {} bytes",
trace!("Skipping message because we had already received it; payload = {} bytes",
publish.get_data().len());
continue;
}
Expand All @@ -609,7 +609,7 @@ fn handle_packet_received(
.map(|h| TopicHash::from_raw(h))
.collect::<Vec<_>>();

trace!("Processing message for topics {:?} ; payload = {} bytes",
trace!("Processing message for topics {:?}; payload = {} bytes",
topics,
publish.get_data().len());

Expand Down
4 changes: 2 additions & 2 deletions protocols/identify/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where

let bytes = message
.write_to_bytes()
.expect("writing protobuf failed ; should never happen");
.expect("writing protobuf failed; should never happen");

let future = self.inner.send(bytes).map(|_| ());
Box::new(future) as Box<_>
Expand Down Expand Up @@ -142,7 +142,7 @@ where
let (info, observed_addr) = match parse_proto_msg(msg) {
Ok(v) => v,
Err(err) => {
debug!("Failed to parse protobuf message ; error = {:?}", err);
debug!("Failed to parse protobuf message; error = {:?}", err);
return Err(err.into());
}
};
Expand Down
8 changes: 4 additions & 4 deletions protocols/kad/src/high_level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ where F: FnMut(&PeerId) -> Fut + Send + 'a,
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
let my_id_len = my_id.as_bytes().len();

// TODO: this 2 is magic here ; it is the length of the hash of the multihash
// TODO: this 2 is magic here; it is the length of the hash of the multihash
let bits_diff = bucket_num + 1;
if bits_diff > 8 * (my_id_len - 2) {
return Err(());
Expand Down Expand Up @@ -232,7 +232,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
Fut::Future: Send,
{
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
debug!("Start query for {:?}; num results = {}", searched_key, num_results);

// State of the current iterative process.
struct State<'a, F> {
Expand Down Expand Up @@ -322,7 +322,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,
to_contact
};

debug!("New query round ; {} queries in progress ; contacting {} new peers",
debug!("New query round; {} queries in progress; contacting {} new peers",
state.current_attempts_fut.len(),
to_contact.len());

Expand Down Expand Up @@ -449,7 +449,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,

} else {
if !local_nearest_node_updated {
trace!("Loop didn't update closer node ; jumping to step 2");
trace!("Loop didn't update closer node; jumping to step 2");
state.stage = Stage::SecondStep;
}
}
Expand Down
Loading

0 comments on commit c7212db

Please sign in to comment.