From aa7c048980b4e351f6a76bc973e6ee1dd779e13c Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 18 Dec 2024 13:02:56 -0800 Subject: [PATCH] fix(autorelay): Move relayFinder peer disconnect cleanup to separate goroutine (#3105) --- p2p/host/autorelay/relay_finder.go | 74 +++++++++++++++++------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 13f8c63e6b..54e38a1517 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -115,6 +115,45 @@ type scheduledWorkTimes struct { nextAllowedCallToPeerSource time.Time } +func (rf *relayFinder) cleanupDisconnectedPeers(ctx context.Context) { + subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("autorelay (relay finder)"), eventbus.BufSize(32)) + if err != nil { + log.Error("failed to subscribe to the EvtPeerConnectednessChanged") + return + } + defer subConnectedness.Close() + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-subConnectedness.Out(): + if !ok { + return + } + evt := ev.(event.EvtPeerConnectednessChanged) + if evt.Connectedness != network.NotConnected { + continue + } + push := false + + rf.relayMx.Lock() + if rf.usingRelay(evt.Peer) { // we were disconnected from a relay + log.Debugw("disconnected from relay", "id", evt.Peer) + delete(rf.relays, evt.Peer) + rf.notifyMaybeConnectToRelay() + rf.notifyMaybeNeedNewCandidates() + push = true + } + rf.relayMx.Unlock() + + if push { + rf.clearCachedAddrsAndSignalAddressChange() + rf.metricsTracer.ReservationEnded(1) + } + } + } +} + func (rf *relayFinder) background(ctx context.Context) { peerSourceRateLimiter := make(chan struct{}, 1) rf.refCount.Add(1) @@ -129,13 +168,6 @@ func (rf *relayFinder) background(ctx context.Context) { rf.handleNewCandidates(ctx) }() - subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("autorelay (relay finder)")) - if err != nil { - log.Error("failed to subscribe to the EvtPeerConnectednessChanged") - return - } - defer subConnectedness.Close() - now := rf.conf.clock.Now() bootDelayTimer := rf.conf.clock.InstantTimer(now.Add(rf.conf.bootDelay)) defer bootDelayTimer.Stop() @@ -164,32 +196,10 @@ func (rf *relayFinder) background(ctx context.Context) { workTimer := rf.conf.clock.InstantTimer(rf.runScheduledWork(ctx, now, scheduledWork, peerSourceRateLimiter)) defer workTimer.Stop() + go rf.cleanupDisconnectedPeers(ctx) + for { select { - case ev, ok := <-subConnectedness.Out(): - if !ok { - return - } - evt := ev.(event.EvtPeerConnectednessChanged) - if evt.Connectedness != network.NotConnected { - continue - } - push := false - - rf.relayMx.Lock() - if rf.usingRelay(evt.Peer) { // we were disconnected from a relay - log.Debugw("disconnected from relay", "id", evt.Peer) - delete(rf.relays, evt.Peer) - rf.notifyMaybeConnectToRelay() - rf.notifyMaybeNeedNewCandidates() - push = true - } - rf.relayMx.Unlock() - - if push { - rf.clearCachedAddrsAndSignalAddressChange() - rf.metricsTracer.ReservationEnded(1) - } case <-rf.candidateFound: rf.notifyMaybeConnectToRelay() case <-bootDelayTimer.Ch(): @@ -264,7 +274,7 @@ func (rf *relayFinder) runScheduledWork(ctx context.Context, now time.Time, sche if scheduledWork.nextOldCandidateCheck.Before(nextTime) { nextTime = scheduledWork.nextOldCandidateCheck } - if nextTime == now { + if nextTime.Equal(now) { // Only happens in CI with a mock clock nextTime = nextTime.Add(1) // avoids an infinite loop }