Skip to content

Commit

Permalink
Update dht_bootstrap.go
Browse files Browse the repository at this point in the history
1) on connecting to a new peer  -> trigger self & bucket bootstrap if RT size goes below thereshold
2) accept formatting & doc suggestions in the review
3) remove RT recovery code for now -> will address in a separate PR once #383 goes in

changes as per review
  • Loading branch information
aarshkshah1992 authored and Stebalien committed Oct 11, 2019
1 parent f4630f6 commit 00fffba
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 91 deletions.
81 changes: 25 additions & 56 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -22,18 +21,18 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/metrics"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/providers"

proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
"github.com/jbenet/goprocess"
"github.com/jbenet/goprocess/context"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
base32 "github.com/whyrusleeping/base32"
"github.com/whyrusleeping/base32"
)

var logger = logging.Logger("dht")
Expand Down Expand Up @@ -70,7 +69,7 @@ type IpfsDHT struct {

bootstrapCfg opts.BootstrapConfig

rtRecoveryChan chan *rtRecoveryReq
triggerBootstrap chan struct{}
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand All @@ -83,15 +82,6 @@ var (
_ routing.ValueStore = (*IpfsDHT)(nil)
)

type rtRecoveryReq struct {
id string
errorChan chan error
}

func mkRtRecoveryReq() *rtRecoveryReq {
return &rtRecoveryReq{uuid.New().String(), make(chan error, 1)}
}

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
Expand All @@ -114,11 +104,6 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

// RT recovery proc
rtRecoveryProc := goprocessctx.WithContext(ctx)
rtRecoveryProc.Go(dht.rtRecovery)
dht.proc.AddChild(rtRecoveryProc)

if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
Expand Down Expand Up @@ -152,8 +137,6 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT {
rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())
rtRecoveryChan := make(chan *rtRecoveryReq)

cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand All @@ -162,46 +145,32 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p

rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
go func(rtRecoveryChan chan *rtRecoveryReq) {
if rt.Size() == 0 {
req := mkRtRecoveryReq()
logger.Warningf("rt peer removed notification: RT is empty, will attempt to initiate recovery, reqID=%s", req.id)
select {
case <-ctx.Done():
return
case rtRecoveryChan <- req:
select {
case <-ctx.Done():
return
case <-req.errorChan:
// TODO Do we need to do anything here ?
}
}
}
}(rtRecoveryChan)
}

dht := &IpfsDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
rtRecoveryChan: rtRecoveryChan,
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
triggerBootstrap: make(chan struct{}),
}

dht.ctx = dht.newContextWithLocalTags(ctx)

return dht
}

func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR
// come up with an alternative solution.
// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387
/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
writeResp := func(errorChan chan error, err error) {
select {
case <-proc.Closing():
Expand Down Expand Up @@ -231,7 +200,7 @@ func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
return
}
}
}
}*/

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
Expand Down
70 changes: 35 additions & 35 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

var DefaultBootstrapPeers []multiaddr.Multiaddr

var minRTBootstrapThreshold = 4

func init() {
for _, s := range []string{
"/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
Expand All @@ -39,33 +41,27 @@ func init() {
}
}

// Runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
seedRTIfEmpty := func(tag string) {
if dht.routingTable.Size() == 0 {
req := mkRtRecoveryReq()
logger.Warningf("dht bootstrap: %s: RT is empty, will attempt to initiate recovery, reqID=%s", tag, req.id)
select {
case <-ctx.Done():
return
case dht.rtRecoveryChan <- req:
select {
case <-ctx.Done():
return
case <-req.errorChan:
// TODO Should we abort the ONGOING bootstrap attempt if seeder returns an error on the channel ?
}
}
triggerBootstrapFnc := func() {
logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap",
dht.routingTable.Size(), minRTBootstrapThreshold)

if err := dht.selfWalk(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err)
}

if err := dht.bootstrapBuckets(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err)
}
}

// we should query for self periodically so we can discover closer peers
go func() {
for {
seedRTIfEmpty("self walk")
err := dht.selfWalk(ctx)
if err != nil {
logger.Warningf("error bootstrapping while searching for my self (I'm Too Shallow ?): %s", err)
logger.Warningf("self walk: error: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.SelfQueryInterval):
Expand All @@ -78,29 +74,31 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
go func() {
for {
seedRTIfEmpty("buckets")
err := dht.bootstrapBuckets(ctx)
if err != nil {
logger.Warningf("error bootstrapping: %s", err)
logger.Warningf("bootstrap buckets: error bootstrapping: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval):
case <-dht.triggerBootstrap:
triggerBootstrapFnc()
case <-ctx.Done():
return
}
}
}()

return nil
}

//scan the RT,& do a random walk on k-buckets that haven't been queried since the given bucket period
// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
doQuery := func(n int, target string, f func(context.Context) error) error {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)",
n, target, dht.routingTable.Size())
bucketId, target, dht.routingTable.Size())
defer func() {
logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)",
n, target, dht.routingTable.Size())
bucketId, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
defer cancel()
Expand Down Expand Up @@ -145,16 +143,27 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
close(errChan)
}()

// accumulate errors from all go-routines
// accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure.
var errStrings []string
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
} else {
return fmt.Errorf("errors encountered while running bootstrap on RT: %s", strings.Join(errStrings, "\n"))
return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n"))
}
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
}
return err
}

// synchronous bootstrap.
Expand All @@ -165,12 +174,3 @@ func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error {
return dht.bootstrapBuckets(ctx)
}
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
_, err := dht.FindPeer(ctx, dht.self)
if err == routing.ErrNotFound {
return nil
}
return err
}
49 changes: 49 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,55 @@ func TestBootstrap(t *testing.T) {
}
}

func TestBootstrapBelowMinRTThreshold(t *testing.T) {
ctx := context.Background()
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)
dhtC := setupDHT(ctx, t, false)

defer func() {
dhtA.Close()
dhtA.host.Close()

dhtB.Close()
dhtB.host.Close()

dhtC.Close()
dhtC.host.Close()
}()

connect(t, ctx, dhtA, dhtB)
connect(t, ctx, dhtB, dhtC)

// we ONLY init bootstrap on A
dhtA.Bootstrap(ctx)
// and wait for one round to complete i.e. A should be connected to both B & C
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second)

// now we create two new peers
dhtD := setupDHT(ctx, t, false)
dhtE := setupDHT(ctx, t, false)

// connect them to each other
connect(t, ctx, dhtD, dhtE)
defer func() {
dhtD.Close()
dhtD.host.Close()

dhtE.Close()
dhtE.host.Close()
}()

// and then, on connecting the peer D to A, the min RT threshold gets triggered on A which leads to a bootstrap.
// since the default bootstrap scan interval is 30 mins - 1 hour, we can be sure that if bootstrap happens,
// it is because of the min RT threshold getting triggered (since default min value is 4 & we only have 2 peers in the RT when D gets connected)
connect(t, ctx, dhtA, dhtD)

// and because of the above bootstrap, A also discovers E !
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second)
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}

func TestPeriodicBootstrap(t *testing.T) {
if ci.IsRunning() {
t.Skip("skipping on CI. highly timing dependent")
Expand Down
14 changes: 14 additions & 0 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
dht.plk.Lock()
defer dht.plk.Unlock()
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
}
}
}
return
}
Expand Down Expand Up @@ -71,7 +78,14 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
dht.plk.Lock()
defer dht.plk.Unlock()
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
}
}
}
}

Expand Down

0 comments on commit 00fffba

Please sign in to comment.