Skip to content

Commit

Permalink
check that memory allocations return to 0
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Apr 9, 2022
1 parent 8480508 commit 6b2a3f9
Showing 1 changed file with 76 additions and 15 deletions.
91 changes: 76 additions & 15 deletions suites/mux/muxer_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-testing/ci"

"github.com/stretchr/testify/require"
Expand All @@ -43,6 +44,42 @@ func getFunctionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}

type peerScope struct {
mx sync.Mutex
memory int
}

func (p *peerScope) ReserveMemory(size int, _ uint8) error {
fmt.Println("reserving", size)
p.mx.Lock()
p.memory += size
p.mx.Unlock()
return nil
}

func (p *peerScope) ReleaseMemory(size int) {
fmt.Println("releasing", size)
p.mx.Lock()
defer p.mx.Unlock()
if p.memory < size {
panic(fmt.Sprintf("tried to release too much memory: %d (current: %d)", size, p.memory))
}
p.memory -= size
}

// Check checks that we don't have any more reserved memory.
func (p *peerScope) Check(t *testing.T) {
p.mx.Lock()
defer p.mx.Unlock()
require.Zero(t, p.memory, "expected all reserved memory to have been released")
}

func (p *peerScope) Stat() network.ScopeStat { return network.ScopeStat{} }
func (p *peerScope) BeginSpan() (network.ResourceScopeSpan, error) { return nil, nil }
func (p *peerScope) Peer() peer.ID { panic("implement me") }

var _ network.PeerScope = &peerScope{}

type Options struct {
tr network.Multiplexer
connNum int
Expand Down Expand Up @@ -70,6 +107,7 @@ func checkErr(t *testing.T, err error) {
}

func log(s string, v ...interface{}) {
return
if testing.Verbose() && !ci.IsRunning() {
fmt.Fprintf(os.Stderr, "> "+s+"\n", v...)
}
Expand Down Expand Up @@ -141,9 +179,13 @@ func SubtestSimpleWrite(t *testing.T, tr network.Multiplexer) {
defer nc1.Close()

log("wrapping conn")
c1, err := tr.NewConn(nc1, false, nil)
scope := &peerScope{}
c1, err := tr.NewConn(nc1, false, scope)
checkErr(t, err)
defer c1.Close()
defer func() {
c1.Close()
scope.Check(t)
}()

// serve the outgoing conn, because some muxers assume
// that we _always_ call serve. (this is an error?)
Expand Down Expand Up @@ -253,7 +295,8 @@ func SubtestStress(t *testing.T, opt Options) {
return
}

c, err := opt.tr.NewConn(nc, false, nil)
scope := &peerScope{}
c, err := opt.tr.NewConn(nc, false, scope)
if err != nil {
t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err))
return
Expand Down Expand Up @@ -282,6 +325,7 @@ func SubtestStress(t *testing.T, opt Options) {
}
wg.Wait()
c.Close()
scope.Check(t)
}

openConnsAndRW := func() {
Expand Down Expand Up @@ -375,10 +419,15 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
}
}()

muxb, err := tr.NewConn(b, false, nil)
scope := &peerScope{}
muxb, err := tr.NewConn(b, false, scope)
if err != nil {
t.Fatal(err)
}
defer func() {
muxb.Close()
scope.Check(t)
}()

time.Sleep(time.Millisecond * 50)

Expand All @@ -391,7 +440,9 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
if err != nil {
break
}
wg.Add(1)
go func() {
defer wg.Done()
str.Close()
select {
case recv <- struct{}{}:
Expand All @@ -415,6 +466,8 @@ func SubtestStreamOpenStress(t *testing.T, tr network.Multiplexer) {
t.Fatal("timed out receiving streams")
}
}

wg.Wait()
}

func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
Expand All @@ -428,7 +481,8 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
wg.Add(1)
go func() {
defer wg.Done()
muxa, err := tr.NewConn(a, true, nil)
scope := &peerScope{}
muxa, err := tr.NewConn(a, true, scope)
if err != nil {
t.Error(err)
return
Expand All @@ -444,14 +498,19 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
if err != network.ErrReset {
t.Error("should have been stream reset")
}

s.Close()
scope.Check(t)
}()

muxb, err := tr.NewConn(b, false, nil)
scope := &peerScope{}
muxb, err := tr.NewConn(b, false, scope)
if err != nil {
t.Fatal(err)
}
defer func() {
muxb.Close()
scope.Check(t)
}()

str, err := muxb.AcceptStream()
checkErr(t, err)
Expand All @@ -464,16 +523,18 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
func SubtestWriteAfterClose(t *testing.T, tr network.Multiplexer) {
a, b := tcpPipe(t)

muxa, err := tr.NewConn(a, true, nil)
scopea := &peerScope{}
muxa, err := tr.NewConn(a, true, scopea)
checkErr(t, err)

muxb, err := tr.NewConn(b, false, nil)
scopeb := &peerScope{}
muxb, err := tr.NewConn(b, false, scopeb)
checkErr(t, err)

err = muxa.Close()
checkErr(t, err)
err = muxb.Close()
checkErr(t, err)
checkErr(t, muxa.Close())
scopea.Check(t)
checkErr(t, muxb.Close())
scopeb.Check(t)

// make sure the underlying net.Conn was closed
if _, err := a.Write([]byte("foobar")); err == nil || !strings.Contains(err.Error(), "use of closed network connection") {
Expand Down Expand Up @@ -567,8 +628,8 @@ var subtests = []TransportTest{
// SubtestAll runs all the stream multiplexer tests against the target
// transport.
func SubtestAll(t *testing.T, tr network.Multiplexer) {
for name, f := range Subtests {
t.Run(name, func(t *testing.T) {
for _, f := range subtests {
t.Run(getFunctionName(f), func(t *testing.T) {
f(t, tr)
})
}
Expand Down

0 comments on commit 6b2a3f9

Please sign in to comment.