Skip to content

Commit

Permalink
swarm: fix timer Leak in the dial loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Nov 6, 2023
1 parent 4e2a16d commit 9c48e8e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 0 deletions.
89 changes: 89 additions & 0 deletions limit_streams_when_hanging_handlers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package libp2p_test

import (
"context"
"io"
"sync"
"testing"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peerstore"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

func TestLimitStreamsWhenHangingHandlers(t *testing.T) {
var partial rcmgr.PartialLimitConfig
const limitsToTry = 10
partial.System.StreamsInbound = limitsToTry
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(partial.Build(rcmgr.InfiniteLimits)))
require.NoError(t, err)

maddr, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/udp/0/quic-v1")
require.NoError(t, err)

receiver, err := libp2p.New(
libp2p.ResourceManager(mgr),
libp2p.ListenAddrs(maddr),
)
require.NoError(t, err)
t.Cleanup(func() { receiver.Close() })

var wg sync.WaitGroup
wg.Add(1)
t.Cleanup(wg.Done)

const pid = "/test"
receiver.SetStreamHandler(pid, func(s network.Stream) {
defer s.Close()

s.Write([]byte{42}) // send one byte in case the receiver is optimizing RTTs and return from NewStream even tho it's gonna fail.

wg.Wait() // simulate heavy computation or very long IO without touching the stream
})

for i := limitsToTry; i != 0; i-- {
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
require.NoError(t, err)

var b [1]byte
_, err = io.ReadFull(s, b[:])
require.NoError(t, err)

sender.Close()
}

for i := 1; i != 0; i-- {
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
if err != nil {
continue // good
}

var b [1]byte
_, err = io.ReadFull(s, b[:])
if err != nil {
continue
}
t.Fatal("we were able to create more handlers than the stream limit allowed")
}
}
2 changes: 2 additions & 0 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func (w *dialWorker) loop() {
startTime := w.cl.Now()
// dialTimer is the dialTimer used to trigger dials
dialTimer := w.cl.InstantTimer(startTime.Add(math.MaxInt64))
defer dialTimer.Stop()

timerRunning := true
// scheduleNextDial updates timer for triggering the next dial
scheduleNextDial := func() {
Expand Down

0 comments on commit 9c48e8e

Please sign in to comment.