Skip to content

Commit

Permalink
Merge pull request #634 from libp2p/fix/protect-useful-peers
Browse files Browse the repository at this point in the history
fix: protect useful peers in low buckets
  • Loading branch information
Stebalien authored May 25, 2020
2 parents c38060f + 6c1d38b commit 3d294c7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 12 deletions.
48 changes: 37 additions & 11 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,23 @@ var (
baseLogger = logger.Desugar()
)

// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag.
// It is added with the common prefix length between two peer IDs.
const BaseConnMgrScore = 5
const (
// BaseConnMgrScore is the base of the score set on the connection
// manager "kbucket" tag. It is added with the common prefix length
// between two peer IDs.
baseConnMgrScore = 5

// UsefulConnMgrScore is given to peers that are among the first peers
// to respond to a query.
//
// This score is given to peers the first time they're useful and lasts
// until we disconnect from the peer.
usefulConnMgrScore = 20

// UsefulConnMgrProtectedBuckets is the number of buckets where useful
// peers are _protected_, instead of just given the useful score.
usefulConnMgrProtectedBuckets = 2
)

type mode int

Expand All @@ -58,11 +72,17 @@ const (
kad2 protocol.ID = "/kad/2.0.0"
)

const (
dhtUsefulTag = "dht-useful"
kbucketTag = "kbucket"
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
selfKey kb.ID
peerstore peerstore.Peerstore // Peer Registry

datastore ds.Datastore // Local data
Expand Down Expand Up @@ -250,6 +270,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
selfKey: kb.ConvertPeerID(h.ID()),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
Expand Down Expand Up @@ -337,17 +358,22 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(dht.host.ID())

rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
// We tag our closest peers with higher and higher scores so we
// stay connected to our nearest neighbors.
//
// We _also_ (elsewhere) protect useful peers in the furthest
// buckets (our "core" routing nodes) and give high scores to
// all other useful peers.
commonPrefixLen := kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
cmgr.TagPeer(p, kbucketTag, baseConnMgrScore+commonPrefixLen)
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
cmgr.Unprotect(p, dhtUsefulTag)
cmgr.UntagPeer(p, kbucketTag)

// try to fix the RT
dht.fixRTIfNeeded()
Expand Down
19 changes: 18 additions & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,24 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
}

func (q *query) recordPeerIsValuable(p peer.ID) {
q.dht.routingTable.UpdateLastUsefulAt(p, time.Now())
if !q.dht.routingTable.UpdateLastUsefulAt(p, time.Now()) {
// not in routing table
return
}

// Protect useful peers, when they're actually useful. This will last
// through disconnects. However, we'll still evict them if they keep
// disconnecting from us.
//
// Restrict to buckets 0, 1 (75% of requests, max 40 peers), so we don't
// protect _too_ many peers.
commonPrefixLen := kb.CommonPrefixLen(q.dht.selfKey, kb.ConvertPeerID(p))
cmgr := q.dht.host.ConnManager()
if commonPrefixLen < usefulConnMgrProtectedBuckets {
cmgr.Protect(p, dhtUsefulTag)
} else {
cmgr.TagPeer(p, dhtUsefulTag, usefulConnMgrScore)
}
}

func (q *query) recordValuablePeers() {
Expand Down

0 comments on commit 3d294c7

Please sign in to comment.