Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix: races in tests #279

Merged
merged 1 commit into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestLargeSwarm(t *testing.T) {
if detectrace.WithRace() {
// when running with the race detector, 500 instances launches
// well over 8k goroutines. This hits a race detector limit.
numInstances = 50
numInstances = 20
} else if travis.IsRunning() {
numInstances = 200
} else {
Expand Down
24 changes: 13 additions & 11 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ const (
// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05

// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
shortTerm = 10 * time.Second

// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
Expand All @@ -96,14 +100,6 @@ const (
blockstoreWorkerCount = 128
)

var (
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
//
// this is only a variable to make testing easier.
shortTerm = 10 * time.Second
)

// Envelope contains a message for a Peer.
type Envelope struct {
// Peer is the intended recipient.
Expand Down Expand Up @@ -161,18 +157,23 @@ type Engine struct {
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int

// how frequently the engine should sample peer usefulness
peerSampleInterval time.Duration

sendDontHaves bool

self peer.ID
}

// NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock)
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm)
}

// This constructor is used by the tests
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, maxReplaceSize int) *Engine {
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
maxReplaceSize int, peerSampleInterval time.Duration) *Engine {

e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
Expand All @@ -181,6 +182,7 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger,
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
peerSampleInterval: peerSampleInterval,
taskWorkerCount: taskWorkerCount,
sendDontHaves: true,
self: self,
Expand Down Expand Up @@ -236,7 +238,7 @@ func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (e *Engine) scoreWorker(ctx context.Context) {
ticker := time.NewTicker(shortTerm)
ticker := time.NewTicker(e.peerSampleInterval)
defer ticker.Stop()

type update struct {
Expand Down
42 changes: 20 additions & 22 deletions internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ type engineSet struct {
Blockstore blockstore.Blockstore
}

func newTestEngine(ctx context.Context, idStr string) engineSet {
func newTestEngine(ctx context.Context, idStr string, peerSampleInterval time.Duration) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(ctx, bs, fpt, "localhost", 0)
e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
Expand All @@ -108,8 +108,8 @@ func newTestEngine(ctx context.Context, idStr string) engineSet {
func TestConsistentAccounting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sender := newTestEngine(ctx, "Ernie")
receiver := newTestEngine(ctx, "Bert")
sender := newTestEngine(ctx, "Ernie", shortTerm)
receiver := newTestEngine(ctx, "Bert", shortTerm)

// Send messages from Ernie to Bert
for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -143,8 +143,8 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sanfrancisco := newTestEngine(ctx, "sf")
seattle := newTestEngine(ctx, "sea")
sanfrancisco := newTestEngine(ctx, "sf", shortTerm)
seattle := newTestEngine(ctx, "sea", shortTerm)

m := message.New(true)

Expand Down Expand Up @@ -181,7 +181,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
ctx := context.Background()
t.SkipNow() // TODO implement *Engine.Close
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0)
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -509,7 +509,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases {
t.Logf("Test case %d:", i)
Expand Down Expand Up @@ -665,7 +665,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

var next envChan
Expand Down Expand Up @@ -850,7 +850,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
Expand All @@ -875,7 +875,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -919,7 +919,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -981,8 +981,8 @@ func TestSendDontHave(t *testing.T) {
func TestTaggingPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
sanfrancisco := newTestEngine(ctx, "sf")
seattle := newTestEngine(ctx, "sea")
sanfrancisco := newTestEngine(ctx, "sf", shortTerm)
seattle := newTestEngine(ctx, "sea", shortTerm)

keys := []string{"a", "b", "c", "d", "e"}
for _, letter := range keys {
Expand All @@ -1007,13 +1007,11 @@ func TestTaggingPeers(t *testing.T) {
}

func TestTaggingUseful(t *testing.T) {
oldShortTerm := shortTerm
shortTerm = 2 * time.Millisecond
defer func() { shortTerm = oldShortTerm }()
peerSampleInterval := 2 * time.Millisecond

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
me := newTestEngine(ctx, "engine")
me := newTestEngine(ctx, "engine", peerSampleInterval)
friend := peer.ID("friend")

block := blocks.NewBlock([]byte("foobar"))
Expand All @@ -1025,21 +1023,21 @@ func TestTaggingUseful(t *testing.T) {
t.Fatal("Peers should be untagged but weren't")
}
me.Engine.MessageSent(friend, msg)
time.Sleep(shortTerm * 2)
time.Sleep(peerSampleInterval * 2)
if me.PeerTagger.count(me.Engine.tagUseful) != 1 {
t.Fatal("Peers should be tagged but weren't")
}
time.Sleep(shortTerm * 8)
time.Sleep(peerSampleInterval * 8)
}

if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 2)
time.Sleep(peerSampleInterval * 2)
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 20)
time.Sleep(peerSampleInterval * 30)
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("peers should finally be untagged")
}
Expand Down
51 changes: 30 additions & 21 deletions internal/messagequeue/donthavetimeoutmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,24 @@ type timeoutRecorder struct {
func (tr *timeoutRecorder) onTimeout(tks []cid.Cid) {
tr.lk.Lock()
defer tr.lk.Unlock()

tr.timedOutKs = append(tr.timedOutKs, tks...)
}

func (tr *timeoutRecorder) timedOutCount() int {
tr.lk.Lock()
defer tr.lk.Unlock()

return len(tr.timedOutKs)
}

func (tr *timeoutRecorder) clear() {
tr.lk.Lock()
defer tr.lk.Unlock()

tr.timedOutKs = nil
}

func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
firstks := testutil.GenerateCids(2)
secondks := append(firstks, testutil.GenerateCids(3)...)
Expand All @@ -75,7 +90,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
time.Sleep(expectedTimeout - 5*time.Millisecond)

// At this stage no keys should have timed out
if len(tr.timedOutKs) > 0 {
if tr.timedOutCount() > 0 {
t.Fatal("expected timeout not to have happened yet")
}

Expand All @@ -86,20 +101,20 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// At this stage first set of keys should have timed out
if len(tr.timedOutKs) != len(firstks) {
if tr.timedOutCount() != len(firstks) {
t.Fatal("expected timeout")
}

// Clear the recorded timed out keys
tr.timedOutKs = nil
tr.clear()

// Sleep until the second set of keys should have timed out
time.Sleep(expectedTimeout)

// At this stage all keys should have timed out. The second set included
// the first set of keys, but they were added before the first set timed
// out, so only the remaining keys should have beed added.
if len(tr.timedOutKs) != len(secondks)-len(firstks) {
if tr.timedOutCount() != len(secondks)-len(firstks) {
t.Fatal("expected second set of keys to timeout")
}
}
Expand Down Expand Up @@ -130,7 +145,7 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
time.Sleep(expectedTimeout)

// At this stage all non-cancelled keys should have timed out
if len(tr.timedOutKs) != len(ks)-cancelCount {
if tr.timedOutCount() != len(ks)-cancelCount {
t.Fatal("expected timeout")
}
}
Expand Down Expand Up @@ -167,15 +182,15 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// At this stage only the key that was never cancelled should have timed out
if len(tr.timedOutKs) != 1 {
if tr.timedOutCount() != 1 {
t.Fatal("expected one key to timeout")
}

// Wait till after added back key should time out
time.Sleep(latency)

// At this stage the key that was added back should also have timed out
if len(tr.timedOutKs) != 2 {
if tr.timedOutCount() != 2 {
t.Fatal("expected added back key to timeout")
}
}
Expand All @@ -202,7 +217,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
time.Sleep(latency + 5*time.Millisecond)

// At this stage all keys should have timed out
if len(tr.timedOutKs) != len(ks) {
if tr.timedOutCount() != len(ks) {
t.Fatal("expected timeout")
}
}
Expand All @@ -229,15 +244,15 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
time.Sleep(expectedTimeout - 5*time.Millisecond)

// At this stage no timeout should have happened yet
if len(tr.timedOutKs) > 0 {
if tr.timedOutCount() > 0 {
t.Fatal("expected timeout not to have happened yet")
}

// Sleep until after the expected timeout
time.Sleep(10 * time.Millisecond)

// Now the keys should have timed out
if len(tr.timedOutKs) != len(ks) {
if tr.timedOutCount() != len(ks) {
t.Fatal("expected timeout")
}
}
Expand All @@ -263,15 +278,15 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
time.Sleep(defaultTimeout - 5*time.Millisecond)

// At this stage no timeout should have happened yet
if len(tr.timedOutKs) > 0 {
if tr.timedOutCount() > 0 {
t.Fatal("expected timeout not to have happened yet")
}

// Sleep until after the default timeout
time.Sleep(10 * time.Millisecond)

// Now the keys should have timed out
if len(tr.timedOutKs) != len(ks) {
if tr.timedOutCount() != len(ks) {
t.Fatal("expected timeout")
}
}
Expand All @@ -281,17 +296,11 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
latency := time.Millisecond * 10
latMultiplier := 1
expProcessTime := time.Duration(0)
tr := timeoutRecorder{}
ctx := context.Background()
pc := &mockPeerConn{latency: latency}

var lk sync.Mutex
var timedOutKs []cid.Cid
onTimeout := func(tks []cid.Cid) {
lk.Lock()
defer lk.Unlock()
timedOutKs = append(timedOutKs, tks...)
}
dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dhtm.Start()

Expand All @@ -308,7 +317,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// Manager was shut down so timeout should not have fired
if len(timedOutKs) != 0 {
if tr.timedOutCount() != 0 {
t.Fatal("expected no timeout after shutdown")
}
}
Loading