Skip to content

Commit

Permalink
RT connectivity changes
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 authored and Stebalien committed Mar 10, 2020
1 parent 7ada018 commit fbb1b36
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 193 deletions.
140 changes: 75 additions & 65 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
)

var logger = logging.Logger("dht")
var rtPvLogger = logging.Logger("dht/rt/peer-validation")

const BaseConnMgrScore = 5

Expand Down Expand Up @@ -96,6 +97,7 @@ type IpfsDHT struct {
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error

maxRecordAge time.Duration

Expand Down Expand Up @@ -127,8 +129,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
if err := cfg.validate(); err != nil {
return nil, err
}
dht, err := makeDHT(ctx, h, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
}

dht := makeDHT(ctx, h, cfg)
dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
Expand Down Expand Up @@ -168,6 +173,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// handle providers
dht.proc.AddChild(dht.providers.Process())

dht.startSelfLookup()
dht.startRefreshing()
return dht, nil
}
Expand Down Expand Up @@ -195,18 +201,10 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
self := kb.ConvertPeerID(h.ID())
rt := kb.NewRoutingTable(cfg.bucketSize, self, cfg.routingTable.latencyTolerance, h.Peerstore())
cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
}

rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
rt, err := makeRoutingTable(h, cfg)
if err != nil {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}

protocols := []protocol.ID{cfg.protocolPrefix + kad2}
Expand All @@ -223,20 +221,21 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
}

dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
rng: rand.New(rand.NewSource(rand.Int63())),
routingTable: rt,
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
d: cfg.disjointPaths,
triggerRtRefresh: make(chan chan<- error),
datastore: cfg.datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
rng: rand.New(rand.NewSource(rand.Int63())),
routingTable: rt,
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
d: cfg.disjointPaths,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
}

// create a DHT proc with the given context
Expand All @@ -249,43 +248,39 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {

dht.providers = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)

return dht
return dht, nil
}

// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR
// come up with an alternative solution.
// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387
/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
writeResp := func(errorChan chan error, err error) {
select {
case <-proc.Closing():
case errorChan <- errChan:
}
close(errorChan)
}
for {
select {
case req := <-dht.rtRecoveryChan:
if dht.routingTable.Size() == 0 {
logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id)
// TODO Call Seeder with default bootstrap peers here once #383 is merged
if dht.routingTable.Size() > 0 {
logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size())
go writeResp(req.errorChan, nil)
} else {
logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id)
go writeResp(req.errorChan, errors.New("RT empty after seed attempt"))
}
} else {
logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id)
go writeResp(req.errorChan, nil)
}
case <-proc.Closing():
return
func makeRoutingTable(h host.Host, cfg config) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(h.ID())
// construct the routing table with a peer validation function
pvF := func(c context.Context, p peer.ID) bool {
if err := h.Connect(c, peer.AddrInfo{ID: p}); err != nil {
rtPvLogger.Errorf("failed to connect to peer %s for validation, err=%s", p, err)
return false
}
return true
}
}*/

rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)}
if !(cfg.routingTable.checkInterval == 0) {
rtOpts = append(rtOpts, kb.TableCleanupInterval(cfg.routingTable.checkInterval))
}

rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, h.Peerstore(),
rtOpts...)
cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
}

return rt, err
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
Expand Down Expand Up @@ -398,11 +393,26 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
return dht.datastore.Put(mkDsKey(key), data)
}

// Update signals the routingTable to Update its last-seen status
// on the given peer.
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
logger.Event(ctx, "updatePeer", p)
dht.routingTable.Update(p)
// peerFound signals the routingTable that we've found a peer that
// supports the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerFound", p)
dht.routingTable.HandlePeerAlive(p)
}

// peerStoppedDHT signals the routing table that a peer has stopped supporting the DHT protocol.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerStoppedDHT", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.HandlePeerDead(p)
}

// peerDisconnected signals the routing table that a peer is not connected anymore.
func (dht *IpfsDHT) peerDisconnected(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerDisconnected", p)
dht.routingTable.HandlePeerDisconnect(p)

}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Expand Down
110 changes: 85 additions & 25 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
)
Expand All @@ -17,7 +18,7 @@ var DefaultBootstrapPeers []multiaddr.Multiaddr

// Minimum number of peers in the routing table. If we drop below this and we
// see a new peer, we trigger a bootstrap round.
var minRTRefreshThreshold = 4
var minRTRefreshThreshold = 10

func init() {
for _, s := range []string{
Expand All @@ -35,6 +36,50 @@ func init() {
}
}

// startSelfLookup starts a go-routine that listens for requests to trigger a self walk on a dedicated channel
// and then sends the error status back on the error channel sent along with the request.
// if multiple callers "simultaneously" ask for a self walk, it performs ONLY one self walk and sends the same error status to all of them.
func (dht *IpfsDHT) startSelfLookup() error {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)
for {
var waiting []chan<- error
select {
case res := <-dht.triggerSelfLookup:
if res != nil {
waiting = append(waiting, res)
}
case <-ctx.Done():
return
}

// batch multiple refresh requests if they're all waiting at the same time.
waiting = append(waiting, collectWaitingChannels(dht.triggerSelfLookup)...)

// Do a self walk
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound || err == kbucket.ErrLookupFailure {
err = nil
} else if err != nil {
err = fmt.Errorf("failed to query self during routing table refresh: %s", err)
}
cancel()

// send back the error status
for _, w := range waiting {
w <- err
close(w)
}
if err != nil {
logger.Warning(err)
}
}
})

return nil
}

// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
Expand Down Expand Up @@ -65,17 +110,7 @@ func (dht *IpfsDHT) startRefreshing() error {
}

// Batch multiple refresh requests if they're all waiting at the same time.
collectWaiting:
for {
select {
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
default:
break collectWaiting
}
}
waiting = append(waiting, collectWaitingChannels(dht.triggerRtRefresh)...)

err := dht.doRefresh(ctx)
for _, w := range waiting {
Expand All @@ -91,11 +126,41 @@ func (dht *IpfsDHT) startRefreshing() error {
return nil
}

func collectWaitingChannels(source chan chan<- error) []chan<- error {
var waiting []chan<- error
for {
select {
case res := <-source:
if res != nil {
waiting = append(waiting, res)
}
default:
return waiting
}
}
}

func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
var merr error
if err := dht.selfWalk(ctx); err != nil {
merr = multierror.Append(merr, err)

// wait for the self walk result
selfWalkres := make(chan error, 1)

select {
case dht.triggerSelfLookup <- selfWalkres:
case <-ctx.Done():
return ctx.Err()
}

select {
case err := <-selfWalkres:
if err != nil {
merr = multierror.Append(merr, err)
}
case <-ctx.Done():
return ctx.Err()
}

if err := dht.refreshCpls(ctx); err != nil {
merr = multierror.Append(merr, err)
}
Expand Down Expand Up @@ -127,6 +192,12 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue
}

// do not refresh if bucket is full
if dht.routingTable.IsBucketFull(tcpl.Cpl) {
continue
}

// gen rand peer with the cpl
randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl)
if err != nil {
Expand All @@ -153,17 +224,6 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
return merr
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
}
return fmt.Errorf("failed to query self during routing table refresh: %s", err)
}

// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
//
Expand Down
Loading

0 comments on commit fbb1b36

Please sign in to comment.