From ca205c7f9b6061932087b3b061be405b8074ef5a Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 2 Mar 2020 16:44:27 -0500 Subject: [PATCH 1/7] feat: track timeouts per peer --- bitswap.go | 33 +- internal/messagequeue/messagequeue.go | 27 +- internal/messagequeue/messagequeue_test.go | 25 +- internal/peermanager/peermanager.go | 11 + .../peertimeoutmanager/peertimeoutmanager.go | 201 +++++++++++ .../peertimeoutmanager_test.go | 322 ++++++++++++++++++ internal/timer/timer.go | 13 + 7 files changed, 601 insertions(+), 31 deletions(-) create mode 100644 internal/peertimeoutmanager/peertimeoutmanager.go create mode 100644 internal/peertimeoutmanager/peertimeoutmanager_test.go create mode 100644 internal/timer/timer.go diff --git a/bitswap.go b/bitswap.go index 1b59dcd0..9c2e6cdd 100644 --- a/bitswap.go +++ b/bitswap.go @@ -17,6 +17,7 @@ import ( bsmq "github.com/ipfs/go-bitswap/internal/messagequeue" notifications "github.com/ipfs/go-bitswap/internal/notifications" bspm "github.com/ipfs/go-bitswap/internal/peermanager" + bsptm "github.com/ipfs/go-bitswap/internal/peertimeoutmanager" bspqm "github.com/ipfs/go-bitswap/internal/providerquerymanager" bssession "github.com/ipfs/go-bitswap/internal/session" bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" @@ -130,13 +131,22 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, // Simulate a DONT_HAVE message arriving to the WantManager wm.ReceiveFrom(ctx, p, nil, nil, dontHaves) } + + var pm *bspm.PeerManager + // onPeerResponseTimeout is triggered when a peer fails to respond to any + // request for a long time + onPeerResponseTimeout := func(peers []peer.ID) { + // Tell the peer manager that the peer timed out + pm.OnTimeout(peers) + } + ptm := bsptm.New(ctx, onPeerResponseTimeout) peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue { - return bsmq.New(ctx, p, network, onDontHaveTimeout) + return bsmq.New(ctx, p, network, onDontHaveTimeout, ptm.RequestSent) } sim := bssim.New() bpm := bsbpm.New() - pm := bspm.New(ctx, peerQueueFactory, network.Self()) + pm = bspm.New(ctx, peerQueueFactory, network.Self()) wm = bswm.New(ctx, pm, sim, bpm) pqm := bspqm.New(ctx, network) @@ -167,6 +177,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, provideKeys: make(chan cid.Cid, provideKeysBufferSize), wm: wm, pm: pm, + ptm: ptm, pqm: pqm, sm: sm, sim: sim, @@ -209,7 +220,8 @@ type Bitswap struct { // the wantlist tracks global wants for bitswap wm *bswm.WantManager - pm *bspm.PeerManager + pm *bspm.PeerManager + ptm *bsptm.PeerTimeoutManager // the provider query manager manages requests to find providers pqm *bspqm.ProviderQueryManager @@ -396,14 +408,21 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg bs.counters.messagesRecvd++ bs.counterLk.Unlock() + iblocks := incoming.Blocks() + haves := incoming.Haves() + dontHaves := incoming.DontHaves() + receivedResponse := len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 + if receivedResponse { + // Tell the peer timeout manager that a response was received + bs.ptm.ResponseReceived(p) + } + // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.engine.MessageReceived(ctx, p, incoming) // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger - iblocks := incoming.Blocks() - if len(iblocks) > 0 { bs.updateReceiveCounters(iblocks) for _, b := range iblocks { @@ -411,9 +430,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg } } - haves := incoming.Haves() - dontHaves := incoming.DontHaves() - if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 { + if receivedResponse { // Process blocks err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves) if err != nil { diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 8e251889..7aaf1cbc 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -6,6 +6,7 @@ import ( "sync" "time" + bstimer "github.com/ipfs/go-bitswap/internal/timer" bsmsg "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" bsnet "github.com/ipfs/go-bitswap/network" @@ -57,6 +58,7 @@ type MessageQueue struct { dhTimeoutMgr DontHaveTimeoutManager maxMessageSize int sendErrorBackoff time.Duration + onRequestSent OnRequestSent outgoingWork chan time.Time done chan struct{} @@ -131,6 +133,8 @@ func (pc *peerConn) Latency() time.Duration { // older version of Bitswap that doesn't support DONT_HAVE messages. type OnDontHaveTimeout func(peer.ID, []cid.Cid) +type OnRequestSent func(peer.ID) + // DontHaveTimeoutManager pings a peer to estimate latency so it can set a reasonable // upper bound on when to consider a DONT_HAVE request as timed out (when connected to // a peer that doesn't support DONT_HAVE messages) @@ -147,17 +151,18 @@ type DontHaveTimeoutManager interface { } // New creates a new MessageQueue. -func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue { +func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout, onRequestSent OnRequestSent) *MessageQueue { onTimeout := func(ks []cid.Cid) { onDontHaveTimeout(p, ks) } dhTimeoutMgr := newDontHaveTimeoutMgr(ctx, newPeerConnection(p, network), onTimeout) - return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr) + return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr, onRequestSent) } // This constructor is used by the tests func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, - maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue { + maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager, + onRequestSent OnRequestSent) *MessageQueue { mq := &MessageQueue{ ctx: ctx, @@ -172,6 +177,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, done: make(chan struct{}), rebroadcastInterval: defaultRebroadcastInterval, sendErrorBackoff: sendErrorBackoff, + onRequestSent: onRequestSent, priority: maxPriority, } @@ -286,11 +292,8 @@ func (mq *MessageQueue) runQueue() { // Create a timer for debouncing scheduled work. scheduleWork := time.NewTimer(0) - if !scheduleWork.Stop() { - // Need to drain the timer if Stop() returns false - // See: https://golang.org/pkg/time/#Timer.Stop - <-scheduleWork.C - } + defer scheduleWork.Stop() + bstimer.StopTimer(scheduleWork) var workScheduled time.Time for { @@ -304,9 +307,8 @@ func (mq *MessageQueue) runQueue() { // track delay. if workScheduled.IsZero() { workScheduled = when - } else if !scheduleWork.Stop() { - // Need to drain the timer if Stop() returns false - <-scheduleWork.C + } else { + bstimer.StopTimer(scheduleWork) } // If we have too many updates and/or we've waited too @@ -413,6 +415,9 @@ func (mq *MessageQueue) sendMessage() { mq.simulateDontHaveWithTimeout(message) + // Signal that a request was sent + mq.onRequestSent(mq.p) + // If the message was too big and only a subset of wants could be // sent, schedule sending the rest of the wants in the next // iteration of the event loop. diff --git a/internal/messagequeue/messagequeue_test.go b/internal/messagequeue/messagequeue_test.go index 0f7cba8a..43731f1a 100644 --- a/internal/messagequeue/messagequeue_test.go +++ b/internal/messagequeue/messagequeue_test.go @@ -124,6 +124,7 @@ func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; ret func (fms *fakeMessageSender) SupportsHave() bool { return fms.supportsHave } func mockTimeoutCb(peer.ID, []cid.Cid) {} +func mockOnRQSent(peer.ID) {} func collectMessages(ctx context.Context, t *testing.T, @@ -159,7 +160,7 @@ func TestStartupAndShutdown(t *testing.T) { fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent) bcstwh := testutil.GenerateCids(10) messageQueue.Startup() @@ -201,7 +202,7 @@ func TestSendingMessagesDeduped(t *testing.T) { fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent) wantHaves := testutil.GenerateCids(10) wantBlocks := testutil.GenerateCids(10) @@ -224,7 +225,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) { fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent) wantHaves := testutil.GenerateCids(10) wantBlocks := testutil.GenerateCids(10) @@ -247,7 +248,7 @@ func TestSendingMessagesPriority(t *testing.T) { fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent) wantHaves1 := testutil.GenerateCids(5) wantHaves2 := testutil.GenerateCids(5) wantHaves := append(wantHaves1, wantHaves2...) @@ -316,7 +317,7 @@ func TestCancelOverridesPendingWants(t *testing.T) { fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent) wantHaves := testutil.GenerateCids(2) wantBlocks := testutil.GenerateCids(2) @@ -350,7 +351,7 @@ func TestWantOverridesPendingCancels(t *testing.T) { fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent) cancels := testutil.GenerateCids(3) messageQueue.Startup() @@ -383,7 +384,7 @@ func TestWantlistRebroadcast(t *testing.T) { fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent) bcstwh := testutil.GenerateCids(10) wantHaves := testutil.GenerateCids(10) wantBlocks := testutil.GenerateCids(10) @@ -485,7 +486,7 @@ func TestSendingLargeMessages(t *testing.T) { wantBlocks := testutil.GenerateCids(10) entrySize := 44 maxMsgSize := entrySize * 3 // 3 wants - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, dhtm, mockOnRQSent) messageQueue.Startup() messageQueue.AddWants(wantBlocks, []cid.Cid{}) @@ -512,7 +513,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := testutil.GeneratePeers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent) messageQueue.Startup() // If the remote peer doesn't support HAVE / DONT_HAVE messages @@ -569,7 +570,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { peerID := testutil.GeneratePeers(1)[0] dhtm := &fakeDontHaveTimeoutMgr{} - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm, mockOnRQSent) messageQueue.Startup() wbs := testutil.GenerateCids(10) @@ -602,7 +603,7 @@ func TestResendAfterError(t *testing.T) { dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] sendErrBackoff := 5 * time.Millisecond - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm, mockOnRQSent) wantBlocks := testutil.GenerateCids(10) wantHaves := testutil.GenerateCids(10) @@ -641,7 +642,7 @@ func TestResendAfterMaxRetries(t *testing.T) { dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] sendErrBackoff := 2 * time.Millisecond - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm, mockOnRQSent) wantBlocks := testutil.GenerateCids(10) wantHaves := testutil.GenerateCids(10) wantBlocks2 := testutil.GenerateCids(10) diff --git a/internal/peermanager/peermanager.go b/internal/peermanager/peermanager.go index ab73fd96..03df1b48 100644 --- a/internal/peermanager/peermanager.go +++ b/internal/peermanager/peermanager.go @@ -129,6 +129,17 @@ func (pm *PeerManager) Disconnected(p peer.ID) { pm.pwm.RemovePeer(p) } +// OnTimeout is called when one or more peers times out. +func (pm *PeerManager) OnTimeout(peers []peer.ID) { + pm.pqLk.Lock() + defer pm.pqLk.Unlock() + + for _, p := range peers { + // Inform the sessions that the peer is not available + pm.signalAvailability(p, false) + } +} + // BroadcastWantHaves broadcasts want-haves to all peers (used by the session // to discover seeds). // For each peer it filters out want-haves that have previously been sent to diff --git a/internal/peertimeoutmanager/peertimeoutmanager.go b/internal/peertimeoutmanager/peertimeoutmanager.go new file mode 100644 index 00000000..6c660c48 --- /dev/null +++ b/internal/peertimeoutmanager/peertimeoutmanager.go @@ -0,0 +1,201 @@ +package peertimeoutmanager + +import ( + "context" + "time" + + peer "github.com/libp2p/go-libp2p-core/peer" + + bstimer "github.com/ipfs/go-bitswap/internal/timer" +) + +const ( + // The time to allow a peer to respond before assuming it is unresponsive. + peerResponseTimeout = 10 * time.Second + + // We keep track of requests in a map, and the order of requests in an + // array. As responses come in items are deleted from the map and set to + // inactive in the array. If the array and map get too far out of sync + // we pause to filter the array. requestGCCount is the maximum allowed + // difference between map and array length before forcing a filtering. + requestGCCount = 256 +) + +type request struct { + // The peer the request was sent to + p peer.ID + // The time at which the request was sent + sent time.Time + // active is true until a response is received + active bool +} + +// FnPeerResponseTimeout is called when one or more peers time out +type FnPeerResponseTimeout func([]peer.ID) + +// PeerTimeoutManager keeps track of requests and responses to peers. +// It reports a timeout if there is no response from a peer to any request +// for some period of time. +type PeerTimeoutManager struct { + ctx context.Context + peerResponseTimeout time.Duration + onPeerResponseTimeout FnPeerResponseTimeout + // queue of requests that are sent + requestSent chan request + // queue of responding peers + responseReceived chan peer.ID +} + +// New creates a new PeerTimeoutManager. +func New(ctx context.Context, onTimeout FnPeerResponseTimeout) *PeerTimeoutManager { + return newPeerTimeoutManager(ctx, onTimeout, peerResponseTimeout) +} + +// Used by the tests +func newPeerTimeoutManager(ctx context.Context, onTimeout FnPeerResponseTimeout, timeout time.Duration) *PeerTimeoutManager { + ptm := &PeerTimeoutManager{ + ctx: ctx, + peerResponseTimeout: timeout, + onPeerResponseTimeout: onTimeout, + requestSent: make(chan request, 128), + responseReceived: make(chan peer.ID, 128), + } + + go ptm.run() + + return ptm +} + +// Called when a request is sent to a peer +func (ptm *PeerTimeoutManager) RequestSent(p peer.ID) { + select { + case ptm.requestSent <- request{p: p, sent: time.Now(), active: true}: + case <-ptm.ctx.Done(): + } +} + +// Called when a response is received from a peer +func (ptm *PeerTimeoutManager) ResponseReceived(p peer.ID) { + select { + case ptm.responseReceived <- p: + case <-ptm.ctx.Done(): + } +} + +// The main run loop for the class +func (ptm *PeerTimeoutManager) run() { + // Create a timer to detect when a peer doesn't respond for too long + responseTimer := time.NewTimer(0) + defer responseTimer.Stop() + bstimer.StopTimer(responseTimer) + + // Keep track of requests sent to a peer, and the order of requests + requestsSent := make(map[peer.ID]*request) + var inOrder []*request + for { + select { + case rq := <-ptm.requestSent: + // Check if there was an earlier request sent to the peer + if _, ok := requestsSent[rq.p]; !ok { + // This is the first request sent to the peer + requestsSent[rq.p] = &rq + inOrder = append(inOrder, &rq) + + // If this is the first request sent to any peer, set a timer + // to expire on timeout + if len(requestsSent) == 1 { + responseTimer.Reset(ptm.peerResponseTimeout) + } + } + + case from := <-ptm.responseReceived: + // A response was received so mark any request to the peer as no + // longer active + if rq, ok := requestsSent[from]; ok { + rq.active = false + delete(requestsSent, from) + + // If a lot of responses have been received, clean up the array + // that keeps track of response order + if len(inOrder)-len(requestsSent) > requestGCCount { + inOrder = removeInactive(inOrder) + } + } + + case <-responseTimer.C: + // The timeout has expired, so remove expired requests from the + // queue + expired := make([]peer.ID, 0, len(requestsSent)) + for len(inOrder) > 0 { + rq := inOrder[0] + + // If the request is still active + if rq.active { + // The queue is in order from earliest to latest, so if we + // didn't find an expired entry we can stop iterating + if time.Since(rq.sent) < ptm.peerResponseTimeout { + break + } + + // Add the peer to the expired list + expired = append(expired, rq.p) + // Remove the request from the requestsSent map + delete(requestsSent, rq.p) + } + + // Remove expired or inactive requests from the queue + inOrder = inOrder[1:] + } + + // Fire the timeout event for peers that have timed out + if len(expired) > 0 { + go ptm.fireTimeout(expired) + } + + // If there are some active requests, and the manager hasn't + // shut down + if len(requestsSent) > 0 && ptm.ctx.Err() == nil { + // Schedule the next check for the moment when the oldest + // active request will timeout + oldestStart := inOrder[0].sent + until := time.Until(oldestStart.Add(ptm.peerResponseTimeout)) + responseTimer.Reset(until) + } + + case <-ptm.ctx.Done(): + return + } + } +} + +func removeInactive(rqs []*request) []*request { + to := 0 + for from := 0; from < len(rqs); from++ { + if rqs[from].active { + rqs[to] = rqs[from] + to++ + } + } + + // Ensure the Garbage Collector can clean up the underlying + // array elements by setting them to nil + for i := to; i < len(rqs); i++ { + rqs[i] = nil + } + + // Truncate the slice + rqs = rqs[:to] + + return rqs +} + +// fireTimeout calls the callback with the timed out peers +func (ptm *PeerTimeoutManager) fireTimeout(peers []peer.ID) { + // Make sure the peer timeout manager has not been shut down + if ptm.ctx.Err() != nil { + return + } + + // Fire the timeout + ptm.onPeerResponseTimeout(peers) +} diff --git a/internal/peertimeoutmanager/peertimeoutmanager_test.go b/internal/peertimeoutmanager/peertimeoutmanager_test.go new file mode 100644 index 00000000..27cf0622 --- /dev/null +++ b/internal/peertimeoutmanager/peertimeoutmanager_test.go @@ -0,0 +1,322 @@ +package peertimeoutmanager + +import ( + "context" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ipfs/go-bitswap/internal/testutil" + peer "github.com/libp2p/go-libp2p-core/peer" +) + +func TestPeerTimeoutManagerNoTimeout(t *testing.T) { + ctx := context.Background() + p := testutil.GeneratePeers(1)[0] + + var timedOut []peer.ID + onTimeout := func(peers []peer.ID) { + timedOut = append(timedOut, peers...) + } + + tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + + // Response received within timeout + ptm.RequestSent(p) + time.Sleep(time.Millisecond) + ptm.ResponseReceived(p) + + <-tctx.Done() + + if len(timedOut) > 0 { + t.Fatal("Expected request not to time out") + } +} + +func TestPeerTimeoutManagerWithTimeout(t *testing.T) { + ctx := context.Background() + p := testutil.GeneratePeers(1)[0] + + var timedOut []peer.ID + onTimeout := func(peers []peer.ID) { + timedOut = append(timedOut, peers...) + } + + tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + + // No response received within timeout + ptm.RequestSent(p) + + <-tctx.Done() + + if len(timedOut) == 0 { + t.Fatal("Expected request to time out") + } +} + +func TestPeerTimeoutManagerMultiRequestWithTimeout(t *testing.T) { + ctx := context.Background() + p := testutil.GeneratePeers(1)[0] + + var timedOut []peer.ID + onTimeout := func(peers []peer.ID) { + timedOut = append(timedOut, peers...) + } + + tctx, cancel := context.WithTimeout(ctx, 15*time.Millisecond) + defer cancel() + ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + + // Five requests sent every 2ms + for i := 0; i < 5; i++ { + ptm.RequestSent(p) + time.Sleep(2 * time.Millisecond) + } + // Response received after 10ms (timeout is 5ms) + ptm.ResponseReceived(p) + + <-tctx.Done() + + if len(timedOut) == 0 { + t.Fatal("Expected request to time out") + } +} + +func TestPeerTimeoutManagerMultiRequestResponseWithTimeout(t *testing.T) { + ctx := context.Background() + p := testutil.GeneratePeers(1)[0] + + var timedOut []peer.ID + onTimeout := func(peers []peer.ID) { + timedOut = append(timedOut, peers...) + } + + tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + + // Response received within timeout + ptm.RequestSent(p) + time.Sleep(time.Millisecond) + ptm.ResponseReceived(p) + + time.Sleep(time.Millisecond) + + // Another request sent but no response before timeout + ptm.RequestSent(p) + + <-tctx.Done() + + if len(timedOut) == 0 { + t.Fatal("Expected request to time out") + } +} + +func TestPeerTimeoutManagerMultiRequestResponseNoTimeout(t *testing.T) { + ctx := context.Background() + p := testutil.GeneratePeers(1)[0] + + var timedOut []peer.ID + onTimeout := func(peers []peer.ID) { + timedOut = append(timedOut, peers...) + } + + tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + + // Several requests and responses sent, all within timeout individually + // but combined time more than timeout + ptm.RequestSent(p) + time.Sleep(time.Millisecond) // +1 = 1 + ptm.ResponseReceived(p) + + time.Sleep(2 * time.Millisecond) // +2 = 3 + + ptm.RequestSent(p) + time.Sleep(time.Millisecond) // +1 = 4 + ptm.ResponseReceived(p) + + time.Sleep(2 * time.Millisecond) // +2 = 6 + + ptm.RequestSent(p) + time.Sleep(time.Millisecond) // +1 = 7 + ptm.ResponseReceived(p) + + <-tctx.Done() + + if len(timedOut) > 0 { + t.Fatal("Expected request not to time out") + } +} + +func TestPeerTimeoutManagerWithSomePeersTimeout(t *testing.T) { + ctx := context.Background() + peers := testutil.GeneratePeers(2) + p1 := peers[0] + p2 := peers[1] + + var timedOut []peer.ID + onTimeout := func(peers []peer.ID) { + timedOut = append(timedOut, peers...) + } + + tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + + // Send request to p1 and p2 but only receive response from p1 + ptm.RequestSent(p1) + ptm.RequestSent(p2) + time.Sleep(time.Millisecond) + ptm.ResponseReceived(p1) + + <-tctx.Done() + + if len(timedOut) != 1 { + t.Fatal("Expected one peer request to time out") + } +} + +func TestPeerTimeoutManagerWithAllPeersTimeout(t *testing.T) { + ctx := context.Background() + peers := testutil.GeneratePeers(2) + p1 := peers[0] + p2 := peers[1] + + var timedOut []peer.ID + onTimeout := func(peers []peer.ID) { + timedOut = append(timedOut, peers...) + } + + tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + + // Send request to p1 and p2 and get no response + ptm.RequestSent(p1) + ptm.RequestSent(p2) + + <-tctx.Done() + + if len(timedOut) != 2 { + t.Fatal("Expected all peer requests to time out") + } +} + +// Launch a lot of requests and responses with some randomness and ensure they +// all time out correctly +func TestPeerTimeoutManagerWithManyRequests(t *testing.T) { + ctx := context.Background() + + // Make sure we launch enough that the PeerTimeoutManager's internal + // GC mechanism kicks in + peers := testutil.GeneratePeers(requestGCCount * 3) + + var lk sync.Mutex + var timedOut []peer.ID + onTimeout := func(peers []peer.ID) { + lk.Lock() + defer lk.Unlock() + timedOut = append(timedOut, peers...) + } + + tctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + ptm := newPeerTimeoutManager(tctx, onTimeout, 50*time.Millisecond) + + // Make batches of 5 peers + expTimeout := int32(0) + for i := 0; i < len(peers); i += 5 { + end := i + 5 + if end > len(peers) { + end = len(peers) + } + batch := peers[i:end] + + // Launch all batches concurrently + go func() { + // Add some random delay + time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) + + // Send all the requests + var wgrq sync.WaitGroup + for _, p := range batch { + p := p + wgrq.Add(1) + go func() { + defer wgrq.Done() + ptm.RequestSent(p) + }() + } + wgrq.Wait() + + // Sleep a little + time.Sleep(5 * time.Millisecond) + + // Send responses for half of the requests (the rest should time + // out) + var wgrs sync.WaitGroup + for i, p := range batch { + if i%2 == 0 { + p := p + wgrs.Add(1) + go func() { + defer wgrs.Done() + time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) + ptm.ResponseReceived(p) + }() + } else { + atomic.AddInt32(&expTimeout, 1) + } + } + wgrs.Wait() + }() + } + + <-tctx.Done() + + if len(timedOut) != int(expTimeout) { + t.Fatal("Expected only some peer requests to time out", len(timedOut), expTimeout) + } +} + +func TestRemoveInactive(t *testing.T) { + doTest := func(count int, active ...bool) { + var rqs []*request + for _, a := range active { + rqs = append(rqs, &request{active: a}) + } + + after := removeInactive(rqs) + if len(after) != count { + t.Fatal(fmt.Sprintf("Expected %d requests after filter, got %d", count, len(after))) + } + for _, rq := range after { + if !rq.active { + t.Fatal("Expected all remaining requests to be active") + } + } + } + + doTest(0) + doTest(0, false) + doTest(1, true) + doTest(1, false, true) + doTest(1, true, false) + doTest(2, true, true) + doTest(0, false, false) + doTest(2, true, false, true) + doTest(1, false, true, false) + doTest(3, true, false, true, false, true) + doTest(2, false, true, false, true, false) + doTest(4, true, true, false, true, true) + doTest(3, false, false, true, true, true) +} diff --git a/internal/timer/timer.go b/internal/timer/timer.go new file mode 100644 index 00000000..52936564 --- /dev/null +++ b/internal/timer/timer.go @@ -0,0 +1,13 @@ +package timer + +import ( + "time" +) + +func StopTimer(t *time.Timer) { + if !t.Stop() { + // Need to drain the timer if Stop() returns false + // See: https://golang.org/pkg/time/#Timer.Stop + <-t.C + } +} From ad97437c15f3382850fdc92654b78797570c546d Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 3 Mar 2020 09:35:20 -0500 Subject: [PATCH 2/7] docs: add warning about StopTimer() --- internal/timer/timer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/timer/timer.go b/internal/timer/timer.go index 52936564..0c060fee 100644 --- a/internal/timer/timer.go +++ b/internal/timer/timer.go @@ -4,6 +4,9 @@ import ( "time" ) +// Stop the timer and drain the channel. +// Note that this should only be called if the channel hasn't already been +// drained or it may block indefinitely. func StopTimer(t *time.Timer) { if !t.Stop() { // Need to drain the timer if Stop() returns false From a388fedd72c30615a8c20348c50f1bbc11ad621a Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 9 Mar 2020 09:28:04 -0400 Subject: [PATCH 3/7] fix: log from warn to info --- internal/session/session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/session/session.go b/internal/session/session.go index 45cd825f..a1f88e82 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -340,7 +340,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { // Search for providers who have the first want in the list. // Typically if the provider has the first block they will have // the rest of the blocks also. - log.Warnf("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, lu.C(wants[0]), len(wants)) + log.Infof("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, lu.C(wants[0]), len(wants)) s.findMorePeers(ctx, wants[0]) } s.resetIdleTick() From 565164cae9c6f22933b228546b21f4bd70ebc376 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 9 Mar 2020 09:42:40 -0400 Subject: [PATCH 4/7] fix: races in tests --- .../peertimeoutmanager_test.go | 115 +++++++----------- 1 file changed, 46 insertions(+), 69 deletions(-) diff --git a/internal/peertimeoutmanager/peertimeoutmanager_test.go b/internal/peertimeoutmanager/peertimeoutmanager_test.go index 27cf0622..b0a55f11 100644 --- a/internal/peertimeoutmanager/peertimeoutmanager_test.go +++ b/internal/peertimeoutmanager/peertimeoutmanager_test.go @@ -13,18 +13,32 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" ) +type timeoutRecorder struct { + lk sync.Mutex + timedOut []peer.ID +} + +func (tr *timeoutRecorder) onTimeout(peers []peer.ID) { + tr.lk.Lock() + defer tr.lk.Unlock() + + tr.timedOut = append(tr.timedOut, peers...) +} + +func (tr *timeoutRecorder) timedOutCount() int { + tr.lk.Lock() + defer tr.lk.Unlock() + + return len(tr.timedOut) +} + func TestPeerTimeoutManagerNoTimeout(t *testing.T) { ctx := context.Background() p := testutil.GeneratePeers(1)[0] - - var timedOut []peer.ID - onTimeout := func(peers []peer.ID) { - timedOut = append(timedOut, peers...) - } - + tr := timeoutRecorder{} tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) // Response received within timeout ptm.RequestSent(p) @@ -33,7 +47,7 @@ func TestPeerTimeoutManagerNoTimeout(t *testing.T) { <-tctx.Done() - if len(timedOut) > 0 { + if tr.timedOutCount() > 0 { t.Fatal("Expected request not to time out") } } @@ -41,22 +55,17 @@ func TestPeerTimeoutManagerNoTimeout(t *testing.T) { func TestPeerTimeoutManagerWithTimeout(t *testing.T) { ctx := context.Background() p := testutil.GeneratePeers(1)[0] - - var timedOut []peer.ID - onTimeout := func(peers []peer.ID) { - timedOut = append(timedOut, peers...) - } - + tr := timeoutRecorder{} tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) // No response received within timeout ptm.RequestSent(p) <-tctx.Done() - if len(timedOut) == 0 { + if tr.timedOutCount() == 0 { t.Fatal("Expected request to time out") } } @@ -64,15 +73,10 @@ func TestPeerTimeoutManagerWithTimeout(t *testing.T) { func TestPeerTimeoutManagerMultiRequestWithTimeout(t *testing.T) { ctx := context.Background() p := testutil.GeneratePeers(1)[0] - - var timedOut []peer.ID - onTimeout := func(peers []peer.ID) { - timedOut = append(timedOut, peers...) - } - + tr := timeoutRecorder{} tctx, cancel := context.WithTimeout(ctx, 15*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) // Five requests sent every 2ms for i := 0; i < 5; i++ { @@ -84,7 +88,7 @@ func TestPeerTimeoutManagerMultiRequestWithTimeout(t *testing.T) { <-tctx.Done() - if len(timedOut) == 0 { + if tr.timedOutCount() == 0 { t.Fatal("Expected request to time out") } } @@ -92,15 +96,10 @@ func TestPeerTimeoutManagerMultiRequestWithTimeout(t *testing.T) { func TestPeerTimeoutManagerMultiRequestResponseWithTimeout(t *testing.T) { ctx := context.Background() p := testutil.GeneratePeers(1)[0] - - var timedOut []peer.ID - onTimeout := func(peers []peer.ID) { - timedOut = append(timedOut, peers...) - } - + tr := timeoutRecorder{} tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) // Response received within timeout ptm.RequestSent(p) @@ -114,7 +113,7 @@ func TestPeerTimeoutManagerMultiRequestResponseWithTimeout(t *testing.T) { <-tctx.Done() - if len(timedOut) == 0 { + if tr.timedOutCount() == 0 { t.Fatal("Expected request to time out") } } @@ -122,15 +121,10 @@ func TestPeerTimeoutManagerMultiRequestResponseWithTimeout(t *testing.T) { func TestPeerTimeoutManagerMultiRequestResponseNoTimeout(t *testing.T) { ctx := context.Background() p := testutil.GeneratePeers(1)[0] - - var timedOut []peer.ID - onTimeout := func(peers []peer.ID) { - timedOut = append(timedOut, peers...) - } - + tr := timeoutRecorder{} tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) // Several requests and responses sent, all within timeout individually // but combined time more than timeout @@ -152,7 +146,7 @@ func TestPeerTimeoutManagerMultiRequestResponseNoTimeout(t *testing.T) { <-tctx.Done() - if len(timedOut) > 0 { + if tr.timedOutCount() > 0 { t.Fatal("Expected request not to time out") } } @@ -162,26 +156,21 @@ func TestPeerTimeoutManagerWithSomePeersTimeout(t *testing.T) { peers := testutil.GeneratePeers(2) p1 := peers[0] p2 := peers[1] - - var timedOut []peer.ID - onTimeout := func(peers []peer.ID) { - timedOut = append(timedOut, peers...) - } - + tr := timeoutRecorder{} tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) // Send request to p1 and p2 but only receive response from p1 ptm.RequestSent(p1) ptm.RequestSent(p2) - time.Sleep(time.Millisecond) + time.Sleep(2 * time.Millisecond) ptm.ResponseReceived(p1) <-tctx.Done() - if len(timedOut) != 1 { - t.Fatal("Expected one peer request to time out") + if tr.timedOutCount() != 1 { + t.Fatal("Expected one peer request to time out, got", tr.timedOutCount()) } } @@ -190,15 +179,10 @@ func TestPeerTimeoutManagerWithAllPeersTimeout(t *testing.T) { peers := testutil.GeneratePeers(2) p1 := peers[0] p2 := peers[1] - - var timedOut []peer.ID - onTimeout := func(peers []peer.ID) { - timedOut = append(timedOut, peers...) - } - + tr := timeoutRecorder{} tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) // Send request to p1 and p2 and get no response ptm.RequestSent(p1) @@ -206,7 +190,7 @@ func TestPeerTimeoutManagerWithAllPeersTimeout(t *testing.T) { <-tctx.Done() - if len(timedOut) != 2 { + if tr.timedOutCount() != 2 { t.Fatal("Expected all peer requests to time out") } } @@ -215,22 +199,15 @@ func TestPeerTimeoutManagerWithAllPeersTimeout(t *testing.T) { // all time out correctly func TestPeerTimeoutManagerWithManyRequests(t *testing.T) { ctx := context.Background() + tr := timeoutRecorder{} // Make sure we launch enough that the PeerTimeoutManager's internal // GC mechanism kicks in peers := testutil.GeneratePeers(requestGCCount * 3) - var lk sync.Mutex - var timedOut []peer.ID - onTimeout := func(peers []peer.ID) { - lk.Lock() - defer lk.Unlock() - timedOut = append(timedOut, peers...) - } - tctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, onTimeout, 50*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 50*time.Millisecond) // Make batches of 5 peers expTimeout := int32(0) @@ -283,8 +260,8 @@ func TestPeerTimeoutManagerWithManyRequests(t *testing.T) { <-tctx.Done() - if len(timedOut) != int(expTimeout) { - t.Fatal("Expected only some peer requests to time out", len(timedOut), expTimeout) + if tr.timedOutCount() != int(expTimeout) { + t.Fatal("Expected only some peer requests to time out", tr.timedOutCount(), expTimeout) } } From 805ea89c99d6709658c0f6502909b7a425a2e498 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 9 Mar 2020 09:46:30 -0400 Subject: [PATCH 5/7] fix: flaky test timing --- .../peertimeoutmanager_test.go | 118 ++++++++++-------- 1 file changed, 65 insertions(+), 53 deletions(-) diff --git a/internal/peertimeoutmanager/peertimeoutmanager_test.go b/internal/peertimeoutmanager/peertimeoutmanager_test.go index b0a55f11..26c3156d 100644 --- a/internal/peertimeoutmanager/peertimeoutmanager_test.go +++ b/internal/peertimeoutmanager/peertimeoutmanager_test.go @@ -5,7 +5,6 @@ import ( "fmt" "math/rand" "sync" - "sync/atomic" "testing" "time" @@ -14,8 +13,9 @@ import ( ) type timeoutRecorder struct { - lk sync.Mutex - timedOut []peer.ID + lk sync.Mutex + timedOut []peer.ID + timedOutCh chan peer.ID } func (tr *timeoutRecorder) onTimeout(peers []peer.ID) { @@ -23,6 +23,11 @@ func (tr *timeoutRecorder) onTimeout(peers []peer.ID) { defer tr.lk.Unlock() tr.timedOut = append(tr.timedOut, peers...) + if tr.timedOutCh != nil { + for _, p := range peers { + tr.timedOutCh <- p + } + } } func (tr *timeoutRecorder) timedOutCount() int { @@ -36,13 +41,13 @@ func TestPeerTimeoutManagerNoTimeout(t *testing.T) { ctx := context.Background() p := testutil.GeneratePeers(1)[0] tr := timeoutRecorder{} - tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + tctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 10*time.Millisecond) // Response received within timeout ptm.RequestSent(p) - time.Sleep(time.Millisecond) + time.Sleep(5 * time.Millisecond) ptm.ResponseReceived(p) <-tctx.Done() @@ -56,9 +61,9 @@ func TestPeerTimeoutManagerWithTimeout(t *testing.T) { ctx := context.Background() p := testutil.GeneratePeers(1)[0] tr := timeoutRecorder{} - tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + tctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 10*time.Millisecond) // No response received within timeout ptm.RequestSent(p) @@ -97,16 +102,16 @@ func TestPeerTimeoutManagerMultiRequestResponseWithTimeout(t *testing.T) { ctx := context.Background() p := testutil.GeneratePeers(1)[0] tr := timeoutRecorder{} - tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + tctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 10*time.Millisecond) // Response received within timeout ptm.RequestSent(p) - time.Sleep(time.Millisecond) + time.Sleep(3 * time.Millisecond) ptm.ResponseReceived(p) - time.Sleep(time.Millisecond) + time.Sleep(3 * time.Millisecond) // Another request sent but no response before timeout ptm.RequestSent(p) @@ -122,27 +127,19 @@ func TestPeerTimeoutManagerMultiRequestResponseNoTimeout(t *testing.T) { ctx := context.Background() p := testutil.GeneratePeers(1)[0] tr := timeoutRecorder{} - tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + tctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 10*time.Millisecond) // Several requests and responses sent, all within timeout individually // but combined time more than timeout - ptm.RequestSent(p) - time.Sleep(time.Millisecond) // +1 = 1 - ptm.ResponseReceived(p) - - time.Sleep(2 * time.Millisecond) // +2 = 3 - - ptm.RequestSent(p) - time.Sleep(time.Millisecond) // +1 = 4 - ptm.ResponseReceived(p) - - time.Sleep(2 * time.Millisecond) // +2 = 6 + for i := 0; i < 7; i++ { + ptm.RequestSent(p) + time.Sleep(time.Millisecond) // +1ms + ptm.ResponseReceived(p) - ptm.RequestSent(p) - time.Sleep(time.Millisecond) // +1 = 7 - ptm.ResponseReceived(p) + time.Sleep(time.Millisecond) // +1ms + } <-tctx.Done() @@ -157,14 +154,14 @@ func TestPeerTimeoutManagerWithSomePeersTimeout(t *testing.T) { p1 := peers[0] p2 := peers[1] tr := timeoutRecorder{} - tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + tctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 10*time.Millisecond) // Send request to p1 and p2 but only receive response from p1 ptm.RequestSent(p1) ptm.RequestSent(p2) - time.Sleep(2 * time.Millisecond) + time.Sleep(5 * time.Millisecond) ptm.ResponseReceived(p1) <-tctx.Done() @@ -180,9 +177,9 @@ func TestPeerTimeoutManagerWithAllPeersTimeout(t *testing.T) { p1 := peers[0] p2 := peers[1] tr := timeoutRecorder{} - tctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + tctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 10*time.Millisecond) // Send request to p1 and p2 and get no response ptm.RequestSent(p1) @@ -199,18 +196,23 @@ func TestPeerTimeoutManagerWithAllPeersTimeout(t *testing.T) { // all time out correctly func TestPeerTimeoutManagerWithManyRequests(t *testing.T) { ctx := context.Background() - tr := timeoutRecorder{} + tctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() - // Make sure we launch enough that the PeerTimeoutManager's internal - // GC mechanism kicks in - peers := testutil.GeneratePeers(requestGCCount * 3) + // Make sure we launch enough requests that the PeerTimeoutManager's + // internal GC mechanism kicks in + peerCount := requestGCCount * 3 + timedOutCh := make(chan peer.ID, peerCount*2) + tr := timeoutRecorder{timedOutCh: timedOutCh} - tctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) - defer cancel() - ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 50*time.Millisecond) + peers := testutil.GeneratePeers(peerCount) + + ptm := newPeerTimeoutManager(ctx, tr.onTimeout, 100*time.Millisecond) // Make batches of 5 peers - expTimeout := int32(0) + var lk sync.Mutex + expTimeout := 0 + var allrqs sync.WaitGroup for i := 0; i < len(peers); i += 5 { end := i + 5 if end > len(peers) { @@ -224,44 +226,54 @@ func TestPeerTimeoutManagerWithManyRequests(t *testing.T) { time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) // Send all the requests - var wgrq sync.WaitGroup + var grq sync.WaitGroup for _, p := range batch { p := p - wgrq.Add(1) + grq.Add(1) go func() { - defer wgrq.Done() + defer grq.Done() ptm.RequestSent(p) }() } - wgrq.Wait() + grq.Wait() // Sleep a little time.Sleep(5 * time.Millisecond) // Send responses for half of the requests (the rest should time // out) - var wgrs sync.WaitGroup for i, p := range batch { if i%2 == 0 { p := p - wgrs.Add(1) + allrqs.Add(1) go func() { - defer wgrs.Done() + defer allrqs.Done() time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) ptm.ResponseReceived(p) }() } else { - atomic.AddInt32(&expTimeout, 1) + lk.Lock() + expTimeout++ + lk.Unlock() } } - wgrs.Wait() }() } - <-tctx.Done() + // Wait for the expected number of responses to time out + timedOutCount := 0 + for { + select { + case <-timedOutCh: + timedOutCount++ - if tr.timedOutCount() != int(expTimeout) { - t.Fatal("Expected only some peer requests to time out", tr.timedOutCount(), expTimeout) + if timedOutCount == expTimeout { + return + } + case <-tctx.Done(): + // The test timed out before we got the expected number of timeouts + t.Fatal(fmt.Sprintf("Expected %d peer requests to time out, but %d timed out", expTimeout, tr.timedOutCount())) + } } } From eec41b76afe37255392ba75f51a072aed6e1be23 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 9 Mar 2020 11:41:09 -0400 Subject: [PATCH 6/7] fix: test race --- internal/peertimeoutmanager/peertimeoutmanager_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/peertimeoutmanager/peertimeoutmanager_test.go b/internal/peertimeoutmanager/peertimeoutmanager_test.go index 26c3156d..8435cdb8 100644 --- a/internal/peertimeoutmanager/peertimeoutmanager_test.go +++ b/internal/peertimeoutmanager/peertimeoutmanager_test.go @@ -243,11 +243,13 @@ func TestPeerTimeoutManagerWithManyRequests(t *testing.T) { // Send responses for half of the requests (the rest should time // out) for i, p := range batch { + allrqs.Add(1) + if i%2 == 0 { p := p - allrqs.Add(1) go func() { defer allrqs.Done() + time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) ptm.ResponseReceived(p) }() @@ -255,6 +257,8 @@ func TestPeerTimeoutManagerWithManyRequests(t *testing.T) { lk.Lock() expTimeout++ lk.Unlock() + + allrqs.Done() } } }() From e53420b9cfb9bf44c1d9b8d5160d25da1c70d62a Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 9 Mar 2020 11:48:01 -0400 Subject: [PATCH 7/7] fix: TestPeerTimeoutManagerWithTimeout --- internal/peertimeoutmanager/peertimeoutmanager_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/peertimeoutmanager/peertimeoutmanager_test.go b/internal/peertimeoutmanager/peertimeoutmanager_test.go index 8435cdb8..93c5d86a 100644 --- a/internal/peertimeoutmanager/peertimeoutmanager_test.go +++ b/internal/peertimeoutmanager/peertimeoutmanager_test.go @@ -63,7 +63,7 @@ func TestPeerTimeoutManagerWithTimeout(t *testing.T) { tr := timeoutRecorder{} tctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 10*time.Millisecond) + ptm := newPeerTimeoutManager(tctx, tr.onTimeout, 5*time.Millisecond) // No response received within timeout ptm.RequestSent(p) @@ -271,7 +271,10 @@ func TestPeerTimeoutManagerWithManyRequests(t *testing.T) { case <-timedOutCh: timedOutCount++ - if timedOutCount == expTimeout { + lk.Lock() + exp := expTimeout + lk.Unlock() + if timedOutCount == exp { return } case <-tctx.Done():