Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter observed addresses #917

Merged
merged 4 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 74 additions & 14 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package identify

import (
"context"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -30,6 +31,10 @@ var GCInterval = 10 * time.Minute
// for adding to an ObservedAddrManager.
var observedAddrManagerWorkerChannelSize = 16

// maxObservedAddrsPerIPAndTransport is the maximum number of observed addresses
// we will return for each (IPx/TCP or UDP) group.
var maxObservedAddrsPerIPAndTransport = 2

type observation struct {
seenTime time.Time
connDirection network.Direction
Expand All @@ -46,13 +51,36 @@ type ObservedAddr struct {
LastSeen time.Time
}

func (oa *ObservedAddr) activated(ttl time.Duration) bool {
func (oa *ObservedAddr) activated() bool {

// We only activate if other peers observed the same address
// of ours at least 4 times. SeenBy peers are removed by GC if
// they say the address more than ttl*ActivationThresh
return len(oa.SeenBy) >= ActivationThresh
}

func (oa *ObservedAddr) numInbound() int {
count := 0
for obs := range oa.SeenBy {
if oa.SeenBy[obs].connDirection == network.DirInbound {
count++
}
}

return count
}

func (oa *ObservedAddr) GroupKey() string {
key := ""
protos := oa.Addr.Protocols()

for i := range protos {
key = key + protos[i].Name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine but kind of sloppy. We should either use a separator (/ip4/udp/quic) or concatenate protocol.VCodes (these are prefix-free codes so we don't need separators).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a separator now.

}

return key
}
Stebalien marked this conversation as resolved.
Show resolved Hide resolved

type newObservation struct {
conn network.Conn
observed ma.Multiaddr
Expand Down Expand Up @@ -111,33 +139,65 @@ func (oas *ObservedAddrManager) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiadd
return
}

now := time.Now()
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}

return addrs
return oas.filter(observedAddrs)
}

// Addrs return all activated observed addresses
func (oas *ObservedAddrManager) Addrs() (addrs []ma.Multiaddr) {
func (oas *ObservedAddrManager) Addrs() []ma.Multiaddr {
oas.mu.RLock()
defer oas.mu.RUnlock()

if len(oas.addrs) == 0 {
return nil
}

var allObserved []*ObservedAddr
for k := range oas.addrs {
allObserved = append(allObserved, oas.addrs[k]...)
}
return oas.filter(allObserved)
}

func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multiaddr {
pmap := make(map[string][]*ObservedAddr)
now := time.Now()
for _, observedAddrs := range oas.addrs {
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)

for i := range observedAddrs {
a := observedAddrs[i]
if now.Sub(a.LastSeen) <= oas.ttl && a.activated() {
// group addresses by their IPX/Transport Protocol(TCP or UDP) pattern.
pat := a.GroupKey()
if len(pat) != 0 {
pmap[pat] = append(pmap[pat], a)
} else {
log.Debugw("unable to group observed addr into IPx/(TCP or UDP) patterm", "address",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm... it can "theoretically" happen if the address dosen't have any protocols but it probably wouldn't make it so far if that were that case. Removing it to reduce clutter.

a.Addr.String())
}
}
}

addrs := make([]ma.Multiaddr, 0, len(observedAddrs))
for pat := range pmap {
s := pmap[pat]

// We prefer inbound connection observations over outbound.
// For ties, we prefer the ones with more votes.
sort.Slice(s, func(i int, j int) bool {
first := s[i]
second := s[j]

if first.numInbound() > second.numInbound() {
return true
}

return len(first.SeenBy) > len(second.SeenBy)
})

for i := 0; i < maxObservedAddrsPerIPAndTransport && i < len(s); i++ {
addrs = append(addrs, s[i].Addr)
}
}

return addrs
}

Expand Down
91 changes: 87 additions & 4 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
p2putil "github.com/libp2p/go-libp2p-netutil"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
ma "github.com/multiformats/go-multiaddr"

identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

type harness struct {
Expand Down Expand Up @@ -51,10 +52,11 @@ func (h *harness) conn(observer peer.ID) network.Conn {
return c
}

func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) {
func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) network.Conn {
c := h.conn(observer)
h.oas.Record(c, observed)
time.Sleep(1 * time.Millisecond) // let the worker run
time.Sleep(50 * time.Millisecond) // let the worker run
return c
}

func newHarness(ctx context.Context, t *testing.T) harness {
Expand Down Expand Up @@ -245,3 +247,84 @@ func TestAddAddrsProfile(t *testing.T) {

wg.Wait()
}

func TestObservedAddrFiltering(t *testing.T) {
addrsMarch := func(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}

for _, aa := range a {
found := false
for _, bb := range b {
if aa.Equal(bb) {
found = true
break
}
}
if !found {
return false
}
}
return true
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
if !addrsMarch(harness.oas.Addrs(), nil) {
t.Error("addrs should be empty")
}

// IP4/TCP
it1 := ma.StringCast("/ip4/1.2.3.4/tcp/1231")
it2 := ma.StringCast("/ip4/1.2.3.4/tcp/1232")
it3 := ma.StringCast("/ip4/1.2.3.4/tcp/1233")
it4 := ma.StringCast("/ip4/1.2.3.4/tcp/1234")
it5 := ma.StringCast("/ip4/1.2.3.4/tcp/1235")
it6 := ma.StringCast("/ip4/1.2.3.4/tcp/1236")
it7 := ma.StringCast("/ip4/1.2.3.4/tcp/1237")

// observers
b1 := ma.StringCast("/ip4/1.2.3.6/tcp/1236")
b2 := ma.StringCast("/ip4/1.2.3.7/tcp/1237")
b3 := ma.StringCast("/ip4/1.2.3.8/tcp/1237")
b4 := ma.StringCast("/ip4/1.2.3.9/tcp/1237")
b5 := ma.StringCast("/ip4/1.2.3.10/tcp/1237")

peers := []peer.ID{harness.add(b1), harness.add(b2), harness.add(b3), harness.add(b4), harness.add(b5)}
for i := 0; i < 4; i++ {
harness.observe(it1, peers[i])
harness.observe(it2, peers[i])
harness.observe(it3, peers[i])
harness.observe(it4, peers[i])
harness.observe(it5, peers[i])
harness.observe(it6, peers[i])
harness.observe(it7, peers[i])
}

harness.observe(it1, peers[4])
harness.observe(it7, peers[4])

addrs := harness.oas.Addrs()
require.Len(t, addrs, 2)
require.Contains(t, addrs, it1)
require.Contains(t, addrs, it7)

}

func TestObservedAddrGroupKey(t *testing.T) {
oa1 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/tcp/1231")}
require.Equal(t, "ip4tcp", oa1.GroupKey())

oa2 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.5/tcp/2222")}
require.Equal(t, "ip4tcp", oa2.GroupKey())

oa3 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/udp/1231")}
require.Equal(t, "ip4udp", oa3.GroupKey())
oa4 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.3.3.4/udp/1531")}
require.Equal(t, "ip4udp", oa4.GroupKey())

oa5 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.3.3.4/udp/1531/quic")}
require.Equal(t, "ip4udpquic", oa5.GroupKey())
}