Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Fix bug with signaling peer availability to sessions #247

Merged
merged 1 commit into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
pm.pwm.RemovePeer(p)
}

// BroadcastWantHaves broadcasts want-haves to all peers (used by the session
// to discover seeds).
// For each peer it filters out want-haves that have previously been sent to
// the peer.
func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
Expand All @@ -140,6 +144,8 @@ func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.C
}
}

// SendWants sends the given want-blocks and want-haves to the given peer.
// It filters out wants that have previously been sent to the peer.
func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
Expand All @@ -150,6 +156,8 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci
}
}

// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
Expand All @@ -162,13 +170,15 @@ func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
}
}

// CurrentWants returns the list of pending want-blocks
func (pm *PeerManager) CurrentWants() []cid.Cid {
pm.pqLk.RLock()
defer pm.pqLk.RUnlock()

return pm.pwm.GetWantBlocks()
}

// CurrentWantHaves returns the list of pending want-haves
func (pm *PeerManager) CurrentWantHaves() []cid.Cid {
pm.pqLk.RLock()
defer pm.pqLk.RUnlock()
Expand All @@ -187,6 +197,8 @@ func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
return pqi
}

// RegisterSession tells the PeerManager that the given session is interested
// in events about the given peer.
func (pm *PeerManager) RegisterSession(p peer.ID, s Session) bool {
pm.psLk.Lock()
defer pm.psLk.Unlock()
Expand All @@ -204,6 +216,8 @@ func (pm *PeerManager) RegisterSession(p peer.ID, s Session) bool {
return ok
}

// UnregisterSession tells the PeerManager that the given session is no longer
// interested in PeerManager events.
func (pm *PeerManager) UnregisterSession(ses uint64) {
pm.psLk.Lock()
defer pm.psLk.Unlock()
Expand All @@ -218,12 +232,16 @@ func (pm *PeerManager) UnregisterSession(ses uint64) {
delete(pm.sessions, ses)
}

// signalAvailability is called when a peer's connectivity changes.
// It informs interested sessions.
func (pm *PeerManager) signalAvailability(p peer.ID, isConnected bool) {
for p, sesIds := range pm.peerSessions {
for sesId := range sesIds {
if s, ok := pm.sessions[sesId]; ok {
s.SignalAvailability(p, isConnected)
}
sesIds, ok := pm.peerSessions[p]
if !ok {
return
}
for sesId := range sesIds {
if s, ok := pm.sessions[sesId]; ok {
s.SignalAvailability(p, isConnected)
}
}
}
15 changes: 13 additions & 2 deletions internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ func TestSessionRegistration(t *testing.T) {
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)

tp := testutil.GeneratePeers(2)
self, p1 := tp[0], tp[1]
tp := testutil.GeneratePeers(3)
self, p1, p2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)

id := uint64(1)
Expand All @@ -282,16 +282,27 @@ func TestSessionRegistration(t *testing.T) {
if s.available[p1] {
t.Fatal("Expected peer not be available till connected")
}
peerManager.RegisterSession(p2, s)
if s.available[p2] {
t.Fatal("Expected peer not be available till connected")
}

peerManager.Connected(p1, nil)
if !s.available[p1] {
t.Fatal("Expected signal callback")
}
peerManager.Connected(p2, nil)
if !s.available[p2] {
t.Fatal("Expected signal callback")
}

peerManager.Disconnected(p1)
if s.available[p1] {
t.Fatal("Expected signal callback")
}
if !s.available[p2] {
t.Fatal("Expected signal callback only for disconnected peer")
}

peerManager.UnregisterSession(id)

Expand Down