Skip to content

Commit

Permalink
fix: protect useful peers in low buckets
Browse files Browse the repository at this point in the history
That way, we never disconnect from the core of the network.

We don't protect our closest peers as:

* We'll reconnect to them frequently when we query for ourself.
* We use the core nodes most frequently.
  • Loading branch information
Stebalien committed May 13, 2020
1 parent 9be7169 commit 7c89d12
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
22 changes: 14 additions & 8 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,17 @@ const (
kad2 protocol.ID = "/kad/2.0.0"
)

const (
dhtUsefulTag = "dht-useful"
kbucketTag = "kbucket"
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
selfKey kb.ID
peerstore peerstore.Peerstore // Peer Registry

datastore ds.Datastore // Local data
Expand Down Expand Up @@ -249,6 +255,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
selfKey: kb.ConvertPeerID(h.ID()),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
Expand Down Expand Up @@ -298,18 +305,17 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))

self := kb.ConvertPeerID(dht.host.ID())

rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold
cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
commonPrefixLen := kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
cmgr.TagPeer(p, kbucketTag, BaseConnMgrScore+commonPrefixLen)
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
cmgr.Unprotect(p, dhtUsefulTag)
cmgr.UntagPeer(p, kbucketTag)

// try to fix the RT
dht.fixRTIfNeeded()
Expand Down
16 changes: 15 additions & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,21 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
}

func (q *query) recordPeerIsValuable(p peer.ID) {
q.dht.routingTable.UpdateLastUsefulAt(p, time.Now())
if !q.dht.routingTable.UpdateLastUsefulAt(p, time.Now()) {
// not in routing table
return
}

// Protect useful peers, when they're actually useful. This will last
// through disconnects. However, we'll still evict them if they keep
// disconnecting from us.
//
// Restrict to buckets 0, 1 (75% of requests, max 40 peers), so we don't
// protect _too_ many peers.
commonPrefixLen := kb.CommonPrefixLen(q.dht.selfKey, kb.ConvertPeerID(p))
if commonPrefixLen < 2 {
q.dht.host.ConnManager().Protect(p, dhtUsefulTag)
}
}

func (q *query) recordValuablePeers() {
Expand Down

0 comments on commit 7c89d12

Please sign in to comment.