From dcf917fe5f7f5f513ed8724cd19c1c36425f86b9 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 25 Mar 2020 16:09:16 +0200 Subject: [PATCH 1/5] feat: allow sent messages seen as subscribed minor feature to allow mimicing the behaviour expected by ipfs api tests. --- protocols/floodsub/src/layer.rs | 19 ++++++++++++++----- protocols/floodsub/src/lib.rs | 21 +++++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 1c837b2d0f0..6ebf91d8a61 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -20,6 +20,7 @@ use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; use crate::topic::Topic; +use crate::FloodsubOptions; use cuckoofilter::CuckooFilter; use fnv::FnvHashSet; use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; @@ -43,8 +44,7 @@ pub struct Floodsub { /// Events that need to be yielded to the outside when polling. events: VecDeque>, - /// Peer id of the local node. Used for the source of the messages that we publish. - local_peer_id: PeerId, + options: FloodsubOptions, /// List of peers to send messages to. target_peers: FnvHashSet, @@ -64,11 +64,16 @@ pub struct Floodsub { } impl Floodsub { - /// Creates a `Floodsub`. + /// Creates a `Floodsub` with default options. pub fn new(local_peer_id: PeerId) -> Self { + Self::from_options(FloodsubOptions::new(local_peer_id)) + } + + /// Creates a `Floodsub` with the given options. + pub fn from_options(options: FloodsubOptions) -> Self { Floodsub { events: VecDeque::new(), - local_peer_id, + options, target_peers: FnvHashSet::default(), connected_peers: HashMap::new(), subscribed_topics: SmallVec::new(), @@ -190,7 +195,7 @@ impl Floodsub { fn publish_many_inner(&mut self, topic: impl IntoIterator>, data: impl Into>, check_self_subscriptions: bool) { let message = FloodsubMessage { - source: self.local_peer_id.clone(), + source: self.options.local_peer_id.clone(), data: data.into(), // If the sequence numbers are predictable, then an attacker could flood the network // with packets with the predetermined sequence numbers and absorb our legitimate @@ -202,6 +207,10 @@ impl Floodsub { let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)); if self_subscribed { self.received.add(&message); + if self.options.subscribe_local_messages { + self.events.push_back( + NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone()))); + } } // Don't publish the message if we have to check subscriptions // and we're not subscribed ourselves to any of the topics. diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index cc9e840af79..3e3f328a6a4 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -21,6 +21,8 @@ //! Implements the floodsub protocol, see also the: //! [spec](https://github.com/libp2p/specs/tree/master/pubsub). +use libp2p_core::PeerId; + pub mod protocol; mod layer; @@ -33,3 +35,22 @@ mod rpc_proto { pub use self::layer::{Floodsub, FloodsubEvent}; pub use self::protocol::{FloodsubMessage, FloodsubRpc}; pub use self::topic::Topic; + +/// Configuration options for the Floodsub protocol. +pub struct FloodsubOptions { + /// Peer id of the local node. Used for the source of the messages that we publish. + pub local_peer_id: PeerId, + + /// `true` if messages published by local node should be propagated as messages received from + /// the network, `false` by default. + pub subscribe_local_messages: bool, +} + +impl FloodsubOptions { + pub fn new(local_peer_id: PeerId) -> Self { + Self { + local_peer_id, + subscribe_local_messages: false + } + } +} From 6d5166773bb02c3aa2ca86dc2ee3ccfb46e30bbd Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 1 Apr 2020 17:28:37 +0300 Subject: [PATCH 2/5] refactor: rename per review comments --- protocols/floodsub/src/layer.rs | 16 ++++++++-------- protocols/floodsub/src/lib.rs | 4 ++-- protocols/floodsub/src/protocol.rs | 14 +++++++------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 6ebf91d8a61..c7411fb1b82 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -18,9 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; +use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; use crate::topic::Topic; -use crate::FloodsubOptions; +use crate::FloodsubConfig; use cuckoofilter::CuckooFilter; use fnv::FnvHashSet; use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; @@ -44,7 +44,7 @@ pub struct Floodsub { /// Events that need to be yielded to the outside when polling. events: VecDeque>, - options: FloodsubOptions, + options: FloodsubConfig, /// List of peers to send messages to. target_peers: FnvHashSet, @@ -64,13 +64,13 @@ pub struct Floodsub { } impl Floodsub { - /// Creates a `Floodsub` with default options. + /// Creates a `Floodsub` with default configuration. pub fn new(local_peer_id: PeerId) -> Self { - Self::from_options(FloodsubOptions::new(local_peer_id)) + Self::from_config(FloodsubConfig::new(local_peer_id)) } - /// Creates a `Floodsub` with the given options. - pub fn from_options(options: FloodsubOptions) -> Self { + /// Creates a `Floodsub` with the given configuration. + pub fn from_config(options: FloodsubConfig) -> Self { Floodsub { events: VecDeque::new(), options, @@ -237,7 +237,7 @@ impl Floodsub { } impl NetworkBehaviour for Floodsub { - type ProtocolsHandler = OneShotHandler; + type ProtocolsHandler = OneShotHandler; type OutEvent = FloodsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index 3e3f328a6a4..8e7014bedaa 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -37,7 +37,7 @@ pub use self::protocol::{FloodsubMessage, FloodsubRpc}; pub use self::topic::Topic; /// Configuration options for the Floodsub protocol. -pub struct FloodsubOptions { +pub struct FloodsubConfig { /// Peer id of the local node. Used for the source of the messages that we publish. pub local_peer_id: PeerId, @@ -46,7 +46,7 @@ pub struct FloodsubOptions { pub subscribe_local_messages: bool, } -impl FloodsubOptions { +impl FloodsubConfig { pub fn new(local_peer_id: PeerId) -> Self { Self { local_peer_id, diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 4df3975eddb..046c72d856b 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -27,16 +27,16 @@ use futures::{Future, io::{AsyncRead, AsyncWrite}}; /// Implementation of `ConnectionUpgrade` for the floodsub protocol. #[derive(Debug, Clone, Default)] -pub struct FloodsubConfig {} +pub struct FloodsubProtocol {} -impl FloodsubConfig { - /// Builds a new `FloodsubConfig`. - pub fn new() -> FloodsubConfig { - FloodsubConfig {} +impl FloodsubProtocol { + /// Builds a new `FloodsubProtocol`. + pub fn new() -> FloodsubProtocol { + FloodsubProtocol {} } } -impl UpgradeInfo for FloodsubConfig { +impl UpgradeInfo for FloodsubProtocol { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -45,7 +45,7 @@ impl UpgradeInfo for FloodsubConfig { } } -impl InboundUpgrade for FloodsubConfig +impl InboundUpgrade for FloodsubProtocol where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, { From d3698103a7b823b3410ddc5581c1bdd1ff00335a Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 1 Apr 2020 17:34:17 +0300 Subject: [PATCH 3/5] refactor: rename Floodsub::options to config --- protocols/floodsub/src/layer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index c7411fb1b82..4bb6aa08cbf 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -44,7 +44,7 @@ pub struct Floodsub { /// Events that need to be yielded to the outside when polling. events: VecDeque>, - options: FloodsubConfig, + config: FloodsubConfig, /// List of peers to send messages to. target_peers: FnvHashSet, @@ -70,10 +70,10 @@ impl Floodsub { } /// Creates a `Floodsub` with the given configuration. - pub fn from_config(options: FloodsubConfig) -> Self { + pub fn from_config(config: FloodsubConfig) -> Self { Floodsub { events: VecDeque::new(), - options, + config, target_peers: FnvHashSet::default(), connected_peers: HashMap::new(), subscribed_topics: SmallVec::new(), @@ -195,7 +195,7 @@ impl Floodsub { fn publish_many_inner(&mut self, topic: impl IntoIterator>, data: impl Into>, check_self_subscriptions: bool) { let message = FloodsubMessage { - source: self.options.local_peer_id.clone(), + source: self.config.local_peer_id.clone(), data: data.into(), // If the sequence numbers are predictable, then an attacker could flood the network // with packets with the predetermined sequence numbers and absorb our legitimate @@ -207,7 +207,7 @@ impl Floodsub { let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)); if self_subscribed { self.received.add(&message); - if self.options.subscribe_local_messages { + if self.config.subscribe_local_messages { self.events.push_back( NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone()))); } From 87c90be31217510923e3fb11b1ca1bc0910ff5ef Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 1 Apr 2020 17:44:42 +0300 Subject: [PATCH 4/5] chore: update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05e66be6ba6..22bf35acfb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Version ??? +- `libp2p-floodsub`: Allow sent messages seen as subscribed + [PR 1520](https://github.com/libp2p/rust-libp2p/pull/1520) # Version 0.17.0 (2020-04-02) From 44537b668c0cceef17c11b6d5e79beba160d9093 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 6 Apr 2020 14:04:59 +0300 Subject: [PATCH 5/5] Update CHANGELOG.md Co-Authored-By: Max Inden --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 22bf35acfb9..53459981bb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Version ??? -- `libp2p-floodsub`: Allow sent messages seen as subscribed +- `libp2p-floodsub`: Allow sent messages seen as subscribed. [PR 1520](https://github.com/libp2p/rust-libp2p/pull/1520) # Version 0.17.0 (2020-04-02)