Skip to content

Commit

Permalink
Merge branch 'yahya/gossipsub-router-interface'
Browse files Browse the repository at this point in the history
  • Loading branch information
yhassanzadeh13 committed Nov 10, 2022
2 parents b897e22 + ab8365c commit 1c91995
Show file tree
Hide file tree
Showing 15 changed files with 344 additions and 256 deletions.
2 changes: 1 addition & 1 deletion discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {
s = server2
}
disc := &mockDiscoveryClient{h, s}
ps := getGossipsub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...)))
ps := getGossipSub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...)))
psubs[i] = ps
topicHandlers[i], _ = ps.Join(topic)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/yhassanzadeh13/go-libp2p-pubsub
module github.com/libp2p/go-libp2p-pubsub

go 1.17

Expand Down
50 changes: 25 additions & 25 deletions gossip_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
)

// gossipTracer is an internal tracer that tracks IWANT requests in order to penalize
// GossipTracer is an internal tracer that tracks IWANT requests in order to penalize
// peers who don't follow up on IWANT requests after an IHAVE advertisement.
// The tracking of promises is probabilistic to avoid using too much memory.
type gossipTracer struct {
type GossipTracer struct {
sync.Mutex

idGen *msgIDGenerator
Expand All @@ -27,15 +27,15 @@ type gossipTracer struct {
peerPromises map[peer.ID]map[string]struct{}
}

func newGossipTracer() *gossipTracer {
return &gossipTracer{
func newGossipTracer() *GossipTracer {
return &GossipTracer{
idGen: newMsgIdGenerator(),
promises: make(map[string]map[peer.ID]time.Time),
peerPromises: make(map[peer.ID]map[string]struct{}),
}
}

func (gt *gossipTracer) Start(gs *GossipSubRouter) {
func (gt *GossipTracer) Start(gs *GossipSubRouter) {
if gt == nil {
return
}
Expand All @@ -45,7 +45,7 @@ func (gt *gossipTracer) Start(gs *GossipSubRouter) {
}

// track a promise to deliver a message from a list of msgIDs we are requesting
func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) {
func (gt *GossipTracer) AddPromise(p peer.ID, msgIDs []string) {
if gt == nil {
return
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) {

// returns the number of broken promises for each peer who didn't follow up
// on an IWANT request.
func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
func (gt *GossipTracer) GetBrokenPromises() map[peer.ID]int {
if gt == nil {
return nil
}
Expand Down Expand Up @@ -114,9 +114,9 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
return res
}

var _ RawTracer = (*gossipTracer)(nil)
var _ RawTracer = (*GossipTracer)(nil)

func (gt *gossipTracer) fulfillPromise(msg *Message) {
func (gt *GossipTracer) fulfillPromise(msg *Message) {
mid := gt.idGen.ID(msg)

gt.Lock()
Expand All @@ -140,12 +140,12 @@ func (gt *gossipTracer) fulfillPromise(msg *Message) {
}
}

func (gt *gossipTracer) DeliverMessage(msg *Message) {
func (gt *GossipTracer) DeliverMessage(msg *Message) {
// someone delivered a message, fulfill promises for it
gt.fulfillPromise(msg)
}

func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
func (gt *GossipTracer) RejectMessage(msg *Message, reason string) {
// A message got rejected, so we can fulfill promises and let the score penalty apply
// from invalid message delivery.
// We do take exception and apply promise penalty regardless in the following cases, where
Expand All @@ -160,26 +160,26 @@ func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
gt.fulfillPromise(msg)
}

func (gt *gossipTracer) ValidateMessage(msg *Message) {
func (gt *GossipTracer) ValidateMessage(msg *Message) {
// we consider the promise fulfilled as soon as the message begins validation
// if it was a case of signature issue it would have been rejected immediately
// without triggering the Validate trace
gt.fulfillPromise(msg)
}

func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
func (gt *gossipTracer) RemovePeer(p peer.ID) {}
func (gt *gossipTracer) Join(topic string) {}
func (gt *gossipTracer) Leave(topic string) {}
func (gt *gossipTracer) Graft(p peer.ID, topic string) {}
func (gt *gossipTracer) Prune(p peer.ID, topic string) {}
func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
func (gt *gossipTracer) RecvRPC(rpc *RPC) {}
func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID) {}
func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (gt *gossipTracer) UndeliverableMessage(msg *Message) {}

func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
func (gt *GossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
func (gt *GossipTracer) RemovePeer(p peer.ID) {}
func (gt *GossipTracer) Join(topic string) {}
func (gt *GossipTracer) Leave(topic string) {}
func (gt *GossipTracer) Graft(p peer.ID, topic string) {}
func (gt *GossipTracer) Prune(p peer.ID, topic string) {}
func (gt *GossipTracer) DuplicateMessage(msg *Message) {}
func (gt *GossipTracer) RecvRPC(rpc *RPC) {}
func (gt *GossipTracer) SendRPC(rpc *RPC, p peer.ID) {}
func (gt *GossipTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (gt *GossipTracer) UndeliverableMessage(msg *Message) {}

func (gt *GossipTracer) ThrottlePeer(p peer.ID) {
gt.Lock()
defer gt.Unlock()

Expand Down
Loading

0 comments on commit 1c91995

Please sign in to comment.