From 69da11bbb4f0515b979f982de59e29f42f45a11e Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 9 Apr 2022 12:33:03 +0100 Subject: [PATCH] add a test that leaves streams open --- suites/mux/muxer_suite.go | 55 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/suites/mux/muxer_suite.go b/suites/mux/muxer_suite.go index f991d2b..9a14e96 100644 --- a/suites/mux/muxer_suite.go +++ b/suites/mux/muxer_suite.go @@ -545,6 +545,60 @@ func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) { } } +func SubtestStreamLeftOpen(t *testing.T, tr network.Multiplexer) { + a, b := tcpPipe(t) + + const numStreams = 10 + const dataLen = 50 * 1024 + + scopea := &peerScope{} + muxa, err := tr.NewConn(a, true, scopea) + checkErr(t, err) + + scopeb := &peerScope{} + muxb, err := tr.NewConn(b, false, scopeb) + checkErr(t, err) + + var wg sync.WaitGroup + wg.Add(1 + numStreams) + go func() { + defer wg.Done() + for i := 0; i < numStreams; i++ { + stra, err := muxa.OpenStream(context.Background()) + checkErr(t, err) + go func() { + defer wg.Done() + _, err = stra.Write(randBuf(dataLen)) + checkErr(t, err) + // do NOT close or reset the stream + }() + } + }() + + wg.Add(1 + numStreams) + go func() { + defer wg.Done() + for i := 0; i < numStreams; i++ { + str, err := muxb.AcceptStream() + checkErr(t, err) + go func() { + defer wg.Done() + _, err = io.ReadFull(str, make([]byte, dataLen)) + checkErr(t, err) + }() + } + }() + + // Now we have a bunch of open streams. + // Make sure that their memory is returned when we close the connection. + wg.Wait() + + muxa.Close() + scopea.Check(t) + muxb.Close() + scopeb.Check(t) +} + func SubtestStress1Conn1Stream1Msg(t *testing.T, tr network.Multiplexer) { SubtestStress(t, Options{ tr: tr, @@ -623,6 +677,7 @@ var subtests = []TransportTest{ SubtestStress1Conn100Stream100Msg10MB, SubtestStreamOpenStress, SubtestStreamReset, + SubtestStreamLeftOpen, } // SubtestAll runs all the stream multiplexer tests against the target