Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tests for memory management #52

Merged
merged 2 commits into from
Apr 10, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 128 additions & 13 deletions suites/mux/muxer_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-testing/ci"

"github.com/stretchr/testify/require"
Expand All @@ -43,6 +44,42 @@ func getFunctionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}

type peerScope struct {
mx sync.Mutex
memory int
}

func (p *peerScope) ReserveMemory(size int, _ uint8) error {
fmt.Println("reserving", size)
p.mx.Lock()
p.memory += size
p.mx.Unlock()
return nil
}

func (p *peerScope) ReleaseMemory(size int) {
fmt.Println("releasing", size)
p.mx.Lock()
defer p.mx.Unlock()
if p.memory < size {
panic(fmt.Sprintf("tried to release too much memory: %d (current: %d)", size, p.memory))
}
p.memory -= size
}

// Check checks that we don't have any more reserved memory.
func (p *peerScope) Check(t *testing.T) {
p.mx.Lock()
defer p.mx.Unlock()
require.Zero(t, p.memory, "expected all reserved memory to have been released")
}

func (p *peerScope) Stat() network.ScopeStat { return network.ScopeStat{} }
func (p *peerScope) BeginSpan() (network.ResourceScopeSpan, error) { return nil, nil }
func (p *peerScope) Peer() peer.ID { panic("implement me") }

var _ network.PeerScope = &peerScope{}

type Options struct {
tr network.Multiplexer
connNum int
Expand Down Expand Up @@ -141,9 +178,13 @@ func SubtestSimpleWrite(t *testing.T, tr network.Multiplexer) {
defer nc1.Close()

log("wrapping conn")
c1, err := tr.NewConn(nc1, false, nil)
scope := &peerScope{}
c1, err := tr.NewConn(nc1, false, scope)
checkErr(t, err)
defer c1.Close()
defer func() {
c1.Close()
scope.Check(t)
}()

// serve the outgoing conn, because some muxers assume
// that we _always_ call serve. (this is an error?)
Expand Down Expand Up @@ -253,7 +294,8 @@ func SubtestStress(t *testing.T, opt Options) {
return
}

c, err := opt.tr.NewConn(nc, false, nil)
scope := &peerScope{}
c, err := opt.tr.NewConn(nc, false, scope)
if err != nil {
t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err))
return
Expand Down Expand Up @@ -282,6 +324,7 @@ func SubtestStress(t *testing.T, opt Options) {
}
wg.Wait()
c.Close()
scope.Check(t)
}

openConnsAndRW := func() {
Expand Down Expand Up @@ -375,10 +418,15 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
}
}()

muxb, err := tr.NewConn(b, false, nil)
scope := &peerScope{}
muxb, err := tr.NewConn(b, false, scope)
if err != nil {
t.Fatal(err)
}
defer func() {
muxb.Close()
scope.Check(t)
}()

time.Sleep(time.Millisecond * 50)

Expand All @@ -391,7 +439,9 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
if err != nil {
break
}
wg.Add(1)
go func() {
defer wg.Done()
str.Close()
select {
case recv <- struct{}{}:
Expand All @@ -415,6 +465,8 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
t.Fatal("timed out receiving streams")
}
}

wg.Wait()
}

func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
Expand All @@ -428,7 +480,8 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
wg.Add(1)
go func() {
defer wg.Done()
muxa, err := tr.NewConn(a, true, nil)
scope := &peerScope{}
muxa, err := tr.NewConn(a, true, scope)
if err != nil {
t.Error(err)
return
Expand All @@ -444,14 +497,19 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
if err != network.ErrReset {
t.Error("should have been stream reset")
}

s.Close()
scope.Check(t)
}()

muxb, err := tr.NewConn(b, false, nil)
scope := &peerScope{}
muxb, err := tr.NewConn(b, false, scope)
if err != nil {
t.Fatal(err)
}
defer func() {
muxb.Close()
scope.Check(t)
}()

str, err := muxb.AcceptStream()
checkErr(t, err)
Expand All @@ -464,16 +522,18 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) {
a, b := tcpPipe(t)

muxa, err := tr.NewConn(a, true, nil)
scopea := &peerScope{}
muxa, err := tr.NewConn(a, true, scopea)
checkErr(t, err)

muxb, err := tr.NewConn(b, false, nil)
scopeb := &peerScope{}
muxb, err := tr.NewConn(b, false, scopeb)
checkErr(t, err)

err = muxa.Close()
checkErr(t, err)
err = muxb.Close()
checkErr(t, err)
checkErr(t, muxa.Close())
scopea.Check(t)
checkErr(t, muxb.Close())
scopeb.Check(t)

// make sure the underlying net.Conn was closed
if _, err := a.Write([]byte("foobar")); err == nil || !strings.Contains(err.Error(), "use of closed network connection") {
Expand All @@ -484,6 +544,60 @@ func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) {
}
}

func SubtestStreamLeftOpen(t *testing.T, tr network.Multiplexer) {
a, b := tcpPipe(t)

const numStreams = 10
const dataLen = 50 * 1024

scopea := &peerScope{}
muxa, err := tr.NewConn(a, true, scopea)
checkErr(t, err)

scopeb := &peerScope{}
muxb, err := tr.NewConn(b, false, scopeb)
checkErr(t, err)

var wg sync.WaitGroup
wg.Add(1 + numStreams)
go func() {
defer wg.Done()
for i := 0; i < numStreams; i++ {
stra, err := muxa.OpenStream(context.Background())
checkErr(t, err)
go func() {
defer wg.Done()
_, err = stra.Write(randBuf(dataLen))
checkErr(t, err)
// do NOT close or reset the stream
}()
}
}()

wg.Add(1 + numStreams)
go func() {
defer wg.Done()
for i := 0; i < numStreams; i++ {
str, err := muxb.AcceptStream()
checkErr(t, err)
go func() {
defer wg.Done()
_, err = io.ReadFull(str, make([]byte, dataLen))
checkErr(t, err)
}()
}
}()

// Now we have a bunch of open streams.
// Make sure that their memory is returned when we close the connection.
wg.Wait()

muxa.Close()
scopea.Check(t)
muxb.Close()
scopeb.Check(t)
}

func SubtestStress1Conn1Stream1Msg(t *testing.T, tr network.Multiplexer) {
SubtestStress(t, Options{
tr: tr,
Expand Down Expand Up @@ -562,6 +676,7 @@ var subtests = []TransportTest{
SubtestStress1Conn100Stream100Msg10MB,
SubtestStreamOpenStress,
SubtestStreamReset,
SubtestStreamLeftOpen,
}

// SubtestAll runs all the stream multiplexer tests against the target
Expand Down