Skip to content

Commit

Permalink
Fix leak due to cid queue never getting cleaned up (#756)
Browse files Browse the repository at this point in the history
* Fix leak due to cid queue never getting cleaned up

This removes entries from the queue portion of CidQueue that were removed from the map portion.

* Remove unused cidQueue function. Do not export cidQueue functions.
  • Loading branch information
gammazero authored Dec 17, 2024
1 parent e11e4ed commit 00f508d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func (bpm *BlockPresenceManager) allDontHave(peers []peer.ID, c cid.Cid) bool {

// RemoveKeys cleans up the given keys from the block presence map
func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) {
if len(ks) == 0 {
return
}

bpm.Lock()
defer bpm.Unlock()

Expand Down
44 changes: 16 additions & 28 deletions bitswap/client/internal/session/cidqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func newCidQueue() *cidQueue {
return &cidQueue{eset: cid.NewSet()}
}

func (cq *cidQueue) Pop() cid.Cid {
func (cq *cidQueue) pop() cid.Cid {
for {
if cq.elems.Len() == 0 {
return cid.Cid{}
Expand All @@ -29,43 +29,31 @@ func (cq *cidQueue) Pop() cid.Cid {
}
}

func (cq *cidQueue) Cids() []cid.Cid {
// Lazily delete from the list any cids that were removed from the set
if cq.elems.Len() > cq.eset.Len() {
for i := 0; i < cq.elems.Len(); i++ {
c := cq.elems.PopFront()
if cq.eset.Has(c) {
cq.elems.PushBack(c)
}
}
}

if cq.elems.Len() == 0 {
return nil
}

// Make a copy of the cids
cids := make([]cid.Cid, cq.elems.Len())
for i := 0; i < cq.elems.Len(); i++ {
cids[i] = cq.elems.At(i)
}
return cids
}

func (cq *cidQueue) Push(c cid.Cid) {
func (cq *cidQueue) push(c cid.Cid) {
if cq.eset.Visit(c) {
cq.elems.PushBack(c)
}
}

func (cq *cidQueue) Remove(c cid.Cid) {
func (cq *cidQueue) remove(c cid.Cid) {
cq.eset.Remove(c)
}

func (cq *cidQueue) Has(c cid.Cid) bool {
func (cq *cidQueue) has(c cid.Cid) bool {
return cq.eset.Has(c)
}

func (cq *cidQueue) Len() int {
func (cq *cidQueue) len() int {
return cq.eset.Len()
}

func (cq *cidQueue) gc() {
if cq.elems.Len() > cq.eset.Len() {
for i := 0; i < cq.elems.Len(); i++ {
c := cq.elems.PopFront()
if cq.eset.Has(c) {
cq.elems.PushBack(c)
}
}
}
}
23 changes: 13 additions & 10 deletions bitswap/client/internal/session/sessionwants.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func newSessionWants(broadcastLimit int) sessionWants {
}

func (sw *sessionWants) String() string {
return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), len(sw.liveWants))
return fmt.Sprintf("%d pending / %d live", sw.toFetch.len(), len(sw.liveWants))
}

// BlocksRequested is called when the client makes a request for blocks
func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
for _, k := range newWants {
sw.toFetch.Push(k)
sw.toFetch.push(k)
}
}

Expand All @@ -56,14 +56,14 @@ func (sw *sessionWants) GetNextWants() []cid.Cid {
// limit)
currentLiveCount := len(sw.liveWants)
toAdd := sw.broadcastLimit - currentLiveCount
liveSize := min(toAdd, sw.toFetch.Len())
liveSize := min(toAdd, sw.toFetch.len())
if liveSize <= 0 {
return nil
}

live := make([]cid.Cid, 0, liveSize)
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
for ; toAdd > 0 && sw.toFetch.len() > 0; toAdd-- {
c := sw.toFetch.pop()
live = append(live, c)
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
Expand All @@ -76,12 +76,13 @@ func (sw *sessionWants) GetNextWants() []cid.Cid {
func (sw *sessionWants) WantsSent(ks []cid.Cid) {
now := time.Now()
for _, c := range ks {
if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
sw.toFetch.Remove(c)
if _, ok := sw.liveWants[c]; !ok && sw.toFetch.has(c) {
sw.toFetch.remove(c)
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}
}
sw.toFetch.gc()
}

// BlocksReceived removes received block CIDs from the live wants list and
Expand All @@ -108,9 +109,10 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)

// Remove the CID from the live wants / toFetch queue
delete(sw.liveWants, c)
sw.toFetch.Remove(c)
sw.toFetch.remove(c)
}
}
sw.toFetch.gc()

// If the live wants ordering array is a long way out of sync with the
// live wants map, clean up the ordering array
Expand Down Expand Up @@ -152,9 +154,10 @@ func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
// CancelPending removes the given CIDs from the fetch queue.
func (sw *sessionWants) CancelPending(keys []cid.Cid) {
for _, k := range keys {
sw.toFetch.Remove(k)
sw.toFetch.remove(k)
delete(sw.liveWants, k)
}
sw.toFetch.gc()
}

// LiveWants returns a list of live wants
Expand Down Expand Up @@ -193,7 +196,7 @@ func (sw *sessionWants) HasLiveWants() bool {
func (sw *sessionWants) isWanted(c cid.Cid) bool {
_, ok := sw.liveWants[c]
if !ok {
ok = sw.toFetch.Has(c)
ok = sw.toFetch.has(c)
}
return ok
}

0 comments on commit 00f508d

Please sign in to comment.