diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 0e8fbf4e9..b3e472d2d 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -11,8 +11,10 @@ import ( decision "github.com/ipfs/go-bitswap/decision" bsmsg "github.com/ipfs/go-bitswap/message" + bsmq "github.com/ipfs/go-bitswap/messagequeue" bsnet "github.com/ipfs/go-bitswap/network" notifications "github.com/ipfs/go-bitswap/notifications" + bspm "github.com/ipfs/go-bitswap/peermanager" bssm "github.com/ipfs/go-bitswap/sessionmanager" bswm "github.com/ipfs/go-bitswap/wantmanager" @@ -85,12 +87,19 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+ " data blocks recived").Histogram(metricsBuckets) + sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+ + " this bitswap").Histogram(metricsBuckets) + notif := notifications.New() px := process.WithTeardown(func() error { notif.Shutdown() return nil }) + peerQueueFactory := func(p peer.ID) bspm.PeerQueue { + return bsmq.New(p, network) + } + bs := &Bitswap{ blockstore: bstore, notifications: notif, @@ -100,14 +109,18 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan cid.Cid, HasBlockBufferSize), provideKeys: make(chan cid.Cid, provideKeysBufferSize), - wm: bswm.New(ctx, network), + wm: bswm.New(ctx), + pm: bspm.New(ctx, peerQueueFactory), sm: bssm.New(), counters: new(counters), - - dupMetric: dupHist, - allMetric: allHist, + dupMetric: dupHist, + allMetric: allHist, + sentHistogram: sentHistogram, } - go bs.wm.Run() + + bs.wm.SetDelegate(bs.pm) + bs.pm.Startup() + bs.wm.Startup() network.SetDelegate(bs) // Start up bitswaps async worker routines @@ -128,6 +141,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, type Bitswap struct { // the peermanager manages sending messages to peers in a way that // wont block bitswap operation + pm *bspm.PeerManager + + // the wantlist tracks global wants for bitswap wm *bswm.WantManager // the engine is the bit of logic that decides who to send which blocks to @@ -160,8 +176,9 @@ type Bitswap struct { counters *counters // Metrics interface metrics - dupMetric metrics.Histogram - allMetric metrics.Histogram + dupMetric metrics.Histogram + allMetric metrics.Histogram + sentHistogram metrics.Histogram // the sessionmanager manages tracking sessions sm *bssm.SessionManager @@ -427,13 +444,14 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { - bs.wm.Connected(p) + initialWants := bs.wm.CurrentBroadcastWants() + bs.pm.Connected(p, initialWants) bs.engine.PeerConnected(p) } // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerDisconnected(p peer.ID) { - bs.wm.Disconnected(p) + bs.pm.Disconnected(p) bs.engine.PeerDisconnected(p) } diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index d55fd0733..ef2d73b8d 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -202,10 +202,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { nump := len(instances) - 1 // assert we're properly connected for _, inst := range instances { - peers := inst.Exchange.wm.ConnectedPeers() + peers := inst.Exchange.pm.ConnectedPeers() for i := 0; i < 10 && len(peers) != nump; i++ { time.Sleep(time.Millisecond * 50) - peers = inst.Exchange.wm.ConnectedPeers() + peers = inst.Exchange.pm.ConnectedPeers() } if len(peers) != nump { t.Fatal("not enough peers connected to instance") diff --git a/bitswap/messagequeue/messagequeue.go b/bitswap/messagequeue/messagequeue.go index f36117d65..bed0cd559 100644 --- a/bitswap/messagequeue/messagequeue.go +++ b/bitswap/messagequeue/messagequeue.go @@ -14,12 +14,20 @@ import ( var log = logging.Logger("bitswap") +// MessageNetwork is any network that can connect peers and generate a message +// sender +type MessageNetwork interface { + ConnectTo(context.Context, peer.ID) error + NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) +} + +// MessageQueue implements queuee of want messages to send to peers type MessageQueue struct { p peer.ID outlk sync.Mutex out bsmsg.BitSwapMessage - network bsnet.BitSwapNetwork + network MessageNetwork wl *wantlist.ThreadSafe sender bsnet.MessageSender @@ -30,7 +38,8 @@ type MessageQueue struct { done chan struct{} } -func New(p peer.ID, network bsnet.BitSwapNetwork) *MessageQueue { +// New creats a new MessageQueues +func New(p peer.ID, network MessageNetwork) *MessageQueue { return &MessageQueue{ done: make(chan struct{}), work: make(chan struct{}, 1), @@ -41,71 +50,54 @@ func New(p peer.ID, network bsnet.BitSwapNetwork) *MessageQueue { } } +// RefIncrement increments the refcount for a message queue func (mq *MessageQueue) RefIncrement() { mq.refcnt++ } +// RefDecrement decrements the refcount for a message queue and returns true +// if the refcount is now 0 func (mq *MessageQueue) RefDecrement() bool { mq.refcnt-- return mq.refcnt > 0 } +// AddMessage adds new entries to an outgoing message for a given session func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) { - var work bool - mq.outlk.Lock() - defer func() { - mq.outlk.Unlock() - if !work { - return - } - select { - case mq.work <- struct{}{}: - default: - } - }() - - // if we have no message held allocate a new one - if mq.out == nil { - mq.out = bsmsg.New(false) + if !mq.addEntries(entries, ses) { + return } - - // TODO: add a msg.Combine(...) method - // otherwise, combine the one we are holding with the - // one passed in - for _, e := range entries { - if e.Cancel { - if mq.wl.Remove(e.Cid, ses) { - work = true - mq.out.Cancel(e.Cid) - } - } else { - if mq.wl.Add(e.Cid, e.Priority, ses) { - work = true - mq.out.AddEntry(e.Cid, e.Priority) - } - } + select { + case mq.work <- struct{}{}: + default: } } +// Startup starts the processing of messages, and creates an initial message +// based on the given initial wantlist func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) { // new peer, we will want to give them our full wantlist - fullwantlist := bsmsg.New(true) - for _, e := range initialEntries { - for k := range e.SesTrk { - mq.wl.AddEntry(e, k) + if len(initialEntries) > 0 { + fullwantlist := bsmsg.New(true) + for _, e := range initialEntries { + for k := range e.SesTrk { + mq.wl.AddEntry(e, k) + } + fullwantlist.AddEntry(e.Cid, e.Priority) } - fullwantlist.AddEntry(e.Cid, e.Priority) + mq.out = fullwantlist + mq.work <- struct{}{} } - mq.out = fullwantlist - mq.work <- struct{}{} - go mq.runQueue(ctx) + } +// Shutdown stops the processing of messages for a message queue func (mq *MessageQueue) Shutdown() { close(mq.done) } + func (mq *MessageQueue) runQueue(ctx context.Context) { for { select { @@ -125,84 +117,134 @@ func (mq *MessageQueue) runQueue(ctx context.Context) { } } -func (mq *MessageQueue) doWork(ctx context.Context) { - // grab outgoing message +func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) bool { + var work bool mq.outlk.Lock() - wlm := mq.out + defer mq.outlk.Unlock() + // if we have no message held allocate a new one + if mq.out == nil { + mq.out = bsmsg.New(false) + } + + // TODO: add a msg.Combine(...) method + // otherwise, combine the one we are holding with the + // one passed in + for _, e := range entries { + if e.Cancel { + if mq.wl.Remove(e.Cid, ses) { + work = true + mq.out.Cancel(e.Cid) + } + } else { + if mq.wl.Add(e.Cid, e.Priority, ses) { + work = true + mq.out.AddEntry(e.Cid, e.Priority) + } + } + } + + return work +} + +func (mq *MessageQueue) doWork(ctx context.Context) { + + wlm := mq.extractOutgoingMessage() if wlm == nil || wlm.Empty() { - mq.outlk.Unlock() return } - mq.out = nil - mq.outlk.Unlock() // NB: only open a stream if we actually have data to send - if mq.sender == nil { - err := mq.openSender(ctx) - if err != nil { - log.Infof("cant open message sender to peer %s: %s", mq.p, err) - // TODO: cant connect, what now? - return - } + err := mq.initializeSender(ctx) + if err != nil { + log.Infof("cant open message sender to peer %s: %s", mq.p, err) + // TODO: cant connect, what now? + return } // send wantlist updates for { // try to send this message until we fail. - err := mq.sender.SendMsg(ctx, wlm) - if err == nil { + if mq.attemptSendAndRecovery(ctx, wlm) { return } + } +} - log.Infof("bitswap send error: %s", err) - mq.sender.Reset() - mq.sender = nil +func (mq *MessageQueue) initializeSender(ctx context.Context) error { + if mq.sender != nil { + return nil + } + nsender, err := openSender(ctx, mq.network, mq.p) + if err != nil { + return err + } + mq.sender = nsender + return nil +} - select { - case <-mq.done: - return - case <-ctx.Done(): - return - case <-time.After(time.Millisecond * 100): - // wait 100ms in case disconnect notifications are still propogating - log.Warning("SendMsg errored but neither 'done' nor context.Done() were set") - } +func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.BitSwapMessage) bool { + err := mq.sender.SendMsg(ctx, wlm) + if err == nil { + return true + } - err = mq.openSender(ctx) - if err != nil { - log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) - // TODO(why): what do we do now? - // I think the *right* answer is to probably put the message we're - // trying to send back, and then return to waiting for new work or - // a disconnect. - return - } + log.Infof("bitswap send error: %s", err) + mq.sender.Reset() + mq.sender = nil + + select { + case <-mq.done: + return true + case <-ctx.Done(): + return true + case <-time.After(time.Millisecond * 100): + // wait 100ms in case disconnect notifications are still propogating + log.Warning("SendMsg errored but neither 'done' nor context.Done() were set") + } - // TODO: Is this the same instance for the remote peer? - // If its not, we should resend our entire wantlist to them - /* - if mq.sender.InstanceID() != mq.lastSeenInstanceID { - wlm = mq.getFullWantlistMessage() - } - */ + err = mq.initializeSender(ctx) + if err != nil { + log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) + // TODO(why): what do we do now? + // I think the *right* answer is to probably put the message we're + // trying to send back, and then return to waiting for new work or + // a disconnect. + return true } + + // TODO: Is this the same instance for the remote peer? + // If its not, we should resend our entire wantlist to them + /* + if mq.sender.InstanceID() != mq.lastSeenInstanceID { + wlm = mq.getFullWantlistMessage() + } + */ + return false +} + +func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage { + // grab outgoing message + mq.outlk.Lock() + wlm := mq.out + mq.out = nil + mq.outlk.Unlock() + return wlm } -func (mq *MessageQueue) openSender(ctx context.Context) error { +func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) { // allow ten minutes for connections this includes looking them up in the // dht dialing them, and handshaking conctx, cancel := context.WithTimeout(ctx, time.Minute*10) defer cancel() - err := mq.network.ConnectTo(conctx, mq.p) + err := network.ConnectTo(conctx, p) if err != nil { - return err + return nil, err } - nsender, err := mq.network.NewMessageSender(ctx, mq.p) + nsender, err := network.NewMessageSender(ctx, p) if err != nil { - return err + return nil, err } - mq.sender = nsender - return nil + return nsender, nil } diff --git a/bitswap/messagequeue/messagequeue_test.go b/bitswap/messagequeue/messagequeue_test.go new file mode 100644 index 000000000..f3389fe7e --- /dev/null +++ b/bitswap/messagequeue/messagequeue_test.go @@ -0,0 +1,161 @@ +package messagequeue + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-bitswap/testutil" + + bsmsg "github.com/ipfs/go-bitswap/message" + bsnet "github.com/ipfs/go-bitswap/network" + peer "github.com/libp2p/go-libp2p-peer" +) + +type fakeMessageNetwork struct { + connectError error + messageSenderError error + messageSender bsnet.MessageSender +} + +func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error { + return fmn.connectError +} + +func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) { + if fmn.messageSenderError == nil { + return fmn.messageSender, nil + } else { + return nil, fmn.messageSenderError + } +} + +type fakeMessageSender struct { + sendError error + fullClosed chan<- struct{} + reset chan<- struct{} + messagesSent chan<- bsmsg.BitSwapMessage +} + +func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { + fms.messagesSent <- msg + return fms.sendError +} +func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{}; return nil } +func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil } + +func collectMessages(ctx context.Context, + t *testing.T, + messagesSent <-chan bsmsg.BitSwapMessage, + timeout time.Duration) []bsmsg.BitSwapMessage { + var messagesReceived []bsmsg.BitSwapMessage + timeoutctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + for { + select { + case messageReceived := <-messagesSent: + messagesReceived = append(messagesReceived, messageReceived) + case <-timeoutctx.Done(): + return messagesReceived + } + } +} + +func totalEntriesLength(messages []bsmsg.BitSwapMessage) int { + totalLength := 0 + for _, messages := range messages { + totalLength += len(messages.Wantlist()) + } + return totalLength +} + +func TestStartupAndShutdown(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan bsmsg.BitSwapMessage) + resetChan := make(chan struct{}, 1) + fullClosedChan := make(chan struct{}, 1) + fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent} + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + messageQueue := New(peerID, fakenet) + ses := testutil.GenerateSessionID() + wl := testutil.GenerateWantlist(10, ses) + + messageQueue.Startup(ctx, wl.Entries()) + + messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + if len(messages) != 1 { + t.Fatal("wrong number of messages were sent for initial wants") + } + + firstMessage := messages[0] + if len(firstMessage.Wantlist()) != wl.Len() { + t.Fatal("did not add all wants to want list") + } + for _, entry := range firstMessage.Wantlist() { + if entry.Cancel { + t.Fatal("initial add sent cancel entry when it should not have") + } + } + + messageQueue.Shutdown() + + timeoutctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + select { + case <-fullClosedChan: + case <-resetChan: + t.Fatal("message sender should have been closed but was reset") + case <-timeoutctx.Done(): + t.Fatal("message sender should have been closed but wasn't") + } +} + +func TestSendingMessagesDeduped(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan bsmsg.BitSwapMessage) + resetChan := make(chan struct{}, 1) + fullClosedChan := make(chan struct{}, 1) + fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent} + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + messageQueue := New(peerID, fakenet) + ses1 := testutil.GenerateSessionID() + ses2 := testutil.GenerateSessionID() + entries := testutil.GenerateMessageEntries(10, false) + messageQueue.Startup(ctx, nil) + + messageQueue.AddMessage(entries, ses1) + messageQueue.AddMessage(entries, ses2) + messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + if totalEntriesLength(messages) != len(entries) { + t.Fatal("Messages were not deduped") + } +} + +func TestSendingMessagesPartialDupe(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan bsmsg.BitSwapMessage) + resetChan := make(chan struct{}, 1) + fullClosedChan := make(chan struct{}, 1) + fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent} + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + messageQueue := New(peerID, fakenet) + ses1 := testutil.GenerateSessionID() + ses2 := testutil.GenerateSessionID() + entries := testutil.GenerateMessageEntries(10, false) + moreEntries := testutil.GenerateMessageEntries(5, false) + secondEntries := append(entries[5:], moreEntries...) + messageQueue.Startup(ctx, nil) + + messageQueue.AddMessage(entries, ses1) + messageQueue.AddMessage(secondEntries, ses2) + messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond) + + if totalEntriesLength(messages) != len(entries)+len(moreEntries) { + t.Fatal("messages were not correctly deduped") + } + +} diff --git a/bitswap/peermanager/peermanager.go b/bitswap/peermanager/peermanager.go new file mode 100644 index 000000000..379fd4bd2 --- /dev/null +++ b/bitswap/peermanager/peermanager.go @@ -0,0 +1,201 @@ +package peermanager + +import ( + "context" + + bsmsg "github.com/ipfs/go-bitswap/message" + wantlist "github.com/ipfs/go-bitswap/wantlist" + logging "github.com/ipfs/go-log" + + peer "github.com/libp2p/go-libp2p-peer" +) + +var log = logging.Logger("bitswap") + +var ( + metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} +) + +// PeerQueue provides a queer of messages to be sent for a single peer +type PeerQueue interface { + RefIncrement() + RefDecrement() bool + AddMessage(entries []*bsmsg.Entry, ses uint64) + Startup(ctx context.Context, initialEntries []*wantlist.Entry) + Shutdown() +} + +// PeerQueueFactory provides a function that will create a PeerQueue +type PeerQueueFactory func(p peer.ID) PeerQueue + +type peerMessage interface { + handle(pm *PeerManager) +} + +// PeerManager manages a pool of peers and sends messages to peers in the pool +type PeerManager struct { + // sync channel for Run loop + peerMessages chan peerMessage + + // synchronized by Run loop, only touch inside there + peerQueues map[peer.ID]PeerQueue + + createPeerQueue PeerQueueFactory + ctx context.Context + cancel func() +} + +// New creates a new PeerManager, given a context and a peerQueueFactory +func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { + ctx, cancel := context.WithCancel(ctx) + return &PeerManager{ + peerMessages: make(chan peerMessage, 10), + peerQueues: make(map[peer.ID]PeerQueue), + createPeerQueue: createPeerQueue, + ctx: ctx, + cancel: cancel, + } +} + +// ConnectedPeers returns a list of peers this PeerManager is managing +func (pm *PeerManager) ConnectedPeers() []peer.ID { + resp := make(chan []peer.ID) + pm.peerMessages <- &getPeersMessage{resp} + return <-resp +} + +// Connected is called to add a new peer to the pool, and send it an initial set +// of wants +func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) { + select { + case pm.peerMessages <- &connectPeerMessage{p, initialEntries}: + case <-pm.ctx.Done(): + } +} + +// Disconnected is called to remove a peer from the pool +func (pm *PeerManager) Disconnected(p peer.ID) { + select { + case pm.peerMessages <- &disconnectPeerMessage{p}: + case <-pm.ctx.Done(): + } +} + +// SendMessage is called to send a message to all or some peers in the pool +// if targets is nil, it sends to all +func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { + select { + case pm.peerMessages <- &sendPeerMessage{entries: entries, targets: targets, from: from}: + case <-pm.ctx.Done(): + } +} + +// Startup enables the run loop for the PeerManager - no processing will occur +// if startup is not called +func (pm *PeerManager) Startup() { + go pm.run() +} + +// Shutdown shutsdown processing for the PeerManager +func (pm *PeerManager) Shutdown() { + pm.cancel() +} + +func (pm *PeerManager) run() { + for { + select { + case message := <-pm.peerMessages: + message.handle(pm) + case <-pm.ctx.Done(): + return + } + } +} + +type sendPeerMessage struct { + entries []*bsmsg.Entry + targets []peer.ID + from uint64 +} + +func (s *sendPeerMessage) handle(pm *PeerManager) { + pm.sendMessage(s) +} + +type connectPeerMessage struct { + p peer.ID + initialEntries []*wantlist.Entry +} + +func (c *connectPeerMessage) handle(pm *PeerManager) { + pm.startPeerHandler(c.p, c.initialEntries) +} + +type disconnectPeerMessage struct { + p peer.ID +} + +func (dc *disconnectPeerMessage) handle(pm *PeerManager) { + pm.stopPeerHandler(dc.p) +} + +type getPeersMessage struct { + peerResp chan<- []peer.ID +} + +func (gp *getPeersMessage) handle(pm *PeerManager) { + pm.getPeers(gp.peerResp) +} + +func (pm *PeerManager) getPeers(peerResp chan<- []peer.ID) { + peers := make([]peer.ID, 0, len(pm.peerQueues)) + for p := range pm.peerQueues { + peers = append(peers, p) + } + peerResp <- peers +} + +func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue { + mq, ok := pm.peerQueues[p] + if ok { + mq.RefIncrement() + return nil + } + + mq = pm.createPeerQueue(p) + pm.peerQueues[p] = mq + mq.Startup(pm.ctx, initialEntries) + return mq +} + +func (pm *PeerManager) stopPeerHandler(p peer.ID) { + pq, ok := pm.peerQueues[p] + if !ok { + // TODO: log error? + return + } + + if pq.RefDecrement() { + return + } + + pq.Shutdown() + delete(pm.peerQueues, p) +} + +func (pm *PeerManager) sendMessage(ms *sendPeerMessage) { + if len(ms.targets) == 0 { + for _, p := range pm.peerQueues { + p.AddMessage(ms.entries, ms.from) + } + } else { + for _, t := range ms.targets { + p, ok := pm.peerQueues[t] + if !ok { + log.Infof("tried sending wantlist change to non-partner peer: %s", t) + continue + } + p.AddMessage(ms.entries, ms.from) + } + } +} diff --git a/bitswap/peermanager/peermanager_test.go b/bitswap/peermanager/peermanager_test.go new file mode 100644 index 000000000..9617dad38 --- /dev/null +++ b/bitswap/peermanager/peermanager_test.go @@ -0,0 +1,179 @@ +package peermanager + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/ipfs/go-bitswap/testutil" + + bsmsg "github.com/ipfs/go-bitswap/message" + wantlist "github.com/ipfs/go-bitswap/wantlist" + "github.com/libp2p/go-libp2p-peer" +) + +type messageSent struct { + p peer.ID + entries []*bsmsg.Entry + ses uint64 +} + +type fakePeer struct { + refcnt int + p peer.ID + messagesSent chan messageSent +} + +func (fp *fakePeer) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {} +func (fp *fakePeer) Shutdown() {} +func (fp *fakePeer) RefIncrement() { fp.refcnt++ } +func (fp *fakePeer) RefDecrement() bool { + fp.refcnt-- + return fp.refcnt > 0 +} +func (fp *fakePeer) AddMessage(entries []*bsmsg.Entry, ses uint64) { + fp.messagesSent <- messageSent{fp.p, entries, ses} +} + +func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory { + return func(p peer.ID) PeerQueue { + return &fakePeer{ + p: p, + refcnt: 1, + messagesSent: messagesSent, + } + } +} + +func collectAndCheckMessages( + ctx context.Context, + t *testing.T, + messagesSent <-chan messageSent, + entries []*bsmsg.Entry, + ses uint64, + timeout time.Duration) []peer.ID { + var peersReceived []peer.ID + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + for { + select { + case nextMessage := <-messagesSent: + if nextMessage.ses != ses { + t.Fatal("Message enqueued with wrong session") + } + if !reflect.DeepEqual(nextMessage.entries, entries) { + t.Fatal("Message enqueued with wrong wants") + } + peersReceived = append(peersReceived, nextMessage.p) + case <-timeoutCtx.Done(): + return peersReceived + } + } +} + +func TestAddingAndRemovingPeers(t *testing.T) { + ctx := context.Background() + peerQueueFactory := makePeerQueueFactory(nil) + + tp := testutil.GeneratePeers(5) + peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] + peerManager := New(ctx, peerQueueFactory) + peerManager.Startup() + + peerManager.Connected(peer1, nil) + peerManager.Connected(peer2, nil) + peerManager.Connected(peer3, nil) + + connectedPeers := peerManager.ConnectedPeers() + + if !testutil.ContainsPeer(connectedPeers, peer1) || + !testutil.ContainsPeer(connectedPeers, peer2) || + !testutil.ContainsPeer(connectedPeers, peer3) { + t.Fatal("Peers not connected that should be connected") + } + + if testutil.ContainsPeer(connectedPeers, peer4) || + testutil.ContainsPeer(connectedPeers, peer5) { + t.Fatal("Peers connected that shouldn't be connected") + } + + // removing a peer with only one reference + peerManager.Disconnected(peer1) + connectedPeers = peerManager.ConnectedPeers() + + if testutil.ContainsPeer(connectedPeers, peer1) { + t.Fatal("Peer should have been disconnected but was not") + } + + // connecting a peer twice, then disconnecting once, should stay in queue + peerManager.Connected(peer2, nil) + peerManager.Disconnected(peer2) + connectedPeers = peerManager.ConnectedPeers() + + if !testutil.ContainsPeer(connectedPeers, peer2) { + t.Fatal("Peer was disconnected but should not have been") + } +} + +func TestSendingMessagesToPeers(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan messageSent) + peerQueueFactory := makePeerQueueFactory(messagesSent) + + tp := testutil.GeneratePeers(5) + + peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] + peerManager := New(ctx, peerQueueFactory) + peerManager.Startup() + + peerManager.Connected(peer1, nil) + peerManager.Connected(peer2, nil) + peerManager.Connected(peer3, nil) + + entries := testutil.GenerateMessageEntries(5, false) + ses := testutil.GenerateSessionID() + + peerManager.SendMessage(entries, nil, ses) + + peersReceived := collectAndCheckMessages( + ctx, t, messagesSent, entries, ses, 10*time.Millisecond) + if len(peersReceived) != 3 { + t.Fatal("Incorrect number of peers received messages") + } + + if !testutil.ContainsPeer(peersReceived, peer1) || + !testutil.ContainsPeer(peersReceived, peer2) || + !testutil.ContainsPeer(peersReceived, peer3) { + t.Fatal("Peers should have received message but did not") + } + + if testutil.ContainsPeer(peersReceived, peer4) || + testutil.ContainsPeer(peersReceived, peer5) { + t.Fatal("Peers received message but should not have") + } + + var peersToSendTo []peer.ID + peersToSendTo = append(peersToSendTo, peer1, peer3, peer4) + peerManager.SendMessage(entries, peersToSendTo, ses) + peersReceived = collectAndCheckMessages( + ctx, t, messagesSent, entries, ses, 10*time.Millisecond) + + if len(peersReceived) != 2 { + t.Fatal("Incorrect number of peers received messages") + } + + if !testutil.ContainsPeer(peersReceived, peer1) || + !testutil.ContainsPeer(peersReceived, peer3) { + t.Fatal("Peers should have received message but did not") + } + + if testutil.ContainsPeer(peersReceived, peer2) || + testutil.ContainsPeer(peersReceived, peer5) { + t.Fatal("Peers received message but should not have") + } + + if testutil.ContainsPeer(peersReceived, peer4) { + t.Fatal("Peers targeted received message but was not connected") + } +} diff --git a/bitswap/testutil/testutil.go b/bitswap/testutil/testutil.go new file mode 100644 index 000000000..f768f40dc --- /dev/null +++ b/bitswap/testutil/testutil.go @@ -0,0 +1,78 @@ +package testutil + +import ( + bsmsg "github.com/ipfs/go-bitswap/message" + "github.com/ipfs/go-bitswap/wantlist" + cid "github.com/ipfs/go-cid" + blocksutil "github.com/ipfs/go-ipfs-blocksutil" + peer "github.com/libp2p/go-libp2p-peer" +) + +var blockGenerator = blocksutil.NewBlockGenerator() +var prioritySeq int + +// GenerateCids produces n content identifiers +func GenerateCids(n int) []cid.Cid { + cids := make([]cid.Cid, 0, n) + for i := 0; i < n; i++ { + c := blockGenerator.Next().Cid() + cids = append(cids, c) + } + return cids +} + +// GenerateWantlist makes a populated wantlist +func GenerateWantlist(n int, ses uint64) *wantlist.ThreadSafe { + wl := wantlist.NewThreadSafe() + for i := 0; i < n; i++ { + prioritySeq++ + entry := wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq) + wl.AddEntry(entry, ses) + } + return wl +} + +// GenerateMessageEntries makes fake bitswap message entries +func GenerateMessageEntries(n int, isCancel bool) []*bsmsg.Entry { + bsmsgs := make([]*bsmsg.Entry, 0, n) + for i := 0; i < n; i++ { + prioritySeq++ + msg := &bsmsg.Entry{ + Entry: wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq), + Cancel: isCancel, + } + bsmsgs = append(bsmsgs, msg) + } + return bsmsgs +} + +var peerSeq int + +// GeneratePeers creates n peer ids +func GeneratePeers(n int) []peer.ID { + peerIds := make([]peer.ID, 0, n) + for i := 0; i < n; i++ { + peerSeq++ + p := peer.ID(peerSeq) + peerIds = append(peerIds, p) + } + return peerIds +} + +var nextSession uint64 + +// GenerateSessionID make a unit session identifier +func GenerateSessionID() uint64 { + nextSession++ + return uint64(nextSession) +} + +// ContainsPeer returns true if a peer is found n a list of peers +func ContainsPeer(peers []peer.ID, p peer.ID) bool { + for _, n := range peers { + if p == n { + return true + } + } + return false +} diff --git a/bitswap/wantmanager/wantmanager.go b/bitswap/wantmanager/wantmanager.go index e3734290c..3dcff166b 100644 --- a/bitswap/wantmanager/wantmanager.go +++ b/bitswap/wantmanager/wantmanager.go @@ -4,10 +4,7 @@ import ( "context" "math" - engine "github.com/ipfs/go-bitswap/decision" bsmsg "github.com/ipfs/go-bitswap/message" - bsmq "github.com/ipfs/go-bitswap/messagequeue" - bsnet "github.com/ipfs/go-bitswap/network" wantlist "github.com/ipfs/go-bitswap/wantlist" logging "github.com/ipfs/go-log" @@ -19,59 +16,60 @@ import ( var log = logging.Logger("bitswap") const ( - // kMaxPriority is the max priority as defined by the bitswap protocol - kMaxPriority = math.MaxInt32 + // maxPriority is the max priority as defined by the bitswap protocol + maxPriority = math.MaxInt32 ) -var ( - metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} -) +// WantSender sends changes out to the network as they get added to the wantlist +// managed by the WantManager +type WantSender interface { + SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) +} +type wantMessage interface { + handle(wm *WantManager) +} + +// WantManager manages a global want list. It tracks two seperate want lists - +// one for all wants, and one for wants that are specifically broadcast to the +// internet type WantManager struct { - // sync channels for Run loop - incoming chan *wantSet - connectEvent chan peerStatus // notification channel for peers connecting/disconnecting - peerReqs chan chan []peer.ID // channel to request connected peers on + // channel requests to the run loop + // to get predictable behavior while running this in a go routine + // having only one channel is neccesary, so requests are processed serially + wantMessages chan wantMessage // synchronized by Run loop, only touch inside there - peers map[peer.ID]*bsmq.MessageQueue - wl *wantlist.ThreadSafe - bcwl *wantlist.ThreadSafe + wl *wantlist.ThreadSafe + bcwl *wantlist.ThreadSafe - network bsnet.BitSwapNetwork - ctx context.Context - cancel func() + ctx context.Context + cancel func() + wantSender WantSender wantlistGauge metrics.Gauge - sentHistogram metrics.Histogram -} - -type peerStatus struct { - connect bool - peer peer.ID } -func New(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { +// New initializes a new WantManager for a given context +func New(ctx context.Context) *WantManager { ctx, cancel := context.WithCancel(ctx) wantlistGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge() - sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+ - " this bitswap").Histogram(metricsBuckets) return &WantManager{ - incoming: make(chan *wantSet, 10), - connectEvent: make(chan peerStatus, 10), - peerReqs: make(chan chan []peer.ID), - peers: make(map[peer.ID]*bsmq.MessageQueue), + wantMessages: make(chan wantMessage, 10), wl: wantlist.NewThreadSafe(), bcwl: wantlist.NewThreadSafe(), - network: network, ctx: ctx, cancel: cancel, wantlistGauge: wantlistGauge, - sentHistogram: sentHistogram, } } +// SetDelegate specifies who will send want changes out to the internet +func (wm *WantManager) SetDelegate(wantSender WantSender) { + wm.wantSender = wantSender +} + // WantBlocks adds the given cids to the wantlist, tracked by the given session func (wm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { log.Infof("want blocks: %s", ks) @@ -83,10 +81,55 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe wm.addEntries(context.Background(), ks, peers, true, ses) } -type wantSet struct { - entries []*bsmsg.Entry - targets []peer.ID - from uint64 +// IsWanted returns whether a CID is currently wanted +func (wm *WantManager) IsWanted(c cid.Cid) bool { + resp := make(chan bool) + wm.wantMessages <- &isWantedMessage{c, resp} + return <-resp +} + +// CurrentWants returns the list of current wants +func (wm *WantManager) CurrentWants() []*wantlist.Entry { + resp := make(chan []*wantlist.Entry) + wm.wantMessages <- ¤tWantsMessage{resp} + return <-resp +} + +// CurrentBroadcastWants returns the current list of wants that are broadcasts +func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry { + resp := make(chan []*wantlist.Entry) + wm.wantMessages <- ¤tBroadcastWantsMessage{resp} + return <-resp +} + +// WantCount returns the total count of wants +func (wm *WantManager) WantCount() int { + resp := make(chan int) + wm.wantMessages <- &wantCountMessage{resp} + return <-resp +} + +// Startup starts processing for the WantManager +func (wm *WantManager) Startup() { + go wm.run() +} + +// Shutdown ends processing for the want manager +func (wm *WantManager) Shutdown() { + wm.cancel() +} + +func (wm *WantManager) run() { + // NOTE: Do not open any streams or connections from anywhere in this + // event loop. Really, just don't do anything likely to block. + for { + select { + case message := <-wm.wantMessages: + message.handle(wm) + case <-wm.ctx.Done(): + return + } + } } func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) { @@ -94,158 +137,80 @@ func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []p for i, k := range ks { entries = append(entries, &bsmsg.Entry{ Cancel: cancel, - Entry: wantlist.NewRefEntry(k, kMaxPriority-i), + Entry: wantlist.NewRefEntry(k, maxPriority-i), }) } select { - case wm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}: + case wm.wantMessages <- &wantSet{entries: entries, targets: targets, from: ses}: case <-wm.ctx.Done(): case <-ctx.Done(): } } -func (wm *WantManager) ConnectedPeers() []peer.ID { - resp := make(chan []peer.ID) - wm.peerReqs <- resp - return <-resp +type wantSet struct { + entries []*bsmsg.Entry + targets []peer.ID + from uint64 } -func (wm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) { - // Blocks need to be sent synchronously to maintain proper backpressure - // throughout the network stack - defer env.Sent() - - msgSize := 0 - msg := bsmsg.New(false) - for _, block := range env.Message.Blocks() { - msgSize += len(block.RawData()) - msg.AddBlock(block) - log.Infof("Sending block %s to %s", block, env.Peer) - } +func (ws *wantSet) handle(wm *WantManager) { + // is this a broadcast or not? + brdc := len(ws.targets) == 0 - wm.sentHistogram.Observe(float64(msgSize)) - err := wm.network.SendMessage(ctx, env.Peer, msg) - if err != nil { - log.Infof("sendblock error: %s", err) - } -} + // add changes to our wantlist + for _, e := range ws.entries { + if e.Cancel { + if brdc { + wm.bcwl.Remove(e.Cid, ws.from) + } -func (wm *WantManager) startPeerHandler(p peer.ID) *bsmq.MessageQueue { - mq, ok := wm.peers[p] - if ok { - mq.RefIncrement() - return nil + if wm.wl.Remove(e.Cid, ws.from) { + wm.wantlistGauge.Dec() + } + } else { + if brdc { + wm.bcwl.AddEntry(e.Entry, ws.from) + } + if wm.wl.AddEntry(e.Entry, ws.from) { + wm.wantlistGauge.Inc() + } + } } - mq = bsmq.New(p, wm.network) - wm.peers[p] = mq - mq.Startup(wm.ctx, wm.bcwl.Entries()) - return mq + // broadcast those wantlist changes + wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from) } -func (wm *WantManager) stopPeerHandler(p peer.ID) { - pq, ok := wm.peers[p] - if !ok { - // TODO: log error? - return - } - - if pq.RefDecrement() { - return - } - - pq.Shutdown() - delete(wm.peers, p) +type isWantedMessage struct { + c cid.Cid + resp chan<- bool } -func (wm *WantManager) Connected(p peer.ID) { - select { - case wm.connectEvent <- peerStatus{peer: p, connect: true}: - case <-wm.ctx.Done(): - } +func (iwm *isWantedMessage) handle(wm *WantManager) { + _, isWanted := wm.wl.Contains(iwm.c) + iwm.resp <- isWanted } -func (wm *WantManager) Disconnected(p peer.ID) { - select { - case wm.connectEvent <- peerStatus{peer: p, connect: false}: - case <-wm.ctx.Done(): - } +type currentWantsMessage struct { + resp chan<- []*wantlist.Entry } -// TODO: use goprocess here once i trust it -func (wm *WantManager) Run() { - // NOTE: Do not open any streams or connections from anywhere in this - // event loop. Really, just don't do anything likely to block. - for { - select { - case ws := <-wm.incoming: - - // is this a broadcast or not? - brdc := len(ws.targets) == 0 - - // add changes to our wantlist - for _, e := range ws.entries { - if e.Cancel { - if brdc { - wm.bcwl.Remove(e.Cid, ws.from) - } - - if wm.wl.Remove(e.Cid, ws.from) { - wm.wantlistGauge.Dec() - } - } else { - if brdc { - wm.bcwl.AddEntry(e.Entry, ws.from) - } - if wm.wl.AddEntry(e.Entry, ws.from) { - wm.wantlistGauge.Inc() - } - } - } - - // broadcast those wantlist changes - if len(ws.targets) == 0 { - for _, p := range wm.peers { - p.AddMessage(ws.entries, ws.from) - } - } else { - for _, t := range ws.targets { - p, ok := wm.peers[t] - if !ok { - log.Infof("tried sending wantlist change to non-partner peer: %s", t) - continue - } - p.AddMessage(ws.entries, ws.from) - } - } +func (cwm *currentWantsMessage) handle(wm *WantManager) { + cwm.resp <- wm.wl.Entries() +} - case p := <-wm.connectEvent: - if p.connect { - wm.startPeerHandler(p.peer) - } else { - wm.stopPeerHandler(p.peer) - } - case req := <-wm.peerReqs: - peers := make([]peer.ID, 0, len(wm.peers)) - for p := range wm.peers { - peers = append(peers, p) - } - req <- peers - case <-wm.ctx.Done(): - return - } - } +type currentBroadcastWantsMessage struct { + resp chan<- []*wantlist.Entry } -func (wm *WantManager) IsWanted(c cid.Cid) bool { - _, isWanted := wm.wl.Contains(c) - return isWanted +func (cbcwm *currentBroadcastWantsMessage) handle(wm *WantManager) { + cbcwm.resp <- wm.bcwl.Entries() } -func (wm *WantManager) CurrentWants() []*wantlist.Entry { - return wm.wl.Entries() +type wantCountMessage struct { + resp chan<- int } -func (wm *WantManager) WantCount() int { - return wm.wl.Len() +func (wcm *wantCountMessage) handle(wm *WantManager) { + wcm.resp <- wm.wl.Len() } diff --git a/bitswap/wantmanager/wantmanager_test.go b/bitswap/wantmanager/wantmanager_test.go new file mode 100644 index 000000000..85590bb15 --- /dev/null +++ b/bitswap/wantmanager/wantmanager_test.go @@ -0,0 +1,215 @@ +package wantmanager + +import ( + "context" + "reflect" + "sync" + "testing" + + "github.com/ipfs/go-bitswap/testutil" + + bsmsg "github.com/ipfs/go-bitswap/message" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-peer" +) + +type fakeWantSender struct { + lk sync.RWMutex + lastWantSet wantSet +} + +func (fws *fakeWantSender) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { + fws.lk.Lock() + fws.lastWantSet = wantSet{entries, targets, from} + fws.lk.Unlock() +} + +func (fws *fakeWantSender) getLastWantSet() wantSet { + fws.lk.Lock() + defer fws.lk.Unlock() + return fws.lastWantSet +} + +func setupTestFixturesAndInitialWantList() ( + context.Context, *fakeWantSender, *WantManager, []cid.Cid, []cid.Cid, []peer.ID, uint64, uint64) { + ctx := context.Background() + + // setup fixtures + wantSender := &fakeWantSender{} + wantManager := New(ctx) + keys := testutil.GenerateCids(10) + otherKeys := testutil.GenerateCids(5) + peers := testutil.GeneratePeers(10) + session := testutil.GenerateSessionID() + otherSession := testutil.GenerateSessionID() + + // startup wantManager + wantManager.SetDelegate(wantSender) + wantManager.Startup() + + // add initial wants + wantManager.WantBlocks( + ctx, + keys, + peers, + session) + + return ctx, wantSender, wantManager, keys, otherKeys, peers, session, otherSession +} + +func TestInitialWantsAddedCorrectly(t *testing.T) { + + _, wantSender, wantManager, keys, _, peers, session, _ := + setupTestFixturesAndInitialWantList() + + bcwl := wantManager.CurrentBroadcastWants() + wl := wantManager.CurrentWants() + + if len(bcwl) > 0 { + t.Fatal("should not create broadcast wants when peers are specified") + } + + if len(wl) != len(keys) { + t.Fatal("did not add correct number of wants to want lsit") + } + + generatedWantSet := wantSender.getLastWantSet() + + if len(generatedWantSet.entries) != len(keys) { + t.Fatal("incorrect wants sent") + } + + for _, entry := range generatedWantSet.entries { + if entry.Cancel { + t.Fatal("did not send only non-cancel messages") + } + } + + if generatedWantSet.from != session { + t.Fatal("incorrect session used in sending") + } + + if !reflect.DeepEqual(generatedWantSet.targets, peers) { + t.Fatal("did not setup peers correctly") + } + + wantManager.Shutdown() +} + +func TestCancellingWants(t *testing.T) { + ctx, wantSender, wantManager, keys, _, peers, session, _ := + setupTestFixturesAndInitialWantList() + + wantManager.CancelWants(ctx, keys, peers, session) + + wl := wantManager.CurrentWants() + + if len(wl) != 0 { + t.Fatal("did not remove blocks from want list") + } + + generatedWantSet := wantSender.getLastWantSet() + + if len(generatedWantSet.entries) != len(keys) { + t.Fatal("incorrect wants sent") + } + + for _, entry := range generatedWantSet.entries { + if !entry.Cancel { + t.Fatal("did not send only cancel messages") + } + } + + if generatedWantSet.from != session { + t.Fatal("incorrect session used in sending") + } + + if !reflect.DeepEqual(generatedWantSet.targets, peers) { + t.Fatal("did not setup peers correctly") + } + + wantManager.Shutdown() + +} + +func TestCancellingWantsFromAnotherSessionHasNoEffect(t *testing.T) { + ctx, _, wantManager, keys, _, peers, _, otherSession := + setupTestFixturesAndInitialWantList() + + // cancelling wants from another session has no effect + wantManager.CancelWants(ctx, keys, peers, otherSession) + + wl := wantManager.CurrentWants() + + if len(wl) != len(keys) { + t.Fatal("should not cancel wants unless they match session that made them") + } + + wantManager.Shutdown() +} + +func TestAddingWantsWithNoPeersAddsToBroadcastAndRegularWantList(t *testing.T) { + ctx, _, wantManager, keys, otherKeys, _, session, _ := + setupTestFixturesAndInitialWantList() + + wantManager.WantBlocks(ctx, otherKeys, nil, session) + + bcwl := wantManager.CurrentBroadcastWants() + wl := wantManager.CurrentWants() + + if len(bcwl) != len(otherKeys) { + t.Fatal("want requests with no peers should get added to broadcast list") + } + + if len(wl) != len(otherKeys)+len(keys) { + t.Fatal("want requests with no peers should get added to regular want list") + } + + wantManager.Shutdown() +} + +func TestAddingRequestFromSecondSessionPreventsCancel(t *testing.T) { + ctx, wantSender, wantManager, keys, _, peers, session, otherSession := + setupTestFixturesAndInitialWantList() + + // add a second session requesting the first key + firstKeys := append([]cid.Cid(nil), keys[0]) + wantManager.WantBlocks(ctx, firstKeys, peers, otherSession) + + wl := wantManager.CurrentWants() + + if len(wl) != len(keys) { + t.Fatal("wants from other sessions should not get added seperately") + } + + generatedWantSet := wantSender.getLastWantSet() + if len(generatedWantSet.entries) != len(firstKeys) && + generatedWantSet.from != otherSession && + generatedWantSet.entries[0].Cid != firstKeys[0] && + generatedWantSet.entries[0].Cancel != false { + t.Fatal("should send additional message requesting want for new session") + } + + // cancel block from first session + wantManager.CancelWants(ctx, firstKeys, peers, session) + + wl = wantManager.CurrentWants() + + // want should still be on want list + if len(wl) != len(keys) { + t.Fatal("wants should not be removed until all sessions cancel wants") + } + + // cancel other block from first session + secondKeys := append([]cid.Cid(nil), keys[1]) + wantManager.CancelWants(ctx, secondKeys, peers, session) + + wl = wantManager.CurrentWants() + + // want should not be on want list, cause it was only tracked by one session + if len(wl) != len(keys)-1 { + t.Fatal("wants should be removed if all sessions have cancelled") + } + + wantManager.Shutdown() +} diff --git a/bitswap/workers.go b/bitswap/workers.go index 34b75bab2..99a967068 100644 --- a/bitswap/workers.go +++ b/bitswap/workers.go @@ -6,8 +6,8 @@ import ( "sync" "time" + engine "github.com/ipfs/go-bitswap/decision" bsmsg "github.com/ipfs/go-bitswap/message" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" process "github.com/jbenet/goprocess" @@ -74,7 +74,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { } bs.engine.MessageSent(envelope.Peer, outgoing) - bs.wm.SendBlocks(ctx, envelope) + bs.sendBlocks(ctx, envelope) bs.counterLk.Lock() for _, block := range envelope.Message.Blocks() { bs.counters.blocksSent++ @@ -90,6 +90,26 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { } } +func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) { + // Blocks need to be sent synchronously to maintain proper backpressure + // throughout the network stack + defer env.Sent() + + msgSize := 0 + msg := bsmsg.New(false) + for _, block := range env.Message.Blocks() { + msgSize += len(block.RawData()) + msg.AddBlock(block) + log.Infof("Sending block %s to %s", block, env.Peer) + } + + bs.sentHistogram.Observe(float64(msgSize)) + err := bs.network.SendMessage(ctx, env.Peer, msg) + if err != nil { + log.Infof("sendblock error: %s", err) + } +} + func (bs *Bitswap) provideWorker(px process.Process) { limit := make(chan struct{}, provideWorkerMax)