From 3165b98c43467d43efc68873b961eb9f906e1712 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 26 Nov 2018 19:10:17 -0800 Subject: [PATCH 1/5] refactor(WantManager): extract PeerManager Seperates the functions of tracking wants from tracking peers Unit tests for both peer manager and want manager Refactor internals of both to address synchonization issues discovered in tests This commit was moved from ipfs/go-bitswap@693085c9c90faa8fe516ffe6979e5bc8c749c478 --- bitswap/bitswap.go | 36 ++- bitswap/bitswap_test.go | 4 +- bitswap/peermanager/peermanager.go | 192 ++++++++++++++++ bitswap/peermanager/peermanager_test.go | 128 +++++++++++ bitswap/wantmanager/wantmanager.go | 277 +++++++++++------------- bitswap/wantmanager/wantmanager_test.go | 244 +++++++++++++++++++++ bitswap/workers.go | 24 +- 7 files changed, 739 insertions(+), 166 deletions(-) create mode 100644 bitswap/peermanager/peermanager.go create mode 100644 bitswap/peermanager/peermanager_test.go create mode 100644 bitswap/wantmanager/wantmanager_test.go diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 0e8fbf4e9..b3e472d2d 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -11,8 +11,10 @@ import ( decision "github.com/ipfs/go-bitswap/decision" bsmsg "github.com/ipfs/go-bitswap/message" + bsmq "github.com/ipfs/go-bitswap/messagequeue" bsnet "github.com/ipfs/go-bitswap/network" notifications "github.com/ipfs/go-bitswap/notifications" + bspm "github.com/ipfs/go-bitswap/peermanager" bssm "github.com/ipfs/go-bitswap/sessionmanager" bswm "github.com/ipfs/go-bitswap/wantmanager" @@ -85,12 +87,19 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+ " data blocks recived").Histogram(metricsBuckets) + sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+ + " this bitswap").Histogram(metricsBuckets) + notif := notifications.New() px := process.WithTeardown(func() error { notif.Shutdown() return nil }) + peerQueueFactory := func(p peer.ID) bspm.PeerQueue { + return bsmq.New(p, network) + } + bs := &Bitswap{ blockstore: bstore, notifications: notif, @@ -100,14 +109,18 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan cid.Cid, HasBlockBufferSize), provideKeys: make(chan cid.Cid, provideKeysBufferSize), - wm: bswm.New(ctx, network), + wm: bswm.New(ctx), + pm: bspm.New(ctx, peerQueueFactory), sm: bssm.New(), counters: new(counters), - - dupMetric: dupHist, - allMetric: allHist, + dupMetric: dupHist, + allMetric: allHist, + sentHistogram: sentHistogram, } - go bs.wm.Run() + + bs.wm.SetDelegate(bs.pm) + bs.pm.Startup() + bs.wm.Startup() network.SetDelegate(bs) // Start up bitswaps async worker routines @@ -128,6 +141,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, type Bitswap struct { // the peermanager manages sending messages to peers in a way that // wont block bitswap operation + pm *bspm.PeerManager + + // the wantlist tracks global wants for bitswap wm *bswm.WantManager // the engine is the bit of logic that decides who to send which blocks to @@ -160,8 +176,9 @@ type Bitswap struct { counters *counters // Metrics interface metrics - dupMetric metrics.Histogram - allMetric metrics.Histogram + dupMetric metrics.Histogram + allMetric metrics.Histogram + sentHistogram metrics.Histogram // the sessionmanager manages tracking sessions sm *bssm.SessionManager @@ -427,13 +444,14 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { - bs.wm.Connected(p) + initialWants := bs.wm.CurrentBroadcastWants() + bs.pm.Connected(p, initialWants) bs.engine.PeerConnected(p) } // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerDisconnected(p peer.ID) { - bs.wm.Disconnected(p) + bs.pm.Disconnected(p) bs.engine.PeerDisconnected(p) } diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index d55fd0733..ef2d73b8d 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -202,10 +202,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { nump := len(instances) - 1 // assert we're properly connected for _, inst := range instances { - peers := inst.Exchange.wm.ConnectedPeers() + peers := inst.Exchange.pm.ConnectedPeers() for i := 0; i < 10 && len(peers) != nump; i++ { time.Sleep(time.Millisecond * 50) - peers = inst.Exchange.wm.ConnectedPeers() + peers = inst.Exchange.pm.ConnectedPeers() } if len(peers) != nump { t.Fatal("not enough peers connected to instance") diff --git a/bitswap/peermanager/peermanager.go b/bitswap/peermanager/peermanager.go new file mode 100644 index 000000000..2fea3ef85 --- /dev/null +++ b/bitswap/peermanager/peermanager.go @@ -0,0 +1,192 @@ +package peermanager + +import ( + "context" + + bsmsg "github.com/ipfs/go-bitswap/message" + wantlist "github.com/ipfs/go-bitswap/wantlist" + logging "github.com/ipfs/go-log" + + peer "github.com/libp2p/go-libp2p-peer" +) + +var log = logging.Logger("bitswap") + +var ( + metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} +) + +type sendMessageParams struct { + entries []*bsmsg.Entry + targets []peer.ID + from uint64 +} + +type connectParams struct { + peer peer.ID + initialEntries []*wantlist.Entry +} + +type peerMessageType int + +const ( + connect peerMessageType = iota + 1 + disconnect + getPeers + sendMessage +) + +type peerMessage struct { + messageType peerMessageType + params interface{} + resultsChan chan interface{} +} + +type PeerQueue interface { + RefIncrement() + RefDecrement() bool + AddMessage(entries []*bsmsg.Entry, ses uint64) + Startup(ctx context.Context, initialEntries []*wantlist.Entry) + Shutdown() +} + +type PeerQueueFactory func(p peer.ID) PeerQueue + +type PeerManager struct { + // sync channel for Run loop + peerMessages chan peerMessage + + // synchronized by Run loop, only touch inside there + peerQueues map[peer.ID]PeerQueue + + createPeerQueue PeerQueueFactory + ctx context.Context + cancel func() +} + +func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { + ctx, cancel := context.WithCancel(ctx) + return &PeerManager{ + peerMessages: make(chan peerMessage, 10), + peerQueues: make(map[peer.ID]PeerQueue), + createPeerQueue: createPeerQueue, + ctx: ctx, + cancel: cancel, + } +} + +func (pm *PeerManager) ConnectedPeers() []peer.ID { + resp := make(chan interface{}) + pm.peerMessages <- peerMessage{getPeers, nil, resp} + peers := <-resp + return peers.([]peer.ID) +} + +func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue { + mq, ok := pm.peerQueues[p] + if ok { + mq.RefIncrement() + return nil + } + + mq = pm.createPeerQueue(p) + pm.peerQueues[p] = mq + mq.Startup(pm.ctx, initialEntries) + return mq +} + +func (pm *PeerManager) stopPeerHandler(p peer.ID) { + pq, ok := pm.peerQueues[p] + if !ok { + // TODO: log error? + return + } + + if pq.RefDecrement() { + return + } + + pq.Shutdown() + delete(pm.peerQueues, p) +} + +func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) { + select { + case pm.peerMessages <- peerMessage{connect, connectParams{peer: p, initialEntries: initialEntries}, nil}: + case <-pm.ctx.Done(): + } +} + +func (pm *PeerManager) Disconnected(p peer.ID) { + select { + case pm.peerMessages <- peerMessage{disconnect, p, nil}: + case <-pm.ctx.Done(): + } +} + +func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { + select { + case pm.peerMessages <- peerMessage{ + sendMessage, + &sendMessageParams{entries: entries, targets: targets, from: from}, + nil, + }: + case <-pm.ctx.Done(): + } +} + +func (pm *PeerManager) Startup() { + go pm.run() +} + +func (pm *PeerManager) Shutdown() { + pm.cancel() +} + +// TODO: use goprocess here once i trust it +func (pm *PeerManager) run() { + // NOTE: Do not open any streams or connections from anywhere in this + // event loop. Really, just don't do anything likely to block. + for { + select { + case message := <-pm.peerMessages: + pm.handleMessage(message) + case <-pm.ctx.Done(): + return + } + } +} + +func (pm *PeerManager) handleMessage(message peerMessage) { + + switch message.messageType { + case sendMessage: + ms := message.params.(*sendMessageParams) + if len(ms.targets) == 0 { + for _, p := range pm.peerQueues { + p.AddMessage(ms.entries, ms.from) + } + } else { + for _, t := range ms.targets { + p, ok := pm.peerQueues[t] + if !ok { + log.Infof("tried sending wantlist change to non-partner peer: %s", t) + continue + } + p.AddMessage(ms.entries, ms.from) + } + } + case connect: + p := message.params.(connectParams) + pm.startPeerHandler(p.peer, p.initialEntries) + case disconnect: + disconnectPeer := message.params.(peer.ID) + pm.stopPeerHandler(disconnectPeer) + case getPeers: + peers := make([]peer.ID, 0, len(pm.peerQueues)) + for p := range pm.peerQueues { + peers = append(peers, p) + } + message.resultsChan <- peers + } +} diff --git a/bitswap/peermanager/peermanager_test.go b/bitswap/peermanager/peermanager_test.go new file mode 100644 index 000000000..c6260df69 --- /dev/null +++ b/bitswap/peermanager/peermanager_test.go @@ -0,0 +1,128 @@ +package peermanager + +import ( + "context" + "testing" + + bsmsg "github.com/ipfs/go-bitswap/message" + wantlist "github.com/ipfs/go-bitswap/wantlist" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs-blocksutil" + "github.com/libp2p/go-libp2p-peer" +) + +var blockGenerator = blocksutil.NewBlockGenerator() + +func generateCids(n int) []cid.Cid { + cids := make([]cid.Cid, 0, n) + for i := 0; i < n; i++ { + c := blockGenerator.Next().Cid() + cids = append(cids, c) + } + return cids +} + +var peerSeq int + +func generatePeers(n int) []peer.ID { + peerIds := make([]peer.ID, 0, n) + for i := 0; i < n; i++ { + peerSeq++ + p := peer.ID(peerSeq) + peerIds = append(peerIds, p) + } + return peerIds +} + +var nextSession uint64 + +func generateSessionID() uint64 { + nextSession++ + return uint64(nextSession) +} + +type messageSent struct { + p peer.ID + entries []*bsmsg.Entry + ses uint64 +} + +type fakePeer struct { + refcnt int + p peer.ID + messagesSent chan messageSent +} + +func containsPeer(peers []peer.ID, p peer.ID) bool { + for _, n := range peers { + if p == n { + return true + } + } + return false +} + +func (fp *fakePeer) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {} +func (fp *fakePeer) Shutdown() {} +func (fp *fakePeer) RefIncrement() { fp.refcnt++ } +func (fp *fakePeer) RefDecrement() bool { + fp.refcnt-- + return fp.refcnt > 0 +} +func (fp *fakePeer) AddMessage(entries []*bsmsg.Entry, ses uint64) { + fp.messagesSent <- messageSent{fp.p, entries, ses} +} + +func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory { + return func(p peer.ID) PeerQueue { + return &fakePeer{ + p: p, + refcnt: 1, + messagesSent: messagesSent, + } + } +} + +func TestAddingAndRemovingPeers(t *testing.T) { + ctx := context.Background() + peerQueueFactory := makePeerQueueFactory(nil) + + tp := generatePeers(5) + peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] + peerManager := New(ctx, peerQueueFactory) + peerManager.Startup() + + peerManager.Connected(peer1, nil) + peerManager.Connected(peer2, nil) + peerManager.Connected(peer3, nil) + + connectedPeers := peerManager.ConnectedPeers() + + if !containsPeer(connectedPeers, peer1) || + !containsPeer(connectedPeers, peer2) || + !containsPeer(connectedPeers, peer3) { + t.Fatal("Peers not connected that should be connected") + } + + if containsPeer(connectedPeers, peer4) || + containsPeer(connectedPeers, peer5) { + t.Fatal("Peers connected that shouldn't be connected") + } + + // removing a peer with only one reference + peerManager.Disconnected(peer1) + connectedPeers = peerManager.ConnectedPeers() + + if containsPeer(connectedPeers, peer1) { + t.Fatal("Peer should have been disconnected but was not") + } + + // connecting a peer twice, then disconnecting once, should stay in queue + peerManager.Connected(peer2, nil) + peerManager.Disconnected(peer2) + connectedPeers = peerManager.ConnectedPeers() + + if !containsPeer(connectedPeers, peer2) { + t.Fatal("Peer was disconnected but should not have been") + } +} diff --git a/bitswap/wantmanager/wantmanager.go b/bitswap/wantmanager/wantmanager.go index e3734290c..a9ea90163 100644 --- a/bitswap/wantmanager/wantmanager.go +++ b/bitswap/wantmanager/wantmanager.go @@ -4,10 +4,7 @@ import ( "context" "math" - engine "github.com/ipfs/go-bitswap/decision" bsmsg "github.com/ipfs/go-bitswap/message" - bsmq "github.com/ipfs/go-bitswap/messagequeue" - bsnet "github.com/ipfs/go-bitswap/network" wantlist "github.com/ipfs/go-bitswap/wantlist" logging "github.com/ipfs/go-log" @@ -19,59 +16,72 @@ import ( var log = logging.Logger("bitswap") const ( - // kMaxPriority is the max priority as defined by the bitswap protocol - kMaxPriority = math.MaxInt32 + // maxPriority is the max priority as defined by the bitswap protocol + maxPriority = math.MaxInt32 ) -var ( - metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} +// WantSender sends changes out to the network as they get added to the wantlist +// managed by the WantManager +type WantSender interface { + SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) +} + +type wantMessageType int + +const ( + isWanted wantMessageType = iota + 1 + addWants + currentWants + currentBroadcastWants + wantCount ) +type wantMessage struct { + messageType wantMessageType + params interface{} + resultsChan chan interface{} +} + +// WantManager manages a global want list. It tracks two seperate want lists - +// one for all wants, and one for wants that are specifically broadcast to the +// internet type WantManager struct { - // sync channels for Run loop - incoming chan *wantSet - connectEvent chan peerStatus // notification channel for peers connecting/disconnecting - peerReqs chan chan []peer.ID // channel to request connected peers on + // channel requests to the run loop + // to get predictable behavior while running this in a go routine + // having only one channel is neccesary, so requests are processed serially + messageReqs chan wantMessage // synchronized by Run loop, only touch inside there - peers map[peer.ID]*bsmq.MessageQueue - wl *wantlist.ThreadSafe - bcwl *wantlist.ThreadSafe + wl *wantlist.ThreadSafe + bcwl *wantlist.ThreadSafe - network bsnet.BitSwapNetwork - ctx context.Context - cancel func() + ctx context.Context + cancel func() + wantSender WantSender wantlistGauge metrics.Gauge - sentHistogram metrics.Histogram -} - -type peerStatus struct { - connect bool - peer peer.ID } -func New(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { +// New initializes a new WantManager +func New(ctx context.Context) *WantManager { ctx, cancel := context.WithCancel(ctx) wantlistGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge() - sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+ - " this bitswap").Histogram(metricsBuckets) return &WantManager{ - incoming: make(chan *wantSet, 10), - connectEvent: make(chan peerStatus, 10), - peerReqs: make(chan chan []peer.ID), - peers: make(map[peer.ID]*bsmq.MessageQueue), + messageReqs: make(chan wantMessage, 10), wl: wantlist.NewThreadSafe(), bcwl: wantlist.NewThreadSafe(), - network: network, ctx: ctx, cancel: cancel, wantlistGauge: wantlistGauge, - sentHistogram: sentHistogram, } } +// SetDelegate specifies who will send want changes out to the internet +func (wm *WantManager) SetDelegate(wantSender WantSender) { + wm.wantSender = wantSender +} + // WantBlocks adds the given cids to the wantlist, tracked by the given session func (wm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { log.Infof("want blocks: %s", ks) @@ -94,158 +104,119 @@ func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []p for i, k := range ks { entries = append(entries, &bsmsg.Entry{ Cancel: cancel, - Entry: wantlist.NewRefEntry(k, kMaxPriority-i), + Entry: wantlist.NewRefEntry(k, maxPriority-i), }) } select { - case wm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}: + case wm.messageReqs <- wantMessage{ + messageType: addWants, + params: &wantSet{entries: entries, targets: targets, from: ses}, + }: case <-wm.ctx.Done(): case <-ctx.Done(): } } -func (wm *WantManager) ConnectedPeers() []peer.ID { - resp := make(chan []peer.ID) - wm.peerReqs <- resp - return <-resp -} - -func (wm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) { - // Blocks need to be sent synchronously to maintain proper backpressure - // throughout the network stack - defer env.Sent() - - msgSize := 0 - msg := bsmsg.New(false) - for _, block := range env.Message.Blocks() { - msgSize += len(block.RawData()) - msg.AddBlock(block) - log.Infof("Sending block %s to %s", block, env.Peer) - } - - wm.sentHistogram.Observe(float64(msgSize)) - err := wm.network.SendMessage(ctx, env.Peer, msg) - if err != nil { - log.Infof("sendblock error: %s", err) - } -} - -func (wm *WantManager) startPeerHandler(p peer.ID) *bsmq.MessageQueue { - mq, ok := wm.peers[p] - if ok { - mq.RefIncrement() - return nil - } - - mq = bsmq.New(p, wm.network) - wm.peers[p] = mq - mq.Startup(wm.ctx, wm.bcwl.Entries()) - return mq -} - -func (wm *WantManager) stopPeerHandler(p peer.ID) { - pq, ok := wm.peers[p] - if !ok { - // TODO: log error? - return - } - - if pq.RefDecrement() { - return - } - - pq.Shutdown() - delete(wm.peers, p) -} - -func (wm *WantManager) Connected(p peer.ID) { - select { - case wm.connectEvent <- peerStatus{peer: p, connect: true}: - case <-wm.ctx.Done(): - } +func (wm *WantManager) Startup() { + go wm.run() } -func (wm *WantManager) Disconnected(p peer.ID) { - select { - case wm.connectEvent <- peerStatus{peer: p, connect: false}: - case <-wm.ctx.Done(): - } +func (wm *WantManager) Shutdown() { + wm.cancel() } -// TODO: use goprocess here once i trust it -func (wm *WantManager) Run() { +func (wm *WantManager) run() { // NOTE: Do not open any streams or connections from anywhere in this // event loop. Really, just don't do anything likely to block. for { select { - case ws := <-wm.incoming: - - // is this a broadcast or not? - brdc := len(ws.targets) == 0 - - // add changes to our wantlist - for _, e := range ws.entries { - if e.Cancel { - if brdc { - wm.bcwl.Remove(e.Cid, ws.from) - } - - if wm.wl.Remove(e.Cid, ws.from) { - wm.wantlistGauge.Dec() - } - } else { - if brdc { - wm.bcwl.AddEntry(e.Entry, ws.from) - } - if wm.wl.AddEntry(e.Entry, ws.from) { - wm.wantlistGauge.Inc() - } + case message := <-wm.messageReqs: + wm.handleMessage(message) + case <-wm.ctx.Done(): + return + } + } +} + +func (wm *WantManager) handleMessage(message wantMessage) { + switch message.messageType { + case addWants: + ws := message.params.(*wantSet) + // is this a broadcast or not? + brdc := len(ws.targets) == 0 + + // add changes to our wantlist + for _, e := range ws.entries { + if e.Cancel { + if brdc { + wm.bcwl.Remove(e.Cid, ws.from) } - } - // broadcast those wantlist changes - if len(ws.targets) == 0 { - for _, p := range wm.peers { - p.AddMessage(ws.entries, ws.from) + if wm.wl.Remove(e.Cid, ws.from) { + wm.wantlistGauge.Dec() } } else { - for _, t := range ws.targets { - p, ok := wm.peers[t] - if !ok { - log.Infof("tried sending wantlist change to non-partner peer: %s", t) - continue - } - p.AddMessage(ws.entries, ws.from) + if brdc { + wm.bcwl.AddEntry(e.Entry, ws.from) + } + if wm.wl.AddEntry(e.Entry, ws.from) { + wm.wantlistGauge.Inc() } } - - case p := <-wm.connectEvent: - if p.connect { - wm.startPeerHandler(p.peer) - } else { - wm.stopPeerHandler(p.peer) - } - case req := <-wm.peerReqs: - peers := make([]peer.ID, 0, len(wm.peers)) - for p := range wm.peers { - peers = append(peers, p) - } - req <- peers - case <-wm.ctx.Done(): - return } + + // broadcast those wantlist changes + wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from) + case isWanted: + c := message.params.(cid.Cid) + _, isWanted := wm.wl.Contains(c) + message.resultsChan <- isWanted + case currentWants: + message.resultsChan <- wm.wl.Entries() + case currentBroadcastWants: + message.resultsChan <- wm.bcwl.Entries() + case wantCount: + message.resultsChan <- wm.wl.Len() } } func (wm *WantManager) IsWanted(c cid.Cid) bool { - _, isWanted := wm.wl.Contains(c) - return isWanted + resp := make(chan interface{}) + wm.messageReqs <- wantMessage{ + messageType: isWanted, + params: c, + resultsChan: resp, + } + result := <-resp + return result.(bool) } func (wm *WantManager) CurrentWants() []*wantlist.Entry { - return wm.wl.Entries() + resp := make(chan interface{}) + wm.messageReqs <- wantMessage{ + messageType: currentWants, + resultsChan: resp, + } + result := <-resp + return result.([]*wantlist.Entry) +} + +func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry { + resp := make(chan interface{}) + wm.messageReqs <- wantMessage{ + messageType: currentBroadcastWants, + resultsChan: resp, + } + result := <-resp + return result.([]*wantlist.Entry) } func (wm *WantManager) WantCount() int { - return wm.wl.Len() + resp := make(chan interface{}) + wm.messageReqs <- wantMessage{ + messageType: wantCount, + resultsChan: resp, + } + result := <-resp + return result.(int) } diff --git a/bitswap/wantmanager/wantmanager_test.go b/bitswap/wantmanager/wantmanager_test.go new file mode 100644 index 000000000..54cab8345 --- /dev/null +++ b/bitswap/wantmanager/wantmanager_test.go @@ -0,0 +1,244 @@ +package wantmanager + +import ( + "context" + "reflect" + "sync" + "testing" + + bsmsg "github.com/ipfs/go-bitswap/message" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs-blocksutil" + "github.com/libp2p/go-libp2p-peer" +) + +var blockGenerator = blocksutil.NewBlockGenerator() + +func generateCids(n int) []cid.Cid { + cids := make([]cid.Cid, 0, n) + for i := 0; i < n; i++ { + c := blockGenerator.Next().Cid() + cids = append(cids, c) + } + return cids +} + +var peerSeq int + +func generatePeers(n int) []peer.ID { + peerIds := make([]peer.ID, 0, n) + for i := 0; i < n; i++ { + peerSeq++ + p := peer.ID(peerSeq) + peerIds = append(peerIds, p) + } + return peerIds +} + +var nextSession uint64 + +func generateSessionID() uint64 { + nextSession++ + return uint64(nextSession) +} + +type fakeWantSender struct { + lk sync.RWMutex + lastWantSet wantSet +} + +func (fws *fakeWantSender) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { + fws.lk.Lock() + fws.lastWantSet = wantSet{entries, targets, from} + fws.lk.Unlock() +} + +func (fws *fakeWantSender) getLastWantSet() wantSet { + fws.lk.Lock() + defer fws.lk.Unlock() + return fws.lastWantSet +} + +func setupTestFixturesAndInitialWantList() ( + context.Context, *fakeWantSender, *WantManager, []cid.Cid, []cid.Cid, []peer.ID, uint64, uint64) { + ctx := context.Background() + + // setup fixtures + wantSender := &fakeWantSender{} + wantManager := New(ctx) + keys := generateCids(10) + otherKeys := generateCids(5) + peers := generatePeers(10) + session := generateSessionID() + otherSession := generateSessionID() + + // startup wantManager + wantManager.SetDelegate(wantSender) + wantManager.Startup() + + // add initial wants + wantManager.WantBlocks( + ctx, + keys, + peers, + session) + + return ctx, wantSender, wantManager, keys, otherKeys, peers, session, otherSession +} + +func TestInitialWantsAddedCorrectly(t *testing.T) { + + _, wantSender, wantManager, keys, _, peers, session, _ := + setupTestFixturesAndInitialWantList() + + bcwl := wantManager.CurrentBroadcastWants() + wl := wantManager.CurrentWants() + + if len(bcwl) > 0 { + t.Fatal("should not create broadcast wants when peers are specified") + } + + if len(wl) != len(keys) { + t.Fatal("did not add correct number of wants to want lsit") + } + + generatedWantSet := wantSender.getLastWantSet() + + if len(generatedWantSet.entries) != len(keys) { + t.Fatal("incorrect wants sent") + } + + for _, entry := range generatedWantSet.entries { + if entry.Cancel { + t.Fatal("did not send only non-cancel messages") + } + } + + if generatedWantSet.from != session { + t.Fatal("incorrect session used in sending") + } + + if !reflect.DeepEqual(generatedWantSet.targets, peers) { + t.Fatal("did not setup peers correctly") + } + + wantManager.Shutdown() +} + +func TestCancellingWants(t *testing.T) { + ctx, wantSender, wantManager, keys, _, peers, session, _ := + setupTestFixturesAndInitialWantList() + + wantManager.CancelWants(ctx, keys, peers, session) + + wl := wantManager.CurrentWants() + + if len(wl) != 0 { + t.Fatal("did not remove blocks from want list") + } + + generatedWantSet := wantSender.getLastWantSet() + + if len(generatedWantSet.entries) != len(keys) { + t.Fatal("incorrect wants sent") + } + + for _, entry := range generatedWantSet.entries { + if !entry.Cancel { + t.Fatal("did not send only cancel messages") + } + } + + if generatedWantSet.from != session { + t.Fatal("incorrect session used in sending") + } + + if !reflect.DeepEqual(generatedWantSet.targets, peers) { + t.Fatal("did not setup peers correctly") + } + + wantManager.Shutdown() + +} + +func TestCancellingWantsFromAnotherSessionHasNoEffect(t *testing.T) { + ctx, _, wantManager, keys, _, peers, _, otherSession := + setupTestFixturesAndInitialWantList() + + // cancelling wants from another session has no effect + wantManager.CancelWants(ctx, keys, peers, otherSession) + + wl := wantManager.CurrentWants() + + if len(wl) != len(keys) { + t.Fatal("should not cancel wants unless they match session that made them") + } + + wantManager.Shutdown() +} + +func TestAddingWantsWithNoPeersAddsToBroadcastAndRegularWantList(t *testing.T) { + ctx, _, wantManager, keys, otherKeys, _, session, _ := + setupTestFixturesAndInitialWantList() + + wantManager.WantBlocks(ctx, otherKeys, nil, session) + + bcwl := wantManager.CurrentBroadcastWants() + wl := wantManager.CurrentWants() + + if len(bcwl) != len(otherKeys) { + t.Fatal("want requests with no peers should get added to broadcast list") + } + + if len(wl) != len(otherKeys)+len(keys) { + t.Fatal("want requests with no peers should get added to regular want list") + } + + wantManager.Shutdown() +} + +func TestAddingRequestFromSecondSessionPreventsCancel(t *testing.T) { + ctx, wantSender, wantManager, keys, _, peers, session, otherSession := + setupTestFixturesAndInitialWantList() + + // add a second session requesting the first key + firstKeys := append([]cid.Cid(nil), keys[0]) + wantManager.WantBlocks(ctx, firstKeys, peers, otherSession) + + wl := wantManager.CurrentWants() + + if len(wl) != len(keys) { + t.Fatal("wants from other sessions should not get added seperately") + } + + generatedWantSet := wantSender.getLastWantSet() + if len(generatedWantSet.entries) != len(firstKeys) && + generatedWantSet.from != otherSession && + generatedWantSet.entries[0].Cid != firstKeys[0] && + generatedWantSet.entries[0].Cancel != false { + t.Fatal("should send additional message requesting want for new session") + } + + // cancel block from first session + wantManager.CancelWants(ctx, firstKeys, peers, session) + + wl = wantManager.CurrentWants() + + // want should still be on want list + if len(wl) != len(keys) { + t.Fatal("wants should not be removed until all sessions cancel wants") + } + + // cancel other block from first session + secondKeys := append([]cid.Cid(nil), keys[1]) + wantManager.CancelWants(ctx, secondKeys, peers, session) + + wl = wantManager.CurrentWants() + + // want should not be on want list, cause it was only tracked by one session + if len(wl) != len(keys)-1 { + t.Fatal("wants should be removed if all sessions have cancelled") + } + + wantManager.Shutdown() +} diff --git a/bitswap/workers.go b/bitswap/workers.go index 34b75bab2..99a967068 100644 --- a/bitswap/workers.go +++ b/bitswap/workers.go @@ -6,8 +6,8 @@ import ( "sync" "time" + engine "github.com/ipfs/go-bitswap/decision" bsmsg "github.com/ipfs/go-bitswap/message" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" process "github.com/jbenet/goprocess" @@ -74,7 +74,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { } bs.engine.MessageSent(envelope.Peer, outgoing) - bs.wm.SendBlocks(ctx, envelope) + bs.sendBlocks(ctx, envelope) bs.counterLk.Lock() for _, block := range envelope.Message.Blocks() { bs.counters.blocksSent++ @@ -90,6 +90,26 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { } } +func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) { + // Blocks need to be sent synchronously to maintain proper backpressure + // throughout the network stack + defer env.Sent() + + msgSize := 0 + msg := bsmsg.New(false) + for _, block := range env.Message.Blocks() { + msgSize += len(block.RawData()) + msg.AddBlock(block) + log.Infof("Sending block %s to %s", block, env.Peer) + } + + bs.sentHistogram.Observe(float64(msgSize)) + err := bs.network.SendMessage(ctx, env.Peer, msg) + if err != nil { + log.Infof("sendblock error: %s", err) + } +} + func (bs *Bitswap) provideWorker(px process.Process) { limit := make(chan struct{}, provideWorkerMax) From f5d003925f8e518e64fc72f01caa1dd9afc6d0ed Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 27 Nov 2018 11:37:53 -0800 Subject: [PATCH 2/5] refactor(Managers): Further cleanup Finishing adding comments to WantManager and PeerManager, refactor message structure for type safety, add sending messages test This commit was moved from ipfs/go-bitswap@9ed150a736762ebc62bf7fc2d0d3639e52a50bc7 --- bitswap/peermanager/peermanager.go | 203 ++++++++++++----------- bitswap/peermanager/peermanager_test.go | 106 +++++++++++- bitswap/wantmanager/wantmanager.go | 212 ++++++++++++------------ 3 files changed, 309 insertions(+), 212 deletions(-) diff --git a/bitswap/peermanager/peermanager.go b/bitswap/peermanager/peermanager.go index 2fea3ef85..379fd4bd2 100644 --- a/bitswap/peermanager/peermanager.go +++ b/bitswap/peermanager/peermanager.go @@ -16,32 +16,7 @@ var ( metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} ) -type sendMessageParams struct { - entries []*bsmsg.Entry - targets []peer.ID - from uint64 -} - -type connectParams struct { - peer peer.ID - initialEntries []*wantlist.Entry -} - -type peerMessageType int - -const ( - connect peerMessageType = iota + 1 - disconnect - getPeers - sendMessage -) - -type peerMessage struct { - messageType peerMessageType - params interface{} - resultsChan chan interface{} -} - +// PeerQueue provides a queer of messages to be sent for a single peer type PeerQueue interface { RefIncrement() RefDecrement() bool @@ -50,8 +25,14 @@ type PeerQueue interface { Shutdown() } +// PeerQueueFactory provides a function that will create a PeerQueue type PeerQueueFactory func(p peer.ID) PeerQueue +type peerMessage interface { + handle(pm *PeerManager) +} + +// PeerManager manages a pool of peers and sends messages to peers in the pool type PeerManager struct { // sync channel for Run loop peerMessages chan peerMessage @@ -64,6 +45,7 @@ type PeerManager struct { cancel func() } +// New creates a new PeerManager, given a context and a peerQueueFactory func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { ctx, cancel := context.WithCancel(ctx) return &PeerManager{ @@ -75,118 +57,145 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { } } +// ConnectedPeers returns a list of peers this PeerManager is managing func (pm *PeerManager) ConnectedPeers() []peer.ID { - resp := make(chan interface{}) - pm.peerMessages <- peerMessage{getPeers, nil, resp} - peers := <-resp - return peers.([]peer.ID) -} - -func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue { - mq, ok := pm.peerQueues[p] - if ok { - mq.RefIncrement() - return nil - } - - mq = pm.createPeerQueue(p) - pm.peerQueues[p] = mq - mq.Startup(pm.ctx, initialEntries) - return mq -} - -func (pm *PeerManager) stopPeerHandler(p peer.ID) { - pq, ok := pm.peerQueues[p] - if !ok { - // TODO: log error? - return - } - - if pq.RefDecrement() { - return - } - - pq.Shutdown() - delete(pm.peerQueues, p) + resp := make(chan []peer.ID) + pm.peerMessages <- &getPeersMessage{resp} + return <-resp } +// Connected is called to add a new peer to the pool, and send it an initial set +// of wants func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) { select { - case pm.peerMessages <- peerMessage{connect, connectParams{peer: p, initialEntries: initialEntries}, nil}: + case pm.peerMessages <- &connectPeerMessage{p, initialEntries}: case <-pm.ctx.Done(): } } +// Disconnected is called to remove a peer from the pool func (pm *PeerManager) Disconnected(p peer.ID) { select { - case pm.peerMessages <- peerMessage{disconnect, p, nil}: + case pm.peerMessages <- &disconnectPeerMessage{p}: case <-pm.ctx.Done(): } } +// SendMessage is called to send a message to all or some peers in the pool +// if targets is nil, it sends to all func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { select { - case pm.peerMessages <- peerMessage{ - sendMessage, - &sendMessageParams{entries: entries, targets: targets, from: from}, - nil, - }: + case pm.peerMessages <- &sendPeerMessage{entries: entries, targets: targets, from: from}: case <-pm.ctx.Done(): } } +// Startup enables the run loop for the PeerManager - no processing will occur +// if startup is not called func (pm *PeerManager) Startup() { go pm.run() } +// Shutdown shutsdown processing for the PeerManager func (pm *PeerManager) Shutdown() { pm.cancel() } -// TODO: use goprocess here once i trust it func (pm *PeerManager) run() { - // NOTE: Do not open any streams or connections from anywhere in this - // event loop. Really, just don't do anything likely to block. for { select { case message := <-pm.peerMessages: - pm.handleMessage(message) + message.handle(pm) case <-pm.ctx.Done(): return } } } -func (pm *PeerManager) handleMessage(message peerMessage) { +type sendPeerMessage struct { + entries []*bsmsg.Entry + targets []peer.ID + from uint64 +} - switch message.messageType { - case sendMessage: - ms := message.params.(*sendMessageParams) - if len(ms.targets) == 0 { - for _, p := range pm.peerQueues { - p.AddMessage(ms.entries, ms.from) - } - } else { - for _, t := range ms.targets { - p, ok := pm.peerQueues[t] - if !ok { - log.Infof("tried sending wantlist change to non-partner peer: %s", t) - continue - } - p.AddMessage(ms.entries, ms.from) - } +func (s *sendPeerMessage) handle(pm *PeerManager) { + pm.sendMessage(s) +} + +type connectPeerMessage struct { + p peer.ID + initialEntries []*wantlist.Entry +} + +func (c *connectPeerMessage) handle(pm *PeerManager) { + pm.startPeerHandler(c.p, c.initialEntries) +} + +type disconnectPeerMessage struct { + p peer.ID +} + +func (dc *disconnectPeerMessage) handle(pm *PeerManager) { + pm.stopPeerHandler(dc.p) +} + +type getPeersMessage struct { + peerResp chan<- []peer.ID +} + +func (gp *getPeersMessage) handle(pm *PeerManager) { + pm.getPeers(gp.peerResp) +} + +func (pm *PeerManager) getPeers(peerResp chan<- []peer.ID) { + peers := make([]peer.ID, 0, len(pm.peerQueues)) + for p := range pm.peerQueues { + peers = append(peers, p) + } + peerResp <- peers +} + +func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue { + mq, ok := pm.peerQueues[p] + if ok { + mq.RefIncrement() + return nil + } + + mq = pm.createPeerQueue(p) + pm.peerQueues[p] = mq + mq.Startup(pm.ctx, initialEntries) + return mq +} + +func (pm *PeerManager) stopPeerHandler(p peer.ID) { + pq, ok := pm.peerQueues[p] + if !ok { + // TODO: log error? + return + } + + if pq.RefDecrement() { + return + } + + pq.Shutdown() + delete(pm.peerQueues, p) +} + +func (pm *PeerManager) sendMessage(ms *sendPeerMessage) { + if len(ms.targets) == 0 { + for _, p := range pm.peerQueues { + p.AddMessage(ms.entries, ms.from) } - case connect: - p := message.params.(connectParams) - pm.startPeerHandler(p.peer, p.initialEntries) - case disconnect: - disconnectPeer := message.params.(peer.ID) - pm.stopPeerHandler(disconnectPeer) - case getPeers: - peers := make([]peer.ID, 0, len(pm.peerQueues)) - for p := range pm.peerQueues { - peers = append(peers, p) + } else { + for _, t := range ms.targets { + p, ok := pm.peerQueues[t] + if !ok { + log.Infof("tried sending wantlist change to non-partner peer: %s", t) + continue + } + p.AddMessage(ms.entries, ms.from) } - message.resultsChan <- peers } } diff --git a/bitswap/peermanager/peermanager_test.go b/bitswap/peermanager/peermanager_test.go index c6260df69..67ba38ae4 100644 --- a/bitswap/peermanager/peermanager_test.go +++ b/bitswap/peermanager/peermanager_test.go @@ -2,24 +2,30 @@ package peermanager import ( "context" + "reflect" "testing" + "time" bsmsg "github.com/ipfs/go-bitswap/message" wantlist "github.com/ipfs/go-bitswap/wantlist" - "github.com/ipfs/go-cid" "github.com/ipfs/go-ipfs-blocksutil" "github.com/libp2p/go-libp2p-peer" ) var blockGenerator = blocksutil.NewBlockGenerator() +var prioritySeq int -func generateCids(n int) []cid.Cid { - cids := make([]cid.Cid, 0, n) +func generateEntries(n int, isCancel bool) []*bsmsg.Entry { + bsmsgs := make([]*bsmsg.Entry, 0, n) for i := 0; i < n; i++ { - c := blockGenerator.Next().Cid() - cids = append(cids, c) + prioritySeq++ + msg := &bsmsg.Entry{ + Entry: wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq), + Cancel: isCancel, + } + bsmsgs = append(bsmsgs, msg) } - return cids + return bsmsgs } var peerSeq int @@ -83,6 +89,32 @@ func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory { } } +func collectAndCheckMessages( + ctx context.Context, + t *testing.T, + messagesSent <-chan messageSent, + entries []*bsmsg.Entry, + ses uint64, + timeout time.Duration) []peer.ID { + var peersReceived []peer.ID + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + for { + select { + case nextMessage := <-messagesSent: + if nextMessage.ses != ses { + t.Fatal("Message enqueued with wrong session") + } + if !reflect.DeepEqual(nextMessage.entries, entries) { + t.Fatal("Message enqueued with wrong wants") + } + peersReceived = append(peersReceived, nextMessage.p) + case <-timeoutCtx.Done(): + return peersReceived + } + } +} + func TestAddingAndRemovingPeers(t *testing.T) { ctx := context.Background() peerQueueFactory := makePeerQueueFactory(nil) @@ -126,3 +158,65 @@ func TestAddingAndRemovingPeers(t *testing.T) { t.Fatal("Peer was disconnected but should not have been") } } + +func TestSendingMessagesToPeers(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan messageSent) + peerQueueFactory := makePeerQueueFactory(messagesSent) + + tp := generatePeers(5) + + peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] + peerManager := New(ctx, peerQueueFactory) + peerManager.Startup() + + peerManager.Connected(peer1, nil) + peerManager.Connected(peer2, nil) + peerManager.Connected(peer3, nil) + + entries := generateEntries(5, false) + ses := generateSessionID() + + peerManager.SendMessage(entries, nil, ses) + + peersReceived := collectAndCheckMessages( + ctx, t, messagesSent, entries, ses, 200*time.Millisecond) + if len(peersReceived) != 3 { + t.Fatal("Incorrect number of peers received messages") + } + + if !containsPeer(peersReceived, peer1) || + !containsPeer(peersReceived, peer2) || + !containsPeer(peersReceived, peer3) { + t.Fatal("Peers should have received message but did not") + } + + if containsPeer(peersReceived, peer4) || + containsPeer(peersReceived, peer5) { + t.Fatal("Peers received message but should not have") + } + + var peersToSendTo []peer.ID + peersToSendTo = append(peersToSendTo, peer1, peer3, peer4) + peerManager.SendMessage(entries, peersToSendTo, ses) + peersReceived = collectAndCheckMessages( + ctx, t, messagesSent, entries, ses, 200*time.Millisecond) + + if len(peersReceived) != 2 { + t.Fatal("Incorrect number of peers received messages") + } + + if !containsPeer(peersReceived, peer1) || + !containsPeer(peersReceived, peer3) { + t.Fatal("Peers should have received message but did not") + } + + if containsPeer(peersReceived, peer2) || + containsPeer(peersReceived, peer5) { + t.Fatal("Peers received message but should not have") + } + + if containsPeer(peersReceived, peer4) { + t.Fatal("Peers targeted received message but was not connected") + } +} diff --git a/bitswap/wantmanager/wantmanager.go b/bitswap/wantmanager/wantmanager.go index a9ea90163..3dcff166b 100644 --- a/bitswap/wantmanager/wantmanager.go +++ b/bitswap/wantmanager/wantmanager.go @@ -26,20 +26,8 @@ type WantSender interface { SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) } -type wantMessageType int - -const ( - isWanted wantMessageType = iota + 1 - addWants - currentWants - currentBroadcastWants - wantCount -) - -type wantMessage struct { - messageType wantMessageType - params interface{} - resultsChan chan interface{} +type wantMessage interface { + handle(wm *WantManager) } // WantManager manages a global want list. It tracks two seperate want lists - @@ -49,7 +37,7 @@ type WantManager struct { // channel requests to the run loop // to get predictable behavior while running this in a go routine // having only one channel is neccesary, so requests are processed serially - messageReqs chan wantMessage + wantMessages chan wantMessage // synchronized by Run loop, only touch inside there wl *wantlist.ThreadSafe @@ -62,13 +50,13 @@ type WantManager struct { wantlistGauge metrics.Gauge } -// New initializes a new WantManager +// New initializes a new WantManager for a given context func New(ctx context.Context) *WantManager { ctx, cancel := context.WithCancel(ctx) wantlistGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge() return &WantManager{ - messageReqs: make(chan wantMessage, 10), + wantMessages: make(chan wantMessage, 10), wl: wantlist.NewThreadSafe(), bcwl: wantlist.NewThreadSafe(), ctx: ctx, @@ -93,34 +81,40 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe wm.addEntries(context.Background(), ks, peers, true, ses) } -type wantSet struct { - entries []*bsmsg.Entry - targets []peer.ID - from uint64 +// IsWanted returns whether a CID is currently wanted +func (wm *WantManager) IsWanted(c cid.Cid) bool { + resp := make(chan bool) + wm.wantMessages <- &isWantedMessage{c, resp} + return <-resp } -func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) { - entries := make([]*bsmsg.Entry, 0, len(ks)) - for i, k := range ks { - entries = append(entries, &bsmsg.Entry{ - Cancel: cancel, - Entry: wantlist.NewRefEntry(k, maxPriority-i), - }) - } - select { - case wm.messageReqs <- wantMessage{ - messageType: addWants, - params: &wantSet{entries: entries, targets: targets, from: ses}, - }: - case <-wm.ctx.Done(): - case <-ctx.Done(): - } +// CurrentWants returns the list of current wants +func (wm *WantManager) CurrentWants() []*wantlist.Entry { + resp := make(chan []*wantlist.Entry) + wm.wantMessages <- ¤tWantsMessage{resp} + return <-resp +} + +// CurrentBroadcastWants returns the current list of wants that are broadcasts +func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry { + resp := make(chan []*wantlist.Entry) + wm.wantMessages <- ¤tBroadcastWantsMessage{resp} + return <-resp +} + +// WantCount returns the total count of wants +func (wm *WantManager) WantCount() int { + resp := make(chan int) + wm.wantMessages <- &wantCountMessage{resp} + return <-resp } +// Startup starts processing for the WantManager func (wm *WantManager) Startup() { go wm.run() } +// Shutdown ends processing for the want manager func (wm *WantManager) Shutdown() { wm.cancel() } @@ -130,93 +124,93 @@ func (wm *WantManager) run() { // event loop. Really, just don't do anything likely to block. for { select { - case message := <-wm.messageReqs: - wm.handleMessage(message) + case message := <-wm.wantMessages: + message.handle(wm) case <-wm.ctx.Done(): return } } } -func (wm *WantManager) handleMessage(message wantMessage) { - switch message.messageType { - case addWants: - ws := message.params.(*wantSet) - // is this a broadcast or not? - brdc := len(ws.targets) == 0 - - // add changes to our wantlist - for _, e := range ws.entries { - if e.Cancel { - if brdc { - wm.bcwl.Remove(e.Cid, ws.from) - } - - if wm.wl.Remove(e.Cid, ws.from) { - wm.wantlistGauge.Dec() - } - } else { - if brdc { - wm.bcwl.AddEntry(e.Entry, ws.from) - } - if wm.wl.AddEntry(e.Entry, ws.from) { - wm.wantlistGauge.Inc() - } +func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) { + entries := make([]*bsmsg.Entry, 0, len(ks)) + for i, k := range ks { + entries = append(entries, &bsmsg.Entry{ + Cancel: cancel, + Entry: wantlist.NewRefEntry(k, maxPriority-i), + }) + } + select { + case wm.wantMessages <- &wantSet{entries: entries, targets: targets, from: ses}: + case <-wm.ctx.Done(): + case <-ctx.Done(): + } +} + +type wantSet struct { + entries []*bsmsg.Entry + targets []peer.ID + from uint64 +} + +func (ws *wantSet) handle(wm *WantManager) { + // is this a broadcast or not? + brdc := len(ws.targets) == 0 + + // add changes to our wantlist + for _, e := range ws.entries { + if e.Cancel { + if brdc { + wm.bcwl.Remove(e.Cid, ws.from) } - } - // broadcast those wantlist changes - wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from) - case isWanted: - c := message.params.(cid.Cid) - _, isWanted := wm.wl.Contains(c) - message.resultsChan <- isWanted - case currentWants: - message.resultsChan <- wm.wl.Entries() - case currentBroadcastWants: - message.resultsChan <- wm.bcwl.Entries() - case wantCount: - message.resultsChan <- wm.wl.Len() + if wm.wl.Remove(e.Cid, ws.from) { + wm.wantlistGauge.Dec() + } + } else { + if brdc { + wm.bcwl.AddEntry(e.Entry, ws.from) + } + if wm.wl.AddEntry(e.Entry, ws.from) { + wm.wantlistGauge.Inc() + } + } } + + // broadcast those wantlist changes + wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from) } -func (wm *WantManager) IsWanted(c cid.Cid) bool { - resp := make(chan interface{}) - wm.messageReqs <- wantMessage{ - messageType: isWanted, - params: c, - resultsChan: resp, - } - result := <-resp - return result.(bool) +type isWantedMessage struct { + c cid.Cid + resp chan<- bool } -func (wm *WantManager) CurrentWants() []*wantlist.Entry { - resp := make(chan interface{}) - wm.messageReqs <- wantMessage{ - messageType: currentWants, - resultsChan: resp, - } - result := <-resp - return result.([]*wantlist.Entry) +func (iwm *isWantedMessage) handle(wm *WantManager) { + _, isWanted := wm.wl.Contains(iwm.c) + iwm.resp <- isWanted } -func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry { - resp := make(chan interface{}) - wm.messageReqs <- wantMessage{ - messageType: currentBroadcastWants, - resultsChan: resp, - } - result := <-resp - return result.([]*wantlist.Entry) +type currentWantsMessage struct { + resp chan<- []*wantlist.Entry } -func (wm *WantManager) WantCount() int { - resp := make(chan interface{}) - wm.messageReqs <- wantMessage{ - messageType: wantCount, - resultsChan: resp, - } - result := <-resp - return result.(int) +func (cwm *currentWantsMessage) handle(wm *WantManager) { + cwm.resp <- wm.wl.Entries() +} + +type currentBroadcastWantsMessage struct { + resp chan<- []*wantlist.Entry +} + +func (cbcwm *currentBroadcastWantsMessage) handle(wm *WantManager) { + cbcwm.resp <- wm.bcwl.Entries() +} + +type wantCountMessage struct { + resp chan<- int +} + +func (wcm *wantCountMessage) handle(wm *WantManager) { + wcm.resp <- wm.wl.Len() } From 4f22a630bb25e1ddfcae3bc3bc6961f8b1efa49a Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 27 Nov 2018 13:15:42 -0800 Subject: [PATCH 3/5] refactor(testing): extract common test utils This commit was moved from ipfs/go-bitswap@9532d009dbd08019440f810ddbf11304fc3003e6 --- bitswap/peermanager/peermanager_test.go | 89 +++++++------------------ bitswap/testutil/testutil.go | 67 +++++++++++++++++++ bitswap/wantmanager/wantmanager_test.go | 43 ++---------- 3 files changed, 97 insertions(+), 102 deletions(-) create mode 100644 bitswap/testutil/testutil.go diff --git a/bitswap/peermanager/peermanager_test.go b/bitswap/peermanager/peermanager_test.go index 67ba38ae4..9b242b55b 100644 --- a/bitswap/peermanager/peermanager_test.go +++ b/bitswap/peermanager/peermanager_test.go @@ -6,47 +6,13 @@ import ( "testing" "time" + "github.com/ipfs/go-bitswap/testutil" + bsmsg "github.com/ipfs/go-bitswap/message" wantlist "github.com/ipfs/go-bitswap/wantlist" - "github.com/ipfs/go-ipfs-blocksutil" "github.com/libp2p/go-libp2p-peer" ) -var blockGenerator = blocksutil.NewBlockGenerator() -var prioritySeq int - -func generateEntries(n int, isCancel bool) []*bsmsg.Entry { - bsmsgs := make([]*bsmsg.Entry, 0, n) - for i := 0; i < n; i++ { - prioritySeq++ - msg := &bsmsg.Entry{ - Entry: wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq), - Cancel: isCancel, - } - bsmsgs = append(bsmsgs, msg) - } - return bsmsgs -} - -var peerSeq int - -func generatePeers(n int) []peer.ID { - peerIds := make([]peer.ID, 0, n) - for i := 0; i < n; i++ { - peerSeq++ - p := peer.ID(peerSeq) - peerIds = append(peerIds, p) - } - return peerIds -} - -var nextSession uint64 - -func generateSessionID() uint64 { - nextSession++ - return uint64(nextSession) -} - type messageSent struct { p peer.ID entries []*bsmsg.Entry @@ -59,15 +25,6 @@ type fakePeer struct { messagesSent chan messageSent } -func containsPeer(peers []peer.ID, p peer.ID) bool { - for _, n := range peers { - if p == n { - return true - } - } - return false -} - func (fp *fakePeer) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {} func (fp *fakePeer) Shutdown() {} func (fp *fakePeer) RefIncrement() { fp.refcnt++ } @@ -119,7 +76,7 @@ func TestAddingAndRemovingPeers(t *testing.T) { ctx := context.Background() peerQueueFactory := makePeerQueueFactory(nil) - tp := generatePeers(5) + tp := testutil.GeneratePeers(5) peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] peerManager := New(ctx, peerQueueFactory) peerManager.Startup() @@ -130,14 +87,14 @@ func TestAddingAndRemovingPeers(t *testing.T) { connectedPeers := peerManager.ConnectedPeers() - if !containsPeer(connectedPeers, peer1) || - !containsPeer(connectedPeers, peer2) || - !containsPeer(connectedPeers, peer3) { + if !testutil.ContainsPeer(connectedPeers, peer1) || + !testutil.ContainsPeer(connectedPeers, peer2) || + !testutil.ContainsPeer(connectedPeers, peer3) { t.Fatal("Peers not connected that should be connected") } - if containsPeer(connectedPeers, peer4) || - containsPeer(connectedPeers, peer5) { + if testutil.ContainsPeer(connectedPeers, peer4) || + testutil.ContainsPeer(connectedPeers, peer5) { t.Fatal("Peers connected that shouldn't be connected") } @@ -145,7 +102,7 @@ func TestAddingAndRemovingPeers(t *testing.T) { peerManager.Disconnected(peer1) connectedPeers = peerManager.ConnectedPeers() - if containsPeer(connectedPeers, peer1) { + if testutil.ContainsPeer(connectedPeers, peer1) { t.Fatal("Peer should have been disconnected but was not") } @@ -154,7 +111,7 @@ func TestAddingAndRemovingPeers(t *testing.T) { peerManager.Disconnected(peer2) connectedPeers = peerManager.ConnectedPeers() - if !containsPeer(connectedPeers, peer2) { + if !testutil.ContainsPeer(connectedPeers, peer2) { t.Fatal("Peer was disconnected but should not have been") } } @@ -164,7 +121,7 @@ func TestSendingMessagesToPeers(t *testing.T) { messagesSent := make(chan messageSent) peerQueueFactory := makePeerQueueFactory(messagesSent) - tp := generatePeers(5) + tp := testutil.GeneratePeers(5) peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] peerManager := New(ctx, peerQueueFactory) @@ -174,8 +131,8 @@ func TestSendingMessagesToPeers(t *testing.T) { peerManager.Connected(peer2, nil) peerManager.Connected(peer3, nil) - entries := generateEntries(5, false) - ses := generateSessionID() + entries := testutil.GenerateEntries(5, false) + ses := testutil.GenerateSessionID() peerManager.SendMessage(entries, nil, ses) @@ -185,14 +142,14 @@ func TestSendingMessagesToPeers(t *testing.T) { t.Fatal("Incorrect number of peers received messages") } - if !containsPeer(peersReceived, peer1) || - !containsPeer(peersReceived, peer2) || - !containsPeer(peersReceived, peer3) { + if !testutil.ContainsPeer(peersReceived, peer1) || + !testutil.ContainsPeer(peersReceived, peer2) || + !testutil.ContainsPeer(peersReceived, peer3) { t.Fatal("Peers should have received message but did not") } - if containsPeer(peersReceived, peer4) || - containsPeer(peersReceived, peer5) { + if testutil.ContainsPeer(peersReceived, peer4) || + testutil.ContainsPeer(peersReceived, peer5) { t.Fatal("Peers received message but should not have") } @@ -206,17 +163,17 @@ func TestSendingMessagesToPeers(t *testing.T) { t.Fatal("Incorrect number of peers received messages") } - if !containsPeer(peersReceived, peer1) || - !containsPeer(peersReceived, peer3) { + if !testutil.ContainsPeer(peersReceived, peer1) || + !testutil.ContainsPeer(peersReceived, peer3) { t.Fatal("Peers should have received message but did not") } - if containsPeer(peersReceived, peer2) || - containsPeer(peersReceived, peer5) { + if testutil.ContainsPeer(peersReceived, peer2) || + testutil.ContainsPeer(peersReceived, peer5) { t.Fatal("Peers received message but should not have") } - if containsPeer(peersReceived, peer4) { + if testutil.ContainsPeer(peersReceived, peer4) { t.Fatal("Peers targeted received message but was not connected") } } diff --git a/bitswap/testutil/testutil.go b/bitswap/testutil/testutil.go new file mode 100644 index 000000000..6ac7dcbfb --- /dev/null +++ b/bitswap/testutil/testutil.go @@ -0,0 +1,67 @@ +package testutil + +import ( + bsmsg "github.com/ipfs/go-bitswap/message" + "github.com/ipfs/go-bitswap/wantlist" + cid "github.com/ipfs/go-cid" + blocksutil "github.com/ipfs/go-ipfs-blocksutil" + peer "github.com/libp2p/go-libp2p-peer" +) + +var blockGenerator = blocksutil.NewBlockGenerator() +var prioritySeq int + +// GenerateCids produces n content identifiers +func GenerateCids(n int) []cid.Cid { + cids := make([]cid.Cid, 0, n) + for i := 0; i < n; i++ { + c := blockGenerator.Next().Cid() + cids = append(cids, c) + } + return cids +} + +// GenerateEntries makes fake bitswap message entries +func GenerateEntries(n int, isCancel bool) []*bsmsg.Entry { + bsmsgs := make([]*bsmsg.Entry, 0, n) + for i := 0; i < n; i++ { + prioritySeq++ + msg := &bsmsg.Entry{ + Entry: wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq), + Cancel: isCancel, + } + bsmsgs = append(bsmsgs, msg) + } + return bsmsgs +} + +var peerSeq int + +// GeneratePeers creates n peer ids +func GeneratePeers(n int) []peer.ID { + peerIds := make([]peer.ID, 0, n) + for i := 0; i < n; i++ { + peerSeq++ + p := peer.ID(peerSeq) + peerIds = append(peerIds, p) + } + return peerIds +} + +var nextSession uint64 + +// GenerateSessionID make a unit session identifier +func GenerateSessionID() uint64 { + nextSession++ + return uint64(nextSession) +} + +// ContainsPeer returns true if a peer is found n a list of peers +func ContainsPeer(peers []peer.ID, p peer.ID) bool { + for _, n := range peers { + if p == n { + return true + } + } + return false +} diff --git a/bitswap/wantmanager/wantmanager_test.go b/bitswap/wantmanager/wantmanager_test.go index 54cab8345..85590bb15 100644 --- a/bitswap/wantmanager/wantmanager_test.go +++ b/bitswap/wantmanager/wantmanager_test.go @@ -6,42 +6,13 @@ import ( "sync" "testing" + "github.com/ipfs/go-bitswap/testutil" + bsmsg "github.com/ipfs/go-bitswap/message" "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipfs-blocksutil" "github.com/libp2p/go-libp2p-peer" ) -var blockGenerator = blocksutil.NewBlockGenerator() - -func generateCids(n int) []cid.Cid { - cids := make([]cid.Cid, 0, n) - for i := 0; i < n; i++ { - c := blockGenerator.Next().Cid() - cids = append(cids, c) - } - return cids -} - -var peerSeq int - -func generatePeers(n int) []peer.ID { - peerIds := make([]peer.ID, 0, n) - for i := 0; i < n; i++ { - peerSeq++ - p := peer.ID(peerSeq) - peerIds = append(peerIds, p) - } - return peerIds -} - -var nextSession uint64 - -func generateSessionID() uint64 { - nextSession++ - return uint64(nextSession) -} - type fakeWantSender struct { lk sync.RWMutex lastWantSet wantSet @@ -66,11 +37,11 @@ func setupTestFixturesAndInitialWantList() ( // setup fixtures wantSender := &fakeWantSender{} wantManager := New(ctx) - keys := generateCids(10) - otherKeys := generateCids(5) - peers := generatePeers(10) - session := generateSessionID() - otherSession := generateSessionID() + keys := testutil.GenerateCids(10) + otherKeys := testutil.GenerateCids(5) + peers := testutil.GeneratePeers(10) + session := testutil.GenerateSessionID() + otherSession := testutil.GenerateSessionID() // startup wantManager wantManager.SetDelegate(wantSender) From 7b958317afe28945db17f5b594438ef3e2142a8d Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 27 Nov 2018 20:13:06 -0800 Subject: [PATCH 4/5] test(messagequeue): Add test for messagequeue This commit was moved from ipfs/go-bitswap@3b7ae9b87a493a4b4abb331a29cfae3247688bfa --- bitswap/messagequeue/messagequeue.go | 28 ++-- bitswap/messagequeue/messagequeue_test.go | 161 ++++++++++++++++++++++ bitswap/peermanager/peermanager_test.go | 6 +- bitswap/testutil/testutil.go | 15 +- 4 files changed, 195 insertions(+), 15 deletions(-) create mode 100644 bitswap/messagequeue/messagequeue_test.go diff --git a/bitswap/messagequeue/messagequeue.go b/bitswap/messagequeue/messagequeue.go index f36117d65..d8421a15a 100644 --- a/bitswap/messagequeue/messagequeue.go +++ b/bitswap/messagequeue/messagequeue.go @@ -14,12 +14,17 @@ import ( var log = logging.Logger("bitswap") +type MessageNetwork interface { + ConnectTo(context.Context, peer.ID) error + NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) +} + type MessageQueue struct { p peer.ID outlk sync.Mutex out bsmsg.BitSwapMessage - network bsnet.BitSwapNetwork + network MessageNetwork wl *wantlist.ThreadSafe sender bsnet.MessageSender @@ -30,7 +35,7 @@ type MessageQueue struct { done chan struct{} } -func New(p peer.ID, network bsnet.BitSwapNetwork) *MessageQueue { +func New(p peer.ID, network MessageNetwork) *MessageQueue { return &MessageQueue{ done: make(chan struct{}), work: make(chan struct{}, 1), @@ -90,22 +95,25 @@ func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) { func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) { // new peer, we will want to give them our full wantlist - fullwantlist := bsmsg.New(true) - for _, e := range initialEntries { - for k := range e.SesTrk { - mq.wl.AddEntry(e, k) + if len(initialEntries) > 0 { + fullwantlist := bsmsg.New(true) + for _, e := range initialEntries { + for k := range e.SesTrk { + mq.wl.AddEntry(e, k) + } + fullwantlist.AddEntry(e.Cid, e.Priority) } - fullwantlist.AddEntry(e.Cid, e.Priority) + mq.out = fullwantlist + mq.work <- struct{}{} } - mq.out = fullwantlist - mq.work <- struct{}{} - go mq.runQueue(ctx) + } func (mq *MessageQueue) Shutdown() { close(mq.done) } + func (mq *MessageQueue) runQueue(ctx context.Context) { for { select { diff --git a/bitswap/messagequeue/messagequeue_test.go b/bitswap/messagequeue/messagequeue_test.go new file mode 100644 index 000000000..f3389fe7e --- /dev/null +++ b/bitswap/messagequeue/messagequeue_test.go @@ -0,0 +1,161 @@ +package messagequeue + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-bitswap/testutil" + + bsmsg "github.com/ipfs/go-bitswap/message" + bsnet "github.com/ipfs/go-bitswap/network" + peer "github.com/libp2p/go-libp2p-peer" +) + +type fakeMessageNetwork struct { + connectError error + messageSenderError error + messageSender bsnet.MessageSender +} + +func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error { + return fmn.connectError +} + +func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) { + if fmn.messageSenderError == nil { + return fmn.messageSender, nil + } else { + return nil, fmn.messageSenderError + } +} + +type fakeMessageSender struct { + sendError error + fullClosed chan<- struct{} + reset chan<- struct{} + messagesSent chan<- bsmsg.BitSwapMessage +} + +func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { + fms.messagesSent <- msg + return fms.sendError +} +func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{}; return nil } +func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil } + +func collectMessages(ctx context.Context, + t *testing.T, + messagesSent <-chan bsmsg.BitSwapMessage, + timeout time.Duration) []bsmsg.BitSwapMessage { + var messagesReceived []bsmsg.BitSwapMessage + timeoutctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + for { + select { + case messageReceived := <-messagesSent: + messagesReceived = append(messagesReceived, messageReceived) + case <-timeoutctx.Done(): + return messagesReceived + } + } +} + +func totalEntriesLength(messages []bsmsg.BitSwapMessage) int { + totalLength := 0 + for _, messages := range messages { + totalLength += len(messages.Wantlist()) + } + return totalLength +} + +func TestStartupAndShutdown(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan bsmsg.BitSwapMessage) + resetChan := make(chan struct{}, 1) + fullClosedChan := make(chan struct{}, 1) + fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent} + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + messageQueue := New(peerID, fakenet) + ses := testutil.GenerateSessionID() + wl := testutil.GenerateWantlist(10, ses) + + messageQueue.Startup(ctx, wl.Entries()) + + messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + if len(messages) != 1 { + t.Fatal("wrong number of messages were sent for initial wants") + } + + firstMessage := messages[0] + if len(firstMessage.Wantlist()) != wl.Len() { + t.Fatal("did not add all wants to want list") + } + for _, entry := range firstMessage.Wantlist() { + if entry.Cancel { + t.Fatal("initial add sent cancel entry when it should not have") + } + } + + messageQueue.Shutdown() + + timeoutctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + select { + case <-fullClosedChan: + case <-resetChan: + t.Fatal("message sender should have been closed but was reset") + case <-timeoutctx.Done(): + t.Fatal("message sender should have been closed but wasn't") + } +} + +func TestSendingMessagesDeduped(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan bsmsg.BitSwapMessage) + resetChan := make(chan struct{}, 1) + fullClosedChan := make(chan struct{}, 1) + fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent} + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + messageQueue := New(peerID, fakenet) + ses1 := testutil.GenerateSessionID() + ses2 := testutil.GenerateSessionID() + entries := testutil.GenerateMessageEntries(10, false) + messageQueue.Startup(ctx, nil) + + messageQueue.AddMessage(entries, ses1) + messageQueue.AddMessage(entries, ses2) + messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + if totalEntriesLength(messages) != len(entries) { + t.Fatal("Messages were not deduped") + } +} + +func TestSendingMessagesPartialDupe(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan bsmsg.BitSwapMessage) + resetChan := make(chan struct{}, 1) + fullClosedChan := make(chan struct{}, 1) + fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent} + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + messageQueue := New(peerID, fakenet) + ses1 := testutil.GenerateSessionID() + ses2 := testutil.GenerateSessionID() + entries := testutil.GenerateMessageEntries(10, false) + moreEntries := testutil.GenerateMessageEntries(5, false) + secondEntries := append(entries[5:], moreEntries...) + messageQueue.Startup(ctx, nil) + + messageQueue.AddMessage(entries, ses1) + messageQueue.AddMessage(secondEntries, ses2) + messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond) + + if totalEntriesLength(messages) != len(entries)+len(moreEntries) { + t.Fatal("messages were not correctly deduped") + } + +} diff --git a/bitswap/peermanager/peermanager_test.go b/bitswap/peermanager/peermanager_test.go index 9b242b55b..9617dad38 100644 --- a/bitswap/peermanager/peermanager_test.go +++ b/bitswap/peermanager/peermanager_test.go @@ -131,13 +131,13 @@ func TestSendingMessagesToPeers(t *testing.T) { peerManager.Connected(peer2, nil) peerManager.Connected(peer3, nil) - entries := testutil.GenerateEntries(5, false) + entries := testutil.GenerateMessageEntries(5, false) ses := testutil.GenerateSessionID() peerManager.SendMessage(entries, nil, ses) peersReceived := collectAndCheckMessages( - ctx, t, messagesSent, entries, ses, 200*time.Millisecond) + ctx, t, messagesSent, entries, ses, 10*time.Millisecond) if len(peersReceived) != 3 { t.Fatal("Incorrect number of peers received messages") } @@ -157,7 +157,7 @@ func TestSendingMessagesToPeers(t *testing.T) { peersToSendTo = append(peersToSendTo, peer1, peer3, peer4) peerManager.SendMessage(entries, peersToSendTo, ses) peersReceived = collectAndCheckMessages( - ctx, t, messagesSent, entries, ses, 200*time.Millisecond) + ctx, t, messagesSent, entries, ses, 10*time.Millisecond) if len(peersReceived) != 2 { t.Fatal("Incorrect number of peers received messages") diff --git a/bitswap/testutil/testutil.go b/bitswap/testutil/testutil.go index 6ac7dcbfb..f768f40dc 100644 --- a/bitswap/testutil/testutil.go +++ b/bitswap/testutil/testutil.go @@ -21,8 +21,19 @@ func GenerateCids(n int) []cid.Cid { return cids } -// GenerateEntries makes fake bitswap message entries -func GenerateEntries(n int, isCancel bool) []*bsmsg.Entry { +// GenerateWantlist makes a populated wantlist +func GenerateWantlist(n int, ses uint64) *wantlist.ThreadSafe { + wl := wantlist.NewThreadSafe() + for i := 0; i < n; i++ { + prioritySeq++ + entry := wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq) + wl.AddEntry(entry, ses) + } + return wl +} + +// GenerateMessageEntries makes fake bitswap message entries +func GenerateMessageEntries(n int, isCancel bool) []*bsmsg.Entry { bsmsgs := make([]*bsmsg.Entry, 0, n) for i := 0; i < n; i++ { prioritySeq++ From 7c8803032e6bb8d365e80821fe3d8daa54903034 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 27 Nov 2018 20:27:02 -0800 Subject: [PATCH 5/5] refactor(messagequeue): cleanup and comment This commit was moved from ipfs/go-bitswap@ac45ed058d5fc515ef53cf3803f1506df31b27db --- bitswap/messagequeue/messagequeue.go | 198 ++++++++++++++++----------- 1 file changed, 116 insertions(+), 82 deletions(-) diff --git a/bitswap/messagequeue/messagequeue.go b/bitswap/messagequeue/messagequeue.go index d8421a15a..bed0cd559 100644 --- a/bitswap/messagequeue/messagequeue.go +++ b/bitswap/messagequeue/messagequeue.go @@ -14,11 +14,14 @@ import ( var log = logging.Logger("bitswap") +// MessageNetwork is any network that can connect peers and generate a message +// sender type MessageNetwork interface { ConnectTo(context.Context, peer.ID) error NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) } +// MessageQueue implements queuee of want messages to send to peers type MessageQueue struct { p peer.ID @@ -35,6 +38,7 @@ type MessageQueue struct { done chan struct{} } +// New creats a new MessageQueues func New(p peer.ID, network MessageNetwork) *MessageQueue { return &MessageQueue{ done: make(chan struct{}), @@ -46,52 +50,31 @@ func New(p peer.ID, network MessageNetwork) *MessageQueue { } } +// RefIncrement increments the refcount for a message queue func (mq *MessageQueue) RefIncrement() { mq.refcnt++ } +// RefDecrement decrements the refcount for a message queue and returns true +// if the refcount is now 0 func (mq *MessageQueue) RefDecrement() bool { mq.refcnt-- return mq.refcnt > 0 } +// AddMessage adds new entries to an outgoing message for a given session func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) { - var work bool - mq.outlk.Lock() - defer func() { - mq.outlk.Unlock() - if !work { - return - } - select { - case mq.work <- struct{}{}: - default: - } - }() - - // if we have no message held allocate a new one - if mq.out == nil { - mq.out = bsmsg.New(false) + if !mq.addEntries(entries, ses) { + return } - - // TODO: add a msg.Combine(...) method - // otherwise, combine the one we are holding with the - // one passed in - for _, e := range entries { - if e.Cancel { - if mq.wl.Remove(e.Cid, ses) { - work = true - mq.out.Cancel(e.Cid) - } - } else { - if mq.wl.Add(e.Cid, e.Priority, ses) { - work = true - mq.out.AddEntry(e.Cid, e.Priority) - } - } + select { + case mq.work <- struct{}{}: + default: } } +// Startup starts the processing of messages, and creates an initial message +// based on the given initial wantlist func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) { // new peer, we will want to give them our full wantlist @@ -110,6 +93,7 @@ func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist. } +// Shutdown stops the processing of messages for a message queue func (mq *MessageQueue) Shutdown() { close(mq.done) } @@ -133,84 +117,134 @@ func (mq *MessageQueue) runQueue(ctx context.Context) { } } -func (mq *MessageQueue) doWork(ctx context.Context) { - // grab outgoing message +func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) bool { + var work bool mq.outlk.Lock() - wlm := mq.out + defer mq.outlk.Unlock() + // if we have no message held allocate a new one + if mq.out == nil { + mq.out = bsmsg.New(false) + } + + // TODO: add a msg.Combine(...) method + // otherwise, combine the one we are holding with the + // one passed in + for _, e := range entries { + if e.Cancel { + if mq.wl.Remove(e.Cid, ses) { + work = true + mq.out.Cancel(e.Cid) + } + } else { + if mq.wl.Add(e.Cid, e.Priority, ses) { + work = true + mq.out.AddEntry(e.Cid, e.Priority) + } + } + } + + return work +} + +func (mq *MessageQueue) doWork(ctx context.Context) { + + wlm := mq.extractOutgoingMessage() if wlm == nil || wlm.Empty() { - mq.outlk.Unlock() return } - mq.out = nil - mq.outlk.Unlock() // NB: only open a stream if we actually have data to send - if mq.sender == nil { - err := mq.openSender(ctx) - if err != nil { - log.Infof("cant open message sender to peer %s: %s", mq.p, err) - // TODO: cant connect, what now? - return - } + err := mq.initializeSender(ctx) + if err != nil { + log.Infof("cant open message sender to peer %s: %s", mq.p, err) + // TODO: cant connect, what now? + return } // send wantlist updates for { // try to send this message until we fail. - err := mq.sender.SendMsg(ctx, wlm) - if err == nil { + if mq.attemptSendAndRecovery(ctx, wlm) { return } + } +} - log.Infof("bitswap send error: %s", err) - mq.sender.Reset() - mq.sender = nil +func (mq *MessageQueue) initializeSender(ctx context.Context) error { + if mq.sender != nil { + return nil + } + nsender, err := openSender(ctx, mq.network, mq.p) + if err != nil { + return err + } + mq.sender = nsender + return nil +} - select { - case <-mq.done: - return - case <-ctx.Done(): - return - case <-time.After(time.Millisecond * 100): - // wait 100ms in case disconnect notifications are still propogating - log.Warning("SendMsg errored but neither 'done' nor context.Done() were set") - } +func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.BitSwapMessage) bool { + err := mq.sender.SendMsg(ctx, wlm) + if err == nil { + return true + } - err = mq.openSender(ctx) - if err != nil { - log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) - // TODO(why): what do we do now? - // I think the *right* answer is to probably put the message we're - // trying to send back, and then return to waiting for new work or - // a disconnect. - return - } + log.Infof("bitswap send error: %s", err) + mq.sender.Reset() + mq.sender = nil + + select { + case <-mq.done: + return true + case <-ctx.Done(): + return true + case <-time.After(time.Millisecond * 100): + // wait 100ms in case disconnect notifications are still propogating + log.Warning("SendMsg errored but neither 'done' nor context.Done() were set") + } - // TODO: Is this the same instance for the remote peer? - // If its not, we should resend our entire wantlist to them - /* - if mq.sender.InstanceID() != mq.lastSeenInstanceID { - wlm = mq.getFullWantlistMessage() - } - */ + err = mq.initializeSender(ctx) + if err != nil { + log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) + // TODO(why): what do we do now? + // I think the *right* answer is to probably put the message we're + // trying to send back, and then return to waiting for new work or + // a disconnect. + return true } + + // TODO: Is this the same instance for the remote peer? + // If its not, we should resend our entire wantlist to them + /* + if mq.sender.InstanceID() != mq.lastSeenInstanceID { + wlm = mq.getFullWantlistMessage() + } + */ + return false +} + +func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage { + // grab outgoing message + mq.outlk.Lock() + wlm := mq.out + mq.out = nil + mq.outlk.Unlock() + return wlm } -func (mq *MessageQueue) openSender(ctx context.Context) error { +func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) { // allow ten minutes for connections this includes looking them up in the // dht dialing them, and handshaking conctx, cancel := context.WithTimeout(ctx, time.Minute*10) defer cancel() - err := mq.network.ConnectTo(conctx, mq.p) + err := network.ConnectTo(conctx, p) if err != nil { - return err + return nil, err } - nsender, err := mq.network.NewMessageSender(ctx, mq.p) + nsender, err := network.NewMessageSender(ctx, p) if err != nil { - return err + return nil, err } - mq.sender = nsender - return nil + return nsender, nil }