diff --git a/bitswap.go b/bitswap.go index 4b72b52d..0e8fbf4e 100644 --- a/bitswap.go +++ b/bitswap.go @@ -5,7 +5,6 @@ package bitswap import ( "context" "errors" - "math" "sync" "sync/atomic" "time" @@ -14,6 +13,8 @@ import ( bsmsg "github.com/ipfs/go-bitswap/message" bsnet "github.com/ipfs/go-bitswap/network" notifications "github.com/ipfs/go-bitswap/notifications" + bssm "github.com/ipfs/go-bitswap/sessionmanager" + bswm "github.com/ipfs/go-bitswap/wantmanager" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" @@ -42,8 +43,6 @@ const ( providerRequestTimeout = time.Second * 10 provideTimeout = time.Second * 15 sizeBatchRequestChan = 32 - // kMaxPriority is the max priority as defined by the bitswap protocol - kMaxPriority = math.MaxInt32 ) var ( @@ -101,7 +100,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan cid.Cid, HasBlockBufferSize), provideKeys: make(chan cid.Cid, provideKeysBufferSize), - wm: NewWantManager(ctx, network), + wm: bswm.New(ctx, network), + sm: bssm.New(), counters: new(counters), dupMetric: dupHist, @@ -128,7 +128,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, type Bitswap struct { // the peermanager manages sending messages to peers in a way that // wont block bitswap operation - wm *WantManager + wm *bswm.WantManager // the engine is the bit of logic that decides who to send which blocks to engine *decision.Engine @@ -163,12 +163,8 @@ type Bitswap struct { dupMetric metrics.Histogram allMetric metrics.Histogram - // Sessions - sessions []*Session - sessLk sync.Mutex - - sessID uint64 - sessIDLk sync.Mutex + // the sessionmanager manages tracking sessions + sm *bssm.SessionManager } type counters struct { @@ -229,7 +225,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) } - mses := bs.getNextSessionID() + mses := bs.sm.GetNextSessionID() bs.wm.WantBlocks(ctx, keys, nil, mses) @@ -294,13 +290,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks return out, nil } -func (bs *Bitswap) getNextSessionID() uint64 { - bs.sessIDLk.Lock() - defer bs.sessIDLk.Unlock() - bs.sessID++ - return bs.sessID -} - // CancelWant removes a given key from the wantlist func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) { if len(cids) == 0 { @@ -359,15 +348,13 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { // SessionsForBlock returns a slice of all sessions that may be interested in the given cid func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session { - bs.sessLk.Lock() - defer bs.sessLk.Unlock() - var out []*Session - for _, s := range bs.sessions { + bs.sm.IterateSessions(func(session exchange.Fetcher) { + s := session.(*Session) if s.interestedIn(c) { out = append(out, s) } - } + }) return out } @@ -398,7 +385,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg log.Debugf("got block %s from %s", b, p) // skip received blocks that are not in the wantlist - if _, contains := bs.wm.wl.Contains(b.Cid()); !contains { + if !bs.wm.IsWanted(b.Cid()) { return } @@ -461,7 +448,7 @@ func (bs *Bitswap) Close() error { } func (bs *Bitswap) GetWantlist() []cid.Cid { - entries := bs.wm.wl.Entries() + entries := bs.wm.CurrentWants() out := make([]cid.Cid, 0, len(entries)) for _, e := range entries { out = append(out, e.Cid) diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go new file mode 100644 index 00000000..f36117d6 --- /dev/null +++ b/messagequeue/messagequeue.go @@ -0,0 +1,208 @@ +package messagequeue + +import ( + "context" + "sync" + "time" + + bsmsg "github.com/ipfs/go-bitswap/message" + bsnet "github.com/ipfs/go-bitswap/network" + wantlist "github.com/ipfs/go-bitswap/wantlist" + logging "github.com/ipfs/go-log" + peer "github.com/libp2p/go-libp2p-peer" +) + +var log = logging.Logger("bitswap") + +type MessageQueue struct { + p peer.ID + + outlk sync.Mutex + out bsmsg.BitSwapMessage + network bsnet.BitSwapNetwork + wl *wantlist.ThreadSafe + + sender bsnet.MessageSender + + refcnt int + + work chan struct{} + done chan struct{} +} + +func New(p peer.ID, network bsnet.BitSwapNetwork) *MessageQueue { + return &MessageQueue{ + done: make(chan struct{}), + work: make(chan struct{}, 1), + wl: wantlist.NewThreadSafe(), + network: network, + p: p, + refcnt: 1, + } +} + +func (mq *MessageQueue) RefIncrement() { + mq.refcnt++ +} + +func (mq *MessageQueue) RefDecrement() bool { + mq.refcnt-- + return mq.refcnt > 0 +} + +func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) { + var work bool + mq.outlk.Lock() + defer func() { + mq.outlk.Unlock() + if !work { + return + } + select { + case mq.work <- struct{}{}: + default: + } + }() + + // if we have no message held allocate a new one + if mq.out == nil { + mq.out = bsmsg.New(false) + } + + // TODO: add a msg.Combine(...) method + // otherwise, combine the one we are holding with the + // one passed in + for _, e := range entries { + if e.Cancel { + if mq.wl.Remove(e.Cid, ses) { + work = true + mq.out.Cancel(e.Cid) + } + } else { + if mq.wl.Add(e.Cid, e.Priority, ses) { + work = true + mq.out.AddEntry(e.Cid, e.Priority) + } + } + } +} + +func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) { + + // new peer, we will want to give them our full wantlist + fullwantlist := bsmsg.New(true) + for _, e := range initialEntries { + for k := range e.SesTrk { + mq.wl.AddEntry(e, k) + } + fullwantlist.AddEntry(e.Cid, e.Priority) + } + mq.out = fullwantlist + mq.work <- struct{}{} + + go mq.runQueue(ctx) +} + +func (mq *MessageQueue) Shutdown() { + close(mq.done) +} +func (mq *MessageQueue) runQueue(ctx context.Context) { + for { + select { + case <-mq.work: // there is work to be done + mq.doWork(ctx) + case <-mq.done: + if mq.sender != nil { + mq.sender.Close() + } + return + case <-ctx.Done(): + if mq.sender != nil { + mq.sender.Reset() + } + return + } + } +} + +func (mq *MessageQueue) doWork(ctx context.Context) { + // grab outgoing message + mq.outlk.Lock() + wlm := mq.out + if wlm == nil || wlm.Empty() { + mq.outlk.Unlock() + return + } + mq.out = nil + mq.outlk.Unlock() + + // NB: only open a stream if we actually have data to send + if mq.sender == nil { + err := mq.openSender(ctx) + if err != nil { + log.Infof("cant open message sender to peer %s: %s", mq.p, err) + // TODO: cant connect, what now? + return + } + } + + // send wantlist updates + for { // try to send this message until we fail. + err := mq.sender.SendMsg(ctx, wlm) + if err == nil { + return + } + + log.Infof("bitswap send error: %s", err) + mq.sender.Reset() + mq.sender = nil + + select { + case <-mq.done: + return + case <-ctx.Done(): + return + case <-time.After(time.Millisecond * 100): + // wait 100ms in case disconnect notifications are still propogating + log.Warning("SendMsg errored but neither 'done' nor context.Done() were set") + } + + err = mq.openSender(ctx) + if err != nil { + log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) + // TODO(why): what do we do now? + // I think the *right* answer is to probably put the message we're + // trying to send back, and then return to waiting for new work or + // a disconnect. + return + } + + // TODO: Is this the same instance for the remote peer? + // If its not, we should resend our entire wantlist to them + /* + if mq.sender.InstanceID() != mq.lastSeenInstanceID { + wlm = mq.getFullWantlistMessage() + } + */ + } +} + +func (mq *MessageQueue) openSender(ctx context.Context) error { + // allow ten minutes for connections this includes looking them up in the + // dht dialing them, and handshaking + conctx, cancel := context.WithTimeout(ctx, time.Minute*10) + defer cancel() + + err := mq.network.ConnectTo(conctx, mq.p) + if err != nil { + return err + } + + nsender, err := mq.network.NewMessageSender(ctx, mq.p) + if err != nil { + return err + } + + mq.sender = nsender + return nil +} diff --git a/session.go b/session.go index 9cbeb7db..cd5f645a 100644 --- a/session.go +++ b/session.go @@ -66,7 +66,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { notif: notifications.New(), uuid: loggables.Uuid("GetBlockRequest"), baseTickDelay: time.Millisecond * 500, - id: bs.getNextSessionID(), + id: bs.sm.GetNextSessionID(), } s.tag = fmt.Sprint("bs-ses-", s.id) @@ -74,10 +74,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { cache, _ := lru.New(2048) s.interest = cache - bs.sessLk.Lock() - bs.sessions = append(bs.sessions, s) - bs.sessLk.Unlock() - + bs.sm.AddSession(s) go s.run(ctx) return s @@ -92,15 +89,7 @@ func (bs *Bitswap) removeSession(s *Session) { } bs.CancelWants(live, s.id) - bs.sessLk.Lock() - defer bs.sessLk.Unlock() - for i := 0; i < len(bs.sessions); i++ { - if bs.sessions[i] == s { - bs.sessions[i] = bs.sessions[len(bs.sessions)-1] - bs.sessions = bs.sessions[:len(bs.sessions)-1] - return - } - } + bs.sm.RemoveSession(s) } type blkRecv struct { diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go new file mode 100644 index 00000000..1ebee2fd --- /dev/null +++ b/sessionmanager/sessionmanager.go @@ -0,0 +1,59 @@ +package sessionmanager + +import ( + "sync" + + exchange "github.com/ipfs/go-ipfs-exchange-interface" +) + +type SessionManager struct { + // Sessions + sessLk sync.Mutex + sessions []exchange.Fetcher + + // Session Index + sessIDLk sync.Mutex + sessID uint64 +} + +func New() *SessionManager { + return &SessionManager{} +} + +func (sm *SessionManager) AddSession(session exchange.Fetcher) { + sm.sessLk.Lock() + sm.sessions = append(sm.sessions, session) + sm.sessLk.Unlock() +} + +func (sm *SessionManager) RemoveSession(session exchange.Fetcher) { + sm.sessLk.Lock() + defer sm.sessLk.Unlock() + for i := 0; i < len(sm.sessions); i++ { + if sm.sessions[i] == session { + sm.sessions[i] = sm.sessions[len(sm.sessions)-1] + sm.sessions = sm.sessions[:len(sm.sessions)-1] + return + } + } +} + +func (sm *SessionManager) GetNextSessionID() uint64 { + sm.sessIDLk.Lock() + defer sm.sessIDLk.Unlock() + sm.sessID++ + return sm.sessID +} + +type IterateSessionFunc func(session exchange.Fetcher) + +// IterateSessions loops through all managed sessions and applies the given +// IterateSessionFunc +func (sm *SessionManager) IterateSessions(iterate IterateSessionFunc) { + sm.sessLk.Lock() + defer sm.sessLk.Unlock() + + for _, s := range sm.sessions { + iterate(s) + } +} diff --git a/wantmanager.go b/wantmanager.go deleted file mode 100644 index 8d033ff9..00000000 --- a/wantmanager.go +++ /dev/null @@ -1,404 +0,0 @@ -package bitswap - -import ( - "context" - "sync" - "time" - - engine "github.com/ipfs/go-bitswap/decision" - bsmsg "github.com/ipfs/go-bitswap/message" - bsnet "github.com/ipfs/go-bitswap/network" - wantlist "github.com/ipfs/go-bitswap/wantlist" - - cid "github.com/ipfs/go-cid" - metrics "github.com/ipfs/go-metrics-interface" - peer "github.com/libp2p/go-libp2p-peer" -) - -type WantManager struct { - // sync channels for Run loop - incoming chan *wantSet - connectEvent chan peerStatus // notification channel for peers connecting/disconnecting - peerReqs chan chan []peer.ID // channel to request connected peers on - - // synchronized by Run loop, only touch inside there - peers map[peer.ID]*msgQueue - wl *wantlist.ThreadSafe - bcwl *wantlist.ThreadSafe - - network bsnet.BitSwapNetwork - ctx context.Context - cancel func() - - wantlistGauge metrics.Gauge - sentHistogram metrics.Histogram -} - -type peerStatus struct { - connect bool - peer peer.ID -} - -func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { - ctx, cancel := context.WithCancel(ctx) - wantlistGauge := metrics.NewCtx(ctx, "wantlist_total", - "Number of items in wantlist.").Gauge() - sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+ - " this bitswap").Histogram(metricsBuckets) - return &WantManager{ - incoming: make(chan *wantSet, 10), - connectEvent: make(chan peerStatus, 10), - peerReqs: make(chan chan []peer.ID), - peers: make(map[peer.ID]*msgQueue), - wl: wantlist.NewThreadSafe(), - bcwl: wantlist.NewThreadSafe(), - network: network, - ctx: ctx, - cancel: cancel, - wantlistGauge: wantlistGauge, - sentHistogram: sentHistogram, - } -} - -type msgQueue struct { - p peer.ID - - outlk sync.Mutex - out bsmsg.BitSwapMessage - network bsnet.BitSwapNetwork - wl *wantlist.ThreadSafe - - sender bsnet.MessageSender - - refcnt int - - work chan struct{} - done chan struct{} -} - -// WantBlocks adds the given cids to the wantlist, tracked by the given session -func (pm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { - log.Infof("want blocks: %s", ks) - pm.addEntries(ctx, ks, peers, false, ses) -} - -// CancelWants removes the given cids from the wantlist, tracked by the given session -func (pm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { - pm.addEntries(context.Background(), ks, peers, true, ses) -} - -type wantSet struct { - entries []*bsmsg.Entry - targets []peer.ID - from uint64 -} - -func (pm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) { - entries := make([]*bsmsg.Entry, 0, len(ks)) - for i, k := range ks { - entries = append(entries, &bsmsg.Entry{ - Cancel: cancel, - Entry: wantlist.NewRefEntry(k, kMaxPriority-i), - }) - } - select { - case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}: - case <-pm.ctx.Done(): - case <-ctx.Done(): - } -} - -func (pm *WantManager) ConnectedPeers() []peer.ID { - resp := make(chan []peer.ID) - pm.peerReqs <- resp - return <-resp -} - -func (pm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) { - // Blocks need to be sent synchronously to maintain proper backpressure - // throughout the network stack - defer env.Sent() - - msgSize := 0 - msg := bsmsg.New(false) - for _, block := range env.Message.Blocks() { - msgSize += len(block.RawData()) - msg.AddBlock(block) - log.Infof("Sending block %s to %s", block, env.Peer) - } - - pm.sentHistogram.Observe(float64(msgSize)) - err := pm.network.SendMessage(ctx, env.Peer, msg) - if err != nil { - log.Infof("sendblock error: %s", err) - } -} - -func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { - mq, ok := pm.peers[p] - if ok { - mq.refcnt++ - return nil - } - - mq = pm.newMsgQueue(p) - - // new peer, we will want to give them our full wantlist - fullwantlist := bsmsg.New(true) - for _, e := range pm.bcwl.Entries() { - for k := range e.SesTrk { - mq.wl.AddEntry(e, k) - } - fullwantlist.AddEntry(e.Cid, e.Priority) - } - mq.out = fullwantlist - mq.work <- struct{}{} - - pm.peers[p] = mq - go mq.runQueue(pm.ctx) - return mq -} - -func (pm *WantManager) stopPeerHandler(p peer.ID) { - pq, ok := pm.peers[p] - if !ok { - // TODO: log error? - return - } - - pq.refcnt-- - if pq.refcnt > 0 { - return - } - - close(pq.done) - delete(pm.peers, p) -} - -func (mq *msgQueue) runQueue(ctx context.Context) { - for { - select { - case <-mq.work: // there is work to be done - mq.doWork(ctx) - case <-mq.done: - if mq.sender != nil { - mq.sender.Close() - } - return - case <-ctx.Done(): - if mq.sender != nil { - mq.sender.Reset() - } - return - } - } -} - -func (mq *msgQueue) doWork(ctx context.Context) { - // grab outgoing message - mq.outlk.Lock() - wlm := mq.out - if wlm == nil || wlm.Empty() { - mq.outlk.Unlock() - return - } - mq.out = nil - mq.outlk.Unlock() - - // NB: only open a stream if we actually have data to send - if mq.sender == nil { - err := mq.openSender(ctx) - if err != nil { - log.Infof("cant open message sender to peer %s: %s", mq.p, err) - // TODO: cant connect, what now? - return - } - } - - // send wantlist updates - for { // try to send this message until we fail. - err := mq.sender.SendMsg(ctx, wlm) - if err == nil { - return - } - - log.Infof("bitswap send error: %s", err) - mq.sender.Reset() - mq.sender = nil - - select { - case <-mq.done: - return - case <-ctx.Done(): - return - case <-time.After(time.Millisecond * 100): - // wait 100ms in case disconnect notifications are still propogating - log.Warning("SendMsg errored but neither 'done' nor context.Done() were set") - } - - err = mq.openSender(ctx) - if err != nil { - log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) - // TODO(why): what do we do now? - // I think the *right* answer is to probably put the message we're - // trying to send back, and then return to waiting for new work or - // a disconnect. - return - } - - // TODO: Is this the same instance for the remote peer? - // If its not, we should resend our entire wantlist to them - /* - if mq.sender.InstanceID() != mq.lastSeenInstanceID { - wlm = mq.getFullWantlistMessage() - } - */ - } -} - -func (mq *msgQueue) openSender(ctx context.Context) error { - // allow ten minutes for connections this includes looking them up in the - // dht dialing them, and handshaking - conctx, cancel := context.WithTimeout(ctx, time.Minute*10) - defer cancel() - - err := mq.network.ConnectTo(conctx, mq.p) - if err != nil { - return err - } - - nsender, err := mq.network.NewMessageSender(ctx, mq.p) - if err != nil { - return err - } - - mq.sender = nsender - return nil -} - -func (pm *WantManager) Connected(p peer.ID) { - select { - case pm.connectEvent <- peerStatus{peer: p, connect: true}: - case <-pm.ctx.Done(): - } -} - -func (pm *WantManager) Disconnected(p peer.ID) { - select { - case pm.connectEvent <- peerStatus{peer: p, connect: false}: - case <-pm.ctx.Done(): - } -} - -// TODO: use goprocess here once i trust it -func (pm *WantManager) Run() { - // NOTE: Do not open any streams or connections from anywhere in this - // event loop. Really, just don't do anything likely to block. - for { - select { - case ws := <-pm.incoming: - - // is this a broadcast or not? - brdc := len(ws.targets) == 0 - - // add changes to our wantlist - for _, e := range ws.entries { - if e.Cancel { - if brdc { - pm.bcwl.Remove(e.Cid, ws.from) - } - - if pm.wl.Remove(e.Cid, ws.from) { - pm.wantlistGauge.Dec() - } - } else { - if brdc { - pm.bcwl.AddEntry(e.Entry, ws.from) - } - if pm.wl.AddEntry(e.Entry, ws.from) { - pm.wantlistGauge.Inc() - } - } - } - - // broadcast those wantlist changes - if len(ws.targets) == 0 { - for _, p := range pm.peers { - p.addMessage(ws.entries, ws.from) - } - } else { - for _, t := range ws.targets { - p, ok := pm.peers[t] - if !ok { - log.Infof("tried sending wantlist change to non-partner peer: %s", t) - continue - } - p.addMessage(ws.entries, ws.from) - } - } - - case p := <-pm.connectEvent: - if p.connect { - pm.startPeerHandler(p.peer) - } else { - pm.stopPeerHandler(p.peer) - } - case req := <-pm.peerReqs: - peers := make([]peer.ID, 0, len(pm.peers)) - for p := range pm.peers { - peers = append(peers, p) - } - req <- peers - case <-pm.ctx.Done(): - return - } - } -} - -func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { - return &msgQueue{ - done: make(chan struct{}), - work: make(chan struct{}, 1), - wl: wantlist.NewThreadSafe(), - network: wm.network, - p: p, - refcnt: 1, - } -} - -func (mq *msgQueue) addMessage(entries []*bsmsg.Entry, ses uint64) { - var work bool - mq.outlk.Lock() - defer func() { - mq.outlk.Unlock() - if !work { - return - } - select { - case mq.work <- struct{}{}: - default: - } - }() - - // if we have no message held allocate a new one - if mq.out == nil { - mq.out = bsmsg.New(false) - } - - // TODO: add a msg.Combine(...) method - // otherwise, combine the one we are holding with the - // one passed in - for _, e := range entries { - if e.Cancel { - if mq.wl.Remove(e.Cid, ses) { - work = true - mq.out.Cancel(e.Cid) - } - } else { - if mq.wl.Add(e.Cid, e.Priority, ses) { - work = true - mq.out.AddEntry(e.Cid, e.Priority) - } - } - } -} diff --git a/wantmanager/wantmanager.go b/wantmanager/wantmanager.go new file mode 100644 index 00000000..e3734290 --- /dev/null +++ b/wantmanager/wantmanager.go @@ -0,0 +1,251 @@ +package wantmanager + +import ( + "context" + "math" + + engine "github.com/ipfs/go-bitswap/decision" + bsmsg "github.com/ipfs/go-bitswap/message" + bsmq "github.com/ipfs/go-bitswap/messagequeue" + bsnet "github.com/ipfs/go-bitswap/network" + wantlist "github.com/ipfs/go-bitswap/wantlist" + logging "github.com/ipfs/go-log" + + cid "github.com/ipfs/go-cid" + metrics "github.com/ipfs/go-metrics-interface" + peer "github.com/libp2p/go-libp2p-peer" +) + +var log = logging.Logger("bitswap") + +const ( + // kMaxPriority is the max priority as defined by the bitswap protocol + kMaxPriority = math.MaxInt32 +) + +var ( + metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} +) + +type WantManager struct { + // sync channels for Run loop + incoming chan *wantSet + connectEvent chan peerStatus // notification channel for peers connecting/disconnecting + peerReqs chan chan []peer.ID // channel to request connected peers on + + // synchronized by Run loop, only touch inside there + peers map[peer.ID]*bsmq.MessageQueue + wl *wantlist.ThreadSafe + bcwl *wantlist.ThreadSafe + + network bsnet.BitSwapNetwork + ctx context.Context + cancel func() + + wantlistGauge metrics.Gauge + sentHistogram metrics.Histogram +} + +type peerStatus struct { + connect bool + peer peer.ID +} + +func New(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { + ctx, cancel := context.WithCancel(ctx) + wantlistGauge := metrics.NewCtx(ctx, "wantlist_total", + "Number of items in wantlist.").Gauge() + sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+ + " this bitswap").Histogram(metricsBuckets) + return &WantManager{ + incoming: make(chan *wantSet, 10), + connectEvent: make(chan peerStatus, 10), + peerReqs: make(chan chan []peer.ID), + peers: make(map[peer.ID]*bsmq.MessageQueue), + wl: wantlist.NewThreadSafe(), + bcwl: wantlist.NewThreadSafe(), + network: network, + ctx: ctx, + cancel: cancel, + wantlistGauge: wantlistGauge, + sentHistogram: sentHistogram, + } +} + +// WantBlocks adds the given cids to the wantlist, tracked by the given session +func (wm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { + log.Infof("want blocks: %s", ks) + wm.addEntries(ctx, ks, peers, false, ses) +} + +// CancelWants removes the given cids from the wantlist, tracked by the given session +func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { + wm.addEntries(context.Background(), ks, peers, true, ses) +} + +type wantSet struct { + entries []*bsmsg.Entry + targets []peer.ID + from uint64 +} + +func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) { + entries := make([]*bsmsg.Entry, 0, len(ks)) + for i, k := range ks { + entries = append(entries, &bsmsg.Entry{ + Cancel: cancel, + Entry: wantlist.NewRefEntry(k, kMaxPriority-i), + }) + } + select { + case wm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}: + case <-wm.ctx.Done(): + case <-ctx.Done(): + } +} + +func (wm *WantManager) ConnectedPeers() []peer.ID { + resp := make(chan []peer.ID) + wm.peerReqs <- resp + return <-resp +} + +func (wm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) { + // Blocks need to be sent synchronously to maintain proper backpressure + // throughout the network stack + defer env.Sent() + + msgSize := 0 + msg := bsmsg.New(false) + for _, block := range env.Message.Blocks() { + msgSize += len(block.RawData()) + msg.AddBlock(block) + log.Infof("Sending block %s to %s", block, env.Peer) + } + + wm.sentHistogram.Observe(float64(msgSize)) + err := wm.network.SendMessage(ctx, env.Peer, msg) + if err != nil { + log.Infof("sendblock error: %s", err) + } +} + +func (wm *WantManager) startPeerHandler(p peer.ID) *bsmq.MessageQueue { + mq, ok := wm.peers[p] + if ok { + mq.RefIncrement() + return nil + } + + mq = bsmq.New(p, wm.network) + wm.peers[p] = mq + mq.Startup(wm.ctx, wm.bcwl.Entries()) + return mq +} + +func (wm *WantManager) stopPeerHandler(p peer.ID) { + pq, ok := wm.peers[p] + if !ok { + // TODO: log error? + return + } + + if pq.RefDecrement() { + return + } + + pq.Shutdown() + delete(wm.peers, p) +} + +func (wm *WantManager) Connected(p peer.ID) { + select { + case wm.connectEvent <- peerStatus{peer: p, connect: true}: + case <-wm.ctx.Done(): + } +} + +func (wm *WantManager) Disconnected(p peer.ID) { + select { + case wm.connectEvent <- peerStatus{peer: p, connect: false}: + case <-wm.ctx.Done(): + } +} + +// TODO: use goprocess here once i trust it +func (wm *WantManager) Run() { + // NOTE: Do not open any streams or connections from anywhere in this + // event loop. Really, just don't do anything likely to block. + for { + select { + case ws := <-wm.incoming: + + // is this a broadcast or not? + brdc := len(ws.targets) == 0 + + // add changes to our wantlist + for _, e := range ws.entries { + if e.Cancel { + if brdc { + wm.bcwl.Remove(e.Cid, ws.from) + } + + if wm.wl.Remove(e.Cid, ws.from) { + wm.wantlistGauge.Dec() + } + } else { + if brdc { + wm.bcwl.AddEntry(e.Entry, ws.from) + } + if wm.wl.AddEntry(e.Entry, ws.from) { + wm.wantlistGauge.Inc() + } + } + } + + // broadcast those wantlist changes + if len(ws.targets) == 0 { + for _, p := range wm.peers { + p.AddMessage(ws.entries, ws.from) + } + } else { + for _, t := range ws.targets { + p, ok := wm.peers[t] + if !ok { + log.Infof("tried sending wantlist change to non-partner peer: %s", t) + continue + } + p.AddMessage(ws.entries, ws.from) + } + } + + case p := <-wm.connectEvent: + if p.connect { + wm.startPeerHandler(p.peer) + } else { + wm.stopPeerHandler(p.peer) + } + case req := <-wm.peerReqs: + peers := make([]peer.ID, 0, len(wm.peers)) + for p := range wm.peers { + peers = append(peers, p) + } + req <- peers + case <-wm.ctx.Done(): + return + } + } +} + +func (wm *WantManager) IsWanted(c cid.Cid) bool { + _, isWanted := wm.wl.Contains(c) + return isWanted +} + +func (wm *WantManager) CurrentWants() []*wantlist.Entry { + return wm.wl.Entries() +} + +func (wm *WantManager) WantCount() int { + return wm.wl.Len() +} diff --git a/workers.go b/workers.go index 3fbe1bb1..34b75bab 100644 --- a/workers.go +++ b/workers.go @@ -183,13 +183,13 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { log.Event(ctx, "Bitswap.Rebroadcast.idle") select { case <-tick.C: - n := bs.wm.wl.Len() + n := bs.wm.WantCount() if n > 0 { log.Debug(n, " keys in bitswap wantlist") } case <-broadcastSignal.C: // resend unfulfilled wantlist keys log.Event(ctx, "Bitswap.Rebroadcast.active") - entries := bs.wm.wl.Entries() + entries := bs.wm.CurrentWants() if len(entries) == 0 { continue }