Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Implement ReputationManager #581

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-bitswap/client/internal/notifications"
bspm "github.com/ipfs/go-bitswap/client/internal/peermanager"
bspqm "github.com/ipfs/go-bitswap/client/internal/providerquerymanager"
bsrm "github.com/ipfs/go-bitswap/client/internal/reputationmanager"
bssession "github.com/ipfs/go-bitswap/client/internal/session"
bssim "github.com/ipfs/go-bitswap/client/internal/sessioninterestmanager"
bssm "github.com/ipfs/go-bitswap/client/internal/sessionmanager"
Expand Down Expand Up @@ -88,6 +89,16 @@ type BlockReceivedNotifier interface {
ReceivedBlocks(peer.ID, []blocks.Block)
}

func WithReputationManager(params *ReputationManagerParams, thresholds *ReputationManagerThresholds, scoreKeeper ReputationManagerScoreKeeper) Option {
return func(c *Client) {
c.rm = bsrm.NewReputationManager(params, thresholds, scoreKeeper)
}
}

type ReputationManagerParams = bsrm.ReputationManagerParams
type ReputationManagerThresholds = bsrm.ReputationManagerThresholds
type ReputationManagerScoreKeeper = bsrm.ScoreKeeper

// New initializes a Bitswap client that runs until client.Close is called.
func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Client {
// important to use provided parent context (since it may include important
Expand Down Expand Up @@ -165,6 +176,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
option(bs)
}

go bs.rm.Start(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIT it's just fancy counters, sounds like a thing you could do with locks.
Is one more goroutine really needed ?

bs.pqm.Startup()

// bind the context and process.
Expand All @@ -184,6 +196,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
type Client struct {
pm *bspm.PeerManager

rm *bsrm.ReputationManager

// the provider query manager manages requests to find providers
pqm *bspqm.ProviderQueryManager

Expand Down Expand Up @@ -255,6 +269,7 @@ func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error)
func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()
bs.rm.AddWants(keys)
session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return session.GetBlocks(ctx, keys)
}
Expand Down Expand Up @@ -302,6 +317,13 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}

if from != "" {
presences := make([]cid.Cid, 0, len(haves)+len(dontHaves))
presences = append(presences, haves...)
presences = append(presences, dontHaves...)
bs.rm.ReceivedFrom(from, blks, presences)
}

allKs := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
allKs = append(allKs, b.Cid())
Expand Down Expand Up @@ -338,6 +360,10 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl
// ReceiveMessage is called by the network interface when a new message is
// received.
func (bs *Client) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
if !bs.rm.AcceptFrom(p) {
return
}

bs.counterLk.Lock()
bs.counters.messagesRecvd++
bs.counterLk.Unlock()
Expand Down Expand Up @@ -424,12 +450,14 @@ func (bs *Client) blockstoreHas(blks []blocks.Block) []bool {
// when a peer initiates a new connection to bitswap.
func (bs *Client) PeerConnected(p peer.ID) {
bs.pm.Connected(p)
bs.rm.PeerConnected(p)
}

// PeerDisconnected is called by the network interface when a peer
// closes a connection
func (bs *Client) PeerDisconnected(p peer.ID) {
bs.pm.Disconnected(p)
bs.rm.PeerDisconnected(p)
}

// ReceiveError is called by the network interface when an error happens
Expand Down
90 changes: 90 additions & 0 deletions client/internal/reputationmanager/recentwantlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package reputationmanager

import (
"context"
"sync"
"time"

"github.com/AndreasBriese/bbloom"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

type recentWantList struct {
params *recentWantListParams
oldWants bbloom.Bloom
newWants bbloom.Bloom
lk sync.RWMutex
}

type recentWantListParams struct {
numEntries float64
wrongPositives float64
refreshInterval time.Duration
}

func newRecentWantList(params *recentWantListParams) *recentWantList {
r := &recentWantList{
params: params,
oldWants: bbloom.New(params.numEntries, params.wrongPositives),
newWants: bbloom.New(params.numEntries, params.wrongPositives),
}

return r
}

func (r *recentWantList) loop(ctx context.Context) {
ticker := time.NewTicker(r.params.refreshInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.swapWantLists()
}
}
}

func (r *recentWantList) swapWantLists() {
r.lk.Lock()
defer r.lk.Unlock()

r.oldWants = r.newWants
r.newWants = bbloom.New(r.params.numEntries, r.params.wrongPositives)
}

func (r *recentWantList) addWants(cids []cid.Cid) {
r.lk.Lock()
defer r.lk.Unlock()

for _, c := range cids {
r.newWants.Add(c.Bytes())
}
}

func (r *recentWantList) splitRecentlyWanted(blks []blocks.Block, presences []cid.Cid) (wantedBlks []blocks.Block, unwantedBlks []blocks.Block, wantedPresences []cid.Cid, unwantedPresences []cid.Cid) {
r.lk.RLock()
defer r.lk.RUnlock()

for _, blk := range blks {
entry := blk.Cid().Bytes()
if r.newWants.Has(entry) || r.oldWants.Has(entry) {
wantedBlks = append(wantedBlks, blk)
} else {
unwantedBlks = append(unwantedBlks, blk)
}
}

for _, presence := range presences {
entry := presence.Bytes()
if r.newWants.Has(entry) || r.oldWants.Has(entry) {
wantedPresences = append(wantedPresences, presence)
} else {
unwantedPresences = append(unwantedPresences, presence)
}
}

return wantedBlks, unwantedBlks, wantedPresences, unwantedPresences
}
98 changes: 98 additions & 0 deletions client/internal/reputationmanager/reputationmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package reputationmanager

import (
"context"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)

type ReputationManager struct {
rwl *recentWantList
scoreKeeper ScoreKeeper
params *ReputationManagerParams
thresholds *ReputationManagerThresholds
}

type ReputationManagerParams struct {
RetainScore time.Duration
ScoreRefreshInterval time.Duration
}

type ReputationManagerThresholds struct {
GrayListThreshold float64
}

type ScoreKeeper interface {
Score(p peer.ID) float64
Update(p peer.ID, wantedBlks, unwantedBlks []blocks.Block, wantedPresences, unwantedPresences []cid.Cid)
PeerConnected(p peer.ID)
PeerDisconnected(p peer.ID)
}

func NewReputationManager(params *ReputationManagerParams, thresholds *ReputationManagerThresholds, scoreKeeper ScoreKeeper) *ReputationManager {
rm := &ReputationManager{
params: params,
thresholds: thresholds,
scoreKeeper: scoreKeeper,
// TODO: make these params configurable
rwl: newRecentWantList(&recentWantListParams{
numEntries: float64(1 << 12),
wrongPositives: float64(0.01),
refreshInterval: 2 * time.Minute,
}),
}

return rm
}

func (r *ReputationManager) AcceptFrom(pid peer.ID) bool {
if r == nil {
return true
}

return r.scoreKeeper.Score(pid) > r.thresholds.GrayListThreshold
}

func (r *ReputationManager) AddWants(cids []cid.Cid) {
if r == nil {
return
}

r.rwl.addWants(cids)
}

func (r *ReputationManager) ReceivedFrom(pid peer.ID, blks []blocks.Block, presences []cid.Cid) {
if r == nil {
return
}

wantedBlks, unwantedBlks, wantedPresences, unwantedPresences := r.rwl.splitRecentlyWanted(blks, presences)
r.scoreKeeper.Update(pid, wantedBlks, unwantedBlks, wantedPresences, unwantedPresences)
}

func (r *ReputationManager) PeerConnected(pid peer.ID) {
if r == nil {
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this, if I call this incorrectly I want to know about it so I can fix the bug. (fail fast & fail loudly)
I like when this check is explicit in the consumer.
For example tracing:

go-bitswap/bitswap.go

Lines 175 to 177 in bc4fd4a

if bs.tracer != nil {
bs.tracer.MessageReceived(p, incoming)
}

(I know tracing is virtual so we couldn't do the same thing you did here but anyway)

Applies to this in all other methods.


r.scoreKeeper.PeerConnected(pid)
}

func (r *ReputationManager) PeerDisconnected(pid peer.ID) {
if r == nil {
return
}

r.scoreKeeper.PeerDisconnected(pid)
}

func (r *ReputationManager) Start(ctx context.Context) {
if r == nil {
return
}

go r.rwl.loop(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already used a goroutine earlier (I don't mind where you start it, just once not twice).

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (sim *SessionInterestManager) RecordSessionInterest(ses uint64, ks []cid.Ci
}
}

// When the session shuts down it calls RemoveSessionInterest().
// When the session shuts down it calls RemoveSession().
// Returns the keys that no session is interested in any more.
func (sim *SessionInterestManager) RemoveSession(ses uint64) []cid.Cid {
sim.lk.Lock()
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
)

require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/btcsuite/btcd v0.21.0-beta // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
Expand All @@ -54,7 +55,9 @@ require (
github.com/libp2p/go-addr-util v0.0.2 // indirect
github.com/libp2p/go-eventbus v0.2.1 // indirect
github.com/libp2p/go-libp2p-autonat v0.4.2 // indirect
github.com/libp2p/go-libp2p-crypto v0.1.0 // indirect
github.com/libp2p/go-libp2p-nat v0.0.6 // indirect
github.com/libp2p/go-libp2p-peer v0.2.0 // indirect
github.com/libp2p/go-libp2p-peerstore v0.2.7 // indirect
github.com/libp2p/go-libp2p-record v0.1.0 // indirect
github.com/libp2p/go-nat v0.0.5 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
Expand Down Expand Up @@ -422,6 +424,7 @@ github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB
github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.5 h1:aEgbIcPGsKy6zYcC+5AJivYFedhYa4sW7mIpWpUaLKw=
github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw=
Expand All @@ -443,6 +446,7 @@ github.com/libp2p/go-libp2p-netutil v0.1.0 h1:zscYDNVEcGxyUpMd0JReUZTrpMfia8PmLK
github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCThNdbQD54k3TqjpbFU=
github.com/libp2p/go-libp2p-noise v0.2.0 h1:wmk5nhB9a2w2RxMOyvsoKjizgJOEaJdfAakr0jN8gds=
github.com/libp2p/go-libp2p-noise v0.2.0/go.mod h1:IEbYhBBzGyvdLBoxxULL/SGbJARhUeqlO8lVSREYu2Q=
github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUjer50DsY=
github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY=
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1ciXAXV397W6h1GH/uKI=
Expand Down