Skip to content
This repository has been archived by the owner on May 11, 2022. It is now read-only.

react to incoming events #65

Merged
merged 4 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 118 additions & 44 deletions autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ type AmbientAutoNAT struct {
// If it is <3, then multiple autoNAT peers may be contacted for dialback
// If only a single autoNAT peer is known, then the confidence increases
// for each failure until it reaches 3.
confidence int
lastInbound time.Time
lastProbe time.Time
confidence int
lastInbound time.Time
lastProbeTry time.Time
lastProbe time.Time
recentProbes map[peer.ID]time.Time

subAddrUpdated event.Subscription
service *autoNATService
service *autoNATService

emitReachabilityChanged event.Emitter
subscriber event.Subscription
}

// StaticAutoNAT is a simple AutoNAT implementation when a single NAT status is desired.
Expand Down Expand Up @@ -100,22 +102,25 @@ func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
}, nil
}

subAddrUpdated, _ := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated))

as := &AmbientAutoNAT{
ctx: ctx,
host: h,
config: conf,
inboundConn: make(chan network.Conn, 5),
observations: make(chan autoNATResult, 1),

subAddrUpdated: subAddrUpdated,

emitReachabilityChanged: emitReachabilityChanged,
service: service,
recentProbes: make(map[peer.ID]time.Time),
}
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})

subscriber, err := as.host.EventBus().Subscribe([]interface{}{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)})
if err != nil {
return nil, err
}
as.subscriber = subscriber

h.Network().Notify(as)
go as.background()

Expand Down Expand Up @@ -159,32 +164,45 @@ func (as *AmbientAutoNAT) background() {
delay := as.config.bootDelay

var lastAddrUpdated time.Time
addrUpdatedChan := as.subAddrUpdated.Out()
defer as.subAddrUpdated.Close()
subChan := as.subscriber.Out()
defer as.subscriber.Close()
defer as.emitReachabilityChanged.Close()

timer := time.NewTimer(delay)
defer timer.Stop()
timerRunning := true
var peer peer.ID

for {
select {
// new connection occured.
// new inbound connection.
case conn := <-as.inboundConn:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also check "is the remote addr public". We can do this in a followup PR.

localAddrs := as.host.Addrs()
ca := as.status.Load().(autoNATResult)
if ca.address != nil {
localAddrs = append(localAddrs, ca.address)
}
if !ipInList(conn.RemoteMultiaddr(), localAddrs) {
if manet.IsPublicAddr(conn.RemoteMultiaddr()) &&
!ipInList(conn.RemoteMultiaddr(), localAddrs) {
as.lastInbound = time.Now()
}

case <-addrUpdatedChan:
if !lastAddrUpdated.Add(time.Second).After(time.Now()) {
lastAddrUpdated = time.Now()
if as.confidence > 1 {
as.confidence--
case e := <-subChan:
switch e.(type) {
case event.EvtLocalAddressesUpdated:
if !lastAddrUpdated.Add(time.Second).After(time.Now()) {
lastAddrUpdated = time.Now()
if as.confidence > 1 {
as.confidence--
}
}
case event.EvtPeerIdentificationCompleted:
peer = e.(event.EvtPeerIdentificationCompleted).Peer
if s, err := as.host.Peerstore().SupportsProtocols(peer, AutoNATProto); err == nil && len(s) > 0 {
currentStatus := as.status.Load().(autoNATResult)
if currentStatus.Reachability == network.ReachabilityUnknown {
as.tryProbe(peer)
}
}
}

Expand All @@ -195,6 +213,8 @@ func (as *AmbientAutoNAT) background() {
}
as.recordObservation(result)
case <-timer.C:
peer = as.getPeerToProbe()
as.tryProbe(peer)
timerRunning = false
case <-as.ctx.Done():
return
Expand All @@ -209,17 +229,33 @@ func (as *AmbientAutoNAT) background() {
}
}

// scheduleProbe calculates when the next probe should be scheduled for,
// and launches it if that time is now.
func (as *AmbientAutoNAT) cleanupRecentProbes() {
fixedNow := time.Now()
for k, v := range as.recentProbes {
if fixedNow.Sub(v) > as.throttlePeerPeriod {
delete(as.recentProbes, k)
}
}
}

// scheduleProbe calculates when the next probe should be scheduled for.
func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
// Our baseline is a probe every 'AutoNATRefreshInterval'
// This is modulated by:
// * recent inbound connections make us willing to wait up to 2x longer between probes.
// * low confidence makes us speed up between probes.
// * if we are in an unknown state, or have low confidence, that should drop to 'AutoNATRetryInterval'
// * recent inbound connections (implying continued connectivity) should decrease the retry when public
// * recent inbound connections when not public mean we should try more actively to see if we're public.
fixedNow := time.Now()
currentStatus := as.status.Load().(autoNATResult)

nextProbe := fixedNow
// Don't look for peers in the peer store more than once per second.
if !as.lastProbeTry.IsZero() {
backoff := as.lastProbeTry.Add(time.Second)
if backoff.After(nextProbe) {
nextProbe = backoff
}
}
if !as.lastProbe.IsZero() {
untilNext := as.config.refreshInterval
if currentStatus.Reachability == network.ReachabilityUnknown {
Expand All @@ -228,13 +264,15 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
untilNext = as.config.retryInterval
} else if currentStatus.Reachability == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
untilNext *= 2
} else if currentStatus.Reachability != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
untilNext /= 5
}

if as.lastProbe.Add(untilNext).After(nextProbe) {
nextProbe = as.lastProbe.Add(untilNext)
}
nextProbe = as.lastProbe.Add(untilNext)
}
if fixedNow.After(nextProbe) || fixedNow == nextProbe {
go as.probeNextPeer()
return as.config.retryInterval
}

return nextProbe.Sub(fixedNow)
}

Expand Down Expand Up @@ -302,32 +340,64 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
}
}

func (as *AmbientAutoNAT) tryProbe(p peer.ID) bool {
as.lastProbeTry = time.Now()
if p.Validate() != nil {
return false
}

if lastTime, ok := as.recentProbes[p]; ok {
if time.Since(lastTime) < as.throttlePeerPeriod {
return false
}
}
as.cleanupRecentProbes()

info := as.host.Peerstore().PeerInfo(p)

if !as.config.dialPolicy.skipPeer(info.Addrs) {
willscott marked this conversation as resolved.
Show resolved Hide resolved
as.recentProbes[p] = time.Now()
as.lastProbe = time.Now()
go as.probe(&info)
return true
}
return false
}

func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
cli := NewAutoNATClient(as.host, as.config.addressFunc)
ctx, cancel := context.WithTimeout(as.ctx, as.config.requestTimeout)
defer cancel()

a, err := cli.DialBack(ctx, pi.ID)

var result autoNATResult
switch {
case err == nil:
log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String())
as.observations <- autoNATResult{network.ReachabilityPublic, a}
result.Reachability = network.ReachabilityPublic
result.address = a
case IsDialError(err):
log.Debugf("Dialback through %s failed", pi.ID.Pretty())
as.observations <- autoNATResult{network.ReachabilityPrivate, nil}
result.Reachability = network.ReachabilityPrivate
default:
as.observations <- autoNATResult{network.ReachabilityUnknown, nil}
result.Reachability = network.ReachabilityUnknown
}

select {
case as.observations <- result:
case <-as.ctx.Done():
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OT: above: we need to select on the service context in case we shutdown.

}

func (as *AmbientAutoNAT) probeNextPeer() {
func (as *AmbientAutoNAT) getPeerToProbe() peer.ID {
peers := as.host.Network().Peers()
if len(peers) == 0 {
return
return ""
}

addrs := make([]peer.AddrInfo, 0, len(peers))
candidates := make([]peer.ID, 0, len(peers))

for _, p := range peers {
info := as.host.Peerstore().PeerInfo(p)
Expand All @@ -336,24 +406,28 @@ func (as *AmbientAutoNAT) probeNextPeer() {
continue
}

if !as.config.dialPolicy.skipPeer(info.Addrs) {
addrs = append(addrs, info)
// Exclude peers in backoff.
if lastTime, ok := as.recentProbes[p]; ok {
if time.Since(lastTime) < as.throttlePeerPeriod {
continue
}
}
addrs = append(addrs, info)
}
// TODO: track and exclude recently probed peers.

if len(addrs) == 0 {
return
if as.config.dialPolicy.skipPeer(info.Addrs) {
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use if ... { continue } for all checks, then append at the end. It makes this kind of logic a bit easier to read.

candidates = append(candidates, p)
}

shufflePeers(addrs)
if len(candidates) == 0 {
return ""
}

as.lastProbe = time.Now()
as.probe(&addrs[0])
shufflePeers(candidates)
return candidates[0]
}

func shufflePeers(peers []peer.AddrInfo) {
func shufflePeers(peers []peer.ID) {
for i := range peers {
j := rand.Intn(i + 1)
peers[i], peers[j] = peers[j], peers[i]
Expand Down
41 changes: 39 additions & 2 deletions autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,16 @@ func makeAutoNAT(ctx context.Context, t *testing.T, ash host.Host) (host.Host, A
h.Peerstore().AddProtocols(ash.ID(), AutoNATProto)
a, _ := New(ctx, h, WithSchedule(100*time.Millisecond, time.Second), WithoutStartupDelay())
a.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true
a.(*AmbientAutoNAT).config.throttlePeerPeriod = 100 * time.Millisecond
return h, a
}

func identifyAsServer(server, recip host.Host) {
recip.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Minute)
recip.Peerstore().AddProtocols(server.ID(), AutoNATProto)

}

func connect(t *testing.T, a, b host.Host) {
pinfo := peer.AddrInfo{ID: a.ID(), Addrs: a.Addrs()}
err := b.Connect(context.Background(), pinfo)
Expand Down Expand Up @@ -129,7 +136,7 @@ func TestAutoNATPublic(t *testing.T) {
}

connect(t, hs, hc)
time.Sleep(200 * time.Millisecond)
time.Sleep(1500 * time.Millisecond)

status = an.Status()
if status != network.ReachabilityPublic {
Expand Down Expand Up @@ -158,7 +165,7 @@ func TestAutoNATPublictoPrivate(t *testing.T) {
}

connect(t, hs, hc)
time.Sleep(200 * time.Millisecond)
time.Sleep(1500 * time.Millisecond)

status = an.Status()
if status != network.ReachabilityPublic {
Expand All @@ -168,14 +175,44 @@ func TestAutoNATPublictoPrivate(t *testing.T) {
expectEvent(t, s, network.ReachabilityPublic)

hs.SetStreamHandler(AutoNATProto, sayAutoNATPrivate)
hps := makeAutoNATServicePrivate(ctx, t)
connect(t, hps, hc)
identifyAsServer(hps, hc)

time.Sleep(2 * time.Second)

expectEvent(t, s, network.ReachabilityPrivate)

status = an.Status()
if status != network.ReachabilityPrivate {
t.Fatalf("unexpected NAT status: %d", status)
}
}

func TestAutoNATIncomingEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hs := makeAutoNATServicePrivate(ctx, t)
hc, ani := makeAutoNAT(ctx, t, hs)
an := ani.(*AmbientAutoNAT)

status := an.Status()
if status != network.ReachabilityUnknown {
t.Fatalf("unexpected NAT status: %d", status)
}

connect(t, hs, hc)

em, _ := hc.EventBus().Emitter(&event.EvtPeerIdentificationCompleted{})
em.Emit(event.EvtPeerIdentificationCompleted{Peer: hs.ID()})

time.Sleep(10 * time.Millisecond)
if an.Status() == network.ReachabilityUnknown {
t.Fatalf("Expected probe due to identification of autonat service")
}
}

func TestAutoNATObservationRecording(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
Loading