Skip to content

Commit

Permalink
make sure to not oversubscribe to relays
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Apr 18, 2022
1 parent 59ea27e commit 5b646fb
Showing 1 changed file with 120 additions and 80 deletions.
200 changes: 120 additions & 80 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ type relayFinder struct {

peerChan <-chan peer.AddrInfo

candidateFound chan struct{} // receives every time we find a new relay candidate
candidateMx sync.Mutex
candidates map[peer.ID]*candidate
candidatesOnBackoff []*candidateOnBackoff // this slice is always sorted by the nextConnAttempt time
candidateFound chan struct{} // receives every time we find a new relay candidate
candidateMx sync.Mutex
candidates map[peer.ID]*candidate
candidatesOnBackoff []*candidateOnBackoff // this slice is always sorted by the nextConnAttempt time
handleNewCandidateTrigger chan struct{} // cap: 1

relayUpdated chan struct{}

Expand All @@ -82,14 +83,15 @@ type relayFinder struct {

func newRelayFinder(host *basic.BasicHost, peerChan <-chan peer.AddrInfo, conf *config) *relayFinder {
r := &relayFinder{
bootTime: time.Now(),
host: host,
conf: conf,
peerChan: peerChan,
candidates: make(map[peer.ID]*candidate),
candidateFound: make(chan struct{}, 1),
relays: make(map[peer.ID]*circuitv2.Reservation),
relayUpdated: make(chan struct{}, 1),
bootTime: time.Now(),
host: host,
conf: conf,
peerChan: peerChan,
candidates: make(map[peer.ID]*candidate),
candidateFound: make(chan struct{}, 1),
handleNewCandidateTrigger: make(chan struct{}, 1),
relays: make(map[peer.ID]*circuitv2.Reservation),
relayUpdated: make(chan struct{}, 1),
}
return r
}
Expand All @@ -100,6 +102,11 @@ func (rf *relayFinder) background(ctx context.Context) {
defer rf.refCount.Done()
rf.findNodes(ctx)
}()
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.handleNewCandidates(ctx)
}()

subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
Expand Down Expand Up @@ -136,9 +143,15 @@ func (rf *relayFinder) background(ctx context.Context) {
}
rf.relayMx.Unlock()
case <-rf.candidateFound:
rf.handleNewCandidate(ctx)
select {
case rf.handleNewCandidateTrigger <- struct{}{}:
default:
}
case <-bootDelayTimer.C:
rf.handleNewCandidate(ctx)
select {
case rf.handleNewCandidateTrigger <- struct{}{}:
default:
}
case <-rf.relayUpdated:
push = true
case now := <-refreshTicker.C:
Expand Down Expand Up @@ -296,6 +309,27 @@ func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsR
return false, nil
}

// When a new node that could be a relay is found, we receive a notification on the handleNewCandidateTrigger chan.
// This function makes sure that we only run one instance of handleNewCandidate at once, and buffers
// exactly one more trigger event to run handleNewCandidate.
func (rf *relayFinder) handleNewCandidates(ctx context.Context) {
sem := make(chan struct{}, 1)
for {
select {
case <-ctx.Done():
return
case <-rf.handleNewCandidateTrigger:
select {
case <-ctx.Done():
return
case sem <- struct{}{}:
}
rf.handleNewCandidate(ctx)
<-sem
}
}
}

func (rf *relayFinder) handleNewCandidate(ctx context.Context) {
rf.relayMx.Lock()
numRelays := len(rf.relays)
Expand All @@ -306,92 +340,98 @@ func (rf *relayFinder) handleNewCandidate(ctx context.Context) {
}

rf.candidateMx.Lock()
defer rf.candidateMx.Unlock()
if len(rf.candidates) == 0 {
return
}

if len(rf.conf.staticRelays) != 0 {
// make sure we read all static relays before continuing
if len(rf.peerChan) > 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay {
rf.candidateMx.Unlock()
return
}
} else if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay {
// During the startup phase, we don't want to connect to the first candidate that we find.
// Instead, we wait until we've found at least minCandidates, and then select the best of those.
// However, if that takes too long (longer than bootDelay), we still go ahead.
rf.candidateMx.Unlock()
return
}
if len(rf.candidates) == 0 {
rf.candidateMx.Unlock()
return
}

candidates := rf.selectCandidates()
rf.candidateMx.Unlock()

// We now iterate over the candidates, attempting (sequentially) to get reservations with them, until
// we reach the desired number of relays.
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
for _, cand := range candidates {
id := cand.ai.ID
rf.relayMx.Lock()
usingRelay := rf.usingRelay(id)
rf.relayMx.Unlock()
if usingRelay {
continue
}
rsvp, err := rf.connectToRelay(ctx, cand)
if err != nil {
log.Debugw("failed to connect to relay", "peer", id, "error", err)
continue
}
log.Debugw("adding new relay", "id", id)
rf.relayMx.Lock()
rf.relays[id] = rsvp
numRelays := len(rf.relays)
rf.relayMx.Unlock()

for _, cand := range candidates {
id := cand.ai.ID
rf.relayMx.Lock()
usingRelay := rf.usingRelay(id)
rf.relayMx.Unlock()
if usingRelay {
continue
}
rf.host.ConnManager().Protect(id, autorelayTag) // protect the connection

var failed bool
var rsvp *circuitv2.Reservation

// make sure we're still connected.
if rf.host.Network().Connectedness(id) != network.Connected {
if err := rf.host.Connect(ctx, cand.ai); err != nil {
log.Debugw("failed to reconnect", "peer", cand.ai, "error", err)
rf.candidateMx.Lock()
delete(rf.candidates, cand.ai.ID)
rf.candidateMx.Unlock()
continue
}
}
if cand.supportsRelayV2 {
var err error
rsvp, err = circuitv2.Reserve(ctx, rf.host, cand.ai)
if err != nil {
failed = true
log.Debugw("failed to reserve slot", "id", id, "error", err)
}
}
rf.candidateMx.Lock()
if failed {
cand.numAttempts++
delete(rf.candidates, id)
// We failed to obtain a reservation for too many times. We give up.
if cand.numAttempts >= rf.conf.maxAttempts {
log.Debugw("failed to obtain a reservation with. Giving up.", "id", id, "num attempts", cand.numAttempts)
} else {
rf.moveCandidateToBackoff(cand)
}
rf.candidateMx.Unlock()
continue
}
rf.candidateMx.Unlock()
log.Debugw("adding new relay", "id", id)
rf.relayMx.Lock()
rf.relays[id] = rsvp
numRelays := len(rf.relays)
rf.relayMx.Unlock()
select {
case rf.relayUpdated <- struct{}{}:
default:
}
if numRelays >= rf.conf.desiredRelays {
break
}
}
}

rf.host.ConnManager().Protect(id, autorelayTag) // protect the connection
func (rf *relayFinder) connectToRelay(ctx context.Context, cand *candidate) (*circuitv2.Reservation, error) {
id := cand.ai.ID

select {
case rf.relayUpdated <- struct{}{}:
default:
}
if numRelays >= rf.conf.desiredRelays {
break
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

var failed bool
var rsvp *circuitv2.Reservation

// make sure we're still connected.
if rf.host.Network().Connectedness(id) != network.Connected {
if err := rf.host.Connect(ctx, cand.ai); err != nil {
rf.candidateMx.Lock()
delete(rf.candidates, cand.ai.ID)
rf.candidateMx.Unlock()
return nil, fmt.Errorf("failed to connect: %w", err)
}
}()
}
var err error
if cand.supportsRelayV2 {
rsvp, err = circuitv2.Reserve(ctx, rf.host, cand.ai)
if err != nil {
failed = true
err = fmt.Errorf("failed to reserve slot: %w", err)
}
}
rf.candidateMx.Lock()
defer rf.candidateMx.Unlock()
if failed {
cand.numAttempts++
delete(rf.candidates, id)
// We failed to obtain a reservation for too many times. We give up.
if cand.numAttempts >= rf.conf.maxAttempts {
return nil, fmt.Errorf("failed to obtain a reservation too may times: %w", err)
}
rf.moveCandidateToBackoff(cand)
return nil, err
}
return rsvp, nil
}

// must be called with mutex locked
Expand Down

0 comments on commit 5b646fb

Please sign in to comment.