From 5b646fb1cc62a6b1043d64cb0240ee5c6df6d0c1 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 18 Apr 2022 20:15:00 +0100 Subject: [PATCH] make sure to not oversubscribe to relays --- p2p/host/autorelay/relay_finder.go | 200 +++++++++++++++++------------ 1 file changed, 120 insertions(+), 80 deletions(-) diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 6672523ef6..63c02ded67 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -66,10 +66,11 @@ type relayFinder struct { peerChan <-chan peer.AddrInfo - candidateFound chan struct{} // receives every time we find a new relay candidate - candidateMx sync.Mutex - candidates map[peer.ID]*candidate - candidatesOnBackoff []*candidateOnBackoff // this slice is always sorted by the nextConnAttempt time + candidateFound chan struct{} // receives every time we find a new relay candidate + candidateMx sync.Mutex + candidates map[peer.ID]*candidate + candidatesOnBackoff []*candidateOnBackoff // this slice is always sorted by the nextConnAttempt time + handleNewCandidateTrigger chan struct{} // cap: 1 relayUpdated chan struct{} @@ -82,14 +83,15 @@ type relayFinder struct { func newRelayFinder(host *basic.BasicHost, peerChan <-chan peer.AddrInfo, conf *config) *relayFinder { r := &relayFinder{ - bootTime: time.Now(), - host: host, - conf: conf, - peerChan: peerChan, - candidates: make(map[peer.ID]*candidate), - candidateFound: make(chan struct{}, 1), - relays: make(map[peer.ID]*circuitv2.Reservation), - relayUpdated: make(chan struct{}, 1), + bootTime: time.Now(), + host: host, + conf: conf, + peerChan: peerChan, + candidates: make(map[peer.ID]*candidate), + candidateFound: make(chan struct{}, 1), + handleNewCandidateTrigger: make(chan struct{}, 1), + relays: make(map[peer.ID]*circuitv2.Reservation), + relayUpdated: make(chan struct{}, 1), } return r } @@ -100,6 +102,11 @@ func (rf *relayFinder) background(ctx context.Context) { defer rf.refCount.Done() rf.findNodes(ctx) }() + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.handleNewCandidates(ctx) + }() subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) if err != nil { @@ -136,9 +143,15 @@ func (rf *relayFinder) background(ctx context.Context) { } rf.relayMx.Unlock() case <-rf.candidateFound: - rf.handleNewCandidate(ctx) + select { + case rf.handleNewCandidateTrigger <- struct{}{}: + default: + } case <-bootDelayTimer.C: - rf.handleNewCandidate(ctx) + select { + case rf.handleNewCandidateTrigger <- struct{}{}: + default: + } case <-rf.relayUpdated: push = true case now := <-refreshTicker.C: @@ -296,6 +309,27 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR return false, nil } +// When a new node that could be a relay is found, we receive a notification on the handleNewCandidateTrigger chan. +// This function makes sure that we only run one instance of handleNewCandidate at once, and buffers +// exactly one more trigger event to run handleNewCandidate. +func (rf *relayFinder) handleNewCandidates(ctx context.Context) { + sem := make(chan struct{}, 1) + for { + select { + case <-ctx.Done(): + return + case <-rf.handleNewCandidateTrigger: + select { + case <-ctx.Done(): + return + case sem <- struct{}{}: + } + rf.handleNewCandidate(ctx) + <-sem + } + } +} + func (rf *relayFinder) handleNewCandidate(ctx context.Context) { rf.relayMx.Lock() numRelays := len(rf.relays) @@ -306,92 +340,98 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) { } rf.candidateMx.Lock() - defer rf.candidateMx.Unlock() - if len(rf.candidates) == 0 { - return - } - if len(rf.conf.staticRelays) != 0 { // make sure we read all static relays before continuing if len(rf.peerChan) > 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + rf.candidateMx.Unlock() return } } else if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { // During the startup phase, we don't want to connect to the first candidate that we find. // Instead, we wait until we've found at least minCandidates, and then select the best of those. // However, if that takes too long (longer than bootDelay), we still go ahead. + rf.candidateMx.Unlock() + return + } + if len(rf.candidates) == 0 { + rf.candidateMx.Unlock() return } - candidates := rf.selectCandidates() + rf.candidateMx.Unlock() // We now iterate over the candidates, attempting (sequentially) to get reservations with them, until // we reach the desired number of relays. - rf.refCount.Add(1) - go func() { - defer rf.refCount.Done() + for _, cand := range candidates { + id := cand.ai.ID + rf.relayMx.Lock() + usingRelay := rf.usingRelay(id) + rf.relayMx.Unlock() + if usingRelay { + continue + } + rsvp, err := rf.connectToRelay(ctx, cand) + if err != nil { + log.Debugw("failed to connect to relay", "peer", id, "error", err) + continue + } + log.Debugw("adding new relay", "id", id) + rf.relayMx.Lock() + rf.relays[id] = rsvp + numRelays := len(rf.relays) + rf.relayMx.Unlock() - for _, cand := range candidates { - id := cand.ai.ID - rf.relayMx.Lock() - usingRelay := rf.usingRelay(id) - rf.relayMx.Unlock() - if usingRelay { - continue - } + rf.host.ConnManager().Protect(id, autorelayTag) // protect the connection - var failed bool - var rsvp *circuitv2.Reservation - - // make sure we're still connected. - if rf.host.Network().Connectedness(id) != network.Connected { - if err := rf.host.Connect(ctx, cand.ai); err != nil { - log.Debugw("failed to reconnect", "peer", cand.ai, "error", err) - rf.candidateMx.Lock() - delete(rf.candidates, cand.ai.ID) - rf.candidateMx.Unlock() - continue - } - } - if cand.supportsRelayV2 { - var err error - rsvp, err = circuitv2.Reserve(ctx, rf.host, cand.ai) - if err != nil { - failed = true - log.Debugw("failed to reserve slot", "id", id, "error", err) - } - } - rf.candidateMx.Lock() - if failed { - cand.numAttempts++ - delete(rf.candidates, id) - // We failed to obtain a reservation for too many times. We give up. - if cand.numAttempts >= rf.conf.maxAttempts { - log.Debugw("failed to obtain a reservation with. Giving up.", "id", id, "num attempts", cand.numAttempts) - } else { - rf.moveCandidateToBackoff(cand) - } - rf.candidateMx.Unlock() - continue - } - rf.candidateMx.Unlock() - log.Debugw("adding new relay", "id", id) - rf.relayMx.Lock() - rf.relays[id] = rsvp - numRelays := len(rf.relays) - rf.relayMx.Unlock() + select { + case rf.relayUpdated <- struct{}{}: + default: + } + if numRelays >= rf.conf.desiredRelays { + break + } + } +} - rf.host.ConnManager().Protect(id, autorelayTag) // protect the connection +func (rf *relayFinder) connectToRelay(ctx context.Context, cand *candidate) (*circuitv2.Reservation, error) { + id := cand.ai.ID - select { - case rf.relayUpdated <- struct{}{}: - default: - } - if numRelays >= rf.conf.desiredRelays { - break - } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + var failed bool + var rsvp *circuitv2.Reservation + + // make sure we're still connected. + if rf.host.Network().Connectedness(id) != network.Connected { + if err := rf.host.Connect(ctx, cand.ai); err != nil { + rf.candidateMx.Lock() + delete(rf.candidates, cand.ai.ID) + rf.candidateMx.Unlock() + return nil, fmt.Errorf("failed to connect: %w", err) } - }() + } + var err error + if cand.supportsRelayV2 { + rsvp, err = circuitv2.Reserve(ctx, rf.host, cand.ai) + if err != nil { + failed = true + err = fmt.Errorf("failed to reserve slot: %w", err) + } + } + rf.candidateMx.Lock() + defer rf.candidateMx.Unlock() + if failed { + cand.numAttempts++ + delete(rf.candidates, id) + // We failed to obtain a reservation for too many times. We give up. + if cand.numAttempts >= rf.conf.maxAttempts { + return nil, fmt.Errorf("failed to obtain a reservation too may times: %w", err) + } + rf.moveCandidateToBackoff(cand) + return nil, err + } + return rsvp, nil } // must be called with mutex locked