Skip to content

Commit

Permalink
fix(autorelay): Move relayFinder peer disconnect cleanup to separate …
Browse files Browse the repository at this point in the history
…goroutine (#3105)
  • Loading branch information
MarcoPolo authored Dec 18, 2024
1 parent 88ae979 commit aa7c048
Showing 1 changed file with 42 additions and 32 deletions.
74 changes: 42 additions & 32 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,45 @@ type scheduledWorkTimes struct {
nextAllowedCallToPeerSource time.Time
}

func (rf *relayFinder) cleanupDisconnectedPeers(ctx context.Context) {
subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("autorelay (relay finder)"), eventbus.BufSize(32))
if err != nil {
log.Error("failed to subscribe to the EvtPeerConnectednessChanged")
return
}
defer subConnectedness.Close()
for {
select {
case <-ctx.Done():
return
case ev, ok := <-subConnectedness.Out():
if !ok {
return
}
evt := ev.(event.EvtPeerConnectednessChanged)
if evt.Connectedness != network.NotConnected {
continue
}
push := false

rf.relayMx.Lock()
if rf.usingRelay(evt.Peer) { // we were disconnected from a relay
log.Debugw("disconnected from relay", "id", evt.Peer)
delete(rf.relays, evt.Peer)
rf.notifyMaybeConnectToRelay()
rf.notifyMaybeNeedNewCandidates()
push = true
}
rf.relayMx.Unlock()

if push {
rf.clearCachedAddrsAndSignalAddressChange()
rf.metricsTracer.ReservationEnded(1)
}
}
}
}

func (rf *relayFinder) background(ctx context.Context) {
peerSourceRateLimiter := make(chan struct{}, 1)
rf.refCount.Add(1)
Expand All @@ -129,13 +168,6 @@ func (rf *relayFinder) background(ctx context.Context) {
rf.handleNewCandidates(ctx)
}()

subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("autorelay (relay finder)"))
if err != nil {
log.Error("failed to subscribe to the EvtPeerConnectednessChanged")
return
}
defer subConnectedness.Close()

now := rf.conf.clock.Now()
bootDelayTimer := rf.conf.clock.InstantTimer(now.Add(rf.conf.bootDelay))
defer bootDelayTimer.Stop()
Expand Down Expand Up @@ -164,32 +196,10 @@ func (rf *relayFinder) background(ctx context.Context) {
workTimer := rf.conf.clock.InstantTimer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter))
defer workTimer.Stop()

go rf.cleanupDisconnectedPeers(ctx)

for {
select {
case ev, ok := <-subConnectedness.Out():
if !ok {
return
}
evt := ev.(event.EvtPeerConnectednessChanged)
if evt.Connectedness != network.NotConnected {
continue
}
push := false

rf.relayMx.Lock()
if rf.usingRelay(evt.Peer) { // we were disconnected from a relay
log.Debugw("disconnected from relay", "id", evt.Peer)
delete(rf.relays, evt.Peer)
rf.notifyMaybeConnectToRelay()
rf.notifyMaybeNeedNewCandidates()
push = true
}
rf.relayMx.Unlock()

if push {
rf.clearCachedAddrsAndSignalAddressChange()
rf.metricsTracer.ReservationEnded(1)
}
case <-rf.candidateFound:
rf.notifyMaybeConnectToRelay()
case <-bootDelayTimer.Ch():
Expand Down Expand Up @@ -264,7 +274,7 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche
if scheduledWork.nextOldCandidateCheck.Before(nextTime) {
nextTime = scheduledWork.nextOldCandidateCheck
}
if nextTime == now {
if nextTime.Equal(now) {
// Only happens in CI with a mock clock
nextTime = nextTime.Add(1) // avoids an infinite loop
}
Expand Down

0 comments on commit aa7c048

Please sign in to comment.