Skip to content

Commit

Permalink
1) seed RT whenever it becomes empty
Browse files Browse the repository at this point in the history
2) seed RT if empty before starting bootstrap incase 1 hasn't fired
3) pass bootstrap config as option while creating Dht
4) replace all bootstrap function with 1 function
  • Loading branch information
aarshkshah1992 authored and Stebalien committed Oct 11, 2019
1 parent 5da1634 commit f4630f6
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 116 deletions.
99 changes: 84 additions & 15 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -37,10 +38,6 @@ import (

var logger = logging.Logger("dht")

// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -70,6 +67,10 @@ type IpfsDHT struct {
protocols []protocol.ID // DHT protocols

bucketSize int

bootstrapCfg opts.BootstrapConfig

rtRecoveryChan chan *rtRecoveryReq
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand All @@ -82,6 +83,15 @@ var (
_ routing.ValueStore = (*IpfsDHT)(nil)
)

type rtRecoveryReq struct {
id string
errorChan chan error
}

func mkRtRecoveryReq() *rtRecoveryReq {
return &rtRecoveryReq{uuid.New().String(), make(chan error, 1)}
}

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
Expand All @@ -90,6 +100,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
dht.bootstrapCfg = cfg.BootstrapConfig

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
Expand All @@ -103,6 +114,11 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

// RT recovery proc
rtRecoveryProc := goprocessctx.WithContext(ctx)
rtRecoveryProc.Go(dht.rtRecovery)
dht.proc.AddChild(rtRecoveryProc)

if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
Expand Down Expand Up @@ -136,34 +152,87 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT {
rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())
rtRecoveryChan := make(chan *rtRecoveryReq)

cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
cmgr.TagPeer(p, "kbucket", 5)
}

rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
go func(rtRecoveryChan chan *rtRecoveryReq) {
if rt.Size() == 0 {
req := mkRtRecoveryReq()
logger.Warningf("rt peer removed notification: RT is empty, will attempt to initiate recovery, reqID=%s", req.id)
select {
case <-ctx.Done():
return
case rtRecoveryChan <- req:
select {
case <-ctx.Done():
return
case <-req.errorChan:
// TODO Do we need to do anything here ?
}
}
}
}(rtRecoveryChan)
}

dht := &IpfsDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
rtRecoveryChan: rtRecoveryChan,
}

dht.ctx = dht.newContextWithLocalTags(ctx)

return dht
}

func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
writeResp := func(errorChan chan error, err error) {
select {
case <-proc.Closing():
case errorChan <- err:
}
close(errorChan)
}

for {
select {
case req := <-dht.rtRecoveryChan:
if dht.routingTable.Size() == 0 {
logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id)
// TODO Call Seeder with default bootstrap peers here once #383 is merged
if dht.routingTable.Size() > 0 {
logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size())
go writeResp(req.errorChan, nil)
} else {
logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id)
go writeResp(req.errorChan, errors.New("RT empty after seed attempt"))
}
} else {
logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id)
go writeResp(req.errorChan, nil)
}
case <-proc.Closing():
return
}
}
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {

Expand Down
127 changes: 39 additions & 88 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package dht

import (
"context"
"crypto/rand"
"fmt"
"strings"
"sync"
"time"

u "github.com/ipfs/go-ipfs-util"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -42,43 +39,36 @@ func init() {
}
}

// BootstrapConfig specifies parameters used for bootstrapping the DHT.
type BootstrapConfig struct {
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period
SelfQueryInterval time.Duration // how often to query for self
}

var DefaultBootstrapConfig = BootstrapConfig{
// same as that mentioned in the kad dht paper
BucketPeriod: 1 * time.Hour,

// since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable
RoutingTableScanInterval: 30 * time.Minute,

Timeout: 10 * time.Second,

SelfQueryInterval: 1 * time.Hour,
}

// A method in the IpfsRouting interface. It calls BootstrapWithConfig with
// the default bootstrap config.
// Runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig)
}
seedRTIfEmpty := func(tag string) {
if dht.routingTable.Size() == 0 {
req := mkRtRecoveryReq()
logger.Warningf("dht bootstrap: %s: RT is empty, will attempt to initiate recovery, reqID=%s", tag, req.id)
select {
case <-ctx.Done():
return
case dht.rtRecoveryChan <- req:
select {
case <-ctx.Done():
return
case <-req.errorChan:
// TODO Should we abort the ONGOING bootstrap attempt if seeder returns an error on the channel ?
}
}
}
}

// Runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error {
// we should query for self periodically so we can discover closer peers
go func() {
for {
err := dht.BootstrapSelf(ctx)
seedRTIfEmpty("self walk")
err := dht.selfWalk(ctx)
if err != nil {
logger.Warningf("error bootstrapping while searching for my self (I'm Too Shallow ?): %s", err)
}
select {
case <-time.After(cfg.SelfQueryInterval):
case <-time.After(dht.bootstrapCfg.SelfQueryInterval):
case <-ctx.Done():
return
}
Expand All @@ -88,12 +78,13 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
go func() {
for {
err := dht.runBootstrap(ctx, cfg)
seedRTIfEmpty("buckets")
err := dht.bootstrapBuckets(ctx)
if err != nil {
logger.Warningf("error bootstrapping: %s", err)
}
select {
case <-time.After(cfg.RoutingTableScanInterval):
case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval):
case <-ctx.Done():
return
}
Expand All @@ -102,57 +93,16 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig
return nil
}

func newRandomPeerId() peer.ID {
id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs.
rand.Read(id)
id = u.Hash(id) // TODO: Feed this directly into the multihash instead of hashing it.
return peer.ID(id)
}

// Traverse the DHT toward the given ID.
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (peer.AddrInfo, error) {
// TODO: Extract the query action (traversal logic?) inside FindPeer,
// don't actually call through the FindPeer machinery, which can return
// things out of the peer store etc.
return dht.FindPeer(ctx, target)
}

// Traverse the DHT toward a random ID.
func (dht *IpfsDHT) randomWalk(ctx context.Context) error {
id := newRandomPeerId()
p, err := dht.walk(ctx, id)
switch err {
case routing.ErrNotFound:
return nil
case nil:
// We found a peer from a randomly generated ID. This should be very
// unlikely.
logger.Warningf("random walk toward %s actually found peer: %s", id, p)
return nil
default:
return err
}
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
_, err := dht.walk(ctx, dht.self)
if err == routing.ErrNotFound {
return nil
}
return err
}

//scan the RT,& do a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
doQuery := func(n int, target string, f func(context.Context) error) error {
logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)",
n, target, dht.routingTable.Size())
defer func() {
logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)",
n, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, cfg.Timeout)
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
defer cancel()
err := f(queryCtx)
if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
Expand All @@ -166,7 +116,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
errChan := make(chan error)

for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) > cfg.BucketPeriod {
if time.Since(bucket.RefreshedAt()) > dht.bootstrapCfg.BucketPeriod {
wg.Add(1)
go func(bucketID int, errChan chan<- error) {
defer wg.Done()
Expand All @@ -175,7 +125,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error

// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.walk(ctx, randPeerInBucket)
_, err := dht.FindPeer(ctx, randPeerInBucket)
if err == routing.ErrNotFound {
return nil
}
Expand Down Expand Up @@ -207,19 +157,20 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
}
}

// This is a synchronous bootstrap.
func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
if err := dht.BootstrapSelf(ctx); err != nil {
// synchronous bootstrap.
func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error {
if err := dht.selfWalk(ctx); err != nil {
return errors.Wrap(err, "failed bootstrap while searching for self")
} else {
return dht.runBootstrap(ctx, cfg)
return dht.bootstrapBuckets(ctx)
}
}

func (dht *IpfsDHT) BootstrapRandom(ctx context.Context) error {
return dht.randomWalk(ctx)
}

func (dht *IpfsDHT) BootstrapSelf(ctx context.Context) error {
return dht.selfWalk(ctx)
// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
_, err := dht.FindPeer(ctx, dht.self)
if err == routing.ErrNotFound {
return nil
}
return err
}
Loading

0 comments on commit f4630f6

Please sign in to comment.