From c8ff5acfb9eafded3223b6442ae5bea13213fbb8 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 9 Apr 2022 12:18:45 +0100 Subject: [PATCH 1/2] check that memory allocations return to 0 --- suites/mux/muxer_suite.go | 86 +++++++++++++++++++++++++++++++++------ 1 file changed, 73 insertions(+), 13 deletions(-) diff --git a/suites/mux/muxer_suite.go b/suites/mux/muxer_suite.go index 2362a83..b0cb0d8 100644 --- a/suites/mux/muxer_suite.go +++ b/suites/mux/muxer_suite.go @@ -18,6 +18,7 @@ import ( "time" "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-testing/ci" "github.com/stretchr/testify/require" @@ -43,6 +44,42 @@ func getFunctionName(i interface{}) string { return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() } +type peerScope struct { + mx sync.Mutex + memory int +} + +func (p *peerScope) ReserveMemory(size int, _ uint8) error { + fmt.Println("reserving", size) + p.mx.Lock() + p.memory += size + p.mx.Unlock() + return nil +} + +func (p *peerScope) ReleaseMemory(size int) { + fmt.Println("releasing", size) + p.mx.Lock() + defer p.mx.Unlock() + if p.memory < size { + panic(fmt.Sprintf("tried to release too much memory: %d (current: %d)", size, p.memory)) + } + p.memory -= size +} + +// Check checks that we don't have any more reserved memory. +func (p *peerScope) Check(t *testing.T) { + p.mx.Lock() + defer p.mx.Unlock() + require.Zero(t, p.memory, "expected all reserved memory to have been released") +} + +func (p *peerScope) Stat() network.ScopeStat { return network.ScopeStat{} } +func (p *peerScope) BeginSpan() (network.ResourceScopeSpan, error) { return nil, nil } +func (p *peerScope) Peer() peer.ID { panic("implement me") } + +var _ network.PeerScope = &peerScope{} + type Options struct { tr network.Multiplexer connNum int @@ -141,9 +178,13 @@ func SubtestSimpleWrite(t *testing.T, tr network.Multiplexer) { defer nc1.Close() log("wrapping conn") - c1, err := tr.NewConn(nc1, false, nil) + scope := &peerScope{} + c1, err := tr.NewConn(nc1, false, scope) checkErr(t, err) - defer c1.Close() + defer func() { + c1.Close() + scope.Check(t) + }() // serve the outgoing conn, because some muxers assume // that we _always_ call serve. (this is an error?) @@ -253,7 +294,8 @@ func SubtestStress(t *testing.T, opt Options) { return } - c, err := opt.tr.NewConn(nc, false, nil) + scope := &peerScope{} + c, err := opt.tr.NewConn(nc, false, scope) if err != nil { t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err)) return @@ -282,6 +324,7 @@ func SubtestStress(t *testing.T, opt Options) { } wg.Wait() c.Close() + scope.Check(t) } openConnsAndRW := func() { @@ -375,10 +418,15 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) { } }() - muxb, err := tr.NewConn(b, false, nil) + scope := &peerScope{} + muxb, err := tr.NewConn(b, false, scope) if err != nil { t.Fatal(err) } + defer func() { + muxb.Close() + scope.Check(t) + }() time.Sleep(time.Millisecond * 50) @@ -391,7 +439,9 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) { if err != nil { break } + wg.Add(1) go func() { + defer wg.Done() str.Close() select { case recv <- struct{}{}: @@ -415,6 +465,8 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) { t.Fatal("timed out receiving streams") } } + + wg.Wait() } func SubtestStreamReset(t *testing.T, tr network.Multiplexer) { @@ -428,7 +480,8 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) { wg.Add(1) go func() { defer wg.Done() - muxa, err := tr.NewConn(a, true, nil) + scope := &peerScope{} + muxa, err := tr.NewConn(a, true, scope) if err != nil { t.Error(err) return @@ -444,14 +497,19 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) { if err != network.ErrReset { t.Error("should have been stream reset") } - s.Close() + scope.Check(t) }() - muxb, err := tr.NewConn(b, false, nil) + scope := &peerScope{} + muxb, err := tr.NewConn(b, false, scope) if err != nil { t.Fatal(err) } + defer func() { + muxb.Close() + scope.Check(t) + }() str, err := muxb.AcceptStream() checkErr(t, err) @@ -464,16 +522,18 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) { func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) { a, b := tcpPipe(t) - muxa, err := tr.NewConn(a, true, nil) + scopea := &peerScope{} + muxa, err := tr.NewConn(a, true, scopea) checkErr(t, err) - muxb, err := tr.NewConn(b, false, nil) + scopeb := &peerScope{} + muxb, err := tr.NewConn(b, false, scopeb) checkErr(t, err) - err = muxa.Close() - checkErr(t, err) - err = muxb.Close() - checkErr(t, err) + checkErr(t, muxa.Close()) + scopea.Check(t) + checkErr(t, muxb.Close()) + scopeb.Check(t) // make sure the underlying net.Conn was closed if _, err := a.Write([]byte("foobar")); err == nil || !strings.Contains(err.Error(), "use of closed network connection") { From a180f6eb5ebaef0569ab3f67b5c13afab497c3d8 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 9 Apr 2022 12:33:03 +0100 Subject: [PATCH 2/2] add a test that leaves streams open --- suites/mux/muxer_suite.go | 55 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/suites/mux/muxer_suite.go b/suites/mux/muxer_suite.go index b0cb0d8..e28e433 100644 --- a/suites/mux/muxer_suite.go +++ b/suites/mux/muxer_suite.go @@ -544,6 +544,60 @@ func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) { } } +func SubtestStreamLeftOpen(t *testing.T, tr network.Multiplexer) { + a, b := tcpPipe(t) + + const numStreams = 10 + const dataLen = 50 * 1024 + + scopea := &peerScope{} + muxa, err := tr.NewConn(a, true, scopea) + checkErr(t, err) + + scopeb := &peerScope{} + muxb, err := tr.NewConn(b, false, scopeb) + checkErr(t, err) + + var wg sync.WaitGroup + wg.Add(1 + numStreams) + go func() { + defer wg.Done() + for i := 0; i < numStreams; i++ { + stra, err := muxa.OpenStream(context.Background()) + checkErr(t, err) + go func() { + defer wg.Done() + _, err = stra.Write(randBuf(dataLen)) + checkErr(t, err) + // do NOT close or reset the stream + }() + } + }() + + wg.Add(1 + numStreams) + go func() { + defer wg.Done() + for i := 0; i < numStreams; i++ { + str, err := muxb.AcceptStream() + checkErr(t, err) + go func() { + defer wg.Done() + _, err = io.ReadFull(str, make([]byte, dataLen)) + checkErr(t, err) + }() + } + }() + + // Now we have a bunch of open streams. + // Make sure that their memory is returned when we close the connection. + wg.Wait() + + muxa.Close() + scopea.Check(t) + muxb.Close() + scopeb.Check(t) +} + func SubtestStress1Conn1Stream1Msg(t *testing.T, tr network.Multiplexer) { SubtestStress(t, Options{ tr: tr, @@ -622,6 +676,7 @@ var subtests = []TransportTest{ SubtestStress1Conn100Stream100Msg10MB, SubtestStreamOpenStress, SubtestStreamReset, + SubtestStreamLeftOpen, } // SubtestAll runs all the stream multiplexer tests against the target