Skip to content

Commit

Permalink
fix: leaking go routines
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel authored and Jorropo committed Jun 15, 2023
1 parent 0f65ba1 commit 6a1c92e
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 82 deletions.
43 changes: 33 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ type IpfsDHT struct {

// timeout for the lookupCheck operation
lookupCheckTimeout time.Duration
// number of concurrent lookupCheck operations
lookupCheckCapacity int
lookupChecksLk sync.Mutex

// A function returning a set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
Expand Down Expand Up @@ -296,6 +299,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
bucketSize: cfg.BucketSize,
alpha: cfg.Concurrency,
beta: cfg.Resiliency,
lookupCheckCapacity: cfg.LookupCheckConcurrency,
queryPeerFilter: cfg.QueryPeerFilter,
routingTablePeerFilter: cfg.RoutingTable.PeerFilter,
rtPeerDiversityFilter: cfg.RoutingTable.DiversityFilter,
Expand Down Expand Up @@ -658,8 +662,8 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
// it fails to answer, it isn't added to the routingTable.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
// if the peer is already in the routing table or the appropriate bucket is
// already full, don't try to add the new peer.ID
if dht.routingTable.Find(p) != "" || !dht.routingTable.UsefulPeer(p) {
// already full, don't try to add the new peer.ID
if !dht.routingTable.UsefulNewPeer(p) {
return
}

Expand All @@ -669,17 +673,36 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {

livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout)
defer cancel()

// performing a FIND_NODE query
if err := dht.lookupCheck(livelinessCtx, p); err != nil {
logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err)
// check if the maximal number of concurrent lookup checks is reached
dht.lookupChecksLk.Lock()
if dht.lookupCheckCapacity == 0 {
dht.lookupChecksLk.Unlock()
// drop the new peer.ID if the maximal number of concurrent lookup
// checks is reached
return
}
dht.lookupCheckCapacity--
dht.lookupChecksLk.Unlock()

go func() {
livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout)
defer cancel()

// performing a FIND_NODE query
err := dht.lookupCheck(livelinessCtx, p)

dht.lookupChecksLk.Lock()
dht.lookupCheckCapacity++
dht.lookupChecksLk.Unlock()

if err != nil {
logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err)
return
}

// if the FIND_NODE succeeded, the peer is considered as valid
dht.validPeerFound(ctx, p)
// if the FIND_NODE succeeded, the peer is considered as valid
dht.validPeerFound(ctx, p)
}()
}
}

Expand Down
5 changes: 0 additions & 5 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

// a peer has queried us, let's add it to RT. A new go routine is required
// because we can't block the stream handler until the remote peer answers
// our query.
go dht.peerFound(dht.ctx, mPeer)

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
zap.Int32("type", int32(req.GetType())),
Expand Down
9 changes: 9 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ func Resiliency(beta int) Option {
}
}

// LookupInterval configures maximal number of go routines that can be used to
// perform a lookup check operation, before adding a new node to the routing table.
func LookupCheckConcurrency(n int) Option {
return func(c *dhtcfg.Config) error {
c.LookupCheckConcurrency = n
return nil
}
}

// MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record")
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
Expand Down
34 changes: 17 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ require (
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-log v1.0.5
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-libp2p v0.27.5
github.com/libp2p/go-libp2p-kbucket v0.6.1
github.com/libp2p/go-libp2p v0.27.6
github.com/libp2p/go-libp2p-kbucket v0.6.2
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.7.0
github.com/libp2p/go-libp2p-testing v0.12.0
Expand All @@ -37,7 +37,7 @@ require (
)

require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
Expand All @@ -55,28 +55,28 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20230405160723-4a4c7d95572b // indirect
github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/huin/goupnp v1.1.0 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/ipld/go-ipld-prime v0.20.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.16.4 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-reuseport v0.2.0 // indirect
github.com/libp2p/go-reuseport v0.3.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.53 // indirect
github.com/miekg/dns v1.1.54 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
Expand All @@ -86,37 +86,37 @@ require (
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.9.2 // indirect
github.com/onsi/ginkgo/v2 v2.9.7 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
github.com/quic-go/quic-go v0.33.0 // indirect
github.com/quic-go/webtransport-go v0.5.2 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/fx v1.19.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/tools v0.9.1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
Expand Down
Loading

0 comments on commit 6a1c92e

Please sign in to comment.