Skip to content

Commit

Permalink
[identify] Implement /ipfs/id/push/1.0.0 alongside some refactoring. (#…
Browse files Browse the repository at this point in the history
…1999)

* Implement /ipfs/id/push/1.0.0 alongside some refactoring.

  * Implement /ipfs/id/push/1.0.0, i.e. the ability to actively
    push information of the local peer to specific remotes.
  * Make the initial delay as well as the recurring delay
    for the periodic identification requests configurable,
    introducing `IdentifyConfig`.

* Fix test.

* Fix example.

* Update protocols/identify/src/identify.rs

Co-authored-by: Max Inden <[email protected]>

* Update protocols/identify/src/identify.rs

Co-authored-by: Max Inden <[email protected]>

* Update versions and changelogs.

Co-authored-by: Max Inden <[email protected]>
  • Loading branch information
romanb and mxinden authored Mar 18, 2021
1 parent 24b3e09 commit 5a45f93
Show file tree
Hide file tree
Showing 10 changed files with 510 additions and 197 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@

# `libp2p` facade crate

## Version 0.37.0 [unreleased]

- Update `libp2p-identify`.

## Version 0.36.0 [2021-03-17]

- Consolidate top-level utility functions for constructing development
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p"
edition = "2018"
description = "Peer-to-peer networking library"
version = "0.36.0"
version = "0.37.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down Expand Up @@ -67,7 +67,7 @@ lazy_static = "1.2"
libp2p-core = { version = "0.28.0", path = "core", default-features = false }
libp2p-floodsub = { version = "0.28.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.29.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.28.0", path = "protocols/identify", optional = true }
libp2p-identify = { version = "0.29.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.29.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.28.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.30.0", path = "transports/noise", optional = true }
Expand Down
7 changes: 3 additions & 4 deletions examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use libp2p::{
either::EitherTransport, muxing::StreamMuxerBox, transport, transport::upgrade::Version,
},
gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity},
identify::{Identify, IdentifyEvent},
identify::{Identify, IdentifyConfig, IdentifyEvent},
identity,
multiaddr::Protocol,
noise,
Expand Down Expand Up @@ -245,11 +245,10 @@ fn main() -> Result<(), Box<dyn Error>> {
gossipsub_config,
)
.expect("Valid configuration"),
identify: Identify::new(
identify: Identify::new(IdentifyConfig::new(
"/ipfs/0.1.0".into(),
"rust-ipfs-example".into(),
local_key.public(),
),
)),
ping: Ping::new(PingConfig::new()),
};

Expand Down
6 changes: 6 additions & 0 deletions protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.29.0 [unreleased]

- Implement the `/ipfs/id/push/1.0.0` protocol.
cf. https://github.com/libp2p/specs/tree/master/identify#identifypush
[PR 1999](https://github.com/libp2p/rust-libp2p/pull/1999)

# 0.28.0 [2021-03-17]

- Update `libp2p-swarm`.
Expand Down
3 changes: 2 additions & 1 deletion protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-identify"
edition = "2018"
description = "Nodes identifcation protocol for libp2p"
version = "0.28.0"
version = "0.29.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -20,6 +20,7 @@ wasm-timer = "0.2"

[dev-dependencies]
async-std = "1.6.2"
env_logger = "0.8"
libp2p-mplex = { path = "../../muxers/mplex" }
libp2p-noise = { path = "../../transports/noise" }
libp2p-tcp = { path = "../../transports/tcp" }
Expand Down
128 changes: 89 additions & 39 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,25 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{RemoteInfo, IdentifyProtocolConfig, ReplySubstream};
use crate::protocol::{
IdentifyProtocol,
IdentifyPushProtocol,
IdentifyInfo,
InboundPush,
OutboundPush,
ReplySubstream
};
use futures::prelude::*;
use libp2p_core::either::{
EitherError,
EitherOutput,
};
use libp2p_core::upgrade::{
EitherUpgrade,
InboundUpgrade,
OutboundUpgrade,
ReadOneError
SelectUpgrade,
UpgradeError,
};
use libp2p_swarm::{
NegotiatedSubstream,
Expand All @@ -34,89 +47,119 @@ use libp2p_swarm::{
ProtocolsHandlerUpgrErr
};
use smallvec::SmallVec;
use std::{pin::Pin, task::Context, task::Poll, time::Duration};
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Delay;

/// Delay between the moment we connect and the first time we identify.
const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500);
/// After an identification succeeded, wait this long before the next time.
const DELAY_TO_NEXT_ID: Duration = Duration::from_secs(5 * 60);
/// After we failed to identify the remote, try again after the given delay.
const TRY_AGAIN_ON_ERR: Duration = Duration::from_secs(60 * 60);

/// Protocol handler for sending and receiving identification requests.
///
/// Outbound requests are sent periodically. The handler performs expects
/// at least one identification request to be answered by the remote before
/// permitting the underlying connection to be closed.
pub struct IdentifyHandler {
/// Configuration for the protocol.
config: IdentifyProtocolConfig,

/// Pending events to yield.
events: SmallVec<[IdentifyHandlerEvent; 4]>,
events: SmallVec<[ProtocolsHandlerEvent<
EitherUpgrade<IdentifyProtocol, IdentifyPushProtocol<OutboundPush>>,
(),
IdentifyHandlerEvent,
io::Error,
>; 4]>,

/// Future that fires when we need to identify the node again.
next_id: Delay,

/// Whether the handler should keep the connection alive.
keep_alive: KeepAlive,

/// The interval of `next_id`, i.e. the recurrent delay.
interval: Duration,
}

/// Event produced by the `IdentifyHandler`.
#[derive(Debug)]
pub enum IdentifyHandlerEvent {
/// We obtained identification information from the remote
Identified(RemoteInfo),
/// We obtained identification information from the remote.
Identified(IdentifyInfo),
/// We received a request for identification.
Identify(ReplySubstream<NegotiatedSubstream>),
/// Failed to identify the remote.
IdentificationError(ProtocolsHandlerUpgrErr<ReadOneError>),
IdentificationError(ProtocolsHandlerUpgrErr<io::Error>),
}

/// Identifying information of the local node that is pushed to a remote.
#[derive(Debug)]
pub struct IdentifyPush(pub IdentifyInfo);

impl IdentifyHandler {
/// Creates a new `IdentifyHandler`.
pub fn new() -> Self {
pub fn new(initial_delay: Duration, interval: Duration) -> Self {
IdentifyHandler {
config: IdentifyProtocolConfig,
events: SmallVec::new(),
next_id: Delay::new(DELAY_TO_FIRST_ID),
next_id: Delay::new(initial_delay),
keep_alive: KeepAlive::Yes,
interval,
}
}
}

impl ProtocolsHandler for IdentifyHandler {
type InEvent = ();
type InEvent = IdentifyPush;
type OutEvent = IdentifyHandlerEvent;
type Error = ReadOneError;
type InboundProtocol = IdentifyProtocolConfig;
type OutboundProtocol = IdentifyProtocolConfig;
type Error = io::Error;
type InboundProtocol = SelectUpgrade<IdentifyProtocol, IdentifyPushProtocol<InboundPush>>;
type OutboundProtocol = EitherUpgrade<IdentifyProtocol, IdentifyPushProtocol<OutboundPush>>;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(self.config.clone(), ())
SubstreamProtocol::new(
SelectUpgrade::new(
IdentifyProtocol,
IdentifyPushProtocol::inbound(),
), ())
}

fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo
output: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo
) {
self.events.push(IdentifyHandlerEvent::Identify(protocol))
match output {
EitherOutput::First(substream) => {
self.events.push(
ProtocolsHandlerEvent::Custom(
IdentifyHandlerEvent::Identify(substream)))
}
EitherOutput::Second(info) => {
self.events.push(
ProtocolsHandlerEvent::Custom(
IdentifyHandlerEvent::Identified(info)))
}
}
}

fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::OutboundOpenInfo,
output: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo,
) {
self.events.push(IdentifyHandlerEvent::Identified(protocol));
self.keep_alive = KeepAlive::No;
match output {
EitherOutput::First(remote_info) => {
self.events.push(
ProtocolsHandlerEvent::Custom(
IdentifyHandlerEvent::Identified(remote_info)));
self.keep_alive = KeepAlive::No;
}
EitherOutput::Second(()) => {}
}
}

fn inject_event(&mut self, _: Self::InEvent) {}
fn inject_event(&mut self, IdentifyPush(push): Self::InEvent) {
self.events.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
EitherUpgrade::B(
IdentifyPushProtocol::outbound(push)), ())
});
}

fn inject_dial_upgrade_error(
&mut self,
Expand All @@ -125,9 +168,16 @@ impl ProtocolsHandler for IdentifyHandler {
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error
>
) {
self.events.push(IdentifyHandlerEvent::IdentificationError(err));
let err = err.map_upgrade_err(|e| match e {
UpgradeError::Select(e) => UpgradeError::Select(e),
UpgradeError::Apply(EitherError::A(ioe)) => UpgradeError::Apply(ioe),
UpgradeError::Apply(EitherError::B(ioe)) => UpgradeError::Apply(ioe),
});
self.events.push(
ProtocolsHandlerEvent::Custom(
IdentifyHandlerEvent::IdentificationError(err)));
self.keep_alive = KeepAlive::No;
self.next_id.reset(TRY_AGAIN_ON_ERR);
self.next_id.reset(self.interval);
}

fn connection_keep_alive(&self) -> KeepAlive {
Expand All @@ -143,18 +193,18 @@ impl ProtocolsHandler for IdentifyHandler {
>,
> {
if !self.events.is_empty() {
return Poll::Ready(ProtocolsHandlerEvent::Custom(
return Poll::Ready(
self.events.remove(0),
));
);
}

// Poll the future that fires when we need to identify the node again.
match Future::poll(Pin::new(&mut self.next_id), cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => {
self.next_id.reset(DELAY_TO_NEXT_ID);
self.next_id.reset(self.interval);
let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.config.clone(), ())
protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ())
};
Poll::Ready(ev)
}
Expand Down
Loading

0 comments on commit 5a45f93

Please sign in to comment.