From c93f753018af3a63c07e38c09672e0e1e8f1a385 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 4 May 2023 05:47:11 +0100 Subject: [PATCH] feat: replace `ProtocolName` with `AsRef` Previously, a protocol could be any sequence of bytes as long as it started with `/`. Now, we directly parse a protocol as `String` which enforces it to be valid UTF8. To notify users of this change, we delete the `ProtocolName` trait. The new requirement is that users need to provide a type that implements `AsRef`. We also add a `StreamProtocol` newtype in `libp2p-swarm` which provides an easy way for users to ensure their protocol strings are compliant. The newtype enforces that protocol strings start with `/`. `StreamProtocol` also implements `AsRef`, meaning users can directly use it in their upgrades. `multistream-select` by itself only changes marginally with this patch. The only thing we enforce in the type-system is that protocols must implement `AsRef`. Resolves: #2831. Pull-Request: #3746. --- core/CHANGELOG.md | 7 + core/src/either.rs | 17 +- core/src/lib.rs | 2 +- core/src/upgrade.rs | 49 +----- core/src/upgrade/apply.rs | 104 ++---------- core/src/upgrade/denied.rs | 2 +- core/src/upgrade/either.rs | 16 +- core/src/upgrade/pending.rs | 8 +- core/src/upgrade/ready.rs | 8 +- core/src/upgrade/select.rs | 19 +-- examples/file-sharing/src/network.rs | 29 ++-- libp2p/CHANGELOG.md | 8 + libp2p/src/lib.rs | 1 + misc/metrics/src/identify.rs | 11 +- misc/multistream-select/src/dialer_select.rs | 6 +- misc/multistream-select/src/lib.rs | 2 +- .../multistream-select/src/listener_select.rs | 13 +- misc/multistream-select/src/protocol.rs | 31 +++- .../multistream-select/tests/dialer_select.rs | 8 +- muxers/mplex/src/config.rs | 8 +- muxers/mplex/src/lib.rs | 2 +- muxers/yamux/src/lib.rs | 8 +- protocols/autonat/src/behaviour.rs | 5 +- protocols/autonat/src/protocol.rs | 28 +--- protocols/dcutr/src/protocol.rs | 4 +- protocols/dcutr/src/protocol/inbound.rs | 4 +- protocols/dcutr/src/protocol/outbound.rs | 4 +- protocols/floodsub/src/protocol.rs | 7 +- protocols/gossipsub/src/behaviour.rs | 6 +- protocols/gossipsub/src/behaviour/tests.rs | 4 +- protocols/gossipsub/src/config.rs | 158 +++++++++++------- protocols/gossipsub/src/handler.rs | 1 + protocols/gossipsub/src/protocol.rs | 124 +++++--------- protocols/identify/src/behaviour.rs | 14 +- protocols/identify/src/handler.rs | 4 +- protocols/identify/src/protocol.rs | 36 ++-- protocols/kad/src/behaviour.rs | 10 +- protocols/kad/src/protocol_priv.rs | 15 +- protocols/perf/src/client/handler.rs | 4 +- protocols/perf/src/lib.rs | 4 +- protocols/perf/src/server/handler.rs | 4 +- protocols/ping/src/handler.rs | 10 +- protocols/ping/src/protocol.rs | 3 +- protocols/relay/src/protocol.rs | 7 +- protocols/relay/src/protocol/inbound_hop.rs | 4 +- protocols/relay/src/protocol/inbound_stop.rs | 4 +- protocols/relay/src/protocol/outbound_hop.rs | 4 +- protocols/relay/src/protocol/outbound_stop.rs | 4 +- protocols/relay/src/v2.rs | 12 -- protocols/rendezvous/src/handler.rs | 3 +- protocols/rendezvous/src/substream_handler.rs | 11 +- protocols/request-response/src/codec.rs | 4 +- protocols/request-response/src/lib.rs | 2 +- protocols/request-response/tests/ping.rs | 40 ++--- swarm/CHANGELOG.md | 5 + swarm/src/handler/multi.rs | 10 +- swarm/src/lib.rs | 18 +- swarm/src/stream_protocol.rs | 104 ++++++++++++ swarm/src/upgrade.rs | 6 +- transports/deflate/src/lib.rs | 4 +- transports/deflate/tests/test.rs | 4 +- transports/noise/src/lib.rs | 4 +- transports/noise/src/protocol/x25519.rs | 12 +- transports/noise/src/protocol/x25519_spec.rs | 12 +- transports/noise/tests/smoke.rs | 4 +- transports/plaintext/src/lib.rs | 8 +- transports/plaintext/tests/smoke.rs | 4 +- transports/tls/src/upgrade.rs | 4 +- 68 files changed, 540 insertions(+), 562 deletions(-) create mode 100644 swarm/src/stream_protocol.rs diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index f0d7579d17d..d3a5ac63f6e 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -9,6 +9,13 @@ [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3867]: https://github.com/libp2p/rust-libp2p/pull/3867 +- Enforce protocol names to be valid UTF8 strings as required by the [spec]. + We delete the `ProtocolName` trait and replace it with a requirement for `AsRef`. + See [PR 3746] + +[spec]: https://github.com/libp2p/specs/blob/master/connections/README.md#multistream-select +[PR 3746]: https://github.com/libp2p/rust-libp2p/pull/3746 + ## 0.39.2 - Deprecate `upgrade::from_fn` without replacement as it is not used within `rust-libp2p`. diff --git a/core/src/either.rs b/core/src/either.rs index 8049717a878..32e09ca691c 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -22,7 +22,7 @@ use crate::muxing::StreamMuxerEvent; use crate::{ muxing::StreamMuxer, transport::{ListenerId, Transport, TransportError, TransportEvent}, - Multiaddr, ProtocolName, + Multiaddr, }; use either::Either; use futures::prelude::*; @@ -115,21 +115,6 @@ where } } -#[derive(Debug, Clone)] -pub enum EitherName { - A(A), - B(B), -} - -impl ProtocolName for EitherName { - fn protocol_name(&self) -> &[u8] { - match self { - EitherName::A(a) => a.protocol_name(), - EitherName::B(b) => b.protocol_name(), - } - } -} - impl Transport for Either where B: Transport, diff --git a/core/src/lib.rs b/core/src/lib.rs index ba531c919f2..c40e64c5d8b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -127,7 +127,7 @@ pub use peer_record::PeerRecord; pub use signed_envelope::SignedEnvelope; pub use translation::address_translation; pub use transport::Transport; -pub use upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError, UpgradeInfo}; +pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError, UpgradeInfo}; #[derive(Debug, thiserror::Error)] pub struct DecodeError(String); diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index b53ba5f1d5e..3d0b752f4b8 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -80,58 +80,11 @@ pub use self::{ pub use crate::Negotiated; pub use multistream_select::{NegotiatedComplete, NegotiationError, ProtocolError, Version}; -/// Types serving as protocol names. -/// -/// # Context -/// -/// In situations where we provide a list of protocols that we support, -/// the elements of that list are required to implement the [`ProtocolName`] trait. -/// -/// Libp2p will call [`ProtocolName::protocol_name`] on each element of that list, and transmit the -/// returned value on the network. If the remote accepts a given protocol, the element -/// serves as the return value of the function that performed the negotiation. -/// -/// # Example -/// -/// ``` -/// use libp2p_core::ProtocolName; -/// -/// enum MyProtocolName { -/// Version1, -/// Version2, -/// Version3, -/// } -/// -/// impl ProtocolName for MyProtocolName { -/// fn protocol_name(&self) -> &[u8] { -/// match *self { -/// MyProtocolName::Version1 => b"/myproto/1.0", -/// MyProtocolName::Version2 => b"/myproto/2.0", -/// MyProtocolName::Version3 => b"/myproto/3.0", -/// } -/// } -/// } -/// ``` -/// -pub trait ProtocolName { - /// The protocol name as bytes. Transmitted on the network. - /// - /// **Note:** Valid protocol names must start with `/` and - /// not exceed 140 bytes in length. - fn protocol_name(&self) -> &[u8]; -} - -impl> ProtocolName for T { - fn protocol_name(&self) -> &[u8] { - self.as_ref() - } -} - /// Common trait for upgrades that can be applied on inbound substreams, outbound substreams, /// or both. pub trait UpgradeInfo { /// Opaque type representing a negotiable protocol. - type Info: ProtocolName + Clone; + type Info: AsRef + Clone; /// Iterator returned by `protocol_info`. type InfoIter: IntoIterator; diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index 0b4b4d6b992..1dca0be4ad5 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -18,16 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}; use crate::{connection::ConnectedPoint, Negotiated}; use futures::{future::Either, prelude::*}; use log::debug; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; -use std::{iter, mem, pin::Pin, task::Context, task::Poll}; +use std::{mem, pin::Pin, task::Context, task::Poll}; pub(crate) use multistream_select::Version; -use smallvec::SmallVec; -use std::fmt; // TODO: Still needed? /// Applies an upgrade to the inbound and outbound direction of a connection or substream. @@ -55,14 +53,9 @@ where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade>, { - let iter = up - .protocol_info() - .into_iter() - .map(NameWrap as fn(_) -> NameWrap<_>); - let future = multistream_select::listener_select_proto(conn, iter); InboundUpgradeApply { inner: InboundUpgradeApplyState::Init { - future, + future: multistream_select::listener_select_proto(conn, up.protocol_info().into_iter()), upgrade: up, }, } @@ -74,14 +67,9 @@ where C: AsyncRead + AsyncWrite + Unpin, U: OutboundUpgrade>, { - let iter = up - .protocol_info() - .into_iter() - .map(NameWrap as fn(_) -> NameWrap<_>); - let future = multistream_select::dialer_select_proto(conn, iter, v); OutboundUpgradeApply { inner: OutboundUpgradeApplyState::Init { - future, + future: multistream_select::dialer_select_proto(conn, up.protocol_info(), v), upgrade: up, }, } @@ -96,18 +84,19 @@ where inner: InboundUpgradeApplyState, } +#[allow(clippy::large_enum_variant)] enum InboundUpgradeApplyState where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade>, { Init { - future: ListenerSelectFuture>, + future: ListenerSelectFuture, upgrade: U, }, Upgrade { future: Pin>, - name: SmallVec<[u8; 32]>, + name: String, }, Undefined, } @@ -140,10 +129,9 @@ where return Poll::Pending; } }; - let name = SmallVec::from_slice(info.protocol_name()); self.inner = InboundUpgradeApplyState::Upgrade { - future: Box::pin(upgrade.upgrade_inbound(io, info.0)), - name, + future: Box::pin(upgrade.upgrade_inbound(io, info.clone())), + name: info.as_ref().to_owned(), }; } InboundUpgradeApplyState::Upgrade { mut future, name } => { @@ -153,14 +141,11 @@ where return Poll::Pending; } Poll::Ready(Ok(x)) => { - log::trace!("Upgraded inbound stream to {}", DisplayProtocolName(name)); + log::trace!("Upgraded inbound stream to {name}"); return Poll::Ready(Ok(x)); } Poll::Ready(Err(e)) => { - debug!( - "Failed to upgrade inbound stream to {}", - DisplayProtocolName(name) - ); + debug!("Failed to upgrade inbound stream to {name}"); return Poll::Ready(Err(UpgradeError::Apply(e))); } } @@ -188,12 +173,12 @@ where U: OutboundUpgrade>, { Init { - future: DialerSelectFuture::IntoIter>>, + future: DialerSelectFuture::IntoIter>, upgrade: U, }, Upgrade { future: Pin>, - name: SmallVec<[u8; 32]>, + name: String, }, Undefined, } @@ -226,10 +211,9 @@ where return Poll::Pending; } }; - let name = SmallVec::from_slice(info.protocol_name()); self.inner = OutboundUpgradeApplyState::Upgrade { - future: Box::pin(upgrade.upgrade_outbound(connection, info.0)), - name, + future: Box::pin(upgrade.upgrade_outbound(connection, info.clone())), + name: info.as_ref().to_owned(), }; } OutboundUpgradeApplyState::Upgrade { mut future, name } => { @@ -239,17 +223,11 @@ where return Poll::Pending; } Poll::Ready(Ok(x)) => { - log::trace!( - "Upgraded outbound stream to {}", - DisplayProtocolName(name) - ); + log::trace!("Upgraded outbound stream to {name}",); return Poll::Ready(Ok(x)); } Poll::Ready(Err(e)) => { - debug!( - "Failed to upgrade outbound stream to {}", - DisplayProtocolName(name) - ); + debug!("Failed to upgrade outbound stream to {name}",); return Poll::Ready(Err(UpgradeError::Apply(e))); } } @@ -261,51 +239,3 @@ where } } } - -type NameWrapIter = iter::Map::Item) -> NameWrap<::Item>>; - -/// Wrapper type to expose an `AsRef<[u8]>` impl for all types implementing `ProtocolName`. -#[derive(Clone)] -struct NameWrap(N); - -impl AsRef<[u8]> for NameWrap { - fn as_ref(&self) -> &[u8] { - self.0.protocol_name() - } -} - -/// Wrapper for printing a [`ProtocolName`] that is expected to be mostly ASCII -pub(crate) struct DisplayProtocolName(pub(crate) N); - -impl fmt::Display for DisplayProtocolName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use fmt::Write; - for byte in self.0.protocol_name() { - if (b' '..=b'~').contains(byte) { - f.write_char(char::from(*byte))?; - } else { - write!(f, "<{byte:02X}>")?; - } - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn display_protocol_name() { - assert_eq!(DisplayProtocolName(b"/hello/1.0").to_string(), "/hello/1.0"); - assert_eq!(DisplayProtocolName("/hellö/").to_string(), "/hell/"); - assert_eq!( - DisplayProtocolName((0u8..=255).collect::>()).to_string(), - (0..32) - .map(|c| format!("<{c:02X}>")) - .chain((32..127).map(|c| format!("{}", char::from_u32(c).unwrap()))) - .chain((127..256).map(|c| format!("<{c:02X}>"))) - .collect::() - ); - } -} diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs index 93438e0bea8..353a184822d 100644 --- a/core/src/upgrade/denied.rs +++ b/core/src/upgrade/denied.rs @@ -29,7 +29,7 @@ use void::Void; pub struct DeniedUpgrade; impl UpgradeInfo for DeniedUpgrade { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = iter::Empty; fn protocol_info(&self) -> Self::InfoIter { diff --git a/core/src/upgrade/either.rs b/core/src/upgrade/either.rs index 0bda47ca9d0..db62f8d6558 100644 --- a/core/src/upgrade/either.rs +++ b/core/src/upgrade/either.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - either::{EitherFuture, EitherName}, + either::EitherFuture, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, }; use either::Either; @@ -31,7 +31,7 @@ where A: UpgradeInfo, B: UpgradeInfo, { - type Info = EitherName; + type Info = Either; type InfoIter = Either< Map<::IntoIter, fn(A::Info) -> Self::Info>, Map<::IntoIter, fn(B::Info) -> Self::Info>, @@ -39,8 +39,8 @@ where fn protocol_info(&self) -> Self::InfoIter { match self { - Either::Left(a) => Either::Left(a.protocol_info().into_iter().map(EitherName::A)), - Either::Right(b) => Either::Right(b.protocol_info().into_iter().map(EitherName::B)), + Either::Left(a) => Either::Left(a.protocol_info().into_iter().map(Either::Left)), + Either::Right(b) => Either::Right(b.protocol_info().into_iter().map(Either::Right)), } } } @@ -56,10 +56,10 @@ where fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { match (self, info) { - (Either::Left(a), EitherName::A(info)) => { + (Either::Left(a), Either::Left(info)) => { EitherFuture::First(a.upgrade_inbound(sock, info)) } - (Either::Right(b), EitherName::B(info)) => { + (Either::Right(b), Either::Right(info)) => { EitherFuture::Second(b.upgrade_inbound(sock, info)) } _ => panic!("Invalid invocation of EitherUpgrade::upgrade_inbound"), @@ -78,10 +78,10 @@ where fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { match (self, info) { - (Either::Left(a), EitherName::A(info)) => { + (Either::Left(a), Either::Left(info)) => { EitherFuture::First(a.upgrade_outbound(sock, info)) } - (Either::Right(b), EitherName::B(info)) => { + (Either::Right(b), Either::Right(info)) => { EitherFuture::Second(b.upgrade_outbound(sock, info)) } _ => panic!("Invalid invocation of EitherUpgrade::upgrade_outbound"), diff --git a/core/src/upgrade/pending.rs b/core/src/upgrade/pending.rs index 15d3c31df48..6931e20bfdc 100644 --- a/core/src/upgrade/pending.rs +++ b/core/src/upgrade/pending.rs @@ -19,7 +19,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use futures::future; use std::iter; use void::Void; @@ -39,7 +39,7 @@ impl

PendingUpgrade

{ impl

UpgradeInfo for PendingUpgrade

where - P: ProtocolName + Clone, + P: AsRef + Clone, { type Info = P; type InfoIter = iter::Once

; @@ -51,7 +51,7 @@ where impl InboundUpgrade for PendingUpgrade

where - P: ProtocolName + Clone, + P: AsRef + Clone, { type Output = Void; type Error = Void; @@ -64,7 +64,7 @@ where impl OutboundUpgrade for PendingUpgrade

where - P: ProtocolName + Clone, + P: AsRef + Clone, { type Output = Void; type Error = Void; diff --git a/core/src/upgrade/ready.rs b/core/src/upgrade/ready.rs index 16a9b2867f4..323f1f73f32 100644 --- a/core/src/upgrade/ready.rs +++ b/core/src/upgrade/ready.rs @@ -19,7 +19,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use futures::future; use std::iter; use void::Void; @@ -38,7 +38,7 @@ impl

ReadyUpgrade

{ impl

UpgradeInfo for ReadyUpgrade

where - P: ProtocolName + Clone, + P: AsRef + Clone, { type Info = P; type InfoIter = iter::Once

; @@ -50,7 +50,7 @@ where impl InboundUpgrade for ReadyUpgrade

where - P: ProtocolName + Clone, + P: AsRef + Clone, { type Output = C; type Error = Void; @@ -63,7 +63,7 @@ where impl OutboundUpgrade for ReadyUpgrade

where - P: ProtocolName + Clone, + P: AsRef + Clone, { type Output = C; type Error = Void; diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index c823d791cf7..19b8b7a93f7 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -19,10 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::either::EitherFuture; -use crate::{ - either::EitherName, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, -}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use either::Either; use futures::future; use std::iter::{Chain, Map}; @@ -48,7 +45,7 @@ where A: UpgradeInfo, B: UpgradeInfo, { - type Info = EitherName; + type Info = Either; type InfoIter = Chain< Map<::IntoIter, fn(A::Info) -> Self::Info>, Map<::IntoIter, fn(B::Info) -> Self::Info>, @@ -59,12 +56,12 @@ where .0 .protocol_info() .into_iter() - .map(EitherName::A as fn(A::Info) -> _); + .map(Either::Left as fn(A::Info) -> _); let b = self .1 .protocol_info() .into_iter() - .map(EitherName::B as fn(B::Info) -> _); + .map(Either::Right as fn(B::Info) -> _); a.chain(b) } @@ -81,8 +78,8 @@ where fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { match info { - EitherName::A(info) => EitherFuture::First(self.0.upgrade_inbound(sock, info)), - EitherName::B(info) => EitherFuture::Second(self.1.upgrade_inbound(sock, info)), + Either::Left(info) => EitherFuture::First(self.0.upgrade_inbound(sock, info)), + Either::Right(info) => EitherFuture::Second(self.1.upgrade_inbound(sock, info)), } } } @@ -98,8 +95,8 @@ where fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { match info { - EitherName::A(info) => EitherFuture::First(self.0.upgrade_outbound(sock, info)), - EitherName::B(info) => EitherFuture::Second(self.1.upgrade_outbound(sock, info)), + Either::Left(info) => EitherFuture::First(self.0.upgrade_outbound(sock, info)), + Either::Right(info) => EitherFuture::Second(self.1.upgrade_outbound(sock, info)), } } } diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs index 64fad09774e..0e83998fc75 100644 --- a/examples/file-sharing/src/network.rs +++ b/examples/file-sharing/src/network.rs @@ -6,7 +6,7 @@ use futures::prelude::*; use libp2p::{ core::{ - upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName}, + upgrade::{read_length_prefixed, write_length_prefixed}, Multiaddr, }, identity, @@ -21,6 +21,7 @@ use libp2p::{ }; use libp2p::core::upgrade::Version; +use libp2p::StreamProtocol; use std::collections::{hash_map, HashMap, HashSet}; use std::error::Error; use std::iter; @@ -61,7 +62,10 @@ pub(crate) async fn new( kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), request_response: request_response::Behaviour::new( FileExchangeCodec(), - iter::once((FileExchangeProtocol(), ProtocolSupport::Full)), + iter::once(( + StreamProtocol::new("/file-exchange/1"), + ProtocolSupport::Full, + )), Default::default(), ), }, @@ -468,31 +472,20 @@ pub(crate) enum Event { // Simple file exchange protocol -#[derive(Debug, Clone)] -struct FileExchangeProtocol(); #[derive(Clone)] struct FileExchangeCodec(); #[derive(Debug, Clone, PartialEq, Eq)] struct FileRequest(String); #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct FileResponse(Vec); -impl ProtocolName for FileExchangeProtocol { - fn protocol_name(&self) -> &[u8] { - "/file-exchange/1".as_bytes() - } -} #[async_trait] impl request_response::Codec for FileExchangeCodec { - type Protocol = FileExchangeProtocol; + type Protocol = StreamProtocol; type Request = FileRequest; type Response = FileResponse; - async fn read_request( - &mut self, - _: &FileExchangeProtocol, - io: &mut T, - ) -> io::Result + async fn read_request(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -507,7 +500,7 @@ impl request_response::Codec for FileExchangeCodec { async fn read_response( &mut self, - _: &FileExchangeProtocol, + _: &StreamProtocol, io: &mut T, ) -> io::Result where @@ -524,7 +517,7 @@ impl request_response::Codec for FileExchangeCodec { async fn write_request( &mut self, - _: &FileExchangeProtocol, + _: &StreamProtocol, io: &mut T, FileRequest(data): FileRequest, ) -> io::Result<()> @@ -539,7 +532,7 @@ impl request_response::Codec for FileExchangeCodec { async fn write_response( &mut self, - _: &FileExchangeProtocol, + _: &StreamProtocol, io: &mut T, FileResponse(data): FileResponse, ) -> io::Result<()> diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index 5581fe7921d..bc47bcb0827 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -3,6 +3,14 @@ - Raise MSRV to 1.65. See [PR 3715]. +- Protocol names are now required to be valid UTF8 strings. + We delete the `ProtocolName` trait from `libp2p::core` and replace it with a requirement for `AsRef`. + At the same time, we introduce `StreamProtocol`, a newtype in `libp2p::swarm`. + This newtype enforces additional variants like a leading forward-slash. + We encourage users to use `StreamProtocol` when implementing `UpgradeInfo`. + See [PR 3746]. + +[PR 3746]: https://github.com/libp2p/rust-libp2p/pull/3746 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 ## 0.51.3 diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 47db4ac4134..3a8c09a068b 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -177,6 +177,7 @@ pub use self::swarm::Swarm; pub use self::transport_ext::TransportExt; pub use libp2p_identity as identity; pub use libp2p_identity::PeerId; +pub use libp2p_swarm::StreamProtocol; /// Builds a `Transport` based on TCP/IP that supports the most commonly-used features of libp2p: /// diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index 91f7d78722d..81e2ace6279 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -20,6 +20,7 @@ use crate::protocol_stack; use libp2p_identity::PeerId; +use libp2p_swarm::StreamProtocol; use prometheus_client::encoding::{EncodeLabelSet, EncodeMetric, MetricEncoder}; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; @@ -134,11 +135,11 @@ impl super::Recorder for Metrics { } libp2p_identify::Event::Received { peer_id, info, .. } => { { - let mut protocols: Vec = info + let mut protocols = info .protocols .iter() .filter(|p| { - let allowed_protocols: &[&[u8]] = &[ + let allowed_protocols: &[StreamProtocol] = &[ #[cfg(feature = "dcutr")] libp2p_dcutr::PROTOCOL_NAME, // #[cfg(feature = "gossipsub")] @@ -156,10 +157,10 @@ impl super::Recorder for Metrics { libp2p_relay::HOP_PROTOCOL_NAME, ]; - allowed_protocols.contains(&p.as_bytes()) + allowed_protocols.contains(p) }) - .cloned() - .collect(); + .map(|p| p.to_string()) + .collect::>(); // Signal via an additional label value that one or more // protocols of the remote peer have not been recognized. diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 893c86f8867..af9f79d876a 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -52,7 +52,7 @@ pub fn dialer_select_proto( where R: AsyncRead + AsyncWrite, I: IntoIterator, - I::Item: AsRef<[u8]>, + I::Item: AsRef, { let protocols = protocols.into_iter().peekable(); DialerSelectFuture { @@ -88,7 +88,7 @@ where // It also makes the implementation considerably easier to write. R: AsyncRead + AsyncWrite + Unpin, I: Iterator, - I::Item: AsRef<[u8]>, + I::Item: AsRef, { type Output = Result<(I::Item, Negotiated), NegotiationError>; @@ -187,7 +187,7 @@ where Message::NotAvailable => { log::debug!( "Dialer: Received rejection of protocol: {}", - String::from_utf8_lossy(protocol.as_ref()) + protocol.as_ref() ); let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; *this.state = State::SendProtocol { io, protocol } diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index b2393137386..ec62023a861 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -78,7 +78,7 @@ //! async_std::task::block_on(async move { //! let socket = TcpStream::connect("127.0.0.1:10333").await.unwrap(); //! -//! let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; +//! let protos = vec!["/echo/1.0.0", "/echo/2.5.0"]; //! let (protocol, _io) = dialer_select_proto(socket, protos, Version::V1).await.unwrap(); //! //! println!("Negotiated protocol: {:?}", protocol); diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index aa433e40c4d..5386114fab8 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -45,7 +45,7 @@ pub fn listener_select_proto(inner: R, protocols: I) -> ListenerSelectFutu where R: AsyncRead + AsyncWrite, I: IntoIterator, - I::Item: AsRef<[u8]>, + I::Item: AsRef, { let protocols = protocols .into_iter() @@ -54,7 +54,7 @@ where Err(e) => { log::warn!( "Listener: Ignoring invalid protocol: {} due to {}", - String::from_utf8_lossy(n.as_ref()), + n.as_ref(), e ); None @@ -113,7 +113,7 @@ where // The Unpin bound here is required because we produce a `Negotiated` as the output. // It also makes the implementation considerably easier to write. R: AsyncRead + AsyncWrite + Unpin, - N: AsRef<[u8]> + Clone, + N: AsRef + Clone, { type Output = Result<(N, Negotiated), NegotiationError>; @@ -231,10 +231,7 @@ where log::debug!("Listener: confirming protocol: {}", p); Message::Protocol(p.clone()) } else { - log::debug!( - "Listener: rejecting protocol: {}", - String::from_utf8_lossy(p.as_ref()) - ); + log::debug!("Listener: rejecting protocol: {}", p.as_ref()); Message::NotAvailable }; @@ -292,7 +289,7 @@ where Some(protocol) => { log::debug!( "Listener: sent confirmed protocol: {}", - String::from_utf8_lossy(protocol.as_ref()) + protocol.as_ref() ); let io = Negotiated::completed(io.into_inner()); return Poll::Ready(Ok((protocol, io))); diff --git a/misc/multistream-select/src/protocol.rs b/misc/multistream-select/src/protocol.rs index a560d116e53..be2f3122da0 100644 --- a/misc/multistream-select/src/protocol.rs +++ b/misc/multistream-select/src/protocol.rs @@ -68,9 +68,9 @@ impl From for HeaderLine { /// A protocol (name) exchanged during protocol negotiation. #[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) struct Protocol(Bytes); -impl AsRef<[u8]> for Protocol { - fn as_ref(&self) -> &[u8] { +pub(crate) struct Protocol(String); +impl AsRef for Protocol { + fn as_ref(&self) -> &str { self.0.as_ref() } } @@ -82,7 +82,10 @@ impl TryFrom for Protocol { if !value.as_ref().starts_with(b"/") { return Err(ProtocolError::InvalidProtocol); } - Ok(Protocol(value)) + let protocol_as_string = + String::from_utf8(value.to_vec()).map_err(|_| ProtocolError::InvalidProtocol)?; + + Ok(Protocol(protocol_as_string)) } } @@ -94,9 +97,21 @@ impl TryFrom<&[u8]> for Protocol { } } +impl TryFrom<&str> for Protocol { + type Error = ProtocolError; + + fn try_from(value: &str) -> Result { + if !value.starts_with('/') { + return Err(ProtocolError::InvalidProtocol); + } + + Ok(Protocol(value.to_owned())) + } +} + impl fmt::Display for Protocol { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", String::from_utf8_lossy(&self.0)) + write!(f, "{}", self.0) } } @@ -130,7 +145,7 @@ impl Message { Ok(()) } Message::Protocol(p) => { - let len = p.0.as_ref().len() + 1; // + 1 for \n + let len = p.as_ref().len() + 1; // + 1 for \n dest.reserve(len); dest.put(p.0.as_ref()); dest.put_u8(b'\n'); @@ -145,7 +160,7 @@ impl Message { let mut buf = uvi::encode::usize_buffer(); let mut encoded = Vec::with_capacity(ps.len()); for p in ps { - encoded.extend(uvi::encode::usize(p.0.as_ref().len() + 1, &mut buf)); // +1 for '\n' + encoded.extend(uvi::encode::usize(p.as_ref().len() + 1, &mut buf)); // +1 for '\n' encoded.extend_from_slice(p.0.as_ref()); encoded.push(b'\n') } @@ -464,7 +479,7 @@ mod tests { .filter(|&c| c.is_ascii_alphanumeric()) .take(n) .collect(); - Protocol(Bytes::from(format!("/{p}"))) + Protocol(format!("/{p}")) } } diff --git a/misc/multistream-select/tests/dialer_select.rs b/misc/multistream-select/tests/dialer_select.rs index 378c8c15909..f080730b939 100644 --- a/misc/multistream-select/tests/dialer_select.rs +++ b/misc/multistream-select/tests/dialer_select.rs @@ -32,9 +32,9 @@ fn select_proto_basic() { let server = async_std::task::spawn(async move { let connec = listener.accept().await.unwrap().0; - let protos = vec![b"/proto1", b"/proto2"]; + let protos = vec!["/proto1", "/proto2"]; let (proto, mut io) = listener_select_proto(connec, protos).await.unwrap(); - assert_eq!(proto, b"/proto2"); + assert_eq!(proto, "/proto2"); let mut out = vec![0; 32]; let n = io.read(&mut out).await.unwrap(); @@ -47,11 +47,11 @@ fn select_proto_basic() { let client = async_std::task::spawn(async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); - let protos = vec![b"/proto3", b"/proto2"]; + let protos = vec!["/proto3", "/proto2"]; let (proto, mut io) = dialer_select_proto(connec, protos.into_iter(), version) .await .unwrap(); - assert_eq!(proto, b"/proto2"); + assert_eq!(proto, "/proto2"); io.write_all(b"ping").await.unwrap(); io.flush().await.unwrap(); diff --git a/muxers/mplex/src/config.rs b/muxers/mplex/src/config.rs index 587a4230480..3bf5e703a18 100644 --- a/muxers/mplex/src/config.rs +++ b/muxers/mplex/src/config.rs @@ -21,7 +21,7 @@ use crate::codec::MAX_FRAME_SIZE; use std::cmp; -pub(crate) const DEFAULT_MPLEX_PROTOCOL_NAME: &[u8] = b"/mplex/6.7.0"; +pub(crate) const DEFAULT_MPLEX_PROTOCOL_NAME: &str = "/mplex/6.7.0"; /// Configuration for the multiplexer. #[derive(Debug, Clone)] @@ -36,7 +36,7 @@ pub struct MplexConfig { /// (max 1MByte, as per the Mplex spec). pub(crate) split_send_size: usize, /// Protocol name, defaults to b"/mplex/6.7.0" - pub(crate) protocol_name: &'static [u8], + pub(crate) protocol_name: &'static str, } impl MplexConfig { @@ -94,9 +94,9 @@ impl MplexConfig { /// ```rust /// use libp2p_mplex::MplexConfig; /// let mut muxer_config = MplexConfig::new(); - /// muxer_config.set_protocol_name(b"/mplex/6.7.0"); + /// muxer_config.set_protocol_name("/mplex/6.7.0"); /// ``` - pub fn set_protocol_name(&mut self, protocol_name: &'static [u8]) -> &mut Self { + pub fn set_protocol_name(&mut self, protocol_name: &'static str) -> &mut Self { self.protocol_name = protocol_name; self } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index e1b1eb08cd9..fa36fecfefb 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -37,7 +37,7 @@ use parking_lot::Mutex; use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; impl UpgradeInfo for MplexConfig { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 197fefb69c7..64ae8de436a 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -321,20 +321,20 @@ impl Default for Config { } impl UpgradeInfo for Config { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/yamux/1.0.0") + iter::once("/yamux/1.0.0") } } impl UpgradeInfo for LocalConfig { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/yamux/1.0.0") + iter::once("/yamux/1.0.0") } } diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index b0d07f7a37b..c46d8044989 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -21,7 +21,8 @@ mod as_client; mod as_server; -use crate::protocol::{AutoNatCodec, AutoNatProtocol, DialRequest, DialResponse, ResponseError}; +use crate::protocol::{AutoNatCodec, DialRequest, DialResponse, ResponseError}; +use crate::DEFAULT_PROTOCOL_NAME; use as_client::AsClient; pub use as_client::{OutboundProbeError, OutboundProbeEvent}; use as_server::AsServer; @@ -218,7 +219,7 @@ pub struct Behaviour { impl Behaviour { pub fn new(local_peer_id: PeerId, config: Config) -> Self { - let protocols = iter::once((AutoNatProtocol, ProtocolSupport::Full)); + let protocols = iter::once((DEFAULT_PROTOCOL_NAME, ProtocolSupport::Full)); let mut cfg = request_response::Config::default(); cfg.set_request_timeout(config.timeout); let inner = request_response::Behaviour::new(AutoNatCodec, protocols, cfg); diff --git a/protocols/autonat/src/protocol.rs b/protocols/autonat/src/protocol.rs index 82e448cda24..a63fd8cdf4d 100644 --- a/protocols/autonat/src/protocol.rs +++ b/protocols/autonat/src/protocol.rs @@ -23,36 +23,24 @@ use async_trait::async_trait; use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use libp2p_core::{upgrade, Multiaddr}; use libp2p_identity::PeerId; -use libp2p_request_response::{self as request_response, ProtocolName}; +use libp2p_request_response::{self as request_response}; +use libp2p_swarm::StreamProtocol; use quick_protobuf::{BytesReader, Writer}; use std::{convert::TryFrom, io}; -#[derive(Clone, Debug)] -pub struct AutoNatProtocol; - /// The protocol name used for negotiating with multistream-select. -pub const DEFAULT_PROTOCOL_NAME: &[u8] = b"/libp2p/autonat/1.0.0"; - -impl ProtocolName for AutoNatProtocol { - fn protocol_name(&self) -> &[u8] { - DEFAULT_PROTOCOL_NAME - } -} +pub const DEFAULT_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/libp2p/autonat/1.0.0"); #[derive(Clone)] pub struct AutoNatCodec; #[async_trait] impl request_response::Codec for AutoNatCodec { - type Protocol = AutoNatProtocol; + type Protocol = StreamProtocol; type Request = DialRequest; type Response = DialResponse; - async fn read_request( - &mut self, - _: &AutoNatProtocol, - io: &mut T, - ) -> io::Result + async fn read_request(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result where T: AsyncRead + Send + Unpin, { @@ -63,7 +51,7 @@ impl request_response::Codec for AutoNatCodec { async fn read_response( &mut self, - _: &AutoNatProtocol, + _: &StreamProtocol, io: &mut T, ) -> io::Result where @@ -76,7 +64,7 @@ impl request_response::Codec for AutoNatCodec { async fn write_request( &mut self, - _: &AutoNatProtocol, + _: &StreamProtocol, io: &mut T, data: Self::Request, ) -> io::Result<()> @@ -89,7 +77,7 @@ impl request_response::Codec for AutoNatCodec { async fn write_response( &mut self, - _: &AutoNatProtocol, + _: &StreamProtocol, io: &mut T, data: Self::Response, ) -> io::Result<()> diff --git a/protocols/dcutr/src/protocol.rs b/protocols/dcutr/src/protocol.rs index 4da255fc1d9..81a2badd9a9 100644 --- a/protocols/dcutr/src/protocol.rs +++ b/protocols/dcutr/src/protocol.rs @@ -18,8 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use libp2p_swarm::StreamProtocol; + pub(crate) mod inbound; pub(crate) mod outbound; -pub const PROTOCOL_NAME: &[u8; 13] = b"/libp2p/dcutr"; +pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/libp2p/dcutr"); const MAX_MESSAGE_SIZE_BYTES: usize = 4096; diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index df4c3c63d34..83fa926a550 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -22,7 +22,7 @@ use crate::proto; use asynchronous_codec::Framed; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; use std::convert::TryFrom; use std::iter; use thiserror::Error; @@ -30,7 +30,7 @@ use thiserror::Error; pub struct Upgrade {} impl upgrade::UpgradeInfo for Upgrade { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs index 29509b16c34..00b16e20617 100644 --- a/protocols/dcutr/src/protocol/outbound.rs +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -24,7 +24,7 @@ use futures::{future::BoxFuture, prelude::*}; use futures_timer::Delay; use instant::Instant; use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; use std::convert::TryFrom; use std::iter; use thiserror::Error; @@ -34,7 +34,7 @@ pub struct Upgrade { } impl upgrade::UpgradeInfo for Upgrade { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 7ac584bd2de..ebd3d8b3bc8 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -28,11 +28,12 @@ use futures::{ use futures::{SinkExt, StreamExt}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_identity::PeerId; +use libp2p_swarm::StreamProtocol; use std::{io, iter, pin::Pin}; const MAX_MESSAGE_LEN_BYTES: usize = 2048; -const PROTOCOL_NAME: &[u8] = b"/floodsub/1.0.0"; +const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/floodsub/1.0.0"); /// Implementation of `ConnectionUpgrade` for the floodsub protocol. #[derive(Debug, Clone, Default)] @@ -46,7 +47,7 @@ impl FloodsubProtocol { } impl UpgradeInfo for FloodsubProtocol { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { @@ -133,7 +134,7 @@ pub struct FloodsubRpc { } impl UpgradeInfo for FloodsubRpc { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 6a92210e725..c2d6b8da3fe 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -52,7 +52,7 @@ use crate::handler::{Handler, HandlerEvent, HandlerIn}; use crate::mcache::MessageCache; use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; -use crate::protocol::{ProtocolConfig, SIGNING_PREFIX}; +use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; use crate::time_cache::{DuplicateCache, TimeCache}; use crate::topic::{Hasher, Topic, TopicHash}; @@ -3317,7 +3317,7 @@ where _: &Multiaddr, ) -> Result, ConnectionDenied> { Ok(Handler::new( - ProtocolConfig::new(&self.config), + self.config.protocol_config(), self.config.idle_timeout(), )) } @@ -3330,7 +3330,7 @@ where _: Endpoint, ) -> Result, ConnectionDenied> { Ok(Handler::new( - ProtocolConfig::new(&self.config), + self.config.protocol_config(), self.config.idle_timeout(), )) } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 8c6052bd6b6..29262e9c8f6 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -21,6 +21,7 @@ // Collection of tests for the gossipsub network behaviour use super::*; +use crate::protocol::ProtocolConfig; use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::FastMessageId; @@ -271,8 +272,7 @@ where for connection_id in peer_connections.connections.clone() { active_connections = active_connections.checked_sub(1).unwrap(); - let dummy_handler = - Handler::new(ProtocolConfig::new(&Config::default()), Duration::ZERO); + let dummy_handler = Handler::new(ProtocolConfig::default(), Duration::ZERO); gs.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id: *peer_id, diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index aa488b4ea29..a5d31071538 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -22,9 +22,11 @@ use std::borrow::Cow; use std::sync::Arc; use std::time::Duration; -use libp2p_identity::PeerId; +use crate::protocol::{ProtocolConfig, ProtocolId, FLOODSUB_PROTOCOL}; +use crate::types::{FastMessageId, Message, MessageId, PeerKind, RawMessage}; -use crate::types::{FastMessageId, Message, MessageId, RawMessage}; +use libp2p_identity::PeerId; +use libp2p_swarm::StreamProtocol; /// The types of message validation that can be employed by gossipsub. #[derive(Debug, Clone)] @@ -59,8 +61,7 @@ pub enum Version { /// Configuration parameters that define the performance of the gossipsub network. #[derive(Clone)] pub struct Config { - protocol_id: Cow<'static, str>, - custom_id_version: Option, + protocol: ProtocolConfig, history_length: usize, history_gossip: usize, mesh_n: usize, @@ -73,11 +74,9 @@ pub struct Config { heartbeat_interval: Duration, fanout_ttl: Duration, check_explicit_peers_ticks: u64, - max_transmit_size: usize, idle_timeout: Duration, duplicate_cache_time: Duration, validate_messages: bool, - validation_mode: ValidationMode, message_id_fn: Arc MessageId + Send + Sync + 'static>, fast_message_id_fn: Option FastMessageId + Send + Sync + 'static>>, allow_self_origin: bool, @@ -96,27 +95,12 @@ pub struct Config { max_ihave_length: usize, max_ihave_messages: usize, iwant_followup_time: Duration, - support_floodsub: bool, published_message_ids_cache_time: Duration, } impl Config { - // All the getters - - /// The protocol id to negotiate this protocol. By default, the resulting protocol id has the form - /// `//`, but can optionally be changed to a literal form by providing some Version as custom_id_version. - /// As gossipsub supports version 1.0 and 1.1, there are two suffixes supported for the resulting protocol id. - /// - /// Calling [`ConfigBuilder::protocol_id_prefix`] will set a new prefix and retain the prefix logic. - /// Calling [`ConfigBuilder::protocol_id`] will set a custom `protocol_id` and disable the prefix logic. - /// - /// The default prefix is `meshsub`, giving the supported protocol ids: `/meshsub/1.1.0` and `/meshsub/1.0.0`, negotiated in that order. - pub fn protocol_id(&self) -> &Cow<'static, str> { - &self.protocol_id - } - - pub fn custom_id_version(&self) -> &Option { - &self.custom_id_version + pub(crate) fn protocol_config(&self) -> ProtocolConfig { + self.protocol.clone() } // Overlay network parameters. @@ -196,7 +180,7 @@ impl Config { /// must be large enough to transmit the desired peer information on pruning. It must be at /// least 100 bytes. Default is 65536 bytes. pub fn max_transmit_size(&self) -> usize { - self.max_transmit_size + self.protocol.max_transmit_size } /// The time a connection is maintained to a peer without being in the mesh and without @@ -226,7 +210,7 @@ impl Config { /// Determines the level of validation used when receiving messages. See [`ValidationMode`] /// for the available types. The default is ValidationMode::Strict. pub fn validation_mode(&self) -> &ValidationMode { - &self.validation_mode + &self.protocol.validation_mode } /// A user-defined function allowing the user to specify the message id of a gossipsub message. @@ -381,7 +365,7 @@ impl Config { /// Enable support for flooodsub peers. Default false. pub fn support_floodsub(&self) -> bool { - self.support_floodsub + self.protocol.protocol_ids.contains(&FLOODSUB_PROTOCOL) } /// Published message ids time cache duration. The default is 10 seconds. @@ -402,14 +386,14 @@ impl Default for Config { /// The builder struct for constructing a gossipsub configuration. pub struct ConfigBuilder { config: Config, + invalid_protocol: bool, // This is a bit of a hack to only expose one error to the user. } impl Default for ConfigBuilder { fn default() -> Self { ConfigBuilder { config: Config { - protocol_id: Cow::Borrowed("meshsub"), - custom_id_version: None, + protocol: ProtocolConfig::default(), history_length: 5, history_gossip: 3, mesh_n: 6, @@ -422,11 +406,9 @@ impl Default for ConfigBuilder { heartbeat_interval: Duration::from_secs(1), fanout_ttl: Duration::from_secs(60), check_explicit_peers_ticks: 300, - max_transmit_size: 65536, idle_timeout: Duration::from_secs(120), duplicate_cache_time: Duration::from_secs(60), validate_messages: false, - validation_mode: ValidationMode::Strict, message_id_fn: Arc::new(|message| { // default message id is: source + sequence number // NOTE: If either the peer_id or source is not provided, we set to 0; @@ -458,27 +440,51 @@ impl Default for ConfigBuilder { max_ihave_length: 5000, max_ihave_messages: 10, iwant_followup_time: Duration::from_secs(3), - support_floodsub: false, published_message_ids_cache_time: Duration::from_secs(10), }, + invalid_protocol: false, } } } impl From for ConfigBuilder { fn from(config: Config) -> Self { - ConfigBuilder { config } + ConfigBuilder { + config, + invalid_protocol: false, + } } } impl ConfigBuilder { - /// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.0.0`). + /// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.1.0` and `/meshsub/1.0.0`). pub fn protocol_id_prefix( &mut self, protocol_id_prefix: impl Into>, ) -> &mut Self { - self.config.custom_id_version = None; - self.config.protocol_id = protocol_id_prefix.into(); + let cow = protocol_id_prefix.into(); + + match ( + StreamProtocol::try_from_owned(format!("{}/1.1.0", cow)), + StreamProtocol::try_from_owned(format!("{}/1.0.0", cow)), + ) { + (Ok(p1), Ok(p2)) => { + self.config.protocol.protocol_ids = vec![ + ProtocolId { + protocol: p1, + kind: PeerKind::Gossipsubv1_1, + }, + ProtocolId { + protocol: p2, + kind: PeerKind::Gossipsub, + }, + ] + } + _ => { + self.invalid_protocol = true; + } + } + self } @@ -488,8 +494,23 @@ impl ConfigBuilder { protocol_id: impl Into>, custom_id_version: Version, ) -> &mut Self { - self.config.custom_id_version = Some(custom_id_version); - self.config.protocol_id = protocol_id.into(); + let cow = protocol_id.into(); + + match StreamProtocol::try_from_owned(cow.to_string()) { + Ok(protocol) => { + self.config.protocol.protocol_ids = vec![ProtocolId { + protocol, + kind: match custom_id_version { + Version::V1_1 => PeerKind::Gossipsubv1_1, + Version::V1_0 => PeerKind::Gossipsub, + }, + }] + } + _ => { + self.invalid_protocol = true; + } + } + self } @@ -576,7 +597,7 @@ impl ConfigBuilder { /// The maximum byte size for each gossip (default is 2048 bytes). pub fn max_transmit_size(&mut self, max_transmit_size: usize) -> &mut Self { - self.config.max_transmit_size = max_transmit_size; + self.config.protocol.max_transmit_size = max_transmit_size; self } @@ -609,7 +630,7 @@ impl ConfigBuilder { /// Determines the level of validation used when receiving messages. See [`ValidationMode`] /// for the available types. The default is ValidationMode::Strict. pub fn validation_mode(&mut self, validation_mode: ValidationMode) -> &mut Self { - self.config.validation_mode = validation_mode; + self.config.protocol.validation_mode = validation_mode; self } @@ -787,7 +808,16 @@ impl ConfigBuilder { /// Enable support for flooodsub peers. pub fn support_floodsub(&mut self) -> &mut Self { - self.config.support_floodsub = true; + if self + .config + .protocol + .protocol_ids + .contains(&FLOODSUB_PROTOCOL) + { + return self; + } + + self.config.protocol.protocol_ids.push(FLOODSUB_PROTOCOL); self } @@ -804,7 +834,7 @@ impl ConfigBuilder { pub fn build(&self) -> Result { // check all constraints on config - if self.config.max_transmit_size < 100 { + if self.config.protocol.max_transmit_size < 100 { return Err("The maximum transmission size must be greater than 100 to permit basic control messages"); } @@ -833,6 +863,10 @@ impl ConfigBuilder { return Err("The unsubscribe_backoff parameter should be positive."); } + if self.invalid_protocol { + return Err("The provided protocol is invalid, it must start with a forward-slash"); + } + Ok(self.config.clone()) } } @@ -840,8 +874,7 @@ impl ConfigBuilder { impl std::fmt::Debug for Config { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut builder = f.debug_struct("GossipsubConfig"); - let _ = builder.field("protocol_id", &self.protocol_id); - let _ = builder.field("custom_id_version", &self.custom_id_version); + let _ = builder.field("protocol", &self.protocol); let _ = builder.field("history_length", &self.history_length); let _ = builder.field("history_gossip", &self.history_gossip); let _ = builder.field("mesh_n", &self.mesh_n); @@ -853,11 +886,9 @@ impl std::fmt::Debug for Config { let _ = builder.field("heartbeat_initial_delay", &self.heartbeat_initial_delay); let _ = builder.field("heartbeat_interval", &self.heartbeat_interval); let _ = builder.field("fanout_ttl", &self.fanout_ttl); - let _ = builder.field("max_transmit_size", &self.max_transmit_size); let _ = builder.field("idle_timeout", &self.idle_timeout); let _ = builder.field("duplicate_cache_time", &self.duplicate_cache_time); let _ = builder.field("validate_messages", &self.validate_messages); - let _ = builder.field("validation_mode", &self.validation_mode); let _ = builder.field("allow_self_origin", &self.allow_self_origin); let _ = builder.field("do_px", &self.do_px); let _ = builder.field("prune_peers", &self.prune_peers); @@ -872,7 +903,6 @@ impl std::fmt::Debug for Config { let _ = builder.field("max_ihave_length", &self.max_ihave_length); let _ = builder.field("max_ihave_messages", &self.max_ihave_messages); let _ = builder.field("iwant_followup_time", &self.iwant_followup_time); - let _ = builder.field("support_floodsub", &self.support_floodsub); let _ = builder.field( "published_message_ids_cache_time", &self.published_message_ids_cache_time, @@ -884,11 +914,11 @@ impl std::fmt::Debug for Config { #[cfg(test)] mod test { use super::*; - use crate::protocol::ProtocolConfig; use crate::topic::IdentityHash; use crate::types::PeerKind; use crate::Topic; use libp2p_core::UpgradeInfo; + use libp2p_swarm::StreamProtocol; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; @@ -944,38 +974,42 @@ mod test { #[test] fn create_config_with_protocol_id_prefix() { - let protocol_config = ProtocolConfig::new( - &ConfigBuilder::default() - .protocol_id_prefix("purple") - .build() - .unwrap(), - ); + let protocol_config = ConfigBuilder::default() + .protocol_id_prefix("/purple") + .build() + .unwrap() + .protocol_config(); let protocol_ids = protocol_config.protocol_info(); assert_eq!(protocol_ids.len(), 2); - assert_eq!(protocol_ids[0].protocol_id, b"/purple/1.1.0".to_vec()); + assert_eq!( + protocol_ids[0].protocol, + StreamProtocol::new("/purple/1.1.0") + ); assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsubv1_1); - assert_eq!(protocol_ids[1].protocol_id, b"/purple/1.0.0".to_vec()); + assert_eq!( + protocol_ids[1].protocol, + StreamProtocol::new("/purple/1.0.0") + ); assert_eq!(protocol_ids[1].kind, PeerKind::Gossipsub); } #[test] fn create_config_with_custom_protocol_id() { - let protocol_config = ProtocolConfig::new( - &ConfigBuilder::default() - .protocol_id("purple", Version::V1_0) - .build() - .unwrap(), - ); + let protocol_config = ConfigBuilder::default() + .protocol_id("/purple", Version::V1_0) + .build() + .unwrap() + .protocol_config(); let protocol_ids = protocol_config.protocol_info(); assert_eq!(protocol_ids.len(), 1); - assert_eq!(protocol_ids[0].protocol_id, b"purple".to_vec()); + assert_eq!(protocol_ids[0].protocol, "/purple"); assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsub); } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 609bb81a306..9e93a53b279 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -79,6 +79,7 @@ pub enum HandlerIn { /// creation loops. const MAX_SUBSTREAM_ATTEMPTS: usize = 5; +#[allow(clippy::large_enum_variant)] pub enum Handler { Enabled(EnabledHandler), Disabled(DisabledHandler), diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index cde091c7b56..48c1e4b2d15 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -18,21 +18,22 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::config::{ValidationMode, Version}; +use crate::config::ValidationMode; use crate::handler::HandlerEvent; +use crate::rpc_proto::proto; use crate::topic::TopicHash; use crate::types::{ ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction, }; use crate::ValidationError; -use crate::{rpc_proto::proto, Config}; use asynchronous_codec::{Decoder, Encoder, Framed}; use byteorder::{BigEndian, ByteOrder}; use bytes::BytesMut; use futures::future; use futures::prelude::*; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_identity::{PeerId, PublicKey}; +use libp2p_swarm::StreamProtocol; use log::{debug, warn}; use quick_protobuf::Writer; use std::pin::Pin; @@ -41,94 +42,52 @@ use void::Void; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; +pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId { + protocol: StreamProtocol::new("/meshsub/1.1.0"), + kind: PeerKind::Gossipsubv1_1, +}; +pub(crate) const GOSSIPSUB_1_0_0_PROTOCOL: ProtocolId = ProtocolId { + protocol: StreamProtocol::new("/meshsub/1.0.0"), + kind: PeerKind::Gossipsub, +}; +pub(crate) const FLOODSUB_PROTOCOL: ProtocolId = ProtocolId { + protocol: StreamProtocol::new("/floodsub/1.0.0"), + kind: PeerKind::Floodsub, +}; + /// Implementation of [`InboundUpgrade`] and [`OutboundUpgrade`] for the Gossipsub protocol. #[derive(Debug, Clone)] pub struct ProtocolConfig { /// The Gossipsub protocol id to listen on. - protocol_ids: Vec, + pub(crate) protocol_ids: Vec, /// The maximum transmit size for a packet. - max_transmit_size: usize, + pub(crate) max_transmit_size: usize, /// Determines the level of validation to be done on incoming messages. - validation_mode: ValidationMode, + pub(crate) validation_mode: ValidationMode, } -impl ProtocolConfig { - /// Builds a new [`ProtocolConfig`]. - /// - /// Sets the maximum gossip transmission size. - pub fn new(gossipsub_config: &Config) -> ProtocolConfig { - let mut protocol_ids = match gossipsub_config.custom_id_version() { - Some(v) => match v { - Version::V1_0 => vec![ProtocolId::new( - gossipsub_config.protocol_id(), - PeerKind::Gossipsub, - false, - )], - Version::V1_1 => vec![ProtocolId::new( - gossipsub_config.protocol_id(), - PeerKind::Gossipsubv1_1, - false, - )], - }, - None => { - vec![ - ProtocolId::new( - gossipsub_config.protocol_id(), - PeerKind::Gossipsubv1_1, - true, - ), - ProtocolId::new(gossipsub_config.protocol_id(), PeerKind::Gossipsub, true), - ] - } - }; - - // add floodsub support if enabled. - if gossipsub_config.support_floodsub() { - protocol_ids.push(ProtocolId::new("", PeerKind::Floodsub, false)); - } - - ProtocolConfig { - protocol_ids, - max_transmit_size: gossipsub_config.max_transmit_size(), - validation_mode: gossipsub_config.validation_mode().clone(), +impl Default for ProtocolConfig { + fn default() -> Self { + Self { + max_transmit_size: 65536, + validation_mode: ValidationMode::Strict, + protocol_ids: vec![GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL], } } } /// The protocol ID -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct ProtocolId { /// The RPC message type/name. - pub protocol_id: Vec, + pub protocol: StreamProtocol, /// The type of protocol we support pub kind: PeerKind, } -/// An RPC protocol ID. -impl ProtocolId { - pub fn new(id: &str, kind: PeerKind, prefix: bool) -> Self { - let protocol_id = match kind { - PeerKind::Gossipsubv1_1 => match prefix { - true => format!("/{}/{}", id, "1.1.0"), - false => id.to_string(), - }, - PeerKind::Gossipsub => match prefix { - true => format!("/{}/{}", id, "1.0.0"), - false => id.to_string(), - }, - PeerKind::Floodsub => format!("/{}/{}", "floodsub", "1.0.0"), - // NOTE: This is used for informing the behaviour of unsupported peers. We do not - // advertise this variant. - PeerKind::NotSupported => unreachable!("Should never advertise NotSupported"), - } - .into_bytes(); - ProtocolId { protocol_id, kind } - } -} - -impl ProtocolName for ProtocolId { - fn protocol_name(&self) -> &[u8] { - &self.protocol_id +impl AsRef for ProtocolId { + fn as_ref(&self) -> &str { + self.protocol.as_ref() } } @@ -554,8 +513,8 @@ impl Decoder for GossipsubCodec { mod tests { use super::*; use crate::config::Config; - use crate::IdentTopic as Topic; use crate::{Behaviour, ConfigBuilder}; + use crate::{IdentTopic as Topic, Version}; use libp2p_core::identity::Keypair; use quickcheck::*; @@ -654,21 +613,14 @@ mod tests { #[test] fn support_floodsub_with_custom_protocol() { - let gossipsub_config = ConfigBuilder::default() + let protocol_config = ConfigBuilder::default() .protocol_id("/foosub", Version::V1_1) .support_floodsub() .build() - .unwrap(); - - let protocol_config = ProtocolConfig::new(&gossipsub_config); - - assert_eq!( - String::from_utf8_lossy(&protocol_config.protocol_ids[0].protocol_id), - "/foosub" - ); - assert_eq!( - String::from_utf8_lossy(&protocol_config.protocol_ids[1].protocol_id), - "/floodsub/1.0.0" - ); + .unwrap() + .protocol_config(); + + assert_eq!(protocol_config.protocol_ids[0].protocol, "/foosub"); + assert_eq!(protocol_config.protocol_ids[1].protocol, "/floodsub/1.0.0"); } } diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 6dcea9c1df3..2f410fec0df 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -26,7 +26,8 @@ use libp2p_identity::PublicKey; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ AddressScore, ConnectionDenied, ConnectionHandlerUpgrErr, DialError, ExternalAddresses, - ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters, THandlerInEvent, ToSwarm, + ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, + THandlerInEvent, ToSwarm, }; use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent}; use lru::LruCache; @@ -498,12 +499,14 @@ pub enum Event { }, } -fn supported_protocols(params: &impl PollParameters) -> Vec { +fn supported_protocols(params: &impl PollParameters) -> Vec { // The protocol names can be bytes, but the identify protocol except UTF-8 strings. // There's not much we can do to solve this conflict except strip non-UTF-8 characters. params .supported_protocols() - .map(|p| String::from_utf8_lossy(&p).to_string()) + .filter_map(|p| { + StreamProtocol::try_from_owned(String::from_utf8_lossy(&p).to_string()).ok() + }) .collect() } @@ -569,10 +572,7 @@ mod tests { use libp2p_tcp as tcp; use std::time::Duration; - fn transport() -> ( - identity::PublicKey, - transport::Boxed<(PeerId, StreamMuxerBox)>, - ) { + fn transport() -> (PublicKey, transport::Boxed<(PeerId, StreamMuxerBox)>) { let id_keys = identity::Keypair::generate_ed25519(); let pubkey = id_keys.public(); let transport = tcp::async_io::Transport::new(tcp::Config::default().nodelay(true)) diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index c0bd9d928eb..f95cef424b9 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -35,7 +35,7 @@ use libp2p_swarm::handler::{ }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - NegotiatedSubstream, SubstreamProtocol, + NegotiatedSubstream, StreamProtocol, SubstreamProtocol, }; use log::warn; use smallvec::SmallVec; @@ -92,7 +92,7 @@ pub struct InEvent { pub listen_addrs: Vec, /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`. - pub supported_protocols: Vec, + pub supported_protocols: Vec, /// The protocol w.r.t. the information requested. pub protocol: Protocol, diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 1a10b591278..a32c3725039 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -28,7 +28,7 @@ use libp2p_core::{ }; use libp2p_identity as identity; use libp2p_identity::PublicKey; -use libp2p_swarm::ConnectionId; +use libp2p_swarm::{ConnectionId, StreamProtocol}; use log::{debug, trace}; use std::convert::TryFrom; use std::{io, iter, pin::Pin}; @@ -37,9 +37,9 @@ use void::Void; const MAX_MESSAGE_SIZE_BYTES: usize = 4096; -pub const PROTOCOL_NAME: &[u8; 14] = b"/ipfs/id/1.0.0"; +pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0"); -pub const PUSH_PROTOCOL_NAME: &[u8; 19] = b"/ipfs/id/push/1.0.0"; +pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0"); /// The type of the Substream protocol. #[derive(Debug, PartialEq, Eq)] @@ -84,13 +84,13 @@ pub struct Info { /// The addresses that the peer is listening on. pub listen_addrs: Vec, /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`. - pub protocols: Vec, + pub protocols: Vec, /// Address observed by or for the remote. pub observed_addr: Multiaddr, } impl UpgradeInfo for Identify { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { @@ -122,7 +122,7 @@ where } impl UpgradeInfo for Push { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { @@ -177,7 +177,7 @@ where publicKey: Some(pubkey_bytes), listenAddrs: listen_addrs, observedAddr: Some(info.observed_addr.to_vec()), - protocols: info.protocols, + protocols: info.protocols.into_iter().map(|p| p.to_string()).collect(), }; let mut framed_io = FramedWrite::new( @@ -249,7 +249,17 @@ impl TryFrom for Info { protocol_version: msg.protocolVersion.unwrap_or_default(), agent_version: msg.agentVersion.unwrap_or_default(), listen_addrs, - protocols: msg.protocols, + protocols: msg + .protocols + .into_iter() + .filter_map(|p| match StreamProtocol::try_from_owned(p) { + Ok(p) => Some(p), + Err(e) => { + debug!("Received invalid protocol from peer: {e}"); + None + } + }) + .collect(), observed_addr, }; @@ -328,7 +338,10 @@ mod tests { "/ip4/80.81.82.83/tcp/500".parse().unwrap(), "/ip6/::1/udp/1000".parse().unwrap(), ], - protocols: vec!["proto1".to_string(), "proto2".to_string()], + protocols: vec![ + StreamProtocol::new("/proto1"), + StreamProtocol::new("/proto2"), + ], observed_addr: "/ip4/100.101.102.103/tcp/5000".parse().unwrap(), }, ) @@ -357,10 +370,7 @@ mod tests { "/ip6/::1/udp/1000".parse().unwrap() ] ); - assert_eq!( - info.protocols, - &["proto1".to_string(), "proto2".to_string()] - ); + assert_eq!(info.protocols, &["/proto1", "/proto2"]); bg_task.await; }); diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index b34c5e428f7..ed161d7a6ce 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -47,8 +47,8 @@ use libp2p_swarm::behaviour::{ use libp2p_swarm::{ dial_opts::{self, DialOpts}, ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses, - NetworkBehaviour, NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, - ToSwarm, + NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, }; use log::{debug, info, warn}; use smallvec::SmallVec; @@ -56,8 +56,8 @@ use std::collections::{BTreeMap, HashSet, VecDeque}; use std::fmt; use std::num::NonZeroUsize; use std::task::{Context, Poll}; +use std::time::Duration; use std::vec; -use std::{borrow::Cow, time::Duration}; use thiserror::Error; pub use crate::query::QueryStats; @@ -228,7 +228,7 @@ impl KademliaConfig { /// More than one protocol name can be supplied. In this case the node will /// be able to talk to other nodes supporting any of the provided names. /// Multiple names must be used with caution to avoid network partitioning. - pub fn set_protocol_names(&mut self, names: Vec>) -> &mut Self { + pub fn set_protocol_names(&mut self, names: Vec) -> &mut Self { self.protocol_config.set_protocol_names(names); self } @@ -411,7 +411,7 @@ where } /// Get the protocol name of this kademlia instance. - pub fn protocol_names(&self) -> &[Cow<'static, [u8]>] { + pub fn protocol_names(&self) -> &[StreamProtocol] { self.protocol_config.protocol_names() } diff --git a/protocols/kad/src/protocol_priv.rs b/protocols/kad/src/protocol_priv.rs index 3f7f9d99443..1801ce935fd 100644 --- a/protocols/kad/src/protocol_priv.rs +++ b/protocols/kad/src/protocol_priv.rs @@ -36,13 +36,14 @@ use instant::Instant; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; +use libp2p_swarm::StreamProtocol; use quick_protobuf::{BytesReader, Writer}; -use std::{borrow::Cow, convert::TryFrom, time::Duration}; +use std::{convert::TryFrom, time::Duration}; use std::{io, iter}; use unsigned_varint::codec; /// The protocol name used for negotiating with multistream-select. -pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0"; +pub const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0"); /// The default maximum size for a varint length-delimited packet. pub const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024; @@ -139,20 +140,20 @@ impl From for proto::Peer { // `OutboundUpgrade` to be just a single message #[derive(Debug, Clone)] pub struct KademliaProtocolConfig { - protocol_names: Vec>, + protocol_names: Vec, /// Maximum allowed size of a packet. max_packet_size: usize, } impl KademliaProtocolConfig { /// Returns the configured protocol name. - pub fn protocol_names(&self) -> &[Cow<'static, [u8]>] { + pub fn protocol_names(&self) -> &[StreamProtocol] { &self.protocol_names } /// Modifies the protocol names used on the wire. Can be used to create incompatibilities /// between networks on purpose. - pub fn set_protocol_names(&mut self, names: Vec>) { + pub fn set_protocol_names(&mut self, names: Vec) { self.protocol_names = names; } @@ -165,14 +166,14 @@ impl KademliaProtocolConfig { impl Default for KademliaProtocolConfig { fn default() -> Self { KademliaProtocolConfig { - protocol_names: iter::once(Cow::Borrowed(DEFAULT_PROTO_NAME)).collect(), + protocol_names: iter::once(DEFAULT_PROTO_NAME).collect(), max_packet_size: DEFAULT_MAX_PACKET_SIZE, } } } impl UpgradeInfo for KademliaProtocolConfig { - type Info = Cow<'static, [u8]>; + type Info = StreamProtocol; type InfoIter = std::vec::IntoIter; fn protocol_info(&self) -> Self::InfoIter { diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index d8e3854a91d..e0c9f44c886 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -31,7 +31,7 @@ use libp2p_swarm::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }, - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, StreamProtocol, SubstreamProtocol, }; use void::Void; @@ -90,7 +90,7 @@ impl ConnectionHandler for Handler { type OutEvent = Event; type Error = Void; type InboundProtocol = DeniedUpgrade; - type OutboundProtocol = ReadyUpgrade<&'static [u8]>; + type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); type InboundOpenInfo = (); diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs index 19bb956a1d2..aeb91ff2412 100644 --- a/protocols/perf/src/lib.rs +++ b/protocols/perf/src/lib.rs @@ -24,8 +24,10 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +use libp2p_swarm::StreamProtocol; + pub mod client; mod protocol; pub mod server; -pub const PROTOCOL_NAME: &[u8; 11] = b"/perf/1.0.0"; +pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/perf/1.0.0"); diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index 2946b6d4a4c..95e93dd171a 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -30,7 +30,7 @@ use libp2p_swarm::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, }, - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, StreamProtocol, SubstreamProtocol, }; use log::error; @@ -67,7 +67,7 @@ impl ConnectionHandler for Handler { type InEvent = Void; type OutEvent = Event; type Error = Void; - type InboundProtocol = ReadyUpgrade<&'static [u8]>; + type InboundProtocol = ReadyUpgrade; type OutboundProtocol = DeniedUpgrade; type OutboundOpenInfo = Void; type InboundOpenInfo = (); diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 2703b274c77..1f6ca00dec7 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -29,7 +29,7 @@ use libp2p_swarm::handler::{ }; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, - NegotiatedSubstream, SubstreamProtocol, + NegotiatedSubstream, StreamProtocol, SubstreamProtocol, }; use std::collections::VecDeque; use std::{ @@ -257,12 +257,12 @@ impl ConnectionHandler for Handler { type InEvent = Void; type OutEvent = crate::Result; type Error = Failure; - type InboundProtocol = ReadyUpgrade<&'static [u8]>; - type OutboundProtocol = ReadyUpgrade<&'static [u8]>; + type InboundProtocol = ReadyUpgrade; + type OutboundProtocol = ReadyUpgrade; type OutboundOpenInfo = (); type InboundOpenInfo = (); - fn listen_protocol(&self) -> SubstreamProtocol, ()> { + fn listen_protocol(&self) -> SubstreamProtocol, ()> { SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) } @@ -279,7 +279,7 @@ impl ConnectionHandler for Handler { fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, (), crate::Result, Self::Error>> + ) -> Poll, (), crate::Result, Self::Error>> { match self.state { State::Inactive { reported: true } => { diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index 032b4aa0a0a..34f816522d9 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -20,10 +20,11 @@ use futures::prelude::*; use instant::Instant; +use libp2p_swarm::StreamProtocol; use rand::{distributions, prelude::*}; use std::{io, time::Duration}; -pub const PROTOCOL_NAME: &[u8] = b"/ipfs/ping/1.0.0"; +pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/ping/1.0.0"); /// The `Ping` protocol upgrade. /// diff --git a/protocols/relay/src/protocol.rs b/protocols/relay/src/protocol.rs index 2c471d36dd9..4376f64cc0b 100644 --- a/protocols/relay/src/protocol.rs +++ b/protocols/relay/src/protocol.rs @@ -19,14 +19,17 @@ // DEALINGS IN THE SOFTWARE. use crate::proto; +use libp2p_swarm::StreamProtocol; use std::time::Duration; pub(crate) mod inbound_hop; pub(crate) mod inbound_stop; pub(crate) mod outbound_hop; pub(crate) mod outbound_stop; -pub const HOP_PROTOCOL_NAME: &[u8; 31] = b"/libp2p/circuit/relay/0.2.0/hop"; -pub const STOP_PROTOCOL_NAME: &[u8; 32] = b"/libp2p/circuit/relay/0.2.0/stop"; +pub const HOP_PROTOCOL_NAME: StreamProtocol = + StreamProtocol::new("/libp2p/circuit/relay/0.2.0/hop"); +pub const STOP_PROTOCOL_NAME: StreamProtocol = + StreamProtocol::new("/libp2p/circuit/relay/0.2.0/stop"); const MAX_MESSAGE_SIZE: usize = 4096; diff --git a/protocols/relay/src/protocol/inbound_hop.rs b/protocols/relay/src/protocol/inbound_hop.rs index 485c3ef76bd..1af258fc25b 100644 --- a/protocols/relay/src/protocol/inbound_hop.rs +++ b/protocols/relay/src/protocol/inbound_hop.rs @@ -26,7 +26,7 @@ use futures::{future::BoxFuture, prelude::*}; use instant::{Duration, SystemTime}; use libp2p_core::{upgrade, Multiaddr}; use libp2p_identity::PeerId; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; use std::convert::TryInto; use std::iter; use thiserror::Error; @@ -38,7 +38,7 @@ pub struct Upgrade { } impl upgrade::UpgradeInfo for Upgrade { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { diff --git a/protocols/relay/src/protocol/inbound_stop.rs b/protocols/relay/src/protocol/inbound_stop.rs index 67e4cdd5323..bfffb6a1e9c 100644 --- a/protocols/relay/src/protocol/inbound_stop.rs +++ b/protocols/relay/src/protocol/inbound_stop.rs @@ -25,14 +25,14 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::upgrade; use libp2p_identity::PeerId; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; use std::iter; use thiserror::Error; pub struct Upgrade {} impl upgrade::UpgradeInfo for Upgrade { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { diff --git a/protocols/relay/src/protocol/outbound_hop.rs b/protocols/relay/src/protocol/outbound_hop.rs index 323d0be3b9a..07d09157404 100644 --- a/protocols/relay/src/protocol/outbound_hop.rs +++ b/protocols/relay/src/protocol/outbound_hop.rs @@ -27,7 +27,7 @@ use futures_timer::Delay; use instant::{Duration, SystemTime}; use libp2p_core::{upgrade, Multiaddr}; use libp2p_identity::PeerId; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; use std::convert::TryFrom; use std::iter; use thiserror::Error; @@ -38,7 +38,7 @@ pub enum Upgrade { } impl upgrade::UpgradeInfo for Upgrade { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { diff --git a/protocols/relay/src/protocol/outbound_stop.rs b/protocols/relay/src/protocol/outbound_stop.rs index 9b028bc5b66..782808acc57 100644 --- a/protocols/relay/src/protocol/outbound_stop.rs +++ b/protocols/relay/src/protocol/outbound_stop.rs @@ -25,7 +25,7 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::upgrade; use libp2p_identity::PeerId; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; use std::convert::TryInto; use std::iter; use std::time::Duration; @@ -38,7 +38,7 @@ pub struct Upgrade { } impl upgrade::UpgradeInfo for Upgrade { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { diff --git a/protocols/relay/src/v2.rs b/protocols/relay/src/v2.rs index baaf148f2b5..ab222062eec 100644 --- a/protocols/relay/src/v2.rs +++ b/protocols/relay/src/v2.rs @@ -154,18 +154,6 @@ pub mod protocol { note = "Use libp2p_relay::outbound::stop::FatalUpgradeError instead." )] pub type OutboundStopFatalUpgradeError = crate::outbound::stop::FatalUpgradeError; - - #[deprecated( - since = "0.15.0", - note = "Use libp2p_relay::HOP_PROTOCOL_NAME instead." - )] - pub const HOP_PROTOCOL_NAME: &[u8; 31] = crate::HOP_PROTOCOL_NAME; - - #[deprecated( - since = "0.15.0", - note = "Use libp2p_relay::STOP_PROTOCOL_NAME instead." - )] - pub const STOP_PROTOCOL_NAME: &[u8; 32] = crate::STOP_PROTOCOL_NAME; } #[deprecated( diff --git a/protocols/rendezvous/src/handler.rs b/protocols/rendezvous/src/handler.rs index f69748a400b..ccf765c9c65 100644 --- a/protocols/rendezvous/src/handler.rs +++ b/protocols/rendezvous/src/handler.rs @@ -20,9 +20,10 @@ use crate::codec; use crate::codec::Message; +use libp2p_swarm::StreamProtocol; use void::Void; -const PROTOCOL_IDENT: &[u8] = b"/rendezvous/1.0.0"; +const PROTOCOL_IDENT: StreamProtocol = StreamProtocol::new("/rendezvous/1.0.0"); pub(crate) mod inbound; pub(crate) mod outbound; diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs index f5f79451b09..d2a1651cd52 100644 --- a/protocols/rendezvous/src/substream_handler.rs +++ b/protocols/rendezvous/src/substream_handler.rs @@ -31,7 +31,8 @@ use instant::Instant; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamProtocol, + SubstreamProtocol, }; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -127,21 +128,21 @@ impl fmt::Display for OutboundSubstreamId { } pub struct PassthroughProtocol { - ident: Option<&'static [u8]>, + ident: Option, } impl PassthroughProtocol { - pub fn new(ident: &'static [u8]) -> Self { + pub fn new(ident: StreamProtocol) -> Self { Self { ident: Some(ident) } } } impl UpgradeInfo for PassthroughProtocol { - type Info = &'static [u8]; + type Info = StreamProtocol; type InfoIter = std::option::IntoIter; fn protocol_info(&self) -> Self::InfoIter { - self.ident.into_iter() + self.ident.clone().into_iter() } } diff --git a/protocols/request-response/src/codec.rs b/protocols/request-response/src/codec.rs index 71cfb79a27a..d26b729acae 100644 --- a/protocols/request-response/src/codec.rs +++ b/protocols/request-response/src/codec.rs @@ -18,8 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -pub use libp2p_core::ProtocolName; - use async_trait::async_trait; use futures::prelude::*; use std::io; @@ -30,7 +28,7 @@ use std::io; #[async_trait] pub trait Codec { /// The type of protocol(s) or protocol versions being negotiated. - type Protocol: ProtocolName + Send + Clone; + type Protocol: AsRef + Send + Clone; /// The type of inbound and outbound requests. type Request: Send; /// The type of inbound and outbound responses. diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 14854ddea77..f187ab8c48b 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -61,7 +61,7 @@ mod codec; mod handler; -pub use codec::{Codec, ProtocolName}; +pub use codec::Codec; pub use handler::ProtocolSupport; use crate::handler::protocol::RequestProtocol; diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index c69f771dbe0..48860b5887f 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -22,14 +22,11 @@ use async_trait::async_trait; use futures::{prelude::*, AsyncWriteExt}; -use libp2p_core::{ - upgrade::{read_length_prefixed, write_length_prefixed}, - ProtocolName, -}; +use libp2p_core::upgrade::{read_length_prefixed, write_length_prefixed}; use libp2p_identity::PeerId; use libp2p_request_response as request_response; use libp2p_request_response::ProtocolSupport; -use libp2p_swarm::{Swarm, SwarmEvent}; +use libp2p_swarm::{StreamProtocol, Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use rand::{self, Rng}; use std::{io, iter}; @@ -40,7 +37,10 @@ async fn is_response_outbound() { let ping = Ping("ping".to_string().into_bytes()); let offline_peer = PeerId::random(); - let protocols = iter::once((PingProtocol(), request_response::ProtocolSupport::Full)); + let protocols = iter::once(( + StreamProtocol::new("/ping/1"), + request_response::ProtocolSupport::Full, + )); let cfg = request_response::Config::default(); let mut swarm1 = @@ -83,7 +83,7 @@ async fn ping_protocol() { let ping = Ping("ping".to_string().into_bytes()); let pong = Pong("pong".to_string().into_bytes()); - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); let mut swarm1 = Swarm::new_ephemeral(|_| { @@ -174,7 +174,7 @@ async fn ping_protocol() { async fn emits_inbound_connection_closed_failure() { let ping = Ping("ping".to_string().into_bytes()); - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); let mut swarm1 = Swarm::new_ephemeral(|_| { @@ -237,7 +237,7 @@ async fn emits_inbound_connection_closed_failure() { async fn emits_inbound_connection_closed_if_channel_is_dropped() { let ping = Ping("ping".to_string().into_bytes()); - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full)); let cfg = request_response::Config::default(); let mut swarm1 = Swarm::new_ephemeral(|_| { @@ -286,8 +286,6 @@ async fn emits_inbound_connection_closed_if_channel_is_dropped() { // Simple Ping-Pong Protocol -#[derive(Debug, Clone)] -struct PingProtocol(); #[derive(Clone)] struct PingCodec(); #[derive(Debug, Clone, PartialEq, Eq)] @@ -295,19 +293,13 @@ struct Ping(Vec); #[derive(Debug, Clone, PartialEq, Eq)] struct Pong(Vec); -impl ProtocolName for PingProtocol { - fn protocol_name(&self) -> &[u8] { - "/ping/1".as_bytes() - } -} - #[async_trait] impl libp2p_request_response::Codec for PingCodec { - type Protocol = PingProtocol; + type Protocol = StreamProtocol; type Request = Ping; type Response = Pong; - async fn read_request(&mut self, _: &PingProtocol, io: &mut T) -> io::Result + async fn read_request(&mut self, _: &StreamProtocol, io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -320,7 +312,11 @@ impl libp2p_request_response::Codec for PingCodec { Ok(Ping(vec)) } - async fn read_response(&mut self, _: &PingProtocol, io: &mut T) -> io::Result + async fn read_response( + &mut self, + _: &StreamProtocol, + io: &mut T, + ) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -335,7 +331,7 @@ impl libp2p_request_response::Codec for PingCodec { async fn write_request( &mut self, - _: &PingProtocol, + _: &StreamProtocol, io: &mut T, Ping(data): Ping, ) -> io::Result<()> @@ -350,7 +346,7 @@ impl libp2p_request_response::Codec for PingCodec { async fn write_response( &mut self, - _: &PingProtocol, + _: &StreamProtocol, io: &mut T, Pong(data): Pong, ) -> io::Result<()> diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 3509d60e662..9e3d5e1b340 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -3,6 +3,11 @@ - Raise MSRV to 1.65. See [PR 3715]. +- Introduce `StreamProtocol` type. + This type enforces invariants on protocol names, such as leading forward slashes and correct UTF8 encoding. + See [PR 3746]. + +[PR 3746]: https://github.com/libp2p/rust-libp2p/pull/3746 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 ## 0.42.2 diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index 146a2a96895..e14c75376cc 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -31,7 +31,7 @@ use crate::handler::{ use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend}; use crate::NegotiatedSubstream; use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::upgrade::{NegotiationError, ProtocolError, ProtocolName, UpgradeError}; +use libp2p_core::upgrade::{NegotiationError, ProtocolError, UpgradeError}; use libp2p_core::ConnectedPoint; use libp2p_identity::PeerId; use rand::Rng; @@ -463,9 +463,9 @@ where #[derive(Debug, Clone)] pub struct IndexedProtoName(usize, H); -impl ProtocolName for IndexedProtoName { - fn protocol_name(&self) -> &[u8] { - self.1.protocol_name() +impl> AsRef for IndexedProtoName { + fn as_ref(&self) -> &str { + self.1.as_ref() } } @@ -586,7 +586,7 @@ where let mut set = HashSet::new(); for infos in iter { for i in infos.protocol_info() { - let v = Vec::from(i.protocol_name()); + let v = Vec::from(i.as_ref()); if set.contains(&v) { return Err(DuplicateProtonameError(v)); } else { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 4ac5a59c089..9c87389334e 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -56,7 +56,9 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] mod connection; +mod executor; mod registry; +mod stream_protocol; #[cfg(test)] mod test; mod upgrade; @@ -64,7 +66,6 @@ mod upgrade; pub mod behaviour; pub mod dial_opts; pub mod dummy; -mod executor; pub mod handler; pub mod keep_alive; @@ -130,6 +131,7 @@ pub use handler::{ #[cfg(feature = "macros")] pub use libp2p_swarm_derive::NetworkBehaviour; pub use registry::{AddAddressResult, AddressRecord, AddressScore}; +pub use stream_protocol::{InvalidProtocol, StreamProtocol}; use crate::handler::UpgradeInfoSend; use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent}; @@ -142,11 +144,11 @@ use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; use libp2p_core::muxing::SubstreamBox; use libp2p_core::{ connection::ConnectedPoint, - multiaddr::Protocol, + multiaddr, multihash::Multihash, muxing::StreamMuxerBox, transport::{self, ListenerId, TransportError, TransportEvent}, - Endpoint, Multiaddr, Negotiated, ProtocolName, Transport, + Endpoint, Multiaddr, Negotiated, Transport, }; use libp2p_identity::PeerId; use registry::{AddressIntoIter, Addresses}; @@ -886,7 +888,7 @@ where .listen_protocol() .upgrade() .protocol_info() - .map(|p| p.protocol_name().to_owned()) + .map(|p| p.as_ref().as_bytes().to_vec()) .collect(); let other_established_connection_ids = self .pool @@ -2015,13 +2017,13 @@ fn p2p_addr(peer: Option, addr: Multiaddr) -> Result return Ok(addr), }; - if let Some(Protocol::P2p(hash)) = addr.iter().last() { + if let Some(multiaddr::Protocol::P2p(hash)) = addr.iter().last() { if &hash != peer.as_ref() { return Err(addr); } Ok(addr) } else { - Ok(addr.with(Protocol::P2p(peer.into()))) + Ok(addr.with(multiaddr::Protocol::P2p(peer.into()))) } } @@ -2728,7 +2730,7 @@ mod tests { })); let other_id = PeerId::random(); - let other_addr = address.with(Protocol::P2p(other_id.into())); + let other_addr = address.with(multiaddr::Protocol::P2p(other_id.into())); swarm2.dial(other_addr.clone()).unwrap(); @@ -2876,7 +2878,7 @@ mod tests { let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::>(); let expected_addresses = addresses .into_iter() - .map(|addr| addr.with(Protocol::P2p(target.into()))) + .map(|addr| addr.with(multiaddr::Protocol::P2p(target.into()))) .collect::>(); assert_eq!(expected_addresses, failed_addresses); diff --git a/swarm/src/stream_protocol.rs b/swarm/src/stream_protocol.rs new file mode 100644 index 00000000000..bce0ec51279 --- /dev/null +++ b/swarm/src/stream_protocol.rs @@ -0,0 +1,104 @@ +use either::Either; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +/// Identifies a protocol for a stream. +/// +/// libp2p nodes use stream protocols to negotiate what to do with a newly opened stream. +/// Stream protocols are string-based and must start with a forward slash: `/`. +#[derive(Debug, Clone, Eq)] +pub struct StreamProtocol { + inner: Either<&'static str, Arc>, +} + +impl StreamProtocol { + /// Construct a new protocol from a static string slice. + /// + /// # Panics + /// + /// This function panics if the protocol does not start with a forward slash: `/`. + pub const fn new(s: &'static str) -> Self { + match s.as_bytes() { + [b'/', ..] => {} + _ => panic!("Protocols should start with a /"), + } + + StreamProtocol { + inner: Either::Left(s), + } + } + + /// Attempt to construct a protocol from an owned string. + /// + /// This function will fail if the protocol does not start with a forward slash: `/`. + /// Where possible, you should use [`StreamProtocol::new`] instead to avoid allocations. + pub fn try_from_owned(protocol: String) -> Result { + if !protocol.starts_with('/') { + return Err(InvalidProtocol::missing_forward_slash()); + } + + Ok(StreamProtocol { + inner: Either::Right(Arc::from(protocol)), // FIXME: Can we somehow reuse the allocation from the owned string? + }) + } +} + +impl AsRef for StreamProtocol { + fn as_ref(&self) -> &str { + either::for_both!(&self.inner, s => s) + } +} + +impl fmt::Display for StreamProtocol { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +impl PartialEq<&str> for StreamProtocol { + fn eq(&self, other: &&str) -> bool { + self.as_ref() == *other + } +} + +impl PartialEq for &str { + fn eq(&self, other: &StreamProtocol) -> bool { + *self == other.as_ref() + } +} + +impl PartialEq for StreamProtocol { + fn eq(&self, other: &Self) -> bool { + self.as_ref() == other.as_ref() + } +} + +impl Hash for StreamProtocol { + fn hash(&self, state: &mut H) { + self.as_ref().hash(state) + } +} + +#[derive(Debug)] +pub struct InvalidProtocol { + // private field to prevent construction outside of this module + _private: (), +} + +impl InvalidProtocol { + pub(crate) fn missing_forward_slash() -> Self { + InvalidProtocol { _private: () } + } +} + +impl fmt::Display for InvalidProtocol { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "invalid protocol: string does not start with a forward slash" + ) + } +} + +impl std::error::Error for InvalidProtocol {} diff --git a/swarm/src/upgrade.rs b/swarm/src/upgrade.rs index e544ac3f27f..b584dfae9fd 100644 --- a/swarm/src/upgrade.rs +++ b/swarm/src/upgrade.rs @@ -30,7 +30,7 @@ use libp2p_core::upgrade; /// [`UpgradeInfo`](upgrade::UpgradeInfo). pub trait UpgradeInfoSend: Send + 'static { /// Equivalent to [`UpgradeInfo::Info`](upgrade::UpgradeInfo::Info). - type Info: upgrade::ProtocolName + Clone + Send + 'static; + type Info: AsRef + Clone + Send + 'static; /// Equivalent to [`UpgradeInfo::InfoIter`](upgrade::UpgradeInfo::InfoIter). type InfoIter: Iterator + Send + 'static; @@ -72,7 +72,7 @@ pub trait OutboundUpgradeSend: UpgradeInfoSend { impl OutboundUpgradeSend for T where T: upgrade::OutboundUpgrade + UpgradeInfoSend, - TInfo: upgrade::ProtocolName + Clone + Send + 'static, + TInfo: AsRef + Clone + Send + 'static, T::Output: Send + 'static, T::Error: Send + 'static, T::Future: Send + 'static, @@ -106,7 +106,7 @@ pub trait InboundUpgradeSend: UpgradeInfoSend { impl InboundUpgradeSend for T where T: upgrade::InboundUpgrade + UpgradeInfoSend, - TInfo: upgrade::ProtocolName + Clone + Send + 'static, + TInfo: AsRef + Clone + Send + 'static, T::Output: Send + 'static, T::Error: Send + 'static, T::Future: Send + 'static, diff --git a/transports/deflate/src/lib.rs b/transports/deflate/src/lib.rs index 29be83462b9..0d83713888e 100644 --- a/transports/deflate/src/lib.rs +++ b/transports/deflate/src/lib.rs @@ -38,11 +38,11 @@ impl Default for DeflateConfig { } impl UpgradeInfo for DeflateConfig { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/deflate/1.0.0") + iter::once("/deflate/1.0.0") } } diff --git a/transports/deflate/tests/test.rs b/transports/deflate/tests/test.rs index b5fbe39ad53..504888a7eca 100644 --- a/transports/deflate/tests/test.rs +++ b/transports/deflate/tests/test.rs @@ -50,7 +50,7 @@ async fn run(message1: Vec) { let client_task = async move { let mut client = DeflateConfig::default() - .upgrade_outbound(client, b"") + .upgrade_outbound(client, "") .await .unwrap(); @@ -64,7 +64,7 @@ async fn run(message1: Vec) { let server_task = async move { let mut server = DeflateConfig::default() - .upgrade_outbound(server, b"") + .upgrade_outbound(server, "") .await .unwrap(); diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs index 4c296e9a111..8e180482780 100644 --- a/transports/noise/src/lib.rs +++ b/transports/noise/src/lib.rs @@ -166,11 +166,11 @@ impl Config { } impl UpgradeInfo for Config { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = std::iter::Once; fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(b"/noise") + std::iter::once("/noise") } } diff --git a/transports/noise/src/protocol/x25519.rs b/transports/noise/src/protocol/x25519.rs index cc1586feb0d..4a572945ef8 100644 --- a/transports/noise/src/protocol/x25519.rs +++ b/transports/noise/src/protocol/x25519.rs @@ -72,29 +72,29 @@ impl Zeroize for X25519 { } impl UpgradeInfo for NoiseConfig { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = std::iter::Once; fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(b"/noise/ix/25519/chachapoly/sha256/0.1.0") + std::iter::once("/noise/ix/25519/chachapoly/sha256/0.1.0") } } impl UpgradeInfo for NoiseConfig { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = std::iter::Once; fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(b"/noise/xx/25519/chachapoly/sha256/0.1.0") + std::iter::once("/noise/xx/25519/chachapoly/sha256/0.1.0") } } impl UpgradeInfo for NoiseConfig { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = std::iter::Once; fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(b"/noise/ik/25519/chachapoly/sha256/0.1.0") + std::iter::once("/noise/ik/25519/chachapoly/sha256/0.1.0") } } diff --git a/transports/noise/src/protocol/x25519_spec.rs b/transports/noise/src/protocol/x25519_spec.rs index 621532ad52d..7f7a5a9c4e7 100644 --- a/transports/noise/src/protocol/x25519_spec.rs +++ b/transports/noise/src/protocol/x25519_spec.rs @@ -85,33 +85,33 @@ impl From> for Keypair { } impl UpgradeInfo for NoiseConfig { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = std::iter::Once; fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(b"/noise") + std::iter::once("/noise") } } /// **Note**: This is not currentlyy a standardised upgrade. impl UpgradeInfo for NoiseConfig { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = std::iter::Once; fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(b"/noise/ix/25519/chachapoly/sha256/0.1.0") + std::iter::once("/noise/ix/25519/chachapoly/sha256/0.1.0") } } /// **Note**: This is not currently a standardised upgrade. impl UpgradeInfo for NoiseConfig { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = std::iter::Once; fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(b"/noise/ik/25519/chachapoly/sha256/0.1.0") + std::iter::once("/noise/ik/25519/chachapoly/sha256/0.1.0") } } diff --git a/transports/noise/tests/smoke.rs b/transports/noise/tests/smoke.rs index b7ce90267e1..b862a944dfd 100644 --- a/transports/noise/tests/smoke.rs +++ b/transports/noise/tests/smoke.rs @@ -55,10 +55,10 @@ fn xx() { ) = futures::future::try_join( noise::Config::new(&server_id) .unwrap() - .upgrade_inbound(server, b""), + .upgrade_inbound(server, ""), noise::Config::new(&client_id) .unwrap() - .upgrade_outbound(client, b""), + .upgrade_outbound(client, ""), ) .await .unwrap(); diff --git a/transports/plaintext/src/lib.rs b/transports/plaintext/src/lib.rs index fe4aba91a88..64aea0b82a6 100644 --- a/transports/plaintext/src/lib.rs +++ b/transports/plaintext/src/lib.rs @@ -78,11 +78,11 @@ mod proto { pub struct PlainText1Config; impl UpgradeInfo for PlainText1Config { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/plaintext/1.0.0") + iter::once("/plaintext/1.0.0") } } @@ -114,11 +114,11 @@ pub struct PlainText2Config { } impl UpgradeInfo for PlainText2Config { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/plaintext/2.0.0") + iter::once("/plaintext/2.0.0") } } diff --git a/transports/plaintext/tests/smoke.rs b/transports/plaintext/tests/smoke.rs index b63bf2529ee..7147ed56686 100644 --- a/transports/plaintext/tests/smoke.rs +++ b/transports/plaintext/tests/smoke.rs @@ -49,11 +49,11 @@ fn variable_msg_length() { PlainText2Config { local_public_key: server_id_public, } - .upgrade_inbound(server, b""), + .upgrade_inbound(server, ""), PlainText2Config { local_public_key: client_id_public, } - .upgrade_inbound(client, b""), + .upgrade_inbound(client, ""), ) .await .unwrap(); diff --git a/transports/tls/src/upgrade.rs b/transports/tls/src/upgrade.rs index d1c9a6cefb4..bf64ce61505 100644 --- a/transports/tls/src/upgrade.rs +++ b/transports/tls/src/upgrade.rs @@ -59,11 +59,11 @@ impl Config { } impl UpgradeInfo for Config { - type Info = &'static [u8]; + type Info = &'static str; type InfoIter = std::iter::Once; fn protocol_info(&self) -> Self::InfoIter { - std::iter::once(b"/tls/1.0.0") + std::iter::once("/tls/1.0.0") } }