From 076f7091f41c90be13a83c6290cf07b8b9cb558e Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 4 Apr 2019 14:05:24 -0700 Subject: [PATCH 1/3] feat(messagequeue): rebroadcast wantlist Provide a failsafe to losing wants on other end by rebroadcasting a wantlist every thirty seconds fix #99, fix #65 --- messagequeue/messagequeue.go | 85 +++++++++++++++++++++---------- messagequeue/messagequeue_test.go | 37 ++++++++++++++ 2 files changed, 96 insertions(+), 26 deletions(-) diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index 2b8f5f7c..d1a24ef4 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -14,7 +14,10 @@ import ( var log = logging.Logger("bitswap") -const maxRetries = 10 +const ( + defaultRebroadcastInterval = 30 * time.Second + maxRetries = 10 +) // MessageNetwork is any network that can connect peers and generate a message // sender. @@ -33,21 +36,25 @@ type MessageQueue struct { done chan struct{} // do not touch out of run loop - wl *wantlist.SessionTrackedWantlist - nextMessage bsmsg.BitSwapMessage - nextMessageLk sync.RWMutex - sender bsnet.MessageSender + wl *wantlist.SessionTrackedWantlist + nextMessage bsmsg.BitSwapMessage + nextMessageLk sync.RWMutex + sender bsnet.MessageSender + rebroadcastIntervalLk sync.RWMutex + rebroadcastInterval time.Duration + rebroadcastTimer *time.Timer } // New creats a new MessageQueue. func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue { return &MessageQueue{ - ctx: ctx, - wl: wantlist.NewSessionTrackedWantlist(), - network: network, - p: p, - outgoingWork: make(chan struct{}, 1), - done: make(chan struct{}), + ctx: ctx, + wl: wantlist.NewSessionTrackedWantlist(), + network: network, + p: p, + outgoingWork: make(chan struct{}, 1), + done: make(chan struct{}), + rebroadcastInterval: defaultRebroadcastInterval, } } @@ -64,27 +71,24 @@ func (mq *MessageQueue) AddMessage(entries []bsmsg.Entry, ses uint64) { // AddWantlist adds a complete session tracked want list to a message queue func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) { - mq.nextMessageLk.Lock() - defer mq.nextMessageLk.Unlock() - initialWants.CopyWants(mq.wl) - if initialWants.Len() > 0 { - if mq.nextMessage == nil { - mq.nextMessage = bsmsg.New(false) - } - for _, e := range initialWants.Entries() { - mq.nextMessage.AddEntry(e.Cid, e.Priority) - } - select { - case mq.outgoingWork <- struct{}{}: - default: - } - } + mq.addWantlist() +} + +// SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist +func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) { + mq.rebroadcastIntervalLk.Lock() + mq.rebroadcastInterval = delay + mq.rebroadcastTimer.Reset(delay) + mq.rebroadcastIntervalLk.Unlock() } // Startup starts the processing of messages, and creates an initial message // based on the given initial wantlist. func (mq *MessageQueue) Startup() { + mq.rebroadcastIntervalLk.RLock() + mq.rebroadcastTimer = time.NewTimer(mq.rebroadcastInterval) + mq.rebroadcastIntervalLk.RUnlock() go mq.runQueue() } @@ -96,6 +100,8 @@ func (mq *MessageQueue) Shutdown() { func (mq *MessageQueue) runQueue() { for { select { + case <-mq.rebroadcastTimer.C: + mq.rebroadcastWantlist() case <-mq.outgoingWork: mq.sendMessage() case <-mq.done: @@ -112,6 +118,33 @@ func (mq *MessageQueue) runQueue() { } } +func (mq *MessageQueue) addWantlist() { + + mq.nextMessageLk.Lock() + defer mq.nextMessageLk.Unlock() + + if mq.wl.Len() > 0 { + if mq.nextMessage == nil { + mq.nextMessage = bsmsg.New(false) + } + for _, e := range mq.wl.Entries() { + mq.nextMessage.AddEntry(e.Cid, e.Priority) + } + select { + case mq.outgoingWork <- struct{}{}: + default: + } + } +} + +func (mq *MessageQueue) rebroadcastWantlist() { + mq.rebroadcastIntervalLk.RLock() + mq.rebroadcastTimer.Reset(mq.rebroadcastInterval) + mq.rebroadcastIntervalLk.RUnlock() + + mq.addWantlist() +} + func (mq *MessageQueue) addEntries(entries []bsmsg.Entry, ses uint64) bool { var work bool mq.nextMessageLk.Lock() diff --git a/messagequeue/messagequeue_test.go b/messagequeue/messagequeue_test.go index aeb903dd..eaba9b3c 100644 --- a/messagequeue/messagequeue_test.go +++ b/messagequeue/messagequeue_test.go @@ -158,3 +158,40 @@ func TestSendingMessagesPartialDupe(t *testing.T) { } } + +func TestWantlistRebroadcast(t *testing.T) { + + ctx := context.Background() + messagesSent := make(chan bsmsg.BitSwapMessage) + resetChan := make(chan struct{}, 1) + fullClosedChan := make(chan struct{}, 1) + fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent} + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + messageQueue := New(ctx, peerID, fakenet) + ses := testutil.GenerateSessionID() + wl := testutil.GenerateWantlist(10, ses) + + messageQueue.Startup() + messageQueue.AddWantlist(wl) + messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + if len(messages) != 1 { + t.Fatal("wrong number of messages were sent for initial wants") + } + + messageQueue.SetRebroadcastInterval(5 * time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + if len(messages) != 1 { + t.Fatal("wrong number of messages were sent for initial wants") + } + + firstMessage := messages[0] + if len(firstMessage.Wantlist()) != wl.Len() { + t.Fatal("did not add all wants to want list") + } + for _, entry := range firstMessage.Wantlist() { + if entry.Cancel { + t.Fatal("initial add sent cancel entry when it should not have") + } + } +} From 256e680ca4afef917d54f2d11e697fbb6578e365 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 4 Apr 2019 17:01:16 -0700 Subject: [PATCH 2/3] fix(messagequeue): add nil check Make sure rebroadcast timer doesn't get reset if it's nil --- messagequeue/messagequeue.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index d1a24ef4..a7142508 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -79,7 +79,9 @@ func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlis func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) { mq.rebroadcastIntervalLk.Lock() mq.rebroadcastInterval = delay - mq.rebroadcastTimer.Reset(delay) + if mq.rebroadcastTimer != nil { + mq.rebroadcastTimer.Reset(delay) + } mq.rebroadcastIntervalLk.Unlock() } From 13e0a4dccf8455078fbba2732455f90dbd2224fe Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 10 Apr 2019 10:49:27 -0700 Subject: [PATCH 3/3] fix(messagequeue): test correction timing on test was failure prone, corrected --- messagequeue/messagequeue_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/messagequeue/messagequeue_test.go b/messagequeue/messagequeue_test.go index eaba9b3c..146f2112 100644 --- a/messagequeue/messagequeue_test.go +++ b/messagequeue/messagequeue_test.go @@ -180,9 +180,9 @@ func TestWantlistRebroadcast(t *testing.T) { } messageQueue.SetRebroadcastInterval(5 * time.Millisecond) - messages = collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + messages = collectMessages(ctx, t, messagesSent, 8*time.Millisecond) if len(messages) != 1 { - t.Fatal("wrong number of messages were sent for initial wants") + t.Fatal("wrong number of messages were rebroadcast") } firstMessage := messages[0]