Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve observed address handling #585

Merged
merged 7 commits into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,
h.ids = opts.IdentifyService
} else {
// we can't set this as a default above because it depends on the *BasicHost.
h.ids = identify.NewIDService(h)
h.ids = identify.NewIDService(ctx, h)
}

if uint64(opts.NegotiationTimeout) != 0 {
Expand Down
9 changes: 5 additions & 4 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ type IDService struct {

// our own observed addresses.
// TODO: instead of expiring, remove these when we disconnect
observedAddrs ObservedAddrSet
observedAddrs *ObservedAddrSet
}

// NewIDService constructs a new *IDService and activates it by
// attaching its stream handler to the given host.Host.
func NewIDService(h host.Host) *IDService {
func NewIDService(ctx context.Context, h host.Host) *IDService {
s := &IDService{
Host: h,
currid: make(map[inet.Conn]chan struct{}),
Host: h,
currid: make(map[inet.Conn]chan struct{}),
observedAddrs: NewObservedAddrSet(ctx),
}
h.SetStreamHandler(ID, s.requestHandler)
h.SetStreamHandler(IDPush, s.pushHandler)
Expand Down
4 changes: 2 additions & 2 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func subtestIDService(t *testing.T) {
h1p := h1.ID()
h2p := h2.ID()

ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
ids1 := identify.NewIDService(ctx, h1)
ids2 := identify.NewIDService(ctx, h2)

testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing
Expand Down
116 changes: 79 additions & 37 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package identify

import (
"context"
"sync"
"time"

Expand All @@ -11,6 +12,8 @@ import (

const ActivationThresh = 4

var GCInterval = 10 * time.Minute

type observation struct {
seenTime time.Time
connDirection net.Direction
Expand Down Expand Up @@ -42,23 +45,40 @@ func (oa *ObservedAddr) activated(ttl time.Duration) bool {
return len(oa.SeenBy) >= ActivationThresh
}

type newObservation struct {
observed, local, observer ma.Multiaddr
direction net.Direction
}

// ObservedAddrSet keeps track of a set of ObservedAddrs
// the zero-value is ready to be used.
type ObservedAddrSet struct {
sync.Mutex // guards whole datastruct.
sync.RWMutex // guards whole datastruct.

// local(internal) address -> list of observed(external) addresses
addrs map[string][]*ObservedAddr
ttl time.Duration

// this is the worker channel
wch chan newObservation
}

func NewObservedAddrSet(ctx context.Context) *ObservedAddrSet {
oas := &ObservedAddrSet{
addrs: make(map[string][]*ObservedAddr),
ttl: pstore.OwnObservedAddrTTL,
wch: make(chan newObservation, 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this longer, like 64 or something (or at least 16). We want as many observed addrs as we can on initial start.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will set to 16 (= 4 * activation-threshold)

}
go oas.worker(ctx)
return oas
}

// AddrsFor return all activated observed addresses associated with the given
// (resolved) listen address.
func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
oas.Lock()
defer oas.Unlock()
oas.RLock()
defer oas.RUnlock()

// for zero-value.
if len(oas.addrs) == 0 {
return nil
}
Expand All @@ -70,53 +90,85 @@ func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
}

now := time.Now()
filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs))
for _, a := range observedAddrs {
// leave only alive observed addresses
if now.Sub(a.LastSeen) <= oas.ttl {
filteredAddrs = append(filteredAddrs, a)
if a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}
if len(filteredAddrs) > 0 {
oas.addrs[key] = filteredAddrs
} else {
delete(oas.addrs, key)
}

return addrs
}

// Addrs return all activated observed addresses
func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) {
oas.Lock()
defer oas.Unlock()
oas.RLock()
defer oas.RUnlock()

// for zero-value.
if len(oas.addrs) == 0 {
return nil
}

now := time.Now()
for _, observedAddrs := range oas.addrs {
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}
}
return addrs
}

func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
direction net.Direction) {
select {
case oas.wch <- newObservation{observed: observed, local: local, observer: observer, direction: direction}:
default:
log.Debugf("dropping address observation of %s; buffer full", observed)
}
}

func (oas *ObservedAddrSet) worker(ctx context.Context) {
ticker := time.NewTicker(GCInterval)
defer ticker.Stop()

for {
select {
case obs := <-oas.wch:
oas.doAdd(obs.observed, obs.local, obs.observer, obs.direction)

case <-ticker.C:
oas.gc()

case <-ctx.Done():
return
}
}
}

func (oas *ObservedAddrSet) gc() {
oas.Lock()
defer oas.Unlock()

now := time.Now()
for local, observedAddrs := range oas.addrs {
// TODO we can do this without allocating by compacting the array in place
filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs))
for _, a := range observedAddrs {
// leave only alive observed addresses
if now.Sub(a.LastSeen) <= oas.ttl {
filteredAddrs = append(filteredAddrs, a)
if a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}
}
oas.addrs[local] = filteredAddrs
if len(filteredAddrs) > 0 {
oas.addrs[local] = filteredAddrs
} else {
delete(oas.addrs, local)
}
}
return addrs
}

func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
func (oas *ObservedAddrSet) doAdd(observed, local, observer ma.Multiaddr,
direction net.Direction) {

now := time.Now()
Expand All @@ -130,12 +182,6 @@ func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
oas.Lock()
defer oas.Unlock()

// for zero-value.
if oas.addrs == nil {
oas.addrs = make(map[string][]*ObservedAddr)
oas.ttl = pstore.OwnObservedAddrTTL
}

observedAddrs := oas.addrs[localString]
// check if observed address seen yet, if so, update it
for i, previousObserved := range observedAddrs {
Expand Down Expand Up @@ -178,11 +224,7 @@ func (oas *ObservedAddrSet) SetTTL(ttl time.Duration) {
}

func (oas *ObservedAddrSet) TTL() time.Duration {
oas.Lock()
defer oas.Unlock()
// for zero-value.
if oas.addrs == nil {
oas.ttl = pstore.OwnObservedAddrTTL
}
oas.RLock()
defer oas.RUnlock()
return oas.ttl
}
12 changes: 10 additions & 2 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package identify

import (
"context"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -52,7 +53,9 @@ func TestObsAddrSet(t *testing.T) {
b4 := m("/ip4/1.2.3.9/tcp/1237")
b5 := m("/ip4/1.2.3.10/tcp/1237")

oas := &ObservedAddrSet{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oas := NewObservedAddrSet(ctx)

if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should be empty")
Expand All @@ -63,6 +66,7 @@ func TestObsAddrSet(t *testing.T) {
dummyDirection := net.DirOutbound

oas.Add(observed, dummyLocal, observer, dummyDirection)
time.Sleep(1 * time.Millisecond) // let the worker run
}

add(oas, a1, a4)
Expand Down Expand Up @@ -131,13 +135,17 @@ func TestAddAddrsProfile(b *testing.T) {
}
return m
}
oas := &ObservedAddrSet{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oas := NewObservedAddrSet(ctx)

add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) {
dummyLocal := m("/ip4/127.0.0.1/tcp/10086")
dummyDirection := net.DirOutbound

oas.Add(observed, dummyLocal, observer, dummyDirection)
time.Sleep(1 * time.Millisecond) // let the worker run
}

a1 := m("/ip4/1.2.3.4/tcp/1231")
Expand Down