Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
refactor: improve sessionWants perf
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Mar 12, 2020
1 parent b83a609 commit 73261ec
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 30 deletions.
85 changes: 57 additions & 28 deletions internal/session/sessionwants.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,34 @@ import (
cid "github.com/ipfs/go-cid"
)

// liveWantsOrder and liveWants will get out of sync as blocks are received.
// This constant is the maximum amount to allow them to be out of sync before
// cleaning up the ordering array.
const liveWantsOrderGCLimit = 32

// sessionWants keeps track of which cids are waiting to be sent out, and which
// peers are "live" - ie, we've sent a request but haven't received a block yet
type sessionWants struct {
// The wants that have not yet been sent out
toFetch *cidQueue
// Wants that have been sent but have not received a response
liveWants *cidQueue
// The time at which live wants were sent
sentAt map[cid.Cid]time.Time
liveWants map[cid.Cid]time.Time
// The order in which wants were requested
liveWantsOrder []cid.Cid
// The maximum number of want-haves to send in a broadcast
broadcastLimit int
}

func newSessionWants(broadcastLimit int) sessionWants {
return sessionWants{
toFetch: newCidQueue(),
liveWants: newCidQueue(),
sentAt: make(map[cid.Cid]time.Time),
liveWants: make(map[cid.Cid]time.Time),
broadcastLimit: broadcastLimit,
}
}

func (sw *sessionWants) String() string {
return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), sw.liveWants.Len())
return fmt.Sprintf("%d pending / %d live", sw.toFetch.Len(), len(sw.liveWants))
}

// BlocksRequested is called when the client makes a request for blocks
Expand All @@ -48,16 +52,17 @@ func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
func (sw *sessionWants) GetNextWants() []cid.Cid {
now := time.Now()

// Move CIDs from fetch queue to the live wants queue (up to the limit)
currentLiveCount := sw.liveWants.Len()
// Move CIDs from fetch queue to the live wants queue (up to the broadcast
// limit)
currentLiveCount := len(sw.liveWants)
toAdd := sw.broadcastLimit - currentLiveCount

var live []cid.Cid
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
sw.liveWants.Push(c)
sw.sentAt[c] = now
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}

return live
Expand All @@ -67,10 +72,10 @@ func (sw *sessionWants) GetNextWants() []cid.Cid {
func (sw *sessionWants) WantsSent(ks []cid.Cid) {
now := time.Now()
for _, c := range ks {
if _, ok := sw.sentAt[c]; !ok && sw.toFetch.Has(c) {
if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
sw.toFetch.Remove(c)
sw.liveWants.Push(c)
sw.sentAt[c] = now
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}
}
}
Expand All @@ -85,38 +90,57 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
return wanted, totalLatency
}

// Filter for blocks that were actually wanted (as opposed to duplicates)
now := time.Now()
for _, c := range ks {
if sw.isWanted(c) {
wanted = append(wanted, c)

// Measure latency
sentAt, ok := sw.sentAt[c]
sentAt, ok := sw.liveWants[c]
if ok && !sentAt.IsZero() {
totalLatency += now.Sub(sentAt)
}

// Remove the CID from the live wants / toFetch queue
sw.liveWants.Remove(c)
delete(sw.sentAt, c)
delete(sw.liveWants, c)
sw.toFetch.Remove(c)
}
}

// If the live wants ordering array is a long way out of sync with the
// live wants map, clean up the ordering array
if len(sw.liveWantsOrder)-len(sw.liveWants) > liveWantsOrderGCLimit {
cleaned := sw.liveWantsOrder[:0]
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
cleaned = append(cleaned, c)
}
}
sw.liveWantsOrder = cleaned
}

return wanted, totalLatency
}

// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs up to the broadcast limit.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
now := time.Now()
live := sw.liveWants.Cids()
if len(live) > sw.broadcastLimit {
live = live[:sw.broadcastLimit]
}
for _, c := range live {
sw.sentAt[c] = now
live := make([]cid.Cid, 0, len(sw.liveWants))
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
// No response was received for the want, so reset the sent time
// to now as we're about to broadcast
sw.liveWants[c] = now

live = append(live, c)
if len(live) == sw.broadcastLimit {
break
}
}
}

return live
}

Expand All @@ -129,18 +153,23 @@ func (sw *sessionWants) CancelPending(keys []cid.Cid) {

// LiveWants returns a list of live wants
func (sw *sessionWants) LiveWants() []cid.Cid {
return sw.liveWants.Cids()
live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants {
live = append(live, c)
}

return live
}

// RandomLiveWant returns a randomly selected live want
func (sw *sessionWants) RandomLiveWant() cid.Cid {
if len(sw.sentAt) == 0 {
if len(sw.liveWants) == 0 {
return cid.Cid{}
}

// picking a random live want
i := rand.Intn(len(sw.sentAt))
for k := range sw.sentAt {
i := rand.Intn(len(sw.liveWants))
for k := range sw.liveWants {
if i == 0 {
return k
}
Expand All @@ -151,12 +180,12 @@ func (sw *sessionWants) RandomLiveWant() cid.Cid {

// Has live wants indicates if there are any live wants
func (sw *sessionWants) HasLiveWants() bool {
return sw.liveWants.Len() > 0
return len(sw.liveWants) > 0
}

// Indicates whether the want is in either of the fetch or live queues
func (sw *sessionWants) isWanted(c cid.Cid) bool {
ok := sw.liveWants.Has(c)
_, ok := sw.liveWants[c]
if !ok {
ok = sw.toFetch.Has(c)
}
Expand Down
24 changes: 22 additions & 2 deletions internal/session/sessionwants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestPrepareBroadcast(t *testing.T) {
// Add 6 new wants
// toFetch Live
// 543210
sw.BlocksRequested(cids[0:6])
sw.BlocksRequested(cids[:6])

// Get next wants with a limit of 3
// The first 3 cids should go move into the live queue
Expand All @@ -139,7 +139,7 @@ func TestPrepareBroadcast(t *testing.T) {

// One block received
// Remove a cid from the live queue
sw.BlocksReceived(cids[0:1])
sw.BlocksReceived(cids[:1])
// toFetch Live
// 543 21_

Expand Down Expand Up @@ -167,3 +167,23 @@ func TestPrepareBroadcast(t *testing.T) {
}
}
}

// Test that even after GC broadcast returns correct wants
func TestPrepareBroadcastAfterGC(t *testing.T) {
sw := newSessionWants(5)
cids := testutil.GenerateCids(liveWantsOrderGCLimit * 2)

sw.BlocksRequested(cids)

// Trigger a sessionWants internal GC of the live wants
sw.BlocksReceived(cids[:liveWantsOrderGCLimit+1])
cids = cids[:liveWantsOrderGCLimit+1]

// Broadcast should contain wants in order
ws := sw.PrepareBroadcast()
for i, c := range ws {
if !c.Equals(cids[i]) {
t.Fatal("broadcast should always return wants in order")
}
}
}

0 comments on commit 73261ec

Please sign in to comment.