Skip to content

Commit

Permalink
feat: add error log when resource manager throttles crawler
Browse files Browse the repository at this point in the history
Since this could leave the Accelerated DHT client in a degraded state,
due to not being able to completely populate its routing table, we
want to signal this to the user until we can modify the client to
degrade more gracefully when hitting resource limits.

Note that this logs only once per crawl, to avoid spamming the user.
  • Loading branch information
guseggert committed May 18, 2022
1 parent 6a75b59 commit b874f6b
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 145 deletions.
17 changes: 10 additions & 7 deletions dht_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ func (m *mockConn) ID() string { return "
func (m *mockConn) Close() error { return nil }
func (m *mockConn) NewStream(context.Context) (network.Stream, error) { return nil, nil }
func (m *mockConn) GetStreams() []network.Stream { return []network.Stream{} }
func (m *mockConn) Stat() network.Stat { return network.Stat{Direction: network.DirOutbound} }
func (m *mockConn) LocalMultiaddr() ma.Multiaddr { return m.local.Addrs[0] }
func (m *mockConn) RemoteMultiaddr() ma.Multiaddr { return m.remote.Addrs[0] }
func (m *mockConn) LocalPeer() peer.ID { return m.local.ID }
func (m *mockConn) LocalPrivateKey() ic.PrivKey { return nil }
func (m *mockConn) RemotePeer() peer.ID { return m.remote.ID }
func (m *mockConn) RemotePublicKey() ic.PubKey { return nil }
func (m *mockConn) Stat() network.ConnStats {
return network.ConnStats{Stats: network.Stats{Direction: network.DirOutbound}}
}
func (m *mockConn) Scope() network.ConnScope { return network.NullScope }
func (m *mockConn) LocalMultiaddr() ma.Multiaddr { return m.local.Addrs[0] }
func (m *mockConn) RemoteMultiaddr() ma.Multiaddr { return m.remote.Addrs[0] }
func (m *mockConn) LocalPeer() peer.ID { return m.local.ID }
func (m *mockConn) LocalPrivateKey() ic.PrivKey { return nil }
func (m *mockConn) RemotePeer() peer.ID { return m.remote.ID }
func (m *mockConn) RemotePublicKey() ic.PubKey { return nil }

func TestFilterCaching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
12 changes: 8 additions & 4 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ func TestHungRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mn, err := mocknet.FullMeshLinked(ctx, 2)
mn, err := mocknet.FullMeshLinked(2)
if err != nil {
t.Fatal(err)
}
defer mn.Close()
hosts := mn.Hosts()

os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
Expand Down Expand Up @@ -231,10 +232,11 @@ func TestNotFound(t *testing.T) {
}

ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 16)
mn, err := mocknet.FullMeshConnected(16)
if err != nil {
t.Fatal(err)
}
defer mn.Close()
hosts := mn.Hosts()

os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
Expand Down Expand Up @@ -324,10 +326,11 @@ func TestLessThanKResponses(t *testing.T) {
// t.Skip("skipping test because it makes a lot of output")

ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 6)
mn, err := mocknet.FullMeshConnected(6)
if err != nil {
t.Fatal(err)
}
defer mn.Close()
hosts := mn.Hosts()

os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
Expand Down Expand Up @@ -397,10 +400,11 @@ func TestMultipleQueries(t *testing.T) {
}

ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
mn, err := mocknet.FullMeshConnected(2)
if err != nil {
t.Fatal(err)
}
defer mn.Close()
hosts := mn.Hosts()
os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
d, err := New(ctx, hosts[0], os...)
Expand Down
20 changes: 19 additions & 1 deletion fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fullrt
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"sync"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
swarm "github.com/libp2p/go-libp2p-swarm"

"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -51,6 +53,8 @@ var Tracer = otel.Tracer("")

var logger = logging.Logger("fullrtdht")

const rtRefreshLimitsMsg = `Accelerated DHT client was unable to fully refresh its routing table due to Resource Manager limits, which may degrade content routing. Consider increasing resource limits. See debug logs for the "dht-crawler" subsystem for details.`

// FullRT is an experimental DHT client that is under development. Expect breaking changes to occur in this client
// until it stabilizes.
type FullRT struct {
Expand Down Expand Up @@ -271,6 +275,7 @@ func (dht *FullRT) runCrawler(ctx context.Context) {
}

start := time.Now()
limitErrOnce := sync.Once{}
dht.crawler.Run(ctx, addrs,
func(p peer.ID, rtPeers []*peer.AddrInfo) {
conns := dht.h.Network().ConnsToPeer(p)
Expand All @@ -296,7 +301,20 @@ func (dht *FullRT) runCrawler(ctx context.Context) {
addrs: addrs,
}
},
func(p peer.ID, err error) {})
func(p peer.ID, err error) {
dialErr, ok := err.(*swarm.DialError)
if ok {
for _, transportErr := range dialErr.DialErrors {
if errors.Is(transportErr.Cause, network.ErrResourceLimitExceeded) {
limitErrOnce.Do(func() { logger.Errorf(rtRefreshLimitsMsg) })
}
}
}
// note that DialError implements Unwrap() which returns the Cause, so this covers that case
if errors.Is(err, network.ErrResourceLimitExceeded) {
limitErrOnce.Do(func() { logger.Errorf(rtRefreshLimitsMsg) })
}
})
dur := time.Since(start)
logger.Infof("crawl took %v", dur)

Expand Down
45 changes: 28 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,50 @@ module github.com/libp2p/go-libp2p-kad-dht
go 1.16

require (
github.com/btcsuite/btcd v0.22.1 // indirect
github.com/gogo/protobuf v1.3.2
github.com/google/gopacket v1.1.19
github.com/google/uuid v1.3.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-cid v0.2.0
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipns v0.1.2
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/ipld/go-ipld-prime v0.16.0 // indirect
github.com/jbenet/goprocess v0.1.4
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p v0.16.0
github.com/libp2p/go-libp2p-core v0.11.0
github.com/libp2p/go-libp2p v0.18.0
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-core v0.15.1
github.com/libp2p/go-libp2p-kbucket v0.4.7
github.com/libp2p/go-libp2p-peerstore v0.4.0
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/libp2p/go-libp2p-swarm v0.8.0
github.com/libp2p/go-libp2p-testing v0.5.0
github.com/libp2p/go-libp2p-xor v0.0.0-20210714161855-5c005aca55db
github.com/libp2p/go-msgio v0.1.0
github.com/libp2p/go-netroute v0.1.6
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.4.0
github.com/libp2p/go-libp2p-swarm v0.10.2
github.com/libp2p/go-libp2p-testing v0.8.0
github.com/libp2p/go-libp2p-xor v0.1.0
github.com/libp2p/go-msgio v0.2.0
github.com/libp2p/go-netroute v0.2.0
github.com/multiformats/go-base32 v0.0.4
github.com/multiformats/go-multiaddr v0.5.0
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.0.15
github.com/multiformats/go-multihash v0.1.0
github.com/multiformats/go-multistream v0.2.2
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.uber.org/zap v1.19.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0
golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898 // indirect
golang.org/x/net v0.0.0-20220517181318-183a9ca12b87 // indirect
golang.org/x/sys v0.0.0-20220517195934-5e4e11fc645e // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
Loading

0 comments on commit b874f6b

Please sign in to comment.