Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signal address change #851

Merged
merged 4 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v1.0.2
github.com/jbenet/go-cienv v0.1.0
github.com/jbenet/goprocess v0.1.3
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-conn-security-multistream v0.1.0
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p-autonat v0.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
Expand Down
82 changes: 31 additions & 51 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"io"
"net"
"sync"
"time"

"github.com/libp2p/go-libp2p/p2p/protocol/identify"
Expand Down Expand Up @@ -84,12 +83,12 @@ type BasicHost struct {

proc goprocess.Process

mx sync.Mutex
lastAddrs []ma.Multiaddr
emitters struct {
emitters struct {
evtLocalProtocolsUpdated event.Emitter
evtLocalAddrsUpdated event.Emitter
}

addrChangeChan chan struct{}
}

var _ host.Host = (*BasicHost)(nil)
Expand Down Expand Up @@ -130,12 +129,13 @@ type HostOpts struct {
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHost, error) {
h := &BasicHost{
network: net,
mux: msmux.NewMultistreamMuxer(),
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
eventbus: eventbus.NewBus(),
network: net,
mux: msmux.NewMultistreamMuxer(),
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
eventbus: eventbus.NewBus(),
addrChangeChan: make(chan struct{}, 1),
}

var err error
Expand Down Expand Up @@ -230,6 +230,7 @@ func New(net network.Network, opts ...interface{}) *BasicHost {
}

h, err := NewHost(context.Background(), net, hostopts)
h.Start()
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// this cannot happen with legacy options
// plus we want to keep the (deprecated) legacy interface unchanged
Expand Down Expand Up @@ -300,24 +301,13 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
go handle(protoID, s)
}

// CheckForAddressChanges determines whether our listen addresses have recently
// changed and emits an EvtLocalAddressesUpdatedEvent & a Push Identify if so.
// SignalAddressChange signals to the host that it needs to determine whether our listen addresses have recently
// changed.
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) CheckForAddressChanges() {
h.mx.Lock()
addrs := h.Addrs()
changeEvt := makeUpdatedAddrEvent(h.lastAddrs, addrs)
if changeEvt != nil {
h.lastAddrs = addrs
}
h.mx.Unlock()

if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
h.ids.Push()
func (h *BasicHost) SignalAddressChange() {
select {
case h.addrChangeChan <- struct{}{}:
default:
}
}

Expand Down Expand Up @@ -360,41 +350,31 @@ func (h *BasicHost) background(p goprocess.Process) {
defer ticker.Stop()

// initialize lastAddrs
h.mx.Lock()
if h.lastAddrs == nil {
h.lastAddrs = h.Addrs()
}
h.mx.Unlock()
lastAddrs := h.Addrs()

for {
select {
case <-ticker.C:
h.CheckForAddressChanges()

case <-h.addrChangeChan:
case <-p.Closing():
return
}
}
}

func sameAddrs(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}

bmap := make(map[string]struct{}, len(b))
for _, addr := range b {
bmap[string(addr.Bytes())] = struct{}{}
}
// emit an EvtLocalAddressesUpdatedEvent & a Push Identify if our listen addresses have changed.
addrs := h.Addrs()
changeEvt := makeUpdatedAddrEvent(lastAddrs, addrs)
if changeEvt != nil {
lastAddrs = addrs
}

for _, addr := range a {
_, ok := bmap[string(addr.Bytes())]
if !ok {
return false
if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't even need to store this emitter in the host, we can just create it and close it within this loop.

if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
h.ids.Push()
}
}

return true
}

// ID returns the (local) peer.ID associated with this Host
Expand Down
48 changes: 23 additions & 25 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"reflect"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -518,7 +519,7 @@ func TestHostAddrChangeDetection(t *testing.T) {
{ma.StringCast("/ip4/2.3.4.5/tcp/1234"), ma.StringCast("/ip4/3.4.5.6/tcp/4321")},
}

// The events we expect the host to emit when CheckForAddressChanges is called
// The events we expect the host to emit when SignalAddressChange is called
// and the changes between addr sets are detected
expectedEvents := []event.EvtLocalAddressesUpdated{
{
Expand Down Expand Up @@ -548,8 +549,11 @@ func TestHostAddrChangeDetection(t *testing.T) {
},
}

var lk sync.Mutex
currentAddrSet := 0
addrsFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr {
lk.Lock()
defer lk.Unlock()
return addrSets[currentAddrSet]
}

Expand All @@ -563,46 +567,40 @@ func TestHostAddrChangeDetection(t *testing.T) {
}
defer sub.Close()

// wait for the host background thread to start
time.Sleep(1 * time.Second)
// host should start with no addrs (addrSet 0)
addrs := h.Addrs()
if len(addrs) != 0 {
t.Fatalf("expected 0 addrs, got %d", len(addrs))
}

// Advance between addrSets
// change addr, signal and assert event
for i := 1; i < len(addrSets); i++ {
lk.Lock()
currentAddrSet = i
h.CheckForAddressChanges() // forces the host to check for changes now, instead of waiting for background update
lk.Unlock()
h.SignalAddressChange()
evt := waitForAddrChangeEvent(ctx, sub, t)
if !updatedAddrEventsEqual(expectedEvents[i-1], evt) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i], evt)
}

}
}

// drain events from the subscription
var receivedEvents []event.EvtLocalAddressesUpdated
readEvents:
func waitForAddrChangeEvent(ctx context.Context, sub event.Subscription, t *testing.T) event.EvtLocalAddressesUpdated {
for {
select {
case evt, more := <-sub.Out():
if !more {
break readEvents
}
receivedEvents = append(receivedEvents, evt.(event.EvtLocalAddressesUpdated))
if len(receivedEvents) == len(expectedEvents) {
break readEvents
t.Fatal("channel should not be closed")
}
return evt.(event.EvtLocalAddressesUpdated)
case <-ctx.Done():
break readEvents
case <-time.After(1 * time.Second):
break readEvents
}
}

// assert that we received the events we expected
if len(receivedEvents) != len(expectedEvents) {
t.Errorf("expected to receive %d addr change events, got %d", len(expectedEvents), len(receivedEvents))
}
for i, expected := range expectedEvents {
actual := receivedEvents[i]
if !updatedAddrEventsEqual(expected, actual) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, actual)
t.Fatal("context should not have cancelled")
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for address change event")
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/relay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (ar *AutoRelay) background(ctx context.Context) {
ar.cachedAddrs = nil
ar.mx.Unlock()
push = false
ar.host.CheckForAddressChanges()
ar.host.SignalAddressChange()
}
}
}
Expand Down