diff --git a/dht.go b/dht.go index 7c6fb8185..548d683fa 100644 --- a/dht.go +++ b/dht.go @@ -42,9 +42,23 @@ var ( baseLogger = logger.Desugar() ) -// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag. -// It is added with the common prefix length between two peer IDs. -const BaseConnMgrScore = 5 +const ( + // BaseConnMgrScore is the base of the score set on the connection + // manager "kbucket" tag. It is added with the common prefix length + // between two peer IDs. + baseConnMgrScore = 5 + + // UsefulConnMgrScore is given to peers that are among the first peers + // to respond to a query. + // + // This score is given to peers the first time they're useful and lasts + // until we disconnect from the peer. + usefulConnMgrScore = 20 + + // UsefulConnMgrProtectedBuckets is the number of buckets where useful + // peers are _protected_, instead of just given the useful score. + usefulConnMgrProtectedBuckets = 2 +) type mode int @@ -58,11 +72,17 @@ const ( kad2 protocol.ID = "/kad/2.0.0" ) +const ( + dhtUsefulTag = "dht-useful" + kbucketTag = "kbucket" +) + // IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. // It is used to implement the base Routing module. type IpfsDHT struct { - host host.Host // the network services we need - self peer.ID // Local peer (yourself) + host host.Host // the network services we need + self peer.ID // Local peer (yourself) + selfKey kb.ID peerstore peerstore.Peerstore // Peer Registry datastore ds.Datastore // Local data @@ -250,6 +270,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { dht := &IpfsDHT{ datastore: cfg.datastore, self: h.ID(), + selfKey: kb.ConvertPeerID(h.ID()), peerstore: h.Peerstore(), host: h, strmap: make(map[peer.ID]*messageSender), @@ -337,17 +358,22 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr } func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) { - self := kb.ConvertPeerID(dht.host.ID()) - - rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold) + rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold) cmgr := dht.host.ConnManager() rt.PeerAdded = func(p peer.ID) { - commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p)) - cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen) + // We tag our closest peers with higher and higher scores so we + // stay connected to our nearest neighbors. + // + // We _also_ (elsewhere) protect useful peers in the furthest + // buckets (our "core" routing nodes) and give high scores to + // all other useful peers. + commonPrefixLen := kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p)) + cmgr.TagPeer(p, kbucketTag, baseConnMgrScore+commonPrefixLen) } rt.PeerRemoved = func(p peer.ID) { - cmgr.UntagPeer(p, "kbucket") + cmgr.Unprotect(p, dhtUsefulTag) + cmgr.UntagPeer(p, kbucketTag) // try to fix the RT dht.fixRTIfNeeded() diff --git a/query.go b/query.go index 43eb808ec..918813389 100644 --- a/query.go +++ b/query.go @@ -174,7 +174,24 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn } func (q *query) recordPeerIsValuable(p peer.ID) { - q.dht.routingTable.UpdateLastUsefulAt(p, time.Now()) + if !q.dht.routingTable.UpdateLastUsefulAt(p, time.Now()) { + // not in routing table + return + } + + // Protect useful peers, when they're actually useful. This will last + // through disconnects. However, we'll still evict them if they keep + // disconnecting from us. + // + // Restrict to buckets 0, 1 (75% of requests, max 40 peers), so we don't + // protect _too_ many peers. + commonPrefixLen := kb.CommonPrefixLen(q.dht.selfKey, kb.ConvertPeerID(p)) + cmgr := q.dht.host.ConnManager() + if commonPrefixLen < usefulConnMgrProtectedBuckets { + cmgr.Protect(p, dhtUsefulTag) + } else { + cmgr.TagPeer(p, dhtUsefulTag, usefulConnMgrScore) + } } func (q *query) recordValuablePeers() {