Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#29 from ipfs/feat/extract-managers
Browse files Browse the repository at this point in the history
Bitswap Refactor ipfs#2: Extract PeerManager From Want Manager + Unit Test

This commit was moved from ipfs/go-bitswap@e9b80b6
  • Loading branch information
hannahhoward authored Dec 11, 2018
2 parents 8418da5 + 7c88030 commit 86c73e9
Show file tree
Hide file tree
Showing 10 changed files with 1,146 additions and 267 deletions.
36 changes: 27 additions & 9 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

decision "github.com/ipfs/go-bitswap/decision"
bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
bspm "github.com/ipfs/go-bitswap/peermanager"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"

Expand Down Expand Up @@ -85,12 +87,19 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
" data blocks recived").Histogram(metricsBuckets)

sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
" this bitswap").Histogram(metricsBuckets)

notif := notifications.New()
px := process.WithTeardown(func() error {
notif.Shutdown()
return nil
})

peerQueueFactory := func(p peer.ID) bspm.PeerQueue {
return bsmq.New(p, network)
}

bs := &Bitswap{
blockstore: bstore,
notifications: notif,
Expand All @@ -100,14 +109,18 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: bswm.New(ctx, network),
wm: bswm.New(ctx),
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(),
counters: new(counters),

dupMetric: dupHist,
allMetric: allHist,
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
}
go bs.wm.Run()

bs.wm.SetDelegate(bs.pm)
bs.pm.Startup()
bs.wm.Startup()
network.SetDelegate(bs)

// Start up bitswaps async worker routines
Expand All @@ -128,6 +141,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
pm *bspm.PeerManager

// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

// the engine is the bit of logic that decides who to send which blocks to
Expand Down Expand Up @@ -160,8 +176,9 @@ type Bitswap struct {
counters *counters

// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram
dupMetric metrics.Histogram
allMetric metrics.Histogram
sentHistogram metrics.Histogram

// the sessionmanager manages tracking sessions
sm *bssm.SessionManager
Expand Down Expand Up @@ -427,13 +444,14 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {

// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p)
initialWants := bs.wm.CurrentBroadcastWants()
bs.pm.Connected(p, initialWants)
bs.engine.PeerConnected(p)
}

// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.pm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}

Expand Down
4 changes: 2 additions & 2 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
nump := len(instances) - 1
// assert we're properly connected
for _, inst := range instances {
peers := inst.Exchange.wm.ConnectedPeers()
peers := inst.Exchange.pm.ConnectedPeers()
for i := 0; i < 10 && len(peers) != nump; i++ {
time.Sleep(time.Millisecond * 50)
peers = inst.Exchange.wm.ConnectedPeers()
peers = inst.Exchange.pm.ConnectedPeers()
}
if len(peers) != nump {
t.Fatal("not enough peers connected to instance")
Expand Down
Loading

0 comments on commit 86c73e9

Please sign in to comment.