Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Track timeouts per peer #274

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
bsmq "github.com/ipfs/go-bitswap/internal/messagequeue"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
bspm "github.com/ipfs/go-bitswap/internal/peermanager"
bsptm "github.com/ipfs/go-bitswap/internal/peertimeoutmanager"
bspqm "github.com/ipfs/go-bitswap/internal/providerquerymanager"
bssession "github.com/ipfs/go-bitswap/internal/session"
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
Expand Down Expand Up @@ -130,13 +131,22 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
// Simulate a DONT_HAVE message arriving to the WantManager
wm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
}

var pm *bspm.PeerManager
// onPeerResponseTimeout is triggered when a peer fails to respond to any
// request for a long time
onPeerResponseTimeout := func(peers []peer.ID) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just pass pm.OnTimeout (without the function)

// Tell the peer manager that the peer timed out
pm.OnTimeout(peers)
}
ptm := bsptm.New(ctx, onPeerResponseTimeout)
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
return bsmq.New(ctx, p, network, onDontHaveTimeout, ptm.RequestSent)
}

sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pm = bspm.New(ctx, peerQueueFactory, network.Self())
wm = bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network)

Expand Down Expand Up @@ -167,6 +177,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: pm,
ptm: ptm,
pqm: pqm,
sm: sm,
sim: sim,
Expand Down Expand Up @@ -209,7 +220,8 @@ type Bitswap struct {
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

pm *bspm.PeerManager
pm *bspm.PeerManager
ptm *bsptm.PeerTimeoutManager

// the provider query manager manages requests to find providers
pqm *bspqm.ProviderQueryManager
Expand Down Expand Up @@ -396,24 +408,29 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
bs.counters.messagesRecvd++
bs.counterLk.Unlock()

iblocks := incoming.Blocks()
haves := incoming.Haves()
dontHaves := incoming.DontHaves()
receivedResponse := len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0
if receivedResponse {
// Tell the peer timeout manager that a response was received
bs.ptm.ResponseReceived(p)
}

// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(ctx, p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger

iblocks := incoming.Blocks()

if len(iblocks) > 0 {
bs.updateReceiveCounters(iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
}
}

haves := incoming.Haves()
dontHaves := incoming.DontHaves()
if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 {
if receivedResponse {
// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves)
if err != nil {
Expand Down
27 changes: 16 additions & 11 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

bstimer "github.com/ipfs/go-bitswap/internal/timer"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
bsnet "github.com/ipfs/go-bitswap/network"
Expand Down Expand Up @@ -57,6 +58,7 @@ type MessageQueue struct {
dhTimeoutMgr DontHaveTimeoutManager
maxMessageSize int
sendErrorBackoff time.Duration
onRequestSent OnRequestSent

outgoingWork chan time.Time
done chan struct{}
Expand Down Expand Up @@ -131,6 +133,8 @@ func (pc *peerConn) Latency() time.Duration {
// older version of Bitswap that doesn't support DONT_HAVE messages.
type OnDontHaveTimeout func(peer.ID, []cid.Cid)

type OnRequestSent func(peer.ID)

// DontHaveTimeoutManager pings a peer to estimate latency so it can set a reasonable
// upper bound on when to consider a DONT_HAVE request as timed out (when connected to
// a peer that doesn't support DONT_HAVE messages)
Expand All @@ -147,17 +151,18 @@ type DontHaveTimeoutManager interface {
}

// New creates a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue {
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout, onRequestSent OnRequestSent) *MessageQueue {
onTimeout := func(ks []cid.Cid) {
onDontHaveTimeout(p, ks)
}
dhTimeoutMgr := newDontHaveTimeoutMgr(ctx, newPeerConnection(p, network), onTimeout)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr, onRequestSent)
}

// This constructor is used by the tests
func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager,
onRequestSent OnRequestSent) *MessageQueue {

mq := &MessageQueue{
ctx: ctx,
Expand All @@ -172,6 +177,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
done: make(chan struct{}),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
onRequestSent: onRequestSent,
priority: maxPriority,
}

Expand Down Expand Up @@ -286,11 +292,8 @@ func (mq *MessageQueue) runQueue() {

// Create a timer for debouncing scheduled work.
scheduleWork := time.NewTimer(0)
if !scheduleWork.Stop() {
// Need to drain the timer if Stop() returns false
// See: https://golang.org/pkg/time/#Timer.Stop
<-scheduleWork.C
}
defer scheduleWork.Stop()
bstimer.StopTimer(scheduleWork)

var workScheduled time.Time
for {
Expand All @@ -304,9 +307,8 @@ func (mq *MessageQueue) runQueue() {
// track delay.
if workScheduled.IsZero() {
workScheduled = when
} else if !scheduleWork.Stop() {
// Need to drain the timer if Stop() returns false
<-scheduleWork.C
} else {
bstimer.StopTimer(scheduleWork)
}

// If we have too many updates and/or we've waited too
Expand Down Expand Up @@ -413,6 +415,9 @@ func (mq *MessageQueue) sendMessage() {

mq.simulateDontHaveWithTimeout(message)

// Signal that a request was sent
mq.onRequestSent(mq.p)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concerned that doing this after sending the request could cause us to handle this after receiving the response.


// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
// iteration of the event loop.
Expand Down
25 changes: 13 additions & 12 deletions internal/messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; ret
func (fms *fakeMessageSender) SupportsHave() bool { return fms.supportsHave }

func mockTimeoutCb(peer.ID, []cid.Cid) {}
func mockOnRQSent(peer.ID) {}

func collectMessages(ctx context.Context,
t *testing.T,
Expand Down Expand Up @@ -159,7 +160,7 @@ func TestStartupAndShutdown(t *testing.T) {
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent)
bcstwh := testutil.GenerateCids(10)

messageQueue.Startup()
Expand Down Expand Up @@ -201,7 +202,7 @@ func TestSendingMessagesDeduped(t *testing.T) {
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent)
wantHaves := testutil.GenerateCids(10)
wantBlocks := testutil.GenerateCids(10)

Expand All @@ -224,7 +225,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent)
wantHaves := testutil.GenerateCids(10)
wantBlocks := testutil.GenerateCids(10)

Expand All @@ -247,7 +248,7 @@ func TestSendingMessagesPriority(t *testing.T) {
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent)
wantHaves1 := testutil.GenerateCids(5)
wantHaves2 := testutil.GenerateCids(5)
wantHaves := append(wantHaves1, wantHaves2...)
Expand Down Expand Up @@ -316,7 +317,7 @@ func TestCancelOverridesPendingWants(t *testing.T) {
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent)
wantHaves := testutil.GenerateCids(2)
wantBlocks := testutil.GenerateCids(2)

Expand Down Expand Up @@ -350,7 +351,7 @@ func TestWantOverridesPendingCancels(t *testing.T) {
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent)
cancels := testutil.GenerateCids(3)

messageQueue.Startup()
Expand Down Expand Up @@ -383,7 +384,7 @@ func TestWantlistRebroadcast(t *testing.T) {
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent)
bcstwh := testutil.GenerateCids(10)
wantHaves := testutil.GenerateCids(10)
wantBlocks := testutil.GenerateCids(10)
Expand Down Expand Up @@ -485,7 +486,7 @@ func TestSendingLargeMessages(t *testing.T) {
wantBlocks := testutil.GenerateCids(10)
entrySize := 44
maxMsgSize := entrySize * 3 // 3 wants
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, dhtm, mockOnRQSent)

messageQueue.Startup()
messageQueue.AddWants(wantBlocks, []cid.Cid{})
Expand All @@ -512,7 +513,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]

messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, mockOnRQSent)
messageQueue.Startup()

// If the remote peer doesn't support HAVE / DONT_HAVE messages
Expand Down Expand Up @@ -569,7 +570,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
peerID := testutil.GeneratePeers(1)[0]

dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm, mockOnRQSent)
messageQueue.Startup()

wbs := testutil.GenerateCids(10)
Expand Down Expand Up @@ -602,7 +603,7 @@ func TestResendAfterError(t *testing.T) {
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
sendErrBackoff := 5 * time.Millisecond
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm, mockOnRQSent)
wantBlocks := testutil.GenerateCids(10)
wantHaves := testutil.GenerateCids(10)

Expand Down Expand Up @@ -641,7 +642,7 @@ func TestResendAfterMaxRetries(t *testing.T) {
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
sendErrBackoff := 2 * time.Millisecond
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm, mockOnRQSent)
wantBlocks := testutil.GenerateCids(10)
wantHaves := testutil.GenerateCids(10)
wantBlocks2 := testutil.GenerateCids(10)
Expand Down
11 changes: 11 additions & 0 deletions internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
pm.pwm.RemovePeer(p)
}

// OnTimeout is called when one or more peers times out.
func (pm *PeerManager) OnTimeout(peers []peer.ID) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

for _, p := range peers {
// Inform the sessions that the peer is not available
pm.signalAvailability(p, false)
}
}

// BroadcastWantHaves broadcasts want-haves to all peers (used by the session
// to discover seeds).
// For each peer it filters out want-haves that have previously been sent to
Expand Down
Loading