Skip to content

Commit

Permalink
fix: use one less go-routine per session (ipfs#377)
Browse files Browse the repository at this point in the history
* fix: use one less go-routine per session

* fix: send cancel when GetBlocks() is cancelled (ipfs#383)

* fix: send cancel when GetBlocks() is cancelled

* fix: in SessionManager shutdown nil out sessions

* fix: sessionWantSender perf

* make sessionWantSender.SignalAvailability() non-blocking

* Refactor SessionInterestManager (ipfs#384)

* refactor: customize SessionInterestManager

* refactor: SessionInterestManager perf

This commit was moved from ipfs/go-bitswap@a2dd024
  • Loading branch information
dirkmc authored May 1, 2020
1 parent 62d249d commit 6c9536b
Show file tree
Hide file tree
Showing 12 changed files with 538 additions and 497 deletions.
9 changes: 7 additions & 2 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,19 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pqm := bspqm.New(ctx, network)

sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager,
sessionFactory := func(
sessctx context.Context,
sessmgr bssession.SessionManager,
id uint64,
spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
Expand Down Expand Up @@ -193,6 +197,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
// do it over here to avoid closing before all setup is done.
go func() {
<-px.Closing() // process closes first
sm.Shutdown()
cancelFunc()
notif.Shutdown()
}()
Expand Down
10 changes: 10 additions & 0 deletions bitswap/internal/blockpresencemanager/blockpresencemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,13 @@ func (bpm *BlockPresenceManager) RemoveKeys(ks []cid.Cid) {
delete(bpm.presence, c)
}
}

// HasKey indicates whether the BlockPresenceManager is tracking the given key
// (used by the tests)
func (bpm *BlockPresenceManager) HasKey(c cid.Cid) bool {
bpm.Lock()
defer bpm.Unlock()

_, ok := bpm.presence[c]
return ok
}
47 changes: 26 additions & 21 deletions bitswap/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ type PeerManager interface {
SendCancels(context.Context, []cid.Cid)
}

// SessionManager manages all the sessions
type SessionManager interface {
// Remove a session (called when the session shuts down)
RemoveSession(sesid uint64)
// Cancel wants (called when a call to GetBlocks() is cancelled)
CancelSessionWants(sid uint64, wants []cid.Cid)
}

// SessionPeerManager keeps track of peers in the session
type SessionPeerManager interface {
// PeersDiscovered indicates if any peers have been discovered yet
Expand Down Expand Up @@ -91,10 +99,10 @@ type op struct {
// info to, and who to request blocks from.
type Session struct {
// dependencies
bsctx context.Context // context for bitswap
ctx context.Context // context for session
ctx context.Context
shutdown func()
sm SessionManager
pm PeerManager
bpm *bsbpm.BlockPresenceManager
sprm SessionPeerManager
providerFinder ProviderFinder
sim *bssim.SessionInterestManager
Expand Down Expand Up @@ -126,8 +134,8 @@ type Session struct {
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(
bsctx context.Context, // context for bitswap
ctx context.Context, // context for this session
ctx context.Context,
sm SessionManager,
id uint64,
sprm SessionPeerManager,
providerFinder ProviderFinder,
Expand All @@ -138,13 +146,15 @@ func New(
initialSearchDelay time.Duration,
periodicSearchDelay delay.D,
self peer.ID) *Session {

ctx, cancel := context.WithCancel(ctx)
s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
bsctx: bsctx,
ctx: ctx,
shutdown: cancel,
sm: sm,
pm: pm,
bpm: bpm,
sprm: sprm,
providerFinder: providerFinder,
sim: sim,
Expand All @@ -158,7 +168,7 @@ func New(
periodicSearchDelay: periodicSearchDelay,
self: self,
}
s.sws = newSessionWantSender(id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
s.sws = newSessionWantSender(id, pm, sprm, sm, bpm, s.onWantsSent, s.onPeersExhausted)

go s.run(ctx)

Expand All @@ -169,6 +179,10 @@ func (s *Session) ID() uint64 {
return s.id
}

func (s *Session) Shutdown() {
s.shutdown()
}

// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// The SessionManager tells each Session about all keys that it may be
Expand Down Expand Up @@ -295,6 +309,7 @@ func (s *Session) run(ctx context.Context) {
case opCancel:
// Wants were cancelled
s.sw.CancelPending(oper.keys)
s.sws.Cancel(oper.keys)
case opWantsSent:
// Wants were sent to a peer
s.sw.WantsSent(oper.keys)
Expand Down Expand Up @@ -389,19 +404,9 @@ func (s *Session) handleShutdown() {
// Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending)
s.sws.Shutdown()

// Remove session's interest in the given blocks.
cancelKs := s.sim.RemoveSessionInterest(s.id)

// Free up block presence tracking for keys that no session is interested
// in anymore
s.bpm.RemoveKeys(cancelKs)

// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context has already been
// cancelled.
s.pm.SendCancels(s.bsctx, cancelKs)
// Signal to the SessionManager that the session has been shutdown
// and can be cleaned up
s.sm.RemoveSession(s.id)
}

// handleReceive is called when the session receives blocks from a peer
Expand Down
109 changes: 86 additions & 23 deletions bitswap/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,40 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)

type mockSessionMgr struct {
lk sync.Mutex
removeSession bool
cancels []cid.Cid
}

func newMockSessionMgr() *mockSessionMgr {
return &mockSessionMgr{}
}

func (msm *mockSessionMgr) removeSessionCalled() bool {
msm.lk.Lock()
defer msm.lk.Unlock()
return msm.removeSession
}

func (msm *mockSessionMgr) cancelled() []cid.Cid {
msm.lk.Lock()
defer msm.lk.Unlock()
return msm.cancels
}

func (msm *mockSessionMgr) RemoveSession(sesid uint64) {
msm.lk.Lock()
defer msm.lk.Unlock()
msm.removeSession = true
}

func (msm *mockSessionMgr) CancelSessionWants(sid uint64, wants []cid.Cid) {
msm.lk.Lock()
defer msm.lk.Unlock()
msm.cancels = append(msm.cancels, wants...)
}

func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
return bsspm.New(1, newFakePeerTagger())
}
Expand Down Expand Up @@ -61,8 +95,6 @@ type wantReq struct {

type fakePeerManager struct {
wantReqs chan wantReq
lk sync.Mutex
cancels []cid.Cid
}

func newFakePeerManager() *fakePeerManager {
Expand All @@ -82,16 +114,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci
case <-ctx.Done():
}
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
pm.lk.Lock()
defer pm.lk.Unlock()
pm.cancels = append(pm.cancels, cancels...)
}
func (pm *fakePeerManager) allCancels() []cid.Cid {
pm.lk.Lock()
defer pm.lk.Unlock()
return append([]cid.Cid{}, pm.cancels...)
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand All @@ -103,7 +126,8 @@ func TestSessionGetBlocks(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
Expand Down Expand Up @@ -181,9 +205,9 @@ func TestSessionGetBlocks(t *testing.T) {

time.Sleep(10 * time.Millisecond)

// Verify wants were cancelled
if len(fpm.allCancels()) != len(blks) {
t.Fatal("expected cancels to be sent for all wants")
// Verify session was removed
if !sm.removeSessionCalled() {
t.Fatal("expected session to be removed")
}
}

Expand All @@ -198,7 +222,8 @@ func TestSessionFindMorePeers(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
Expand Down Expand Up @@ -272,7 +297,8 @@ func TestSessionOnPeersExhausted(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
var cids []cid.Cid
Expand Down Expand Up @@ -316,7 +342,8 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4)
var cids []cid.Cid
Expand Down Expand Up @@ -428,10 +455,11 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
sm := newMockSessionMgr()

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")

timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer timerCancel()
Expand Down Expand Up @@ -459,10 +487,44 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
case <-timerCtx.Done():
t.Fatal("expected channel to be closed before timeout")
}

time.Sleep(10 * time.Millisecond)

// Expect RemoveSession to be called
if !sm.removeSessionCalled() {
t.Fatal("expected onShutdown to be called")
}
}

func TestSessionOnShutdownCalled(t *testing.T) {
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
sim := bssim.New()
bpm := bsbpm.New()
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
sm := newMockSessionMgr()

// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer sesscancel()
session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")

// Shutdown the session
session.Shutdown()

time.Sleep(10 * time.Millisecond)

// Expect RemoveSession to be called
if !sm.removeSessionCalled() {
t.Fatal("expected onShutdown to be called")
}
}

func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Millisecond)
func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) {
ctx, cancelCtx := context.WithTimeout(context.Background(), 20*time.Millisecond)
fpm := newFakePeerManager()
fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder()
Expand All @@ -472,7 +534,8 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
sm := newMockSessionMgr()
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(2)
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}
Expand Down
Loading

0 comments on commit 6c9536b

Please sign in to comment.