Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

wait for sessionWantSender to shutdown before completing session shutdown #317

Merged
merged 3 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ func (s *Session) handleShutdown() {
s.idleTick.Stop()
// Shut down the session peer manager
s.sprm.Shutdown()
// Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending)
s.sws.Shutdown()
// Remove the session from the want manager
s.wm.RemoveSession(s.ctx, s.id)
}
Expand Down
30 changes: 21 additions & 9 deletions internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ type onPeersExhaustedFn func([]cid.Cid)
// consults the peer response tracker (records which peers sent us blocks).
//
type sessionWantSender struct {
// When the context is cancelled, sessionWantSender shuts down
// The context is used when sending wants
ctx context.Context
// The sessionWantSender uses these channels when it's shutting down
closing chan struct{}
closed chan struct{}
// The session ID
sessionID uint64
// A channel that collects incoming changes (events)
Expand Down Expand Up @@ -102,6 +105,8 @@ func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, spm S

sws := sessionWantSender{
ctx: ctx,
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
closing: make(chan struct{}),
closed: make(chan struct{}),
sessionID: sid,
changes: make(chan change, changesBufferSize),
wants: make(map[cid.Cid]*wantInfo),
Expand Down Expand Up @@ -157,26 +162,33 @@ func (sws *sessionWantSender) Run() {
select {
case ch := <-sws.changes:
sws.onChange([]change{ch})
case <-sws.ctx.Done():
sws.shutdown()
case <-sws.closing:
// Close the 'closed' channel to signal to Shutdown() that the run
// loop has exited
close(sws.closed)
return
}
}
}

// Shutdown the sessionWantSender
func (sws *sessionWantSender) Shutdown() {
// Signal to the run loop to stop processing
close(sws.closing)
// Unregister the session with the PeerManager
sws.pm.UnregisterSession(sws.sessionID)
// Wait for run loop to complete
<-sws.closed
}

// addChange adds a new change to the queue
func (sws *sessionWantSender) addChange(c change) {
select {
case sws.changes <- c:
case <-sws.ctx.Done():
case <-sws.closing:
}
}

// shutdown unregisters the session with the PeerManager
func (sws *sessionWantSender) shutdown() {
sws.pm.UnregisterSession(sws.sessionID)
}

// collectChanges collects all the changes that have occurred since the last
// invocation of onChange
func (sws *sessionWantSender) collectChanges(changes []change) []change {
Expand Down