Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#78 from ipfs/fix/lockless
Browse files Browse the repository at this point in the history
make the WantlistManager own the PeerHandler

This commit was moved from ipfs/go-bitswap@401b87d
  • Loading branch information
Stebalien authored Apr 29, 2019
2 parents 30d02fd + f8582a7 commit 1d30b5b
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 47 deletions.
8 changes: 1 addition & 7 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return bsmq.New(ctx, p, network)
}

wm := bswm.New(ctx)
wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
Expand All @@ -115,15 +115,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
}

bs.wm.SetDelegate(bs.pm)
bs.wm.Startup()
bs.pqm.Startup()
network.SetDelegate(bs)
Expand All @@ -144,10 +142,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,

// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
pm *bspm.PeerManager

// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

Expand Down
13 changes: 0 additions & 13 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {

t.Log("Give the blocks to the first instance")

nump := len(instances) - 1
// assert we're properly connected
for _, inst := range instances {
peers := inst.Exchange.pm.ConnectedPeers()
for i := 0; i < 10 && len(peers) != nump; i++ {
time.Sleep(time.Millisecond * 50)
peers = inst.Exchange.pm.ConnectedPeers()
}
if len(peers) != nump {
t.Fatal("not enough peers connected to instance")
}
}

var blkeys []cid.Cid
first := instances[0]
for _, b := range blocks {
Expand Down
20 changes: 1 addition & 19 deletions bitswap/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package peermanager

import (
"context"
"sync"

bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
Expand Down Expand Up @@ -40,8 +39,7 @@ type peerQueueInstance struct {
// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
// peerQueues -- interact through internal utility functions get/set/remove/iterate
peerQueues map[peer.ID]*peerQueueInstance
peerQueuesLk sync.RWMutex
peerQueues map[peer.ID]*peerQueueInstance

createPeerQueue PeerQueueFactory
ctx context.Context
Expand All @@ -58,8 +56,6 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {

// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
pm.peerQueuesLk.RLock()
defer pm.peerQueuesLk.RUnlock()
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
Expand All @@ -70,56 +66,42 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID {
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist) {
pm.peerQueuesLk.Lock()

pq := pm.getOrCreate(p)

if pq.refcnt == 0 {
pq.pq.AddWantlist(initialWants)
}

pq.refcnt++

pm.peerQueuesLk.Unlock()
}

// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.peerQueuesLk.Lock()
pq, ok := pm.peerQueues[p]

if !ok {
pm.peerQueuesLk.Unlock()
return
}

pq.refcnt--
if pq.refcnt > 0 {
pm.peerQueuesLk.Unlock()
return
}

delete(pm.peerQueues, p)
pm.peerQueuesLk.Unlock()

pq.pq.Shutdown()

}

// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(entries []bsmsg.Entry, targets []peer.ID, from uint64) {
if len(targets) == 0 {
pm.peerQueuesLk.RLock()
for _, p := range pm.peerQueues {
p.pq.AddMessage(entries, from)
}
pm.peerQueuesLk.RUnlock()
} else {
for _, t := range targets {
pm.peerQueuesLk.Lock()
pqi := pm.getOrCreate(t)
pm.peerQueuesLk.Unlock()
pqi.pq.AddMessage(entries, from)
}
}
Expand Down
8 changes: 2 additions & 6 deletions bitswap/wantmanager/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type WantManager struct {
}

// New initializes a new WantManager for a given context.
func New(ctx context.Context) *WantManager {
func New(ctx context.Context, peerHandler PeerHandler) *WantManager {
ctx, cancel := context.WithCancel(ctx)
wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
"Number of items in wantlist.").Gauge()
Expand All @@ -63,15 +63,11 @@ func New(ctx context.Context) *WantManager {
bcwl: wantlist.NewSessionTrackedWantlist(),
ctx: ctx,
cancel: cancel,
peerHandler: peerHandler,
wantlistGauge: wantlistGauge,
}
}

// SetDelegate specifies who will send want changes out to the internet.
func (wm *WantManager) SetDelegate(peerHandler PeerHandler) {
wm.peerHandler = peerHandler
}

// WantBlocks adds the given cids to the wantlist, tracked by the given session.
func (wm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) {
log.Infof("want blocks: %s", ks)
Expand Down
3 changes: 1 addition & 2 deletions bitswap/wantmanager/wantmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ func setupTestFixturesAndInitialWantList() (

// setup fixtures
wantSender := &fakePeerHandler{}
wantManager := New(ctx)
wantManager := New(ctx, wantSender)
keys := testutil.GenerateCids(10)
otherKeys := testutil.GenerateCids(5)
peers := testutil.GeneratePeers(10)
session := testutil.GenerateSessionID()
otherSession := testutil.GenerateSessionID()

// startup wantManager
wantManager.SetDelegate(wantSender)
wantManager.Startup()

// add initial wants
Expand Down

0 comments on commit 1d30b5b

Please sign in to comment.