From 00f508d1ca56035f5da70653b546708ad65a25de Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Tue, 17 Dec 2024 10:53:03 -1000 Subject: [PATCH] Fix leak due to cid queue never getting cleaned up (#756) * Fix leak due to cid queue never getting cleaned up This removes entries from the queue portion of CidQueue that were removed from the map portion. * Remove unused cidQueue function. Do not export cidQueue functions. --- .../blockpresencemanager.go | 4 ++ bitswap/client/internal/session/cidqueue.go | 44 +++++++------------ .../client/internal/session/sessionwants.go | 23 +++++----- 3 files changed, 33 insertions(+), 38 deletions(-) diff --git a/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go b/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go index 685981381..b88b12bd3 100644 --- a/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go +++ b/bitswap/client/internal/blockpresencemanager/blockpresencemanager.go @@ -111,6 +111,10 @@ func (bpm *BlockPresenceManager) allDontHave(peers []peer.ID, c cid.Cid) bool { // RemoveKeys cleans up the given keys from the block presence map func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) { + if len(ks) == 0 { + return + } + bpm.Lock() defer bpm.Unlock() diff --git a/bitswap/client/internal/session/cidqueue.go b/bitswap/client/internal/session/cidqueue.go index 2ecd0f672..b08635bf1 100644 --- a/bitswap/client/internal/session/cidqueue.go +++ b/bitswap/client/internal/session/cidqueue.go @@ -14,7 +14,7 @@ func newCidQueue() *cidQueue { return &cidQueue{eset: cid.NewSet()} } -func (cq *cidQueue) Pop() cid.Cid { +func (cq *cidQueue) pop() cid.Cid { for { if cq.elems.Len() == 0 { return cid.Cid{} @@ -29,43 +29,31 @@ func (cq *cidQueue) Pop() cid.Cid { } } -func (cq *cidQueue) Cids() []cid.Cid { - // Lazily delete from the list any cids that were removed from the set - if cq.elems.Len() > cq.eset.Len() { - for i := 0; i < cq.elems.Len(); i++ { - c := cq.elems.PopFront() - if cq.eset.Has(c) { - cq.elems.PushBack(c) - } - } - } - - if cq.elems.Len() == 0 { - return nil - } - - // Make a copy of the cids - cids := make([]cid.Cid, cq.elems.Len()) - for i := 0; i < cq.elems.Len(); i++ { - cids[i] = cq.elems.At(i) - } - return cids -} - -func (cq *cidQueue) Push(c cid.Cid) { +func (cq *cidQueue) push(c cid.Cid) { if cq.eset.Visit(c) { cq.elems.PushBack(c) } } -func (cq *cidQueue) Remove(c cid.Cid) { +func (cq *cidQueue) remove(c cid.Cid) { cq.eset.Remove(c) } -func (cq *cidQueue) Has(c cid.Cid) bool { +func (cq *cidQueue) has(c cid.Cid) bool { return cq.eset.Has(c) } -func (cq *cidQueue) Len() int { +func (cq *cidQueue) len() int { return cq.eset.Len() } + +func (cq *cidQueue) gc() { + if cq.elems.Len() > cq.eset.Len() { + for i := 0; i < cq.elems.Len(); i++ { + c := cq.elems.PopFront() + if cq.eset.Has(c) { + cq.elems.PushBack(c) + } + } + } +} diff --git a/bitswap/client/internal/session/sessionwants.go b/bitswap/client/internal/session/sessionwants.go index 4653ef872..b9811d620 100644 --- a/bitswap/client/internal/session/sessionwants.go +++ b/bitswap/client/internal/session/sessionwants.go @@ -35,13 +35,13 @@ func newSessionWants(broadcastLimit int) sessionWants { } func (sw *sessionWants) String() string { - return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), len(sw.liveWants)) + return fmt.Sprintf("%d pending / %d live", sw.toFetch.len(), len(sw.liveWants)) } // BlocksRequested is called when the client makes a request for blocks func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) { for _, k := range newWants { - sw.toFetch.Push(k) + sw.toFetch.push(k) } } @@ -56,14 +56,14 @@ func (sw *sessionWants) GetNextWants() []cid.Cid { // limit) currentLiveCount := len(sw.liveWants) toAdd := sw.broadcastLimit - currentLiveCount - liveSize := min(toAdd, sw.toFetch.Len()) + liveSize := min(toAdd, sw.toFetch.len()) if liveSize <= 0 { return nil } live := make([]cid.Cid, 0, liveSize) - for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- { - c := sw.toFetch.Pop() + for ; toAdd > 0 && sw.toFetch.len() > 0; toAdd-- { + c := sw.toFetch.pop() live = append(live, c) sw.liveWantsOrder = append(sw.liveWantsOrder, c) sw.liveWants[c] = now @@ -76,12 +76,13 @@ func (sw *sessionWants) GetNextWants() []cid.Cid { func (sw *sessionWants) WantsSent(ks []cid.Cid) { now := time.Now() for _, c := range ks { - if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) { - sw.toFetch.Remove(c) + if _, ok := sw.liveWants[c]; !ok && sw.toFetch.has(c) { + sw.toFetch.remove(c) sw.liveWantsOrder = append(sw.liveWantsOrder, c) sw.liveWants[c] = now } } + sw.toFetch.gc() } // BlocksReceived removes received block CIDs from the live wants list and @@ -108,9 +109,10 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) // Remove the CID from the live wants / toFetch queue delete(sw.liveWants, c) - sw.toFetch.Remove(c) + sw.toFetch.remove(c) } } + sw.toFetch.gc() // If the live wants ordering array is a long way out of sync with the // live wants map, clean up the ordering array @@ -152,9 +154,10 @@ func (sw *sessionWants) PrepareBroadcast() []cid.Cid { // CancelPending removes the given CIDs from the fetch queue. func (sw *sessionWants) CancelPending(keys []cid.Cid) { for _, k := range keys { - sw.toFetch.Remove(k) + sw.toFetch.remove(k) delete(sw.liveWants, k) } + sw.toFetch.gc() } // LiveWants returns a list of live wants @@ -193,7 +196,7 @@ func (sw *sessionWants) HasLiveWants() bool { func (sw *sessionWants) isWanted(c cid.Cid) bool { _, ok := sw.liveWants[c] if !ok { - ok = sw.toFetch.Has(c) + ok = sw.toFetch.has(c) } return ok }