From fbb1b3668aaecf095931859b4c250926f9e32884 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 6 Mar 2020 18:27:36 +0530 Subject: [PATCH] RT connectivity changes --- dht.go | 140 ++++++++++++++++++++++-------------------- dht_bootstrap.go | 110 +++++++++++++++++++++++++-------- dht_bootstrap_test.go | 49 +++++++++++++++ dht_net.go | 8 +-- dht_options.go | 11 +++- dht_test.go | 77 ----------------------- ext_test.go | 8 +-- go.mod | 2 +- go.sum | 3 + handlers_test.go | 2 +- notify_test.go | 35 ++++++++--- subscriber_notifee.go | 26 ++++++-- 12 files changed, 278 insertions(+), 193 deletions(-) create mode 100644 dht_bootstrap_test.go diff --git a/dht.go b/dht.go index 0465249de..e19c4ef48 100644 --- a/dht.go +++ b/dht.go @@ -36,6 +36,7 @@ import ( ) var logger = logging.Logger("dht") +var rtPvLogger = logging.Logger("dht/rt/peer-validation") const BaseConnMgrScore = 5 @@ -96,6 +97,7 @@ type IpfsDHT struct { rtRefreshQueryTimeout time.Duration rtRefreshPeriod time.Duration triggerRtRefresh chan chan<- error + triggerSelfLookup chan chan<- error maxRecordAge time.Duration @@ -127,8 +129,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) if err := cfg.validate(); err != nil { return nil, err } + dht, err := makeDHT(ctx, h, cfg) + if err != nil { + return nil, fmt.Errorf("failed to create DHT, err=%s", err) + } - dht := makeDHT(ctx, h, cfg) dht.autoRefresh = cfg.routingTable.autoRefresh dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout @@ -168,6 +173,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) // handle providers dht.proc.AddChild(dht.providers.Process()) + dht.startSelfLookup() dht.startRefreshing() return dht, nil } @@ -195,18 +201,10 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT return dht } -func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT { - self := kb.ConvertPeerID(h.ID()) - rt := kb.NewRoutingTable(cfg.bucketSize, self, cfg.routingTable.latencyTolerance, h.Peerstore()) - cmgr := h.ConnManager() - - rt.PeerAdded = func(p peer.ID) { - commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p)) - cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen) - } - - rt.PeerRemoved = func(p peer.ID) { - cmgr.UntagPeer(p, "kbucket") +func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { + rt, err := makeRoutingTable(h, cfg) + if err != nil { + return nil, fmt.Errorf("failed to construct routing table,err=%s", err) } protocols := []protocol.ID{cfg.protocolPrefix + kad2} @@ -223,20 +221,21 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT { } dht := &IpfsDHT{ - datastore: cfg.datastore, - self: h.ID(), - peerstore: h.Peerstore(), - host: h, - strmap: make(map[peer.ID]*messageSender), - birth: time.Now(), - rng: rand.New(rand.NewSource(rand.Int63())), - routingTable: rt, - protocols: protocols, - serverProtocols: serverProtocols, - bucketSize: cfg.bucketSize, - alpha: cfg.concurrency, - d: cfg.disjointPaths, - triggerRtRefresh: make(chan chan<- error), + datastore: cfg.datastore, + self: h.ID(), + peerstore: h.Peerstore(), + host: h, + strmap: make(map[peer.ID]*messageSender), + birth: time.Now(), + rng: rand.New(rand.NewSource(rand.Int63())), + routingTable: rt, + protocols: protocols, + serverProtocols: serverProtocols, + bucketSize: cfg.bucketSize, + alpha: cfg.concurrency, + d: cfg.disjointPaths, + triggerRtRefresh: make(chan chan<- error), + triggerSelfLookup: make(chan chan<- error), } // create a DHT proc with the given context @@ -249,43 +248,39 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT { dht.providers = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore) - return dht + return dht, nil } -// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR -// come up with an alternative solution. -// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387 -/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { - writeResp := func(errorChan chan error, err error) { - select { - case <-proc.Closing(): - case errorChan <- errChan: - } - close(errorChan) - } - - for { - select { - case req := <-dht.rtRecoveryChan: - if dht.routingTable.Size() == 0 { - logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id) - // TODO Call Seeder with default bootstrap peers here once #383 is merged - if dht.routingTable.Size() > 0 { - logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size()) - go writeResp(req.errorChan, nil) - } else { - logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id) - go writeResp(req.errorChan, errors.New("RT empty after seed attempt")) - } - } else { - logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id) - go writeResp(req.errorChan, nil) - } - case <-proc.Closing(): - return +func makeRoutingTable(h host.Host, cfg config) (*kb.RoutingTable, error) { + self := kb.ConvertPeerID(h.ID()) + // construct the routing table with a peer validation function + pvF := func(c context.Context, p peer.ID) bool { + if err := h.Connect(c, peer.AddrInfo{ID: p}); err != nil { + rtPvLogger.Errorf("failed to connect to peer %s for validation, err=%s", p, err) + return false } + return true } -}*/ + + rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)} + if !(cfg.routingTable.checkInterval == 0) { + rtOpts = append(rtOpts, kb.TableCleanupInterval(cfg.routingTable.checkInterval)) + } + + rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, h.Peerstore(), + rtOpts...) + cmgr := h.ConnManager() + + rt.PeerAdded = func(p peer.ID) { + commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p)) + cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen) + } + rt.PeerRemoved = func(p peer.ID) { + cmgr.UntagPeer(p, "kbucket") + } + + return rt, err +} // putValueToPeer stores the given key/value pair at the peer 'p' func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { @@ -398,11 +393,26 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { return dht.datastore.Put(mkDsKey(key), data) } -// Update signals the routingTable to Update its last-seen status -// on the given peer. -func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) { - logger.Event(ctx, "updatePeer", p) - dht.routingTable.Update(p) +// peerFound signals the routingTable that we've found a peer that +// supports the DHT protocol. +func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) { + logger.Event(ctx, "peerFound", p) + dht.routingTable.HandlePeerAlive(p) +} + +// peerStoppedDHT signals the routing table that a peer has stopped supporting the DHT protocol. +func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) { + logger.Event(ctx, "peerStoppedDHT", p) + // A peer that does not support the DHT protocol is dead for us. + // There's no point in talking to anymore till it starts supporting the DHT protocol again. + dht.routingTable.HandlePeerDead(p) +} + +// peerDisconnected signals the routing table that a peer is not connected anymore. +func (dht *IpfsDHT) peerDisconnected(ctx context.Context, p peer.ID) { + logger.Event(ctx, "peerDisconnected", p) + dht.routingTable.HandlePeerDisconnect(p) + } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. diff --git a/dht_bootstrap.go b/dht_bootstrap.go index d943d06bf..2ff90b6ae 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -9,6 +9,7 @@ import ( process "github.com/jbenet/goprocess" processctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p-core/routing" + kbucket "github.com/libp2p/go-libp2p-kbucket" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" ) @@ -17,7 +18,7 @@ var DefaultBootstrapPeers []multiaddr.Multiaddr // Minimum number of peers in the routing table. If we drop below this and we // see a new peer, we trigger a bootstrap round. -var minRTRefreshThreshold = 4 +var minRTRefreshThreshold = 10 func init() { for _, s := range []string{ @@ -35,6 +36,50 @@ func init() { } } +// startSelfLookup starts a go-routine that listens for requests to trigger a self walk on a dedicated channel +// and then sends the error status back on the error channel sent along with the request. +// if multiple callers "simultaneously" ask for a self walk, it performs ONLY one self walk and sends the same error status to all of them. +func (dht *IpfsDHT) startSelfLookup() error { + dht.proc.Go(func(proc process.Process) { + ctx := processctx.OnClosingContext(proc) + for { + var waiting []chan<- error + select { + case res := <-dht.triggerSelfLookup: + if res != nil { + waiting = append(waiting, res) + } + case <-ctx.Done(): + return + } + + // batch multiple refresh requests if they're all waiting at the same time. + waiting = append(waiting, collectWaitingChannels(dht.triggerSelfLookup)...) + + // Do a self walk + queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) + _, err := dht.FindPeer(queryCtx, dht.self) + if err == routing.ErrNotFound || err == kbucket.ErrLookupFailure { + err = nil + } else if err != nil { + err = fmt.Errorf("failed to query self during routing table refresh: %s", err) + } + cancel() + + // send back the error status + for _, w := range waiting { + w <- err + close(w) + } + if err != nil { + logger.Warning(err) + } + } + }) + + return nil +} + // Start the refresh worker. func (dht *IpfsDHT) startRefreshing() error { // scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period @@ -65,17 +110,7 @@ func (dht *IpfsDHT) startRefreshing() error { } // Batch multiple refresh requests if they're all waiting at the same time. - collectWaiting: - for { - select { - case res := <-dht.triggerRtRefresh: - if res != nil { - waiting = append(waiting, res) - } - default: - break collectWaiting - } - } + waiting = append(waiting, collectWaitingChannels(dht.triggerRtRefresh)...) err := dht.doRefresh(ctx) for _, w := range waiting { @@ -91,11 +126,41 @@ func (dht *IpfsDHT) startRefreshing() error { return nil } +func collectWaitingChannels(source chan chan<- error) []chan<- error { + var waiting []chan<- error + for { + select { + case res := <-source: + if res != nil { + waiting = append(waiting, res) + } + default: + return waiting + } + } +} + func (dht *IpfsDHT) doRefresh(ctx context.Context) error { var merr error - if err := dht.selfWalk(ctx); err != nil { - merr = multierror.Append(merr, err) + + // wait for the self walk result + selfWalkres := make(chan error, 1) + + select { + case dht.triggerSelfLookup <- selfWalkres: + case <-ctx.Done(): + return ctx.Err() } + + select { + case err := <-selfWalkres: + if err != nil { + merr = multierror.Append(merr, err) + } + case <-ctx.Done(): + return ctx.Err() + } + if err := dht.refreshCpls(ctx); err != nil { merr = multierror.Append(merr, err) } @@ -127,6 +192,12 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error { if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod { continue } + + // do not refresh if bucket is full + if dht.routingTable.IsBucketFull(tcpl.Cpl) { + continue + } + // gen rand peer with the cpl randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl) if err != nil { @@ -153,17 +224,6 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error { return merr } -// Traverse the DHT toward the self ID -func (dht *IpfsDHT) selfWalk(ctx context.Context) error { - queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) - defer cancel() - _, err := dht.FindPeer(queryCtx, dht.self) - if err == routing.ErrNotFound { - return nil - } - return fmt.Errorf("failed to query self during routing table refresh: %s", err) -} - // Bootstrap tells the DHT to get into a bootstrapped state satisfying the // IpfsRouter interface. // diff --git a/dht_bootstrap_test.go b/dht_bootstrap_test.go new file mode 100644 index 000000000..a7330c940 --- /dev/null +++ b/dht_bootstrap_test.go @@ -0,0 +1,49 @@ +package dht + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/event" + + kb "github.com/libp2p/go-libp2p-kbucket" + + "github.com/stretchr/testify/require" +) + +func TestSelfWalkOnAddressChange(t *testing.T) { + ctx := context.Background() + // create three DHT instances with auto refresh disabled + d1 := setupDHT(ctx, t, false, DisableAutoRefresh()) + d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) + d3 := setupDHT(ctx, t, false, DisableAutoRefresh()) + + var connectedTo *IpfsDHT + // connect d1 to whoever is "further" + if kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d2.self)) <= + kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d3.self)) { + connect(t, ctx, d1, d3) + connectedTo = d3 + } else { + connect(t, ctx, d1, d2) + connectedTo = d2 + } + + // then connect d2 AND d3 + connect(t, ctx, d2, d3) + + // d1 should have ONLY 1 peer in it's RT + waitForWellFormedTables(t, []*IpfsDHT{d1}, 1, 1, 2*time.Second) + require.Equal(t, connectedTo.self, d1.routingTable.ListPeers()[0]) + + // now emit the address change event + em, err := d1.host.EventBus().Emitter(&event.EvtLocalAddressesUpdated{}) + require.NoError(t, err) + require.NoError(t, em.Emit(event.EvtLocalAddressesUpdated{})) + waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 2*time.Second) + // it should now have both peers in the RT + ps := d1.routingTable.ListPeers() + require.Contains(t, ps, d2.self) + require.Contains(t, ps, d3.self) +} diff --git a/dht_net.go b/dht_net.go index 2b38169f9..ae0d62866 100644 --- a/dht_net.go +++ b/dht_net.go @@ -174,7 +174,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message ms, err := dht.messageSenderForPeer(ctx, p) if err != nil { if err == msmux.ErrNotSupported { - dht.RoutingTable().Remove(p) + dht.peerStoppedDHT(ctx, p) } stats.Record(ctx, metrics.SentRequests.M(1), @@ -188,7 +188,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message rpmes, err := ms.SendRequest(ctx, pmes) if err != nil { if err == msmux.ErrNotSupported { - dht.RoutingTable().Remove(p) + dht.peerStoppedDHT(ctx, p) } stats.Record(ctx, metrics.SentRequests.M(1), @@ -214,7 +214,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message ms, err := dht.messageSenderForPeer(ctx, p) if err != nil { if err == msmux.ErrNotSupported { - dht.RoutingTable().Remove(p) + dht.peerStoppedDHT(ctx, p) } stats.Record(ctx, metrics.SentMessages.M(1), @@ -225,7 +225,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message if err := ms.SendMessage(ctx, pmes); err != nil { if err == msmux.ErrNotSupported { - dht.RoutingTable().Remove(p) + dht.peerStoppedDHT(ctx, p) } stats.Record(ctx, metrics.SentMessages.M(1), diff --git a/dht_options.go b/dht_options.go index ecbce7496..648cb5f67 100644 --- a/dht_options.go +++ b/dht_options.go @@ -43,6 +43,7 @@ type config struct { refreshPeriod time.Duration autoRefresh bool latencyTolerance time.Duration + checkInterval time.Duration } // internal parameters, not publicly exposed @@ -80,7 +81,7 @@ var defaults = func(o *config) error { o.routingTable.latencyTolerance = time.Minute o.routingTable.refreshQueryTimeout = 10 * time.Second - o.routingTable.refreshPeriod = 1 * time.Hour + o.routingTable.refreshPeriod = 10 * time.Minute o.routingTable.autoRefresh = true o.maxRecordAge = time.Hour * 36 @@ -120,6 +121,14 @@ func (c *config) validate() error { return nil } +// RoutingTableCheckInterval is the interval between two runs of the RT cleanup routine. +func RoutingTableCheckInterval(i time.Duration) Option { + return func(c *config) error { + c.routingTable.checkInterval = i + return nil + } +} + // RoutingTableLatencyTolerance sets the maximum acceptable latency for peers // in the routing table's cluster. func RoutingTableLatencyTolerance(latency time.Duration) Option { diff --git a/dht_test.go b/dht_test.go index ec41db4a5..2e8ad83f8 100644 --- a/dht_test.go +++ b/dht_test.go @@ -856,83 +856,6 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) { assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!") } -// Check to make sure we re-fill the routing table from connected peers when it -// completely empties. -func TestEmptyTable(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - nDHTs := 50 - dhts := setupDHTS(t, ctx, nDHTs) - defer func() { - for _, dht := range dhts { - dht.Close() - defer dht.host.Close() - } - }() - - t.Logf("dhts are not connected. %d", nDHTs) - for _, dht := range dhts { - rtlen := dht.routingTable.Size() - if rtlen > 0 { - t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen) - } - } - - for i := 1; i < nDHTs; i++ { - connectNoSync(t, ctx, dhts[0], dhts[i]) - } - - // Wait till the routing table stabilizes. - oldSize := dhts[0].routingTable.Size() - for { - time.Sleep(time.Millisecond) - newSize := dhts[0].routingTable.Size() - if oldSize == newSize { - break - } - oldSize = newSize - } - - // remove any one peer from the RT so we don't end up disconnecting all of them if the RT - // already has all peers we are connected to - dhts[0].routingTable.Remove(dhts[0].routingTable.ListPeers()[0]) - - if u.Debug { - printRoutingTables(dhts[:1]) - } - - // Disconnect from all peers that _were_ in the routing table. - routingTablePeers := make(map[peer.ID]bool, nDHTs) - for _, p := range dhts[0].RoutingTable().ListPeers() { - routingTablePeers[p] = true - } - - oldDHTs := dhts[1:] - dhts = dhts[:1] - for _, dht := range oldDHTs { - if routingTablePeers[dht.Host().ID()] { - dhts[0].Host().Network().ClosePeer(dht.host.ID()) - dht.Close() - dht.host.Close() - } else { - dhts = append(dhts, dht) - } - } - - // we should now _re-add_ some peers to the routing table - for i := 0; i < 100; i++ { - if dhts[0].routingTable.Size() > 0 { - return - } - time.Sleep(time.Millisecond) - } - if u.Debug { - printRoutingTables(dhts[:1]) - } - t.Fatal("routing table shouldn't have been empty") -} - func TestPeriodicRefresh(t *testing.T) { if ci.IsRunning() { t.Skip("skipping on CI. highly timing dependent") diff --git a/ext_test.go b/ext_test.go index 832609e92..1b9b92372 100644 --- a/ext_test.go +++ b/ext_test.go @@ -40,7 +40,7 @@ func TestHungRequest(t *testing.T) { defer s.Reset() <-ctx.Done() }) - d.Update(ctx, hosts[1].ID()) + d.peerFound(ctx, hosts[1].ID()) ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second) defer cancel1() @@ -214,7 +214,7 @@ func TestNotFound(t *testing.T) { } for _, p := range hosts { - d.Update(ctx, p.ID()) + d.peerFound(ctx, p.ID()) } // Reply with random peers to every message @@ -294,7 +294,7 @@ func TestLessThanKResponses(t *testing.T) { } for i := 1; i < 5; i++ { - d.Update(ctx, hosts[i].ID()) + d.peerFound(ctx, hosts[i].ID()) } // Reply with random peers to every message @@ -363,7 +363,7 @@ func TestMultipleQueries(t *testing.T) { t.Fatal(err) } - d.Update(ctx, hosts[1].ID()) + d.peerFound(ctx, hosts[1].ID()) // It would be nice to be able to just get a value and succeed but then // we'd need to deal with selectors and validators... diff --git a/go.mod b/go.mod index 7bb178364..f849f4205 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/libp2p/go-eventbus v0.1.0 github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a github.com/libp2p/go-libp2p-core v0.3.2-0.20200305051524-d143201d83c2 - github.com/libp2p/go-libp2p-kbucket v0.2.3 + github.com/libp2p/go-libp2p-kbucket v0.3.1 github.com/libp2p/go-libp2p-peerstore v0.1.4 github.com/libp2p/go-libp2p-record v0.1.2 github.com/libp2p/go-libp2p-swarm v0.2.2 diff --git a/go.sum b/go.sum index 9a642a88b..db59dafdb 100644 --- a/go.sum +++ b/go.sum @@ -195,6 +195,8 @@ github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM= github.com/libp2p/go-libp2p-kbucket v0.2.3/go.mod h1:opWrBZSWnBYPc315q497huxY3sz1t488X6OiXUEYWKA= +github.com/libp2p/go-libp2p-kbucket v0.3.1 h1:aHSdqYBAyExg2xir/VN7B2myIN6yxUuDG0FeyrZpBQE= +github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= @@ -401,6 +403,7 @@ github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= diff --git a/handlers_test.go b/handlers_test.go index dafa1ee05..7b705d2df 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -89,7 +89,7 @@ func BenchmarkHandleFindPeer(b *testing.B) { panic(err) } - d.routingTable.Update(id) + d.peerFound(ctx, id) peers = append(peers, id) a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i)) diff --git a/notify_test.go b/notify_test.go index 20a06f549..3cf30c3f7 100644 --- a/notify_test.go +++ b/notify_test.go @@ -7,14 +7,16 @@ import ( "time" tu "github.com/libp2p/go-libp2p-testing/etc" + + "github.com/stretchr/testify/require" ) func TestNotifieeMultipleConn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - d1 := setupDHT(ctx, t, false) - d2 := setupDHT(ctx, t, false) + d1 := setupDHT(ctx, t, false, RoutingTableCheckInterval(50*time.Millisecond)) + d2 := setupDHT(ctx, t, false, RoutingTableCheckInterval(50*time.Millisecond)) nn1, err := newSubscriberNotifiee(d1) if err != nil { @@ -36,6 +38,8 @@ func TestNotifieeMultipleConn(t *testing.T) { if !checkRoutingTable(d1, d2) { t.Fatal("no routes") } + + // we are still connected, so the disconnect notification should be a No-op nn1.Disconnected(d1.host.Network(), c12) nn2.Disconnected(d2.host.Network(), c21) @@ -43,6 +47,8 @@ func TestNotifieeMultipleConn(t *testing.T) { t.Fatal("no routes") } + // the connection close should now mark the peer as missing in the RT for both peers + // because of the disconnect notification for _, conn := range d1.host.Network().ConnsToPeer(d2.self) { conn.Close() } @@ -50,20 +56,28 @@ func TestNotifieeMultipleConn(t *testing.T) { conn.Close() } - tu.WaitFor(ctx, func() error { + // close both the hosts so all connection attempts to them by RT Peer validation fail + d1.host.Close() + d2.host.Close() + + // wait context will ensure that the RT cleanup completes + waitCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + require.NoError(t, tu.WaitFor(waitCtx, func() error { if checkRoutingTable(d1, d2) { return fmt.Errorf("should not have routes") } return nil - }) + })) } func TestNotifieeFuzz(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() - d1 := setupDHT(ctx, t, false) - d2 := setupDHT(ctx, t, false) + d1 := setupDHT(ctx, t, false, RoutingTableCheckInterval(50*time.Millisecond)) + d2 := setupDHT(ctx, t, false, RoutingTableCheckInterval(50*time.Millisecond)) for i := 0; i < 10; i++ { connectNoSync(t, ctx, d1, d2) @@ -71,13 +85,16 @@ func TestNotifieeFuzz(t *testing.T) { conn.Close() } } - tu.WaitFor(ctx, func() error { + + // close both hosts so peer validation reconnect fails + d1.host.Close() + d2.host.Close() + require.NoError(t, tu.WaitFor(ctx, func() error { if checkRoutingTable(d1, d2) { return fmt.Errorf("should not have routes") } return nil - }) - connect(t, ctx, d1, d2) + })) } func checkRoutingTable(a, b *IpfsDHT) bool { diff --git a/subscriber_notifee.go b/subscriber_notifee.go index ce62e906e..aed5ecaff 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -32,6 +32,10 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) { // register for event bus protocol ID changes in order to update the routing table new(event.EvtPeerProtocolsUpdated), + + // register for event bus notifications for when our local address/addresses change so we can + // advertise those to the network + new(event.EvtLocalAddressesUpdated), } // register for event bus local routability changes in order to trigger switching between client and server modes @@ -62,7 +66,7 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) { return nil, fmt.Errorf("could not check peerstore for protocol support: err: %s", err) } if valid { - dht.Update(dht.ctx, p) + dht.peerFound(dht.ctx, p) } } @@ -82,6 +86,16 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { } switch evt := e.(type) { + case event.EvtLocalAddressesUpdated: + // when our address changes, we should proactively tell our closest peers about it so + // we become discoverable quickly. The Identify protocol will push a signed peer record + // with our new address to all peers we are connected to. However, we might not necessarily be connected + // to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way + // to to forge connections with those matter. + select { + case dht.triggerSelfLookup <- nil: + default: + } case event.EvtPeerIdentificationCompleted: handlePeerIdentificationCompletedEvent(dht, evt) case event.EvtPeerProtocolsUpdated: @@ -117,7 +131,7 @@ func handlePeerIdentificationCompletedEvent(dht *IpfsDHT, e event.EvtPeerIdentif return } if valid { - dht.Update(dht.ctx, e.Peer) + dht.peerFound(dht.ctx, e.Peer) fixLowPeers(dht) } } @@ -130,9 +144,9 @@ func handlePeerProtocolsUpdatedEvent(dht *IpfsDHT, e event.EvtPeerProtocolsUpdat } if valid { - dht.routingTable.Update(e.Peer) + dht.peerFound(dht.ctx, e.Peer) } else { - dht.routingTable.Remove(e.Peer) + dht.peerStoppedDHT(dht.ctx, e.Peer) } fixLowPeers(dht) @@ -187,7 +201,7 @@ func fixLowPeers(dht *IpfsDHT) { // Don't bother probing, we do that on connect. valid, _ := dht.validRTPeer(p) if valid { - dht.Update(dht.Context(), p) + dht.peerFound(dht.Context(), p) } } @@ -218,7 +232,7 @@ func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) { return } - dht.routingTable.Remove(p) + dht.peerDisconnected(dht.ctx, p) fixLowPeers(dht)