Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow sent messages seen as subscribed #1520

Merged
merged 6 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Version ???

- `libp2p-floodsub`: Allow sent messages seen as subscribed.
[PR 1520](https://github.com/libp2p/rust-libp2p/pull/1520)

# Version 0.17.0 (2020-04-02)

Expand Down
23 changes: 16 additions & 7 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::topic::Topic;
use crate::FloodsubConfig;
use cuckoofilter::CuckooFilter;
use fnv::FnvHashSet;
use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
Expand All @@ -43,8 +44,7 @@ pub struct Floodsub {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,

/// Peer id of the local node. Used for the source of the messages that we publish.
local_peer_id: PeerId,
config: FloodsubConfig,

/// List of peers to send messages to.
target_peers: FnvHashSet<PeerId>,
Expand All @@ -64,11 +64,16 @@ pub struct Floodsub {
}

impl Floodsub {
/// Creates a `Floodsub`.
/// Creates a `Floodsub` with default configuration.
pub fn new(local_peer_id: PeerId) -> Self {
Self::from_config(FloodsubConfig::new(local_peer_id))
}

/// Creates a `Floodsub` with the given configuration.
pub fn from_config(config: FloodsubConfig) -> Self {
Floodsub {
events: VecDeque::new(),
local_peer_id,
config,
target_peers: FnvHashSet::default(),
connected_peers: HashMap::new(),
subscribed_topics: SmallVec::new(),
Expand Down Expand Up @@ -190,7 +195,7 @@ impl Floodsub {

fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
let message = FloodsubMessage {
source: self.local_peer_id.clone(),
source: self.config.local_peer_id.clone(),
data: data.into(),
// If the sequence numbers are predictable, then an attacker could flood the network
// with packets with the predetermined sequence numbers and absorb our legitimate
Expand All @@ -202,6 +207,10 @@ impl Floodsub {
let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
if self_subscribed {
self.received.add(&message);
if self.config.subscribe_local_messages {
self.events.push_back(
NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone())));
}
}
// Don't publish the message if we have to check subscriptions
// and we're not subscribed ourselves to any of the topics.
Expand All @@ -228,7 +237,7 @@ impl Floodsub {
}

impl NetworkBehaviour for Floodsub {
type ProtocolsHandler = OneShotHandler<FloodsubConfig, FloodsubRpc, InnerMessage>;
type ProtocolsHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
type OutEvent = FloodsubEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
Expand Down
21 changes: 21 additions & 0 deletions protocols/floodsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
//! Implements the floodsub protocol, see also the:
//! [spec](https://github.com/libp2p/specs/tree/master/pubsub).

use libp2p_core::PeerId;

pub mod protocol;

mod layer;
Expand All @@ -33,3 +35,22 @@ mod rpc_proto {
pub use self::layer::{Floodsub, FloodsubEvent};
pub use self::protocol::{FloodsubMessage, FloodsubRpc};
pub use self::topic::Topic;

/// Configuration options for the Floodsub protocol.
pub struct FloodsubConfig {
/// Peer id of the local node. Used for the source of the messages that we publish.
pub local_peer_id: PeerId,

/// `true` if messages published by local node should be propagated as messages received from
/// the network, `false` by default.
pub subscribe_local_messages: bool,
}

impl FloodsubConfig {
pub fn new(local_peer_id: PeerId) -> Self {
Self {
local_peer_id,
subscribe_local_messages: false
}
}
}
14 changes: 7 additions & 7 deletions protocols/floodsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ use futures::{Future, io::{AsyncRead, AsyncWrite}};

/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone, Default)]
pub struct FloodsubConfig {}
pub struct FloodsubProtocol {}

impl FloodsubConfig {
/// Builds a new `FloodsubConfig`.
pub fn new() -> FloodsubConfig {
FloodsubConfig {}
impl FloodsubProtocol {
/// Builds a new `FloodsubProtocol`.
pub fn new() -> FloodsubProtocol {
FloodsubProtocol {}
}
}

impl UpgradeInfo for FloodsubConfig {
impl UpgradeInfo for FloodsubProtocol {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;

Expand All @@ -45,7 +45,7 @@ impl UpgradeInfo for FloodsubConfig {
}
}

impl<TSocket> InboundUpgrade<TSocket> for FloodsubConfig
impl<TSocket> InboundUpgrade<TSocket> for FloodsubProtocol
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
Expand Down