Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[On ice] Gossipsub: an extensible baseline pubsub protocol, based on randomized topic meshes and gossip #767

Closed
wants to merge 88 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
efe1f23
Copy floodsub to gossipsub, update rpc.proto, rpc_proto, and protocol…
jamesray1 Dec 11, 2018
d2540b3
Move gossipsub to the floodsub crate, modify the crate and rename to …
jamesray1 Dec 11, 2018
3d810d0
Update Cargo.toml in root of workspace: floodsub -> pubsub
jamesray1 Dec 11, 2018
fc86fbe
Fix errors with imports, copy files to gossipsub, add more doc comments
jamesray1 Dec 11, 2018
1082c88
pubsub -> gossipsub, start re-adding floodsub
jamesray1 Dec 17, 2018
f172211
re-add floodsub
jamesray1 Dec 17, 2018
17fa2bf
Modifications to handler, has errors due to only partially changing t…
jamesray1 Dec 17, 2018
10599a8
Add a break
jamesray1 Dec 17, 2018
e49021e
Remove duplication of Floodsub stuff, I think you can import and use …
jamesray1 Dec 18, 2018
2eb2ec2
Fix merge conflict with root Cargo.toml
jamesray1 Dec 18, 2018
40b5c7f
GossipsubHandler -> RawGossipsubHandler and add a GossipsubHandler wi…
jamesray1 Dec 18, 2018
882fd72
Rename Floodsub* to RawGossipsub*, doc comment for GossipsubHandler
jamesray1 Dec 18, 2018
0c727c6
Add more types for ControlMessage, some are still TODO.
jamesray1 Dec 18, 2018
5f421f8
Add apparent changes, though none show in diff, can't checkout with p…
jamesray1 Dec 19, 2018
8f57777
Add remaining control messages.
jamesray1 Dec 19, 2018
291b417
Start layer: Gossipsub struct, new fn, skeletal code for control mess…
jamesray1 Dec 19, 2018
996f8f2
Add extra Gossipsub fields, including ones the same as in Floodsub, p…
jamesray1 Dec 20, 2018
6155234
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into g…
jamesray1 Dec 21, 2018
434e509
change ControlMessage to an enum, add types to the variants of Messag…
jamesray1 Dec 21, 2018
b8f9be0
Re-add protocol.rs and handler.rs
jamesray1 Dec 26, 2018
eb661b1
Re-implement Coder for GossipsubCodec, UpgradeInfo for GossipsubConfi…
jamesray1 Dec 26, 2018
53cdbb9
Add a hash to GossipsubMessage and rename it to Message, use MessageH…
jamesray1 Dec 26, 2018
a8ef658
Add impls for MessageID, use newly added Impl From<MessageID>> for Me…
jamesray1 Dec 26, 2018
e3b0463
Add doc comments for and make fields public.
jamesray1 Dec 26, 2018
e9747c8
Fixes for ProtocolsHandlerSelect import and usage for GossipsubHandle…
jamesray1 Dec 26, 2018
9d392dc
Fixes: types, imports, line breaks, new for Gossipsub, MCache and Mes…
jamesray1 Dec 27, 2018
8b4ba21
Remove extra Cargo.toml files caused by conflict
jamesray1 Dec 27, 2018
ae518df
Rewrite new and build for MsgHashBuilder
jamesray1 Dec 27, 2018
ae2b8f7
Fixes: compiler errors, MsgID -> MsgId, add control to Gossipsub publ…
jamesray1 Dec 27, 2018
f1e857d
Fix more compiler errors
jamesray1 Dec 27, 2018
f7e8cb6
Fixes: compiler errors, adaptations from Floodsub, add to NetworkBeha…
jamesray1 Dec 27, 2018
18a45a0
Fix MsgId by constructing from the seq_no and source
jamesray1 Dec 28, 2018
9eeaafc
Minor fixes to commented out code, newlines at EOF, line breaks
jamesray1 Dec 28, 2018
6c0d720
Fix for control loop in encode fn for Encoder impl for GossipsubCodec
jamesray1 Dec 28, 2018
cf0d1a6
Add a TopicRep and TopicId, GossipsubCodec -> GossqqipsubRpcCodec, ad…
jamesray1 Dec 31, 2018
313ef7f
Remove derive Hash on Gmessage
jamesray1 Jan 2, 2019
683b274
Add notes for https://github.com/stepancheg/rust-protobuf/issues/211 …
jamesray1 Jan 2, 2019
5ef3658
Revert floodsub::topic back to as it is now.
jamesray1 Jan 4, 2019
d2cf099
Add an enum MessageRep to rpc.proto and use instead of
jamesray1 Jan 4, 2019
a58b9e3
Rebuild rpc_proto.rs
jamesray1 Jan 4, 2019
e27d947
Fix compiler errors: impl IntoIterator for TopicMap,
jamesray1 Jan 7, 2019
ca69096
Fix compiler errors e.g. use TopicHash -> TopicRep in
jamesray1 Jan 7, 2019
a802a10
Break up long lines, comment out unused lines in
jamesray1 Jan 9, 2019
47d7e51
Add doc comments for constants.
jamesray1 Jan 9, 2019
74be5f7
Fix compiler errors: use a GOutEvents enum as the
jamesray1 Jan 10, 2019
2ec0354
Fix more compiler errors. Line breaks. Remove comment on
jamesray1 Jan 14, 2019
9b95df4
Rename source field in GMessage to from.
jamesray1 Jan 15, 2019
37e6bfe
rm unused imports, add impl PartialEq and Eq for Topic
jamesray1 Jan 15, 2019
25e951a
Fix compiler errors: pass by reference,
jamesray1 Jan 15, 2019
ad4bbfb
Add graft_many and graft methods, uses added GError
jamesray1 Jan 16, 2019
d78525c
Use custom_error for errors for brevity, adds a
jamesray1 Jan 17, 2019
ca5e630
Changes that were supposed to be included in the
jamesray1 Jan 17, 2019
9c86970
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into g…
jamesray1 Jan 17, 2019
293632a
Bump version to 0.2.0 to match with upstream changes.
jamesray1 Jan 17, 2019
b5665ce
Add another bump in the root workspace.
jamesray1 Jan 17, 2019
e202708
* Bump libp2p version to 0.2.2 (didn't update via
jamesray1 Jan 17, 2019
b39666d
* Line breaks.
jamesray1 Jan 22, 2019
c62c17c
Fix compiler errors:
jamesray1 Jan 22, 2019
21c86a7
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into g…
jamesray1 Jan 22, 2019
54e3fb8
Fix errors by adding edition = "2018" to Cargo.toml.
jamesray1 Jan 22, 2019
4ad8861
Add a seen cache to MCache using LruCache, and fix
jamesray1 Jan 23, 2019
c26b069
Add fields to Gossipsub.
jamesray1 Jan 23, 2019
0e18348
* Use Result<T> type with GError.
jamesray1 Jan 23, 2019
45aad89
* Use a TopicHash for publish and
jamesray1 Jan 25, 2019
d87e6b2
More work on graft_peers_many and GraftErrors.
jamesray1 Jan 28, 2019
c65095d
Fix use of graft methods that use graft_peers_many.
jamesray1 Jan 28, 2019
4816d45
Comment out unused GraftErrors methodsx
jamesray1 Jan 28, 2019
5915058
Add work on graft methods, fixing some compiler errors
jamesray1 Jan 29, 2019
65d1469
Add impl AsRef<PeerId> for PeerId {
jamesray1 Jan 29, 2019
0c21545
* Fix compiler errors: visibility,
jamesray1 Jan 29, 2019
c5bcedd
Fix another error.
jamesray1 Jan 29, 2019
dc100c4
Attempts to fix another error.
jamesray1 Jan 29, 2019
8e665d4
Fix more errors: types, ownership,
jamesray1 Jan 29, 2019
35a12aa
Fix compiler errors in kad/behaviour.rs due to adding AsRef<PeerId>.
jamesray1 Jan 30, 2019
e018484
Rearrange check if topic is not in the
jamesray1 Jan 30, 2019
cf36cb6
* Modify get_peer_from_topic
jamesray1 Jan 30, 2019
9f5b9cb
* "Complete" prune methods
jamesray1 Jan 30, 2019
1094f8f
Add doc comments.
jamesray1 Jan 31, 2019
456b103
Clean up commented out code.
jamesray1 Jan 31, 2019
51e87bc
* Add notifying peer that they have
jamesray1 Jan 31, 2019
beecfcc
Fix error due to doc comment
jamesray1 Jan 31, 2019
05ec0c6
Fix some errors.
jamesray1 Feb 4, 2019
3f15748
Fix errors.
jamesray1 Feb 4, 2019
aa26a99
Fix errors.
jamesray1 Feb 4, 2019
f2e430b
Fix errors.
jamesray1 Feb 4, 2019
8d9592c
Notify with prune message.
jamesray1 Feb 4, 2019
cf079f8
Add minor changes
jamesray1 Feb 7, 2019
6e1e053
Add config
jamesray1 Feb 22, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ libp2p-mplex = { version = "0.2.0", path = "./muxers/mplex" }
libp2p-identify = { version = "0.2.0", path = "./protocols/identify" }
libp2p-kad = { version = "0.2.0", path = "./protocols/kad" }
libp2p-floodsub = { version = "0.2.0", path = "./protocols/floodsub" }
libp2p-gossipsub = { version = "0.2.0", path = "./protocols/gossipsub" }
libp2p-ping = { version = "0.2.0", path = "./protocols/ping" }
libp2p-plaintext = { version = "0.2.0", path = "./protocols/plaintext" }
libp2p-ratelimit = { version = "0.2.0", path = "./transports/ratelimit" }
Expand Down Expand Up @@ -64,6 +65,7 @@ members = [
"muxers/mplex",
"muxers/yamux",
"protocols/floodsub",
"protocols/gossipsub",
"protocols/identify",
"protocols/kad",
"protocols/observed",
Expand Down
5 changes: 5 additions & 0 deletions core/src/peer_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ impl Into<multihash::Multihash> for PeerId {
}
}

impl AsRef<PeerId> for PeerId {
fn as_ref(&self) -> &PeerId {
&self
}
}
quick_error! {
#[derive(Debug)]
pub enum ParseError {
Expand Down
25 changes: 16 additions & 9 deletions protocols/floodsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,36 +182,43 @@ where
let mut substream = self.substreams.swap_remove(n);
loop {
substream = match substream {
SubstreamState::WaitingInput(mut substream) => match substream.poll() {
SubstreamState::WaitingInput(mut substream)
=> match substream.poll() {
Ok(Async::Ready(Some(message))) => {
self.substreams
.push(SubstreamState::WaitingInput(substream));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(message)));
}
Ok(Async::Ready(None)) => SubstreamState::Closing(substream),
return Ok(Async::Ready(
ProtocolsHandlerEvent::Custom(message)));
},
Ok(Async::Ready(None)) => SubstreamState::Closing(
substream),
Ok(Async::NotReady) => {
self.substreams
.push(SubstreamState::WaitingInput(substream));
return Ok(Async::NotReady);
}
},
Err(_) => SubstreamState::Closing(substream),
},
SubstreamState::PendingSend(mut substream, message) => {
match substream.start_send(message)? {
AsyncSink::Ready => SubstreamState::PendingFlush(substream),
AsyncSink::Ready => SubstreamState::PendingFlush(
substream),
AsyncSink::NotReady(message) => {
self.substreams
.push(SubstreamState::PendingSend(substream, message));
.push(SubstreamState::PendingSend(
substream, message));
return Ok(Async::NotReady);
}
}
}
SubstreamState::PendingFlush(mut substream) => {
match substream.poll_complete()? {
Async::Ready(()) => SubstreamState::Closing(substream),
Async::Ready(()) => SubstreamState::Closing(
substream),
Async::NotReady => {
self.substreams
.push(SubstreamState::PendingFlush(substream));
.push(SubstreamState::PendingFlush(
substream));
return Ok(Async::NotReady);
}
}
Expand Down
18 changes: 11 additions & 7 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,26 @@ impl<TSubstream> Floodsub<TSubstream> {
/// Publishes a message to the network.
///
/// > **Note**: Doesn't do anything if we're not subscribed to the topic.
pub fn publish(&mut self, topic: impl Into<TopicHash>, data: impl Into<Vec<u8>>) {
pub fn publish(&mut self, topic: impl Into<TopicHash>,
data: impl Into<Vec<u8>>) {
self.publish_many(iter::once(topic), data)
}

/// Publishes a message with multiple topics to the network.
///
/// > **Note**: Doesn't do anything if we're not subscribed to any of the topics.
pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<TopicHash>>, data: impl Into<Vec<u8>>) {
/// > **Note**: Doesn't do anything if we're not subscribed to any of the
/// > topics.
pub fn publish_many(&mut self, topics: impl IntoIterator<Item = impl
Into<TopicHash>>, data: impl Into<Vec<u8>>) {
let message = FloodsubMessage {
source: self.local_peer_id.clone(),
data: data.into(),
// If the sequence numbers are predictable, then an attacker could flood the network
// with packets with the predetermined sequence numbers and absorb our legitimate
// messages. We therefore use a random number.
// If the sequence numbers are predictable, then an attacker could
// flood the network with packets with the predetermined sequence
// numbers and absorb our legitimate messages. We therefore use a
// random number.
sequence_number: rand::random::<[u8; 20]>().to_vec(),
topics: topic.into_iter().map(|t| t.into().clone()).collect(),
topics: topics.into_iter().map(|t| t.into().clone()).collect(),
};

// Don't publish the message if we're not subscribed ourselves to any of the topics.
Expand Down
8 changes: 5 additions & 3 deletions protocols/floodsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ where

#[inline]
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
future::ok(Framed::new(socket, FloodsubCodec { length_prefix: Default::default() }))
future::ok(Framed::new(socket, FloodsubCodec {
length_prefix: Default::default() }))
}
}

Expand Down Expand Up @@ -122,8 +123,9 @@ impl Encoder for FloodsubCodec {
proto
.write_length_delimited_to_writer(&mut dst.by_ref().writer())
.expect(
"there is no situation in which the protobuf message can be invalid, and \
writing to a BytesMut never fails as we reserved enough space beforehand",
"there is no situation in which the protobuf message can be \
invalid, and writing to a BytesMut never fails as we reserved \
enough space beforehand",
);
Ok(())
}
Expand Down
26 changes: 26 additions & 0 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "libp2p-gossipsub"
description = "Gossipsub for libp2p."
version = "0.2.0"
authors = ["James Ray <[email protected]>"]
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking", "pubsub", "gossipsub"]
categories = ["network-programming", "asynchronous"]

[dependencies]
bs58 = "0.2.0"
bytes = "0.4"
chrono = "0.4"
cuckoofilter = "0.3.2"
custom_error = "1.3.0"
fnv = "1.0"
futures = "0.1"
libp2p-core = { path = "../../core" }
libp2p-floodsub = { path = "../floodsub" }
lru_time_cache = "0.8.1"
protobuf = "2.0.2"
rand = "0.6"
smallvec = "0.6.5"
tokio-codec = "0.1"
tokio-io = "0.1"
unsigned-varint = { version = "0.2.1", features = ["codec"] }
10 changes: 10 additions & 0 deletions protocols/gossipsub/gen_rpc_proto.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/sh

# Similar to regen_structs_proto.sh but leaves installation of any
# dependencies up to the user.
# TODO: write a script that will install dependencies across platforms.
# Basic tip: run `chmod +x gen_rpc_proto.sh && ./gen_rpc_proto.sh`

protoc --rust_out . rpc.proto
sudo chown $USER:$USER *.rs
mv -f rpc.rs ./src/rpc_proto.rs
13 changes: 13 additions & 0 deletions protocols/gossipsub/regen_structs_proto.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh

# This script regenerates the `src/rpc_proto.rs` file from `rpc.proto`.

docker run --rm -v `pwd`:/usr/code:z -w /usr/code rust /bin/bash -c " \
apt-get update; \
apt-get install -y protobuf-compiler; \
cargo install --version 2.0.2 protobuf-codegen; \
protoc --rust_out . rpc.proto"

sudo chown $USER:$USER *.rs

mv -f rpc.rs ./src/rpc_proto.rs
97 changes: 97 additions & 0 deletions protocols/gossipsub/rpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// From https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto,
// should be the same unless that has been changed.

syntax = "proto2";

// import "google/protobuf/descriptor.proto";

// extend google.protobuf.EnumValueOptions {
// optional string type= 51234;
// optional string code= 51235;
// }

package pubsub.pb;

message RPC {
repeated SubOpts subscriptions = 1;
repeated Message publish = 2;

message SubOpts {
optional bool subscribe = 1; // subscribe or unsubcribe
optional string topic_hash = 2;
}

optional ControlMessage control = 3;
}

message Message {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topic_hashes = 4;
// TODO: use
// optional bytes signature = 5;
// optional bytes key = 6;
}

message ControlMessage {
repeated ControlIHave ihave = 1;
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
}

// Using an ID doesn't seem to be worth it compared to using a hash.
// While this may not be strictly necessary as a messageID and a messageHash
// are both strings, it would be useful to differentiate between them in the
// protobuf file, e.g. due to the accessors and for clarity.
// enum MessageRep {
// ID = 0 [(type) = "ID", (code) = 'I'];
// HASH = 1 [(type) = "HSH", (code) = 'H'];
// }

message ControlIHave {
optional string topic_hash = 1;
repeated string message_hashes = 2;
}

message ControlIWant {
repeated string message_hashes = 1;
}

message ControlGraft {
optional string topic_hash = 1;
}

message ControlPrune {
optional string topic_hash = 1;
}

message TopicDescriptor {
optional string name = 1;
// TODO: use
// optional AuthOpts auth = 2;
// optional EncOpts enc = 3;

// message AuthOpts {
// optional AuthMode mode = 1;
// repeated bytes keys = 2; // root keys to trust

// enum AuthMode {
// NONE = 0; // no authentication, anyone can publish
// KEY = 1; // only messages signed by keys in the topic descriptor are accepted
// WOT = 2; // web of trust, certificates can allow publisher set to grow
// }
// }

// message EncOpts {
// optional EncMode mode = 1;
// repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted)

// enum EncMode {
// NONE = 0; // no encryption, anyone can read
// SHAREDKEY = 1; // messages are encrypted with shared key
// WOT = 2; // web of trust, certificates can allow publisher set to grow
// }
// }
}
31 changes: 31 additions & 0 deletions protocols/gossipsub/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Overlay parameters

/// The target number of peers in the mesh to gossip to and from.
pub const TARGET_MESH_DEGREE: u32 = 6;
/// Low water mark for the mesh degree, any lower and it could take longer to
/// find messages.
pub const LOW_WM_MESH_DEGREE: u32 = 4;
/// High water mark for the mesh degree, any higher and it could be too
/// much for bandwidth (particularly for low-end devices).
pub const HIGH_WM_MESH_DEGREE: u32 = 12;

// Gossip parameters
/// Length of gossip history
pub const GOSSIP_HIST_LEN: u32 = 5;

/// We get message IDs from up to (but not including) this index in the
/// `MCache's` history window.
pub const HISTORY_GOSSIP: u32 = 3;

/// The length of the total message history.
pub const MSG_HIST_LEN: u32 = 120;
/// The duration of messages in the seen cache, in seconds.
pub const SEEN_MSGS_CACHE: u32 = 120;

/// In milliseconds
pub const HEARTBEAT_INITIAL_DELAY: u32 = 100;
/// The interval at which to run the heartbeat procedure, in seconds
pub const HEARTBEAT_INTERVAL: u32 = 1;

/// The time to live for fanout in seconds.
pub const FANOUT_TTL: u32 = 60;
Loading