Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autorelay support for circuitv2 relays #1198

Merged
merged 16 commits into from
Sep 25, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 7 additions & 17 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/libp2p/go-libp2p-core/transport"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"

"github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/relay"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
Expand Down Expand Up @@ -209,7 +209,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
// TODO: We shouldn't be doing this here.
oldFactory := h.AddrsFactory
h.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
return oldFactory(relay.Filter(addrs))
return oldFactory(autorelay.Filter(addrs))
}
}

Expand Down Expand Up @@ -237,7 +237,7 @@ func (cfg *Config) NewNode() (host.Host, error) {

// Note: h.AddrsFactory may be changed by AutoRelay, but non-relay version is
// used by AutoNAT below.
var autorelay *relay.AutoRelay
var ar *autorelay.AutoRelay
addrF := h.AddrsFactory
if cfg.EnableAutoRelay {
if !cfg.Relay {
Expand All @@ -246,7 +246,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
}

if len(cfg.StaticRelays) > 0 {
autorelay = relay.NewAutoRelay(h, nil, router, cfg.StaticRelays)
ar = autorelay.NewAutoRelay(h, nil, router, cfg.StaticRelays)
} else {
if router == nil {
h.Close()
Expand All @@ -259,7 +259,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
}

discovery := discovery.NewRoutingDiscovery(crouter)
autorelay = relay.NewAutoRelay(h, discovery, router, cfg.StaticRelays)
ar = autorelay.NewAutoRelay(h, discovery, router, cfg.StaticRelays)
}
}

Expand Down Expand Up @@ -330,22 +330,12 @@ func (cfg *Config) NewNode() (host.Host, error) {
if router != nil {
ho = routed.Wrap(h, router)
}
if autorelay != nil {
return &autoRelayHost{Host: ho, autoRelay: autorelay}, nil
if ar != nil {
return autorelay.NewAutoRelayHost(ho, ar), nil
}
return ho, nil
}

type autoRelayHost struct {
host.Host
autoRelay *relay.AutoRelay
}

func (h *autoRelayHost) Close() error {
_ = h.autoRelay.Close()
return h.Host.Close()
}

// Option is a libp2p config option that can be given to the libp2p constructor
// (`libp2p.New`).
type Option func(cfg *Config) error
Expand Down
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/libp2p/go-libp2p-core/pnet"

"github.com/libp2p/go-libp2p/config"
autorelay "github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
autorelay "github.com/libp2p/go-libp2p/p2p/host/relay"
holepunch "github.com/libp2p/go-libp2p/p2p/protocol/holepunch"

ma "github.com/multiformats/go-multiaddr"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay
package autorelay

import (
"encoding/binary"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay
package autorelay

import (
"testing"
Expand Down
134 changes: 123 additions & 11 deletions p2p/host/relay/autorelay.go → p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay
package autorelay

import (
"context"
Expand All @@ -15,6 +15,8 @@ import (

basic "github.com/libp2p/go-libp2p/p2p/host/basic"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
Expand All @@ -30,7 +32,7 @@ var (
BootDelay = 20 * time.Second
)

// These are the known PL-operated relays
// These are the known PL-operated v1 relays; will be decommissioned in 2022.
var DefaultRelays = []string{
"/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y",
"/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y",
Expand All @@ -55,7 +57,7 @@ type AutoRelay struct {
disconnect chan struct{}

mx sync.Mutex
relays map[peer.ID]struct{}
relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay
status network.Reachability

cachedAddrs []ma.Multiaddr
Expand All @@ -71,14 +73,15 @@ func NewAutoRelay(bhost *basic.BasicHost, discover discovery.Discoverer, router
router: router,
addrsF: bhost.AddrsFactory,
static: static,
relays: make(map[peer.ID]struct{}),
relays: make(map[peer.ID]*circuitv2.Reservation),
disconnect: make(chan struct{}, 1),
status: network.ReachabilityUnknown,
}
bhost.AddrsFactory = ar.hostAddrs
bhost.Network().Notify(ar)
ar.refCount.Add(1)
go ar.background(ctx)
go ar.refresh(ctx)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
return ar
}

Expand Down Expand Up @@ -135,6 +138,75 @@ func (ar *AutoRelay) background(ctx context.Context) {
}
}

func (ar *AutoRelay) refresh(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
vyzo marked this conversation as resolved.
Show resolved Hide resolved
var toRefresh []peer.ID

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must have missed this in my earlier review, but do we need a check to garbage collect expired reservations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, we'll try to refresh them. Do you see any way to create garbage here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, we delete the reservation if refreshing fails.

ar.mx.Lock()
if ar.status == network.ReachabilityPublic {
// we are public, forget about the relays, unprotect peers
for p := range ar.relays {
ar.host.ConnManager().Unprotect(p, "autorelay")
vyzo marked this conversation as resolved.
Show resolved Hide resolved
delete(ar.relays, p)
}

ar.mx.Unlock()
continue
}

// find reservations about to expire
now := time.Now()
for p, rsvp := range ar.relays {
if rsvp == nil {
continue
}

if now.Add(time.Minute).Before(rsvp.Expiration) {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
continue
}

toRefresh = append(toRefresh, p)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}
ar.mx.Unlock()

// refresh reservations about to expire in parallel
var wg sync.WaitGroup
for _, p := range toRefresh {
wg.Add(1)

go func(p peer.ID) {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p})
ar.mx.Lock()
if err != nil {
log.Debugf("failed to refresh relay slot reservation with %s: %s", p, err)
delete(ar.relays, p)
// unprotect the connection
ar.host.ConnManager().Unprotect(p, "autorelay")
// notify of relay disconnection
select {
case ar.disconnect <- struct{}{}:
default:
}
} else {
log.Debugf("refreshed relay slot reservation with %s", p)
ar.relays[p] = rsvp
}
ar.mx.Unlock()
}(p)
}
wg.Wait()

case <-ctx.Done():
return
}
}
}

func (ar *AutoRelay) findRelays(ctx context.Context) bool {
if ar.numRelays() >= DesiredRelays {
return false
Expand Down Expand Up @@ -204,14 +276,48 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool {
return false
}

ok, err := relayv1.CanHop(ctx, ar.host, pi.ID)
protoIDv1 := string(relayv1.ProtoID)
protoIDv2 := string(circuitv2_proto.ProtoIDv2Hop)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
protos, err := ar.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2)
if err != nil {
log.Debugf("error querying relay: %s", err.Error())
log.Debugf("error checking relay protocol support for peer %s: %s", pi.ID, err)
return false
}

if !ok {
// not a hop relay
var supportsv1, supportsv2 bool
for _, proto := range protos {
switch proto {
case protoIDv1:
supportsv1 = true
case protoIDv2:
supportsv2 = true
}
}

var rsvp *circuitv2.Reservation

switch {
case supportsv2:
rsvp, err = circuitv2.Reserve(ctx, ar.host, pi)
if err != nil {
log.Debugf("error reserving slot with %s: %s", pi.ID, err)
return false
}

case supportsv1:
ok, err := relayv1.CanHop(ctx, ar.host, pi.ID)
if err != nil {
log.Debugf("error querying relay %s for v1 hop: %s", pi.ID, err)
return false
}

if !ok {
// not a hop relay
return false
}

default:
// supports neither, unusable relay.
return false
}

Expand All @@ -222,7 +328,11 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool {
if ar.host.Network().Connectedness(pi.ID) != network.Connected {
return false
}
ar.relays[pi.ID] = struct{}{}

ar.relays[pi.ID] = rsvp

// protect the connection
ar.host.ConnManager().Protect(pi.ID, "autorelay")

return true
}
Expand All @@ -246,8 +356,10 @@ func (ar *AutoRelay) connect(ctx context.Context, pi peer.AddrInfo) bool {
return false
}

// tag the connection as very important
ar.host.ConnManager().TagPeer(pi.ID, "relay", 42)
// wait for identify to complete so that we can check the supported protocols
// TODO we should do this without a delay/sleep.
time.Sleep(time.Second)
vyzo marked this conversation as resolved.
Show resolved Hide resolved

return true
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay_test
package autorelay_test

import (
"context"
Expand All @@ -9,7 +9,7 @@ import (
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/p2p/host/relay"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay"

"github.com/libp2p/go-libp2p-core/event"
Expand All @@ -27,8 +27,8 @@ import (

// test specific parameters
func init() {
relay.BootDelay = 1 * time.Second
relay.AdvertiseBootDelay = 100 * time.Millisecond
autorelay.BootDelay = 1 * time.Second
autorelay.AdvertiseBootDelay = 100 * time.Millisecond
}

// mock routing
Expand Down Expand Up @@ -133,16 +133,18 @@ func TestAutoRelay(t *testing.T) {
// this is the relay host
// announce dns addrs because filter out private addresses from relays,
// and we consider dns addresses "public".
relayHost, err := libp2p.New(libp2p.DisableRelay(), libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
for i, addr := range addrs {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
relayHost, err := libp2p.New(
libp2p.DisableRelay(),
libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
for i, addr := range addrs {
saddr := addr.String()
if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") {
addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1")
addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP)
}
}
}
return addrs
}))
return addrs
}))
if err != nil {
t.Fatal(err)
}
Expand All @@ -160,7 +162,7 @@ func TestAutoRelay(t *testing.T) {
t.Fatal(err)
}
relayDiscovery := discovery.NewRoutingDiscovery(relayRouting)
relay.Advertise(ctx, relayDiscovery)
autorelay.Advertise(ctx, relayDiscovery)

// the client hosts
h1, err := libp2p.New(libp2p.EnableRelay())
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/relay/doc.go → p2p/host/autorelay/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ How it works:
advertising relay addresses. The new set of addresses is propagated to
connected peers through the `identify/push` protocol.
*/
package relay
package autorelay
19 changes: 19 additions & 0 deletions p2p/host/autorelay/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package autorelay

import (
"github.com/libp2p/go-libp2p-core/host"
)

type AutoRelayHost struct {
host.Host
ar *AutoRelay
}

func (h *AutoRelayHost) Close() error {
_ = h.ar.Close()
return h.Host.Close()
}

func NewAutoRelayHost(h host.Host, ar *AutoRelay) *AutoRelayHost {
return &AutoRelayHost{Host: h, ar: ar}
}
2 changes: 1 addition & 1 deletion p2p/host/relay/log.go → p2p/host/autorelay/log.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay
package autorelay

import (
logging "github.com/ipfs/go-log/v2"
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/relay/relay.go → p2p/host/autorelay/relay.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay
package autorelay

import (
"context"
Expand Down