diff --git a/dht.go b/dht.go index 973b7de19..471ae4e7b 100644 --- a/dht.go +++ b/dht.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/google/uuid" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -22,18 +21,18 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/metrics" opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" - providers "github.com/libp2p/go-libp2p-kad-dht/providers" + "github.com/libp2p/go-libp2p-kad-dht/providers" - proto "github.com/gogo/protobuf/proto" - cid "github.com/ipfs/go-cid" + "github.com/gogo/protobuf/proto" + "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - goprocess "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" + "github.com/jbenet/goprocess" + "github.com/jbenet/goprocess/context" kb "github.com/libp2p/go-libp2p-kbucket" - record "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" - base32 "github.com/whyrusleeping/base32" + "github.com/whyrusleeping/base32" ) var logger = logging.Logger("dht") @@ -70,7 +69,7 @@ type IpfsDHT struct { bootstrapCfg opts.BootstrapConfig - rtRecoveryChan chan *rtRecoveryReq + triggerBootstrap chan struct{} } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -83,15 +82,6 @@ var ( _ routing.ValueStore = (*IpfsDHT)(nil) ) -type rtRecoveryReq struct { - id string - errorChan chan error -} - -func mkRtRecoveryReq() *rtRecoveryReq { - return &rtRecoveryReq{uuid.New().String(), make(chan error, 1)} -} - // New creates a new DHT with the specified host and options. func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) { var cfg opts.Options @@ -114,11 +104,6 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er dht.proc.AddChild(dht.providers.Process()) dht.Validator = cfg.Validator - // RT recovery proc - rtRecoveryProc := goprocessctx.WithContext(ctx) - rtRecoveryProc.Go(dht.rtRecovery) - dht.proc.AddChild(rtRecoveryProc) - if !cfg.Client { for _, p := range cfg.Protocols { h.SetStreamHandler(p, dht.handleNewStream) @@ -152,8 +137,6 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT { rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()) - rtRecoveryChan := make(chan *rtRecoveryReq) - cmgr := h.ConnManager() rt.PeerAdded = func(p peer.ID) { @@ -162,38 +145,21 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p rt.PeerRemoved = func(p peer.ID) { cmgr.UntagPeer(p, "kbucket") - go func(rtRecoveryChan chan *rtRecoveryReq) { - if rt.Size() == 0 { - req := mkRtRecoveryReq() - logger.Warningf("rt peer removed notification: RT is empty, will attempt to initiate recovery, reqID=%s", req.id) - select { - case <-ctx.Done(): - return - case rtRecoveryChan <- req: - select { - case <-ctx.Done(): - return - case <-req.errorChan: - // TODO Do we need to do anything here ? - } - } - } - }(rtRecoveryChan) } dht := &IpfsDHT{ - datastore: dstore, - self: h.ID(), - peerstore: h.Peerstore(), - host: h, - strmap: make(map[peer.ID]*messageSender), - ctx: ctx, - providers: providers.NewProviderManager(ctx, h.ID(), dstore), - birth: time.Now(), - routingTable: rt, - protocols: protocols, - bucketSize: bucketSize, - rtRecoveryChan: rtRecoveryChan, + datastore: dstore, + self: h.ID(), + peerstore: h.Peerstore(), + host: h, + strmap: make(map[peer.ID]*messageSender), + ctx: ctx, + providers: providers.NewProviderManager(ctx, h.ID(), dstore), + birth: time.Now(), + routingTable: rt, + protocols: protocols, + bucketSize: bucketSize, + triggerBootstrap: make(chan struct{}), } dht.ctx = dht.newContextWithLocalTags(ctx) @@ -201,7 +167,10 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p return dht } -func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { +// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR +// come up with an alternative solution. +// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387 +/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { writeResp := func(errorChan chan error, err error) { select { case <-proc.Closing(): @@ -231,7 +200,7 @@ func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { return } } -} +}*/ // putValueToPeer stores the given key/value pair at the peer 'p' func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 5ecfa860c..6e40c597a 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -15,6 +15,8 @@ import ( var DefaultBootstrapPeers []multiaddr.Multiaddr +var minRTBootstrapThreshold = 4 + func init() { for _, s := range []string{ "/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", @@ -39,33 +41,27 @@ func init() { } } -// Runs cfg.Queries bootstrap queries every cfg.BucketPeriod. +// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod. func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { - seedRTIfEmpty := func(tag string) { - if dht.routingTable.Size() == 0 { - req := mkRtRecoveryReq() - logger.Warningf("dht bootstrap: %s: RT is empty, will attempt to initiate recovery, reqID=%s", tag, req.id) - select { - case <-ctx.Done(): - return - case dht.rtRecoveryChan <- req: - select { - case <-ctx.Done(): - return - case <-req.errorChan: - // TODO Should we abort the ONGOING bootstrap attempt if seeder returns an error on the channel ? - } - } + triggerBootstrapFnc := func() { + logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap", + dht.routingTable.Size(), minRTBootstrapThreshold) + + if err := dht.selfWalk(ctx); err != nil { + logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err) + } + + if err := dht.bootstrapBuckets(ctx); err != nil { + logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err) } } // we should query for self periodically so we can discover closer peers go func() { for { - seedRTIfEmpty("self walk") err := dht.selfWalk(ctx) if err != nil { - logger.Warningf("error bootstrapping while searching for my self (I'm Too Shallow ?): %s", err) + logger.Warningf("self walk: error: %s", err) } select { case <-time.After(dht.bootstrapCfg.SelfQueryInterval): @@ -78,29 +74,31 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period go func() { for { - seedRTIfEmpty("buckets") err := dht.bootstrapBuckets(ctx) if err != nil { - logger.Warningf("error bootstrapping: %s", err) + logger.Warningf("bootstrap buckets: error bootstrapping: %s", err) } select { case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval): + case <-dht.triggerBootstrap: + triggerBootstrapFnc() case <-ctx.Done(): return } } }() + return nil } -//scan the RT,& do a random walk on k-buckets that haven't been queried since the given bucket period +// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { - doQuery := func(n int, target string, f func(context.Context) error) error { + doQuery := func(bucketId int, target string, f func(context.Context) error) error { logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)", - n, target, dht.routingTable.Size()) + bucketId, target, dht.routingTable.Size()) defer func() { logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)", - n, target, dht.routingTable.Size()) + bucketId, target, dht.routingTable.Size()) }() queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout) defer cancel() @@ -145,7 +143,7 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { close(errChan) }() - // accumulate errors from all go-routines + // accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure. var errStrings []string for err := range errChan { errStrings = append(errStrings, err.Error()) @@ -153,8 +151,19 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { if len(errStrings) == 0 { return nil } else { - return fmt.Errorf("errors encountered while running bootstrap on RT: %s", strings.Join(errStrings, "\n")) + return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n")) + } +} + +// Traverse the DHT toward the self ID +func (dht *IpfsDHT) selfWalk(ctx context.Context) error { + queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout) + defer cancel() + _, err := dht.FindPeer(queryCtx, dht.self) + if err == routing.ErrNotFound { + return nil } + return err } // synchronous bootstrap. @@ -165,12 +174,3 @@ func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error { return dht.bootstrapBuckets(ctx) } } - -// Traverse the DHT toward the self ID -func (dht *IpfsDHT) selfWalk(ctx context.Context) error { - _, err := dht.FindPeer(ctx, dht.self) - if err == routing.ErrNotFound { - return nil - } - return err -} diff --git a/dht_test.go b/dht_test.go index 18dc04afc..bba86c84e 100644 --- a/dht_test.go +++ b/dht_test.go @@ -688,6 +688,55 @@ func TestBootstrap(t *testing.T) { } } +func TestBootstrapBelowMinRTThreshold(t *testing.T) { + ctx := context.Background() + dhtA := setupDHT(ctx, t, false) + dhtB := setupDHT(ctx, t, false) + dhtC := setupDHT(ctx, t, false) + + defer func() { + dhtA.Close() + dhtA.host.Close() + + dhtB.Close() + dhtB.host.Close() + + dhtC.Close() + dhtC.host.Close() + }() + + connect(t, ctx, dhtA, dhtB) + connect(t, ctx, dhtB, dhtC) + + // we ONLY init bootstrap on A + dhtA.Bootstrap(ctx) + // and wait for one round to complete i.e. A should be connected to both B & C + waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second) + + // now we create two new peers + dhtD := setupDHT(ctx, t, false) + dhtE := setupDHT(ctx, t, false) + + // connect them to each other + connect(t, ctx, dhtD, dhtE) + defer func() { + dhtD.Close() + dhtD.host.Close() + + dhtE.Close() + dhtE.host.Close() + }() + + // and then, on connecting the peer D to A, the min RT threshold gets triggered on A which leads to a bootstrap. + // since the default bootstrap scan interval is 30 mins - 1 hour, we can be sure that if bootstrap happens, + // it is because of the min RT threshold getting triggered (since default min value is 4 & we only have 2 peers in the RT when D gets connected) + connect(t, ctx, dhtA, dhtD) + + // and because of the above bootstrap, A also discovers E ! + waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second) + assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!") +} + func TestPeriodicBootstrap(t *testing.T) { if ci.IsRunning() { t.Skip("skipping on CI. highly timing dependent") diff --git a/notif.go b/notif.go index 3af758492..556cadd5f 100644 --- a/notif.go +++ b/notif.go @@ -32,7 +32,14 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { + bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) + if bootstrap { + select { + case dht.triggerBootstrap <- struct{}{}: + default: + } + } } return } @@ -71,7 +78,14 @@ func (nn *netNotifiee) testConnection(v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { + bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) + if bootstrap { + select { + case dht.triggerBootstrap <- struct{}{}: + default: + } + } } }