Skip to content

Commit

Permalink
feat(benchmarks): add p2p stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Sep 7, 2020
1 parent bd2d62f commit 3367cdd
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 7 deletions.
87 changes: 80 additions & 7 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package graphsync_test
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io/ioutil"
"math/rand"
"os"
"runtime"
"strings"
Expand All @@ -31,6 +31,7 @@ import (
basicnode "github.com/ipld/go-ipld-prime/node/basic"
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
)

Expand All @@ -48,10 +49,82 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
tdm, err := newTempDirMaker(b)
require.NoError(b, err)
b.Run("test-20-10000", func(b *testing.B) {
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000), tdm)
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm)
})
b.Run("test-p2p-stress-10-128MB", func(b *testing.B) {
p2pStrestTest(ctx, b, 20, allFilesUniformSize(128*(1<<20), 1<<20, 1024), tdm)
})
}

func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mn := mocknet.New(ctx)
mn.SetLinkDefaults(mocknet.LinkOptions{Latency: 100 * time.Millisecond, Bandwidth: 3000000})
net := tn.StreamNet(ctx, mn)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm)
instances, err := ig.Instances(1 + b.N)
require.NoError(b, err)
var allCids []cid.Cid
for i := 0; i < numfiles; i++ {
thisCids := df(ctx, b, instances[:1])
allCids = append(allCids, thisCids...)
}
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)

allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

runtime.GC()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
fetcher := instances[i+1]
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
require.NoError(b, err)
start := time.Now()
disconnectOn := rand.Intn(numfiles)
for j := 0; j < numfiles; j++ {
resultChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector)

wg.Add(1)
go func(j int) {
defer wg.Done()
results := 0
for {
select {
case <-ctx.Done():
return
case <-resultChan:
results++
if results == 100 && j == disconnectOn {
mn.DisconnectPeers(instances[0].Peer, instances[i+1].Peer)
mn.UnlinkPeers(instances[0].Peer, instances[i+1].Peer)
time.Sleep(100 * time.Millisecond)
mn.LinkPeers(instances[0].Peer, instances[i+1].Peer)
}
case err, ok := <-errChan:
if !ok {
return
}
b.Fatalf("received error on request: %s", err.Error())
}
}
}(j)
}
wg.Wait()
result := runStats{
Time: time.Since(start),
Name: b.Name(),
}
benchmarkLog = append(benchmarkLog, result)
cancel()
fetcher.Close()
}
testinstance.Close(instances)
ig.Close()
}
func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, d delay.D, bstoreLatency time.Duration, df distFunc, tdm *tempDirMaker) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -116,10 +189,10 @@ func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int,

type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid

const unixfsChunkSize uint64 = 1 << 10
const unixfsLinksPerLevel = 1024
const defaultUnixfsChunkSize uint64 = 1 << 10
const defaultUnixfsLinksPerLevel = 1024

func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64) cid.Cid {
func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) cid.Cid {

data := make([]byte, size)
_, err := rand.Read(data)
Expand Down Expand Up @@ -151,11 +224,11 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block
return nd.Cid()
}

func allFilesUniformSize(size uint64) distFunc {
func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) distFunc {
return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid {
cids := make([]cid.Cid, 0, len(provs))
for _, prov := range provs {
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size)
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel)
cids = append(cids, c)
}
return cids
Expand Down
40 changes: 40 additions & 0 deletions benchmarks/testnet/peernet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package testnet

import (
"context"

gsnet "github.com/ipfs/go-graphsync/network"

"github.com/libp2p/go-libp2p-core/peer"
tnet "github.com/libp2p/go-libp2p-testing/net"
mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock"
)

type peernet struct {
mockpeernet.Mocknet
}

// StreamNet is a testnet that uses libp2p's MockNet
func StreamNet(ctx context.Context, net mockpeernet.Mocknet) Network {
return &peernet{net}
}

func (pn *peernet) Adapter(p tnet.Identity) gsnet.GraphSyncNetwork {
client, err := pn.Mocknet.AddPeer(p.PrivateKey(), p.Address())
if err != nil {
panic(err.Error())
}
pn.Mocknet.LinkAll()
return gsnet.NewFromLibp2pHost(client)
}

func (pn *peernet) HasPeer(p peer.ID) bool {
for _, member := range pn.Mocknet.Peers() {
if p == member {
return true
}
}
return false
}

var _ Network = (*peernet)(nil)

0 comments on commit 3367cdd

Please sign in to comment.