Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Feat: Track Session Peer Latency More Accurately #149

Merged
merged 4 commits into from
Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
defer wg.Done()

bs.updateReceiveCounters(b)
bs.sm.UpdateReceiveCounters(b)
bs.sm.UpdateReceiveCounters(p, b)
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)

// skip received blocks that are not in the wantlist
if !bs.wm.IsWanted(b.Cid()) {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), p)
Expand Down
18 changes: 10 additions & 8 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
notifications "github.com/ipfs/go-bitswap/notifications"
bssd "github.com/ipfs/go-bitswap/sessiondata"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
loggables "github.com/libp2p/go-libp2p-loggables"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
)

const (
Expand All @@ -34,15 +33,16 @@ type WantManager interface {
// requesting more when neccesary.
type PeerManager interface {
FindMorePeers(context.Context, cid.Cid)
GetOptimizedPeers() []peer.ID
GetOptimizedPeers() []bssd.OptimizedPeer
RecordPeerRequests([]peer.ID, []cid.Cid)
RecordPeerResponse(peer.ID, cid.Cid)
RecordCancel(cid.Cid)
}

// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
SplitRequest([]bssd.OptimizedPeer, []cid.Cid) []bssd.PartialRequest
RecordDuplicateBlock()
RecordUniqueBlock()
}
Expand Down Expand Up @@ -141,15 +141,15 @@ func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
case <-s.ctx.Done():
}
ks := []cid.Cid{blk.Cid()}
s.pm.RecordCancel(blk.Cid())
s.wm.CancelWants(s.ctx, ks, nil, s.id)

}

// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
func (s *Session) UpdateReceiveCounters(from peer.ID, blk blocks.Block) {
select {
case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: true}:
case <-s.ctx.Done():
}
}
Expand Down Expand Up @@ -308,7 +308,6 @@ func (s *Session) handleCancel(keys []cid.Cid) {
}

func (s *Session) handleIdleTick(ctx context.Context) {

live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
Expand Down Expand Up @@ -415,6 +414,9 @@ func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
ks := blk.blk.Cid()
if s.pastWants.Has(ks) {
s.srs.RecordDuplicateBlock()
if blk.from != "" {
s.pm.RecordPeerResponse(blk.from, ks)
}
}
}

Expand Down
19 changes: 14 additions & 5 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"testing"
"time"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-bitswap/testutil"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -52,10 +52,14 @@ func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) {
}
}

func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
func (fpm *fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
fpm.lk.Lock()
defer fpm.lk.Unlock()
return fpm.peers
optimizedPeers := make([]bssd.OptimizedPeer, 0, len(fpm.peers))
for _, peer := range fpm.peers {
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: peer, OptimizationRating: 1.0})
}
return optimizedPeers
}

func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
Expand All @@ -64,12 +68,17 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
fpm.peers = append(fpm.peers, p)
fpm.lk.Unlock()
}
func (fpm *fakePeerManager) RecordCancel(c cid.Cid) {}

type fakeRequestSplitter struct {
}

func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
peers := make([]peer.ID, len(optimizedPeers))
for i, optimizedPeer := range optimizedPeers {
peers[i] = optimizedPeer.Peer
}
return []bssd.PartialRequest{bssd.PartialRequest{Peers: peers, Keys: keys}}
}

func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
Expand Down
18 changes: 18 additions & 0 deletions sessiondata/sessiondata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package sessiondata

import (
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)

// OptimizedPeer describes a peer and its level of optimization from 0 to 1.
type OptimizedPeer struct {
Peer peer.ID
OptimizationRating float64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we comment on what this rating means?

}

// PartialRequest is represents one slice of an over request split among peers
type PartialRequest struct {
Peers []peer.ID
Keys []cid.Cid
}
6 changes: 3 additions & 3 deletions sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Session interface {
exchange.Fetcher
InterestedIn(cid.Cid) bool
ReceiveBlockFrom(peer.ID, blocks.Block)
UpdateReceiveCounters(blocks.Block)
UpdateReceiveCounters(peer.ID, blocks.Block)
}

type sesTrk struct {
Expand Down Expand Up @@ -128,11 +128,11 @@ func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {

// UpdateReceiveCounters records the fact that a block was received, allowing
// sessions to track duplicates
func (sm *SessionManager) UpdateReceiveCounters(blk blocks.Block) {
func (sm *SessionManager) UpdateReceiveCounters(from peer.ID, blk blocks.Block) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()

for _, s := range sm.sessions {
s.session.UpdateReceiveCounters(blk)
s.session.UpdateReceiveCounters(from, blk)
}
}
13 changes: 7 additions & 6 deletions sessionmanager/sessionmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"testing"
"time"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"

bssession "github.com/ipfs/go-bitswap/session"
bssd "github.com/ipfs/go-bitswap/sessiondata"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand All @@ -30,23 +30,24 @@ func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
return nil, nil
}
func (fs *fakeSession) InterestedIn(cid.Cid) bool { return fs.interested }
func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block) { fs.receivedBlock = true }
func (fs *fakeSession) UpdateReceiveCounters(blocks.Block) { fs.updateReceiveCounters = true }
func (fs *fakeSession) InterestedIn(cid.Cid) bool { return fs.interested }
func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block) { fs.receivedBlock = true }
func (fs *fakeSession) UpdateReceiveCounters(peer.ID, blocks.Block) { fs.updateReceiveCounters = true }

type fakePeerManager struct {
id uint64
}

func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid) {}
func (*fakePeerManager) GetOptimizedPeers() []peer.ID { return nil }
func (*fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { return nil }
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (*fakePeerManager) RecordPeerResponse(peer.ID, cid.Cid) {}
func (*fakePeerManager) RecordCancel(c cid.Cid) {}

type fakeRequestSplitter struct {
}

func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
Expand Down
75 changes: 75 additions & 0 deletions sessionpeermanager/latencytracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package sessionpeermanager

import (
"time"

"github.com/ipfs/go-cid"
)

type requestData struct {
startedAt time.Time
wasCancelled bool
timeoutFunc *time.Timer
}

type latencyTracker struct {
requests map[cid.Cid]*requestData
}

func newLatencyTracker() *latencyTracker {
return &latencyTracker{requests: make(map[cid.Cid]*requestData)}
}

type afterTimeoutFunc func(cid.Cid)

func (lt *latencyTracker) SetupRequests(keys []cid.Cid, timeoutDuration time.Duration, afterTimeout afterTimeoutFunc) {
startedAt := time.Now()
for _, k := range keys {
if _, ok := lt.requests[k]; !ok {
lt.requests[k] = &requestData{
startedAt,
false,
time.AfterFunc(timeoutDuration, makeAfterTimeout(afterTimeout, k)),
}
}
}
}

func makeAfterTimeout(afterTimeout afterTimeoutFunc, k cid.Cid) func() {
return func() { afterTimeout(k) }
}

func (lt *latencyTracker) CheckDuration(key cid.Cid) (time.Duration, bool) {
request, ok := lt.requests[key]
var latency time.Duration
if ok {
latency = time.Now().Sub(request.startedAt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: time.Since(request.startedAt)

}
return latency, ok
}

func (lt *latencyTracker) RemoveRequest(key cid.Cid) {
request, ok := lt.requests[key]
if ok {
request.timeoutFunc.Stop()
delete(lt.requests, key)
}
}

func (lt *latencyTracker) RecordCancel(key cid.Cid) {
request, ok := lt.requests[key]
if ok {
request.wasCancelled = true
}
}

func (lt *latencyTracker) WasCancelled(key cid.Cid) bool {
request, ok := lt.requests[key]
return ok && request.wasCancelled
}

func (lt *latencyTracker) Shutdown() {
for _, request := range lt.requests {
request.timeoutFunc.Stop()
}
}
41 changes: 41 additions & 0 deletions sessionpeermanager/peerdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package sessionpeermanager

import (
"time"

"github.com/ipfs/go-cid"
)

const (
newLatencyWeight = 0.5
)

type peerData struct {
hasLatency bool
latency time.Duration
lt *latencyTracker
}

func newPeerData() *peerData {
return &peerData{
hasLatency: false,
lt: newLatencyTracker(),
latency: 0,
}
}

func (pd *peerData) AdjustLatency(k cid.Cid, hasFallbackLatency bool, fallbackLatency time.Duration) {
latency, hasLatency := pd.lt.CheckDuration(k)
pd.lt.RemoveRequest(k)
if !hasLatency {
latency, hasLatency = fallbackLatency, hasFallbackLatency
}
if hasLatency {
if pd.hasLatency {
pd.latency = time.Duration(float64(pd.latency)*(1.0-newLatencyWeight) + float64(latency)*newLatencyWeight)
} else {
pd.latency = latency
pd.hasLatency = true
}
}
}
Loading