diff --git a/limit_streams_when_hanging_handlers_test.go b/limit_streams_when_hanging_handlers_test.go new file mode 100644 index 0000000000..594cf6616a --- /dev/null +++ b/limit_streams_when_hanging_handlers_test.go @@ -0,0 +1,89 @@ +package libp2p_test + +import ( + "context" + "io" + "sync" + "testing" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peerstore" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func TestLimitStreamsWhenHangingHandlers(t *testing.T) { + var partial rcmgr.PartialLimitConfig + const limitsToTry = 10 + partial.System.StreamsInbound = limitsToTry + mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(partial.Build(rcmgr.InfiniteLimits))) + require.NoError(t, err) + + maddr, err := multiaddr.NewMultiaddr("/ip4/0.0.0.0/udp/0/quic-v1") + require.NoError(t, err) + + receiver, err := libp2p.New( + libp2p.ResourceManager(mgr), + libp2p.ListenAddrs(maddr), + ) + require.NoError(t, err) + t.Cleanup(func() { receiver.Close() }) + + var wg sync.WaitGroup + wg.Add(1) + t.Cleanup(wg.Done) + + const pid = "/test" + receiver.SetStreamHandler(pid, func(s network.Stream) { + defer s.Close() + + s.Write([]byte{42}) // send one byte in case the receiver is optimizing RTTs and return from NewStream even tho it's gonna fail. + + wg.Wait() // simulate heavy computation or very long IO without touching the stream + }) + + for i := limitsToTry; i != 0; i-- { + mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)) + require.NoError(t, err) + + sender, err := libp2p.New(libp2p.ResourceManager(mgr)) + require.NoError(t, err) + t.Cleanup(func() { sender.Close() }) + + sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL) + + s, err := sender.NewStream(context.Background(), receiver.ID(), pid) + require.NoError(t, err) + + var b [1]byte + _, err = io.ReadFull(s, b[:]) + require.NoError(t, err) + + sender.Close() + } + + for i := 1; i != 0; i-- { + mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)) + require.NoError(t, err) + + sender, err := libp2p.New(libp2p.ResourceManager(mgr)) + require.NoError(t, err) + t.Cleanup(func() { sender.Close() }) + + sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL) + + s, err := sender.NewStream(context.Background(), receiver.ID(), pid) + if err != nil { + continue // good + } + + var b [1]byte + _, err = io.ReadFull(s, b[:]) + if err != nil { + continue + } + t.Fatal("we were able to create more handlers than the stream limit allowed") + } +} diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 6d6dcf65a2..0cac6e4fa3 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -120,6 +120,8 @@ func (w *dialWorker) loop() { startTime := w.cl.Now() // dialTimer is the dialTimer used to trigger dials dialTimer := w.cl.InstantTimer(startTime.Add(math.MaxInt64)) + defer dialTimer.Stop() + timerRunning := true // scheduleNextDial updates timer for triggering the next dial scheduleNextDial := func() {