diff --git a/go.mod b/go.mod index f1f71d4252..d5f125b0ad 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-log v1.0.2 github.com/jbenet/go-cienv v0.1.0 - github.com/jbenet/goprocess v0.1.3 + github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-conn-security-multistream v0.1.0 github.com/libp2p/go-eventbus v0.1.0 github.com/libp2p/go-libp2p-autonat v0.2.0 diff --git a/go.sum b/go.sum index 8e40e26b97..11e6861d3e 100644 --- a/go.sum +++ b/go.sum @@ -110,6 +110,8 @@ github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= +github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index bce9e580e8..f64e1840f8 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -4,7 +4,6 @@ import ( "context" "io" "net" - "sync" "time" "github.com/libp2p/go-libp2p/p2p/protocol/identify" @@ -84,12 +83,12 @@ type BasicHost struct { proc goprocess.Process - mx sync.Mutex - lastAddrs []ma.Multiaddr - emitters struct { + emitters struct { evtLocalProtocolsUpdated event.Emitter evtLocalAddrsUpdated event.Emitter } + + addrChangeChan chan struct{} } var _ host.Host = (*BasicHost)(nil) @@ -130,12 +129,13 @@ type HostOpts struct { // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHost, error) { h := &BasicHost{ - network: net, - mux: msmux.NewMultistreamMuxer(), - negtimeout: DefaultNegotiationTimeout, - AddrsFactory: DefaultAddrsFactory, - maResolver: madns.DefaultResolver, - eventbus: eventbus.NewBus(), + network: net, + mux: msmux.NewMultistreamMuxer(), + negtimeout: DefaultNegotiationTimeout, + AddrsFactory: DefaultAddrsFactory, + maResolver: madns.DefaultResolver, + eventbus: eventbus.NewBus(), + addrChangeChan: make(chan struct{}, 1), } var err error @@ -230,6 +230,7 @@ func New(net network.Network, opts ...interface{}) *BasicHost { } h, err := NewHost(context.Background(), net, hostopts) + h.Start() if err != nil { // this cannot happen with legacy options // plus we want to keep the (deprecated) legacy interface unchanged @@ -300,24 +301,13 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { go handle(protoID, s) } -// CheckForAddressChanges determines whether our listen addresses have recently -// changed and emits an EvtLocalAddressesUpdatedEvent & a Push Identify if so. +// SignalAddressChange signals to the host that it needs to determine whether our listen addresses have recently +// changed. // Warning: this interface is unstable and may disappear in the future. -func (h *BasicHost) CheckForAddressChanges() { - h.mx.Lock() - addrs := h.Addrs() - changeEvt := makeUpdatedAddrEvent(h.lastAddrs, addrs) - if changeEvt != nil { - h.lastAddrs = addrs - } - h.mx.Unlock() - - if changeEvt != nil { - err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt) - if err != nil { - log.Warnf("error emitting event for updated addrs: %s", err) - } - h.ids.Push() +func (h *BasicHost) SignalAddressChange() { + select { + case h.addrChangeChan <- struct{}{}: + default: } } @@ -360,41 +350,31 @@ func (h *BasicHost) background(p goprocess.Process) { defer ticker.Stop() // initialize lastAddrs - h.mx.Lock() - if h.lastAddrs == nil { - h.lastAddrs = h.Addrs() - } - h.mx.Unlock() + lastAddrs := h.Addrs() for { select { case <-ticker.C: - h.CheckForAddressChanges() - + case <-h.addrChangeChan: case <-p.Closing(): return } - } -} - -func sameAddrs(a, b []ma.Multiaddr) bool { - if len(a) != len(b) { - return false - } - bmap := make(map[string]struct{}, len(b)) - for _, addr := range b { - bmap[string(addr.Bytes())] = struct{}{} - } + // emit an EvtLocalAddressesUpdatedEvent & a Push Identify if our listen addresses have changed. + addrs := h.Addrs() + changeEvt := makeUpdatedAddrEvent(lastAddrs, addrs) + if changeEvt != nil { + lastAddrs = addrs + } - for _, addr := range a { - _, ok := bmap[string(addr.Bytes())] - if !ok { - return false + if changeEvt != nil { + err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt) + if err != nil { + log.Warnf("error emitting event for updated addrs: %s", err) + } + h.ids.Push() } } - - return true } // ID returns the (local) peer.ID associated with this Host diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index dd3593a894..c0dee8e571 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -6,6 +6,7 @@ import ( "io" "reflect" "sort" + "sync" "testing" "time" @@ -518,7 +519,7 @@ func TestHostAddrChangeDetection(t *testing.T) { {ma.StringCast("/ip4/2.3.4.5/tcp/1234"), ma.StringCast("/ip4/3.4.5.6/tcp/4321")}, } - // The events we expect the host to emit when CheckForAddressChanges is called + // The events we expect the host to emit when SignalAddressChange is called // and the changes between addr sets are detected expectedEvents := []event.EvtLocalAddressesUpdated{ { @@ -548,8 +549,11 @@ func TestHostAddrChangeDetection(t *testing.T) { }, } + var lk sync.Mutex currentAddrSet := 0 addrsFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr { + lk.Lock() + defer lk.Unlock() return addrSets[currentAddrSet] } @@ -563,46 +567,40 @@ func TestHostAddrChangeDetection(t *testing.T) { } defer sub.Close() + // wait for the host background thread to start + time.Sleep(1 * time.Second) // host should start with no addrs (addrSet 0) addrs := h.Addrs() if len(addrs) != 0 { t.Fatalf("expected 0 addrs, got %d", len(addrs)) } - // Advance between addrSets + // change addr, signal and assert event for i := 1; i < len(addrSets); i++ { + lk.Lock() currentAddrSet = i - h.CheckForAddressChanges() // forces the host to check for changes now, instead of waiting for background update + lk.Unlock() + h.SignalAddressChange() + evt := waitForAddrChangeEvent(ctx, sub, t) + if !updatedAddrEventsEqual(expectedEvents[i-1], evt) { + t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i], evt) + } + } +} - // drain events from the subscription - var receivedEvents []event.EvtLocalAddressesUpdated -readEvents: +func waitForAddrChangeEvent(ctx context.Context, sub event.Subscription, t *testing.T) event.EvtLocalAddressesUpdated { for { select { case evt, more := <-sub.Out(): if !more { - break readEvents - } - receivedEvents = append(receivedEvents, evt.(event.EvtLocalAddressesUpdated)) - if len(receivedEvents) == len(expectedEvents) { - break readEvents + t.Fatal("channel should not be closed") } + return evt.(event.EvtLocalAddressesUpdated) case <-ctx.Done(): - break readEvents - case <-time.After(1 * time.Second): - break readEvents - } - } - - // assert that we received the events we expected - if len(receivedEvents) != len(expectedEvents) { - t.Errorf("expected to receive %d addr change events, got %d", len(expectedEvents), len(receivedEvents)) - } - for i, expected := range expectedEvents { - actual := receivedEvents[i] - if !updatedAddrEventsEqual(expected, actual) { - t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, actual) + t.Fatal("context should not have cancelled") + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for address change event") } } } diff --git a/p2p/host/relay/autorelay.go b/p2p/host/relay/autorelay.go index 3ef741b08a..a1691efb26 100644 --- a/p2p/host/relay/autorelay.go +++ b/p2p/host/relay/autorelay.go @@ -119,7 +119,7 @@ func (ar *AutoRelay) background(ctx context.Context) { ar.cachedAddrs = nil ar.mx.Unlock() push = false - ar.host.CheckForAddressChanges() + ar.host.SignalAddressChange() } } }