From 3296c58c94cd68286ae65fda4d29995ddba108c8 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 19 Feb 2021 10:23:42 +0530 Subject: [PATCH 01/22] hole punching protocol --- config/config.go | 18 +- go.mod | 3 +- go.sum | 5 + options.go | 38 ++ p2p/host/basic/basic_host.go | 25 +- p2p/host/relay/autorelay.go | 3 + p2p/protocol/holepunch/coordination.go | 302 ++++++++++++++++ p2p/protocol/holepunch/pb/Makefile | 11 + p2p/protocol/holepunch/pb/holepunch.pb.go | 414 ++++++++++++++++++++++ p2p/protocol/holepunch/pb/holepunch.proto | 19 + 10 files changed, 829 insertions(+), 9 deletions(-) create mode 100644 p2p/protocol/holepunch/coordination.go create mode 100644 p2p/protocol/holepunch/pb/Makefile create mode 100644 p2p/protocol/holepunch/pb/holepunch.pb.go create mode 100644 p2p/protocol/holepunch/pb/holepunch.proto diff --git a/config/config.go b/config/config.go index 0871e584f4..5baf11d617 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "context" "crypto/rand" + "errors" "fmt" "time" @@ -92,6 +93,8 @@ type Config struct { EnableAutoRelay bool AutoNATConfig StaticRelays []peer.AddrInfo + + EnableHolePunching bool } func (cfg *Config) makeSwarm(ctx context.Context) (*swarm.Swarm, error) { @@ -185,12 +188,17 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { return nil, err } + if cfg.EnableHolePunching && !cfg.Relay { + return nil, errors.New("cannot enable hole punching; relay is not enabled") + } + h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{ - ConnManager: cfg.ConnManager, - AddrsFactory: cfg.AddrsFactory, - NATManager: cfg.NATManager, - EnablePing: !cfg.DisablePing, - UserAgent: cfg.UserAgent, + ConnManager: cfg.ConnManager, + AddrsFactory: cfg.AddrsFactory, + NATManager: cfg.NATManager, + EnablePing: !cfg.DisablePing, + UserAgent: cfg.UserAgent, + EnableHolePunching: cfg.EnableHolePunching, }) if err != nil { diff --git a/go.mod b/go.mod index a93b215274..918b631759 100644 --- a/go.mod +++ b/go.mod @@ -11,13 +11,14 @@ require ( github.com/ipfs/go-log v1.0.4 github.com/jbenet/go-cienv v0.1.0 github.com/jbenet/goprocess v0.1.4 + github.com/jpillora/backoff v1.0.0 github.com/libp2p/go-addr-util v0.0.2 github.com/libp2p/go-conn-security-multistream v0.2.1 github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p-autonat v0.4.0 github.com/libp2p/go-libp2p-blankhost v0.2.0 github.com/libp2p/go-libp2p-circuit v0.4.0 - github.com/libp2p/go-libp2p-core v0.8.2 + github.com/libp2p/go-libp2p-core v0.8.4 github.com/libp2p/go-libp2p-discovery v0.5.0 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-mplex v0.4.1 diff --git a/go.sum b/go.sum index c0469c98a5..84f9b7f5e8 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,7 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -201,6 +202,8 @@ github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZl github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= @@ -279,6 +282,8 @@ github.com/libp2p/go-libp2p-core v0.8.1 h1:+hvGysqSZ1AAWFHU8vNXX05vMSwI/6BSukuyn github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.2 h1:/eaSZACWftJZYm07S0nRxdI84v1hSmgnCXrGOvJdpNQ= github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= +github.com/libp2p/go-libp2p-core v0.8.4 h1:BL0noEpCJm0FIexkHlGI3nYEW3mGpc7zy7dxRmvXpwg= +github.com/libp2p/go-libp2p-core v0.8.4/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= diff --git a/options.go b/options.go index 4c638f2d5d..04afe2c95b 100644 --- a/options.go +++ b/options.go @@ -462,3 +462,41 @@ func UserAgent(userAgent string) Option { return nil } } + +// Experimental +// EnableHolePunching enables NAT traversal by enabling NATT'd peers to both initiate and respond to hole punching attempts +// to create direct/NAT-traversed connections with other peers. (default: disabled) +// +// Dependencies: +// * Relay (enabled by default) +// +// This subsystem performs two functions: +// +// 1. On receiving an inbound Relay connection, it attempts to create a direct connection with the remote peer +// by initiating and co-ordinating a hole punch over the Relayed connection. +// 2. If a peer sees a request to co-ordinate a hole punch on an outbound Relay connection, +// it will participate in the hole-punch to create a direct connection with the remote peer. +// +// If the hole punch is successful, all new streams will thereafter be created on the hole-punched connection. +// The Relayed connection will eventually be closed after a grace period. +// +// All existing indefinite long-lived streams on the Relayed connection will have to re-opened on the hole-punched connection by the user. +// Users can make use of the `Connected`/`Disconnected` notifications emitted by the Network for this purpose. +// +// It is not mandatory but nice to also enable the `AutoRelay` option (See `EnableAutoRelay`) +// so the peer can discover and connect to Relay servers if it discovers that it is NATT'd and has private reachability via AutoNAT. +// This will then enable it to advertise Relay addresses which can be used to accept inbound Relay connections to then co-ordinate +// a hole punch. +// +// If `EnableAutoRelay` is configured and the user is confident that the peer has private reachability/is NATT'd, +// the `ForceReachabilityPrivate` option can be configured to short-circuit reachability discovery via AutoNAT +// so the peer can immediately start connecting to Relay servers. +// +// If `EnableAutoRelay` is configured, the `StaticRelays` option can be used to configure a static set of Relay servers +// for `AutoRelay` to connect to so that it does not need to discover Relay servers via Routing. +func EnableHolePunching() Option { + return func(cfg *Config) error { + cfg.EnableHolePunching = true + return nil + } +} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 69baafc8c9..dc506c34cd 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -19,6 +19,7 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/record" + "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" addrutil "github.com/libp2p/go-addr-util" "github.com/libp2p/go-eventbus" @@ -87,6 +88,7 @@ type BasicHost struct { network network.Network mux *msmux.MultistreamMuxer ids *identify.IDService + hps *holepunch.HolePunchService pings *ping.PingService natmgr NATManager maResolver *madns.Resolver @@ -151,6 +153,9 @@ type HostOpts struct { // DisableSignedPeerRecord disables the generation of Signed Peer Records on this host. DisableSignedPeerRecord bool + + // EnableHolePunching enables the peer to initiate/respond to hole punching attempts for NAT traversal. + EnableHolePunching bool } // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. @@ -180,6 +185,13 @@ func NewHost(ctx context.Context, n network.Network, opts *HostOpts) (*BasicHost return nil, err } + if opts.EnableHolePunching { + h.hps, err = holepunch.NewHolePunchService(h, h.ids) + if err != nil { + return nil, fmt.Errorf("failed to create hole punch service: %w", err) + } + } + if !h.disableSignedPeerRecord { cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore()) if !ok { @@ -646,7 +658,7 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I } case <-ctx.Done(): s.Reset() - // wait for the negotiation to cancel. + // wait for `SelectOneOf` to error out because of resetting the stream. <-errCh return nil, ctx.Err() } @@ -679,8 +691,11 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error { // absorb addresses into peerstore h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) - if h.Network().Connectedness(pi.ID) == network.Connected { - return nil + forceDirect, _ := network.GetForceDirectDial(ctx) + if !forceDirect { + if h.Network().Connectedness(pi.ID) == network.Connected { + return nil + } } resolved, err := h.resolveAddrs(ctx, h.Peerstore().PeerInfo(pi.ID)) @@ -1003,6 +1018,10 @@ func (h *BasicHost) Close() error { h.ids.Close() } + if h.hps != nil { + h.hps.Close() + } + _ = h.emitters.evtLocalProtocolsUpdated.Close() _ = h.emitters.evtLocalAddrsUpdated.Close() h.Network().Close() diff --git a/p2p/host/relay/autorelay.go b/p2p/host/relay/autorelay.go index a8d2284822..6f800ea634 100644 --- a/p2p/host/relay/autorelay.go +++ b/p2p/host/relay/autorelay.go @@ -33,8 +33,11 @@ var ( // These are the known PL-operated relays var DefaultRelays = []string{ "/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", + "/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", "/ip4/147.75.195.153/tcp/4001/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", + "/ip4/147.75.195.153/udp/4001/quic/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", "/ip4/147.75.70.221/tcp/4001/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", + "/ip4/147.75.70.221/udp/4001/quic/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", } // AutoRelay is a Host that uses relays for connectivity when a NAT is detected. diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go new file mode 100644 index 0000000000..a637bd9642 --- /dev/null +++ b/p2p/protocol/holepunch/coordination.go @@ -0,0 +1,302 @@ +package holepunch + +import ( + "context" + "sync" + "time" + + logging "github.com/ipfs/go-log" + "github.com/jpillora/backoff" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + holepunch_pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "github.com/libp2p/go-msgio/protoio" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +// TODO Should we have options for these ? +const ( + protocol = "/libp2p/holepunch/1.0.0" + maxMsgSize = 4 * 1024 // 4K + holePunchTimeout = 2 * time.Minute + dialTimeout = 60 * time.Second + maxRetries = 4 +) + +var ( + log = logging.Logger("p2p/holepunch") +) + +// TODO Find a better name for this protocol. +// HolePunchService is used to make direct connections with a peer via hole-punching. +type HolePunchService struct { + ctx context.Context + ctxCancel context.CancelFunc + + ids *identify.IDService + host host.Host + + // ensure we shutdown ONLY once + closeSync sync.Once + refCount sync.WaitGroup +} + +// NewHolePunchService creates a new service that can be used for hole punching +func NewHolePunchService(h host.Host, ids *identify.IDService) (*HolePunchService, error) { + ctx, cancel := context.WithCancel(context.Background()) + hs := &HolePunchService{ctx: ctx, ctxCancel: cancel, host: h, ids: ids} + + h.SetStreamHandler(protocol, hs.handleNewStream) + h.Network().Notify((*netNotifiee)(hs)) + return hs, nil +} + +// Close closes the Hole Punch Service. +func (hs *HolePunchService) Close() error { + hs.closeSync.Do(func() { + hs.ctxCancel() + hs.refCount.Wait() + }) + + return nil +} + +// attempts to make a direct connection with the remote peer of `relayConn` by co-ordinating a hole punch over +// the given relay connection `relayConn`. +func (hs *HolePunchService) holePunch(relayConn network.Conn) { + rp := relayConn.RemotePeer() + + // short-circuit hole punching if a direct dial works. + // attempt a direct connection ONLY if we have a public address for the remote peer + for _, a := range hs.host.Peerstore().Addrs(rp) { + if manet.IsPublicAddr(a) && !isRelayAddress(a) { + forceDirectConnCtx := network.WithForceDirectDial(hs.ctx, "hole-punching") + dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) + defer cancel() + if err := hs.host.Connect(dialCtx, peer.AddrInfo{ID: rp}); err == nil { + log.Debugf("direct connection to peer %s successful, no need for a hole punch", rp.Pretty()) + return + } + break + } + } + + // hole punch + hpCtx := network.WithUseTransient(hs.ctx, "hole-punch") + s, err := hs.host.NewStream(hpCtx, rp, protocol) + if err != nil { + return + } + log.Infof("will attempt hole punch with peer %s", rp.Pretty()) + _ = s.SetDeadline(time.Now().Add(holePunchTimeout)) + w := protoio.NewDelimitedWriter(s) + + // send a CONNECT and start RTT measurement + msg := new(holepunch_pb.HolePunch) + msg.Type = holepunch_pb.HolePunch_CONNECT.Enum() + msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) + if err := w.WriteMsg(msg); err != nil { + s.Reset() + log.Errorf("failed to send hole punch CONNECT, err: %s", err) + return + } + tstart := time.Now() + + // wait for a CONNECT message from the remote peer + rd := protoio.NewDelimitedReader(s, maxMsgSize) + msg.Reset() + if err := rd.ReadMsg(msg); err != nil { + s.Reset() + log.Errorf("failed to read connect message from remote peer, err: %s", err) + return + } + if msg.GetType() != holepunch_pb.HolePunch_CONNECT { + s.Reset() + log.Debugf("expectd HolePunch_CONNECT message, got %s", msg.GetType()) + return + } + obsRemote := addrsFromBytes(msg.ObsAddrs) + rtt := time.Since(tstart) + + // send a SYNC message and attempt a direct connect after half the RTT + msg.Reset() + msg.Type = holepunch_pb.HolePunch_SYNC.Enum() + if err := w.WriteMsg(msg); err != nil { + s.Reset() + log.Errorf("failed to send SYNC message for hole punching, err: %s", err) + return + } + defer s.Close() + + synTime := time.Duration(rtt.Milliseconds()/2) * time.Millisecond + + // wait for sync to reach the other peer and then punch a hole for it in our NAT + // by attempting a connect to it. + select { + case <-time.After(synTime): + pi := peer.AddrInfo{ + ID: rp, + Addrs: obsRemote, + } + hs.holePunchConnectWithBackoff(pi) + + case <-hs.ctx.Done(): + return + } +} + +func (hs *HolePunchService) handleNewStream(s network.Stream) { + log.Infof("got hole punch request from peer %s", s.Conn().RemotePeer().Pretty()) + _ = s.SetDeadline(time.Now().Add(holePunchTimeout)) + rp := s.Conn().RemotePeer() + wr := protoio.NewDelimitedWriter(s) + rd := protoio.NewDelimitedReader(s, maxMsgSize) + + // Read Connect message + msg := new(holepunch_pb.HolePunch) + if err := rd.ReadMsg(msg); err != nil { + s.Reset() + return + } + if msg.GetType() != holepunch_pb.HolePunch_CONNECT { + s.Reset() + return + } + obsDial := addrsFromBytes(msg.ObsAddrs) + + // Write CONNECT message + msg.Reset() + msg.Type = holepunch_pb.HolePunch_CONNECT.Enum() + msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) + if err := wr.WriteMsg(msg); err != nil { + s.Reset() + return + } + + // Read SYNC message + msg.Reset() + if err := rd.ReadMsg(msg); err != nil { + s.Reset() + return + } + if msg.GetType() != holepunch_pb.HolePunch_SYNC { + s.Reset() + return + } + defer s.Close() + + // Hole punch now by forcing a connect + pi := peer.AddrInfo{ + ID: rp, + Addrs: obsDial, + } + hs.holePunchConnectWithBackoff(pi) +} + +func (hs *HolePunchService) holePunchConnectWithBackoff(pi peer.AddrInfo) { + forceDirectConnCtx := network.WithForceDirectDial(hs.ctx, "hole-punching") + dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) + defer cancel() + err := hs.host.Connect(dialCtx, pi) + if err == nil { + log.Infof("hole punch with peer %s successful, direct conns to peer are:", pi.ID.Pretty()) + for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { + if !isRelayAddress(c.RemoteMultiaddr()) { + log.Info(c) + } + } + return + } + + // backoff and retry before giving up. + // this code will make the peer retry for (approximately ) a TOTAL of 10 seconds + // before giving up and declaring the hole punch a failure. + b := &backoff.Backoff{ + Jitter: true, + Min: 1 * time.Second, + Max: 5 * time.Second, + Factor: 2, + } + for b.Attempt() < maxRetries { + time.Sleep(b.Duration()) + err = hs.host.Connect(dialCtx, pi) + if err == nil { + log.Infof("hole punch with peer %s successful after retry, direct conns to peer are:", pi.ID.Pretty()) + for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { + if !isRelayAddress(c.RemoteMultiaddr()) { + log.Info(c) + } + } + return + } + } + log.Errorf("hole punch with peer %s failed, err: %s", pi.ID.Pretty(), err) +} + +type netNotifiee HolePunchService + +func (nn *netNotifiee) HolePunchService() *HolePunchService { + return (*HolePunchService)(nn) +} + +// TODO FIX For some reason, we see two such notifications for inbound proxy connections. +func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) { + hs := nn.HolePunchService() + dir := v.Stat().Direction + + // Hole punch if it's an inbound proxy connection. + // If we already have a direct connection with the remote peer, this will be a no-op. + if dir == network.DirInbound && isRelayAddress(v.RemoteMultiaddr()) { + log.Debugf("got inbound proxy conn from peer %s, connection is %v", v.RemotePeer().String(), v) + hs.refCount.Add(1) + go func() { + defer hs.refCount.Done() + select { + // waiting for Identify here will allow us to access the peer's public and observed addresses + // that we can dial to for a hole punch. + case <-hs.ids.IdentifyWait(v): + case <-hs.ctx.Done(): + return + } + nn.HolePunchService().holePunch(v) + }() + return + } +} + +func (nn *netNotifiee) Disconnected(_ network.Network, v network.Conn) {} + +func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {} +func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {} +func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} + +func isRelayAddress(a ma.Multiaddr) bool { + _, err := a.ValueForProtocol(ma.P_CIRCUIT) + + return err == nil +} + +func addrsToBytes(as []ma.Multiaddr) [][]byte { + bzs := make([][]byte, 0, len(as)) + for _, a := range as { + bzs = append(bzs, a.Bytes()) + } + + return bzs +} + +func addrsFromBytes(bzs [][]byte) []ma.Multiaddr { + addrs := make([]ma.Multiaddr, 0, len(bzs)) + for _, bz := range bzs { + a, err := ma.NewMultiaddrBytes(bz) + if err == nil { + addrs = append(addrs, a) + } + } + + return addrs +} diff --git a/p2p/protocol/holepunch/pb/Makefile b/p2p/protocol/holepunch/pb/Makefile new file mode 100644 index 0000000000..60d9dd4434 --- /dev/null +++ b/p2p/protocol/holepunch/pb/Makefile @@ -0,0 +1,11 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $< + +clean: + rm -f *.pb.go + rm -f *.go \ No newline at end of file diff --git a/p2p/protocol/holepunch/pb/holepunch.pb.go b/p2p/protocol/holepunch/pb/holepunch.pb.go new file mode 100644 index 0000000000..0f2452ebeb --- /dev/null +++ b/p2p/protocol/holepunch/pb/holepunch.pb.go @@ -0,0 +1,414 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: holepunch.proto + +package holepunch_pb + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type HolePunch_Type int32 + +const ( + HolePunch_CONNECT HolePunch_Type = 100 + HolePunch_SYNC HolePunch_Type = 300 +) + +var HolePunch_Type_name = map[int32]string{ + 100: "CONNECT", + 300: "SYNC", +} + +var HolePunch_Type_value = map[string]int32{ + "CONNECT": 100, + "SYNC": 300, +} + +func (x HolePunch_Type) Enum() *HolePunch_Type { + p := new(HolePunch_Type) + *p = x + return p +} + +func (x HolePunch_Type) String() string { + return proto.EnumName(HolePunch_Type_name, int32(x)) +} + +func (x *HolePunch_Type) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(HolePunch_Type_value, data, "HolePunch_Type") + if err != nil { + return err + } + *x = HolePunch_Type(value) + return nil +} + +func (HolePunch_Type) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_290ddea0f23ef64a, []int{0, 0} +} + +type HolePunch struct { + Type *HolePunch_Type `protobuf:"varint,1,opt,name=type,enum=holepunch.pb.HolePunch_Type" json:"type,omitempty"` + // For hole punching, we'll send some additional observed addresses to the remote peer + // that could have been filtered by the Host address factory (for example: AutoRelay removes all public addresses if peer has private reachability). + // This is a hack! + // We plan to have a better address discovery and advertisement mechanism in the future. + // See https://github.com/libp2p/go-libp2p-autonat/pull/98 + ObsAddrs [][]byte `protobuf:"bytes,2,rep,name=ObsAddrs" json:"ObsAddrs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HolePunch) Reset() { *m = HolePunch{} } +func (m *HolePunch) String() string { return proto.CompactTextString(m) } +func (*HolePunch) ProtoMessage() {} +func (*HolePunch) Descriptor() ([]byte, []int) { + return fileDescriptor_290ddea0f23ef64a, []int{0} +} +func (m *HolePunch) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *HolePunch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_HolePunch.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *HolePunch) XXX_Merge(src proto.Message) { + xxx_messageInfo_HolePunch.Merge(m, src) +} +func (m *HolePunch) XXX_Size() int { + return m.Size() +} +func (m *HolePunch) XXX_DiscardUnknown() { + xxx_messageInfo_HolePunch.DiscardUnknown(m) +} + +var xxx_messageInfo_HolePunch proto.InternalMessageInfo + +func (m *HolePunch) GetType() HolePunch_Type { + if m != nil && m.Type != nil { + return *m.Type + } + return HolePunch_CONNECT +} + +func (m *HolePunch) GetObsAddrs() [][]byte { + if m != nil { + return m.ObsAddrs + } + return nil +} + +func init() { + proto.RegisterEnum("holepunch.pb.HolePunch_Type", HolePunch_Type_name, HolePunch_Type_value) + proto.RegisterType((*HolePunch)(nil), "holepunch.pb.HolePunch") +} + +func init() { proto.RegisterFile("holepunch.proto", fileDescriptor_290ddea0f23ef64a) } + +var fileDescriptor_290ddea0f23ef64a = []byte{ + // 153 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0xc8, 0xcf, 0x49, + 0x2d, 0x28, 0xcd, 0x4b, 0xce, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x41, 0x12, 0x48, + 0x52, 0xaa, 0xe4, 0xe2, 0xf4, 0xc8, 0xcf, 0x49, 0x0d, 0x00, 0xf1, 0x85, 0x0c, 0xb8, 0x58, 0x4a, + 0x2a, 0x0b, 0x52, 0x25, 0x18, 0x15, 0x18, 0x35, 0xf8, 0x8c, 0x64, 0xf4, 0x90, 0x55, 0xea, 0xc1, + 0x95, 0xe9, 0x85, 0x54, 0x16, 0xa4, 0x06, 0x81, 0x55, 0x0a, 0x49, 0x71, 0x71, 0xf8, 0x27, 0x15, + 0x3b, 0xa6, 0xa4, 0x14, 0x15, 0x4b, 0x30, 0x29, 0x30, 0x6b, 0xf0, 0x04, 0xc1, 0xf9, 0x4a, 0x72, + 0x5c, 0x2c, 0x20, 0x95, 0x42, 0xdc, 0x5c, 0xec, 0xce, 0xfe, 0x7e, 0x7e, 0xae, 0xce, 0x21, 0x02, + 0x29, 0x42, 0x9c, 0x5c, 0x2c, 0xc1, 0x91, 0x7e, 0xce, 0x02, 0x6b, 0x98, 0x9c, 0x78, 0x4e, 0x3c, + 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0x46, 0x40, 0x00, 0x00, 0x00, 0xff, + 0xff, 0x62, 0xf4, 0xc8, 0x7c, 0xa8, 0x00, 0x00, 0x00, +} + +func (m *HolePunch) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *HolePunch) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *HolePunch) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.ObsAddrs) > 0 { + for iNdEx := len(m.ObsAddrs) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.ObsAddrs[iNdEx]) + copy(dAtA[i:], m.ObsAddrs[iNdEx]) + i = encodeVarintHolepunch(dAtA, i, uint64(len(m.ObsAddrs[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if m.Type != nil { + i = encodeVarintHolepunch(dAtA, i, uint64(*m.Type)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func encodeVarintHolepunch(dAtA []byte, offset int, v uint64) int { + offset -= sovHolepunch(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *HolePunch) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Type != nil { + n += 1 + sovHolepunch(uint64(*m.Type)) + } + if len(m.ObsAddrs) > 0 { + for _, b := range m.ObsAddrs { + l = len(b) + n += 1 + l + sovHolepunch(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovHolepunch(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozHolepunch(x uint64) (n int) { + return sovHolepunch(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *HolePunch) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHolepunch + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HolePunch: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HolePunch: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + var v HolePunch_Type + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHolepunch + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= HolePunch_Type(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Type = &v + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ObsAddrs", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHolepunch + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthHolepunch + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthHolepunch + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ObsAddrs = append(m.ObsAddrs, make([]byte, postIndex-iNdEx)) + copy(m.ObsAddrs[len(m.ObsAddrs)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHolepunch(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHolepunch + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthHolepunch + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipHolepunch(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHolepunch + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHolepunch + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHolepunch + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthHolepunch + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupHolepunch + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthHolepunch + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthHolepunch = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowHolepunch = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupHolepunch = fmt.Errorf("proto: unexpected end of group") +) diff --git a/p2p/protocol/holepunch/pb/holepunch.proto b/p2p/protocol/holepunch/pb/holepunch.proto new file mode 100644 index 0000000000..63d6fec699 --- /dev/null +++ b/p2p/protocol/holepunch/pb/holepunch.proto @@ -0,0 +1,19 @@ +syntax = "proto2"; + +package holepunch.pb; + +message HolePunch { + enum Type { + CONNECT = 100; + SYNC = 300; + } + + optional Type type=1; + + // For hole punching, we'll send some additional observed addresses to the remote peer + // that could have been filtered by the Host address factory (for example: AutoRelay removes all public addresses if peer has private reachability). + // This is a hack! + // We plan to have a better address discovery and advertisement mechanism in the future. + // See https://github.com/libp2p/go-libp2p-autonat/pull/98 + repeated bytes ObsAddrs = 2; +} \ No newline at end of file From b85f227e8ab8affae0d248a6db6859489831ef0b Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 19 Feb 2021 15:12:22 +0530 Subject: [PATCH 02/22] address review --- p2p/protocol/holepunch/coordination.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index a637bd9642..6ae54e056d 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -10,7 +10,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - holepunch_pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" + "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-msgio/protoio" ma "github.com/multiformats/go-multiaddr" @@ -21,13 +21,13 @@ import ( const ( protocol = "/libp2p/holepunch/1.0.0" maxMsgSize = 4 * 1024 // 4K - holePunchTimeout = 2 * time.Minute - dialTimeout = 60 * time.Second + holePunchTimeout = 1 * time.Minute + dialTimeout = 10 * time.Second maxRetries = 4 ) var ( - log = logging.Logger("p2p/holepunch") + log = logging.Logger("p2p-holepunch") ) // TODO Find a better name for this protocol. @@ -86,8 +86,10 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { // hole punch hpCtx := network.WithUseTransient(hs.ctx, "hole-punch") - s, err := hs.host.NewStream(hpCtx, rp, protocol) + sCtx := network.WithNoDial(hpCtx, "hole-punch") + s, err := hs.host.NewStream(sCtx, rp, protocol) if err != nil { + log.Errorf("failed to open hole-punching stream with peer %s, err: %s", rp, err) return } log.Infof("will attempt hole punch with peer %s", rp.Pretty()) @@ -222,6 +224,10 @@ func (hs *HolePunchService) holePunchConnectWithBackoff(pi peer.AddrInfo) { } for b.Attempt() < maxRetries { time.Sleep(b.Duration()) + + dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) + defer cancel() + err = hs.host.Connect(dialCtx, pi) if err == nil { log.Infof("hole punch with peer %s successful after retry, direct conns to peer are:", pi.ID.Pretty()) From 08178ad7263a31140db4b541caf281d125b3f4d4 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 19 Feb 2021 15:43:41 +0530 Subject: [PATCH 03/22] fix nits --- p2p/protocol/holepunch/coordination.go | 18 +++++++++--------- p2p/protocol/holepunch/pb/holepunch.proto | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 6ae54e056d..4319382cf7 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -10,7 +10,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" + pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-msgio/protoio" ma "github.com/multiformats/go-multiaddr" @@ -97,8 +97,8 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { w := protoio.NewDelimitedWriter(s) // send a CONNECT and start RTT measurement - msg := new(holepunch_pb.HolePunch) - msg.Type = holepunch_pb.HolePunch_CONNECT.Enum() + msg := new(pb.HolePunch) + msg.Type = pb.HolePunch_CONNECT.Enum() msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) if err := w.WriteMsg(msg); err != nil { s.Reset() @@ -115,7 +115,7 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { log.Errorf("failed to read connect message from remote peer, err: %s", err) return } - if msg.GetType() != holepunch_pb.HolePunch_CONNECT { + if msg.GetType() != pb.HolePunch_CONNECT { s.Reset() log.Debugf("expectd HolePunch_CONNECT message, got %s", msg.GetType()) return @@ -125,7 +125,7 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { // send a SYNC message and attempt a direct connect after half the RTT msg.Reset() - msg.Type = holepunch_pb.HolePunch_SYNC.Enum() + msg.Type = pb.HolePunch_SYNC.Enum() if err := w.WriteMsg(msg); err != nil { s.Reset() log.Errorf("failed to send SYNC message for hole punching, err: %s", err) @@ -158,12 +158,12 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { rd := protoio.NewDelimitedReader(s, maxMsgSize) // Read Connect message - msg := new(holepunch_pb.HolePunch) + msg := new(pb.HolePunch) if err := rd.ReadMsg(msg); err != nil { s.Reset() return } - if msg.GetType() != holepunch_pb.HolePunch_CONNECT { + if msg.GetType() != pb.HolePunch_CONNECT { s.Reset() return } @@ -171,7 +171,7 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { // Write CONNECT message msg.Reset() - msg.Type = holepunch_pb.HolePunch_CONNECT.Enum() + msg.Type = pb.HolePunch_CONNECT.Enum() msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) if err := wr.WriteMsg(msg); err != nil { s.Reset() @@ -184,7 +184,7 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { s.Reset() return } - if msg.GetType() != holepunch_pb.HolePunch_SYNC { + if msg.GetType() != pb.HolePunch_SYNC { s.Reset() return } diff --git a/p2p/protocol/holepunch/pb/holepunch.proto b/p2p/protocol/holepunch/pb/holepunch.proto index 63d6fec699..5e9cfab0cb 100644 --- a/p2p/protocol/holepunch/pb/holepunch.proto +++ b/p2p/protocol/holepunch/pb/holepunch.proto @@ -16,4 +16,4 @@ message HolePunch { // We plan to have a better address discovery and advertisement mechanism in the future. // See https://github.com/libp2p/go-libp2p-autonat/pull/98 repeated bytes ObsAddrs = 2; -} \ No newline at end of file +} From d1b3f07bc28202b96a02209bbaa70ed775f7aff4 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 19 Feb 2021 16:34:18 +0530 Subject: [PATCH 04/22] init correctly --- p2p/host/basic/basic_host.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 16c644973a..6c29e8abe2 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -185,13 +185,6 @@ func NewHost(ctx context.Context, n network.Network, opts *HostOpts) (*BasicHost return nil, err } - if opts.EnableHolePunching { - h.hps, err = holepunch.NewHolePunchService(h, h.ids) - if err != nil { - return nil, fmt.Errorf("failed to create hole punch service: %w", err) - } - } - if !h.disableSignedPeerRecord { cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore()) if !ok { @@ -229,6 +222,13 @@ func NewHost(ctx context.Context, n network.Network, opts *HostOpts) (*BasicHost return nil, fmt.Errorf("failed to create Identify service: %s", err) } + if opts.EnableHolePunching { + h.hps, err = holepunch.NewHolePunchService(h, h.ids) + if err != nil { + return nil, fmt.Errorf("failed to create hole punch service: %w", err) + } + } + if uint64(opts.NegotiationTimeout) != 0 { h.negtimeout = opts.NegotiationTimeout } From 326afcc06f26c1fbf0b7b7774349406e2c3f30b8 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 22 Feb 2021 09:48:16 +0530 Subject: [PATCH 05/22] withSimulConnect --- go.mod | 3 +-- go.sum | 6 ++--- p2p/protocol/holepunch/coordination.go | 32 +++++++++++--------------- 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index d1b4a98c0d..5552a18f1c 100644 --- a/go.mod +++ b/go.mod @@ -11,14 +11,13 @@ require ( github.com/ipfs/go-log v1.0.4 github.com/jbenet/go-cienv v0.1.0 github.com/jbenet/goprocess v0.1.4 - github.com/jpillora/backoff v1.0.0 github.com/libp2p/go-addr-util v0.0.2 github.com/libp2p/go-conn-security-multistream v0.2.1 github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p-autonat v0.4.0 github.com/libp2p/go-libp2p-blankhost v0.2.0 github.com/libp2p/go-libp2p-circuit v0.4.0 - github.com/libp2p/go-libp2p-core v0.8.4 + github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-discovery v0.5.0 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-mplex v0.4.1 diff --git a/go.sum b/go.sum index 5a0d88b786..4c1151c5dd 100644 --- a/go.sum +++ b/go.sum @@ -200,8 +200,6 @@ github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZl github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= -github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= -github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= @@ -277,8 +275,8 @@ github.com/libp2p/go-libp2p-core v0.8.0 h1:5K3mT+64qDTKbV3yTdbMCzJ7O6wbNsavAEb8i github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= -github.com/libp2p/go-libp2p-core v0.8.4 h1:BL0noEpCJm0FIexkHlGI3nYEW3mGpc7zy7dxRmvXpwg= -github.com/libp2p/go-libp2p-core v0.8.4/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= +github.com/libp2p/go-libp2p-core v0.8.5 h1:aEgbIcPGsKy6zYcC+5AJivYFedhYa4sW7mIpWpUaLKw= +github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 4319382cf7..36f579df2a 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -6,7 +6,6 @@ import ( "time" logging "github.com/ipfs/go-log" - "github.com/jpillora/backoff" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -23,7 +22,8 @@ const ( maxMsgSize = 4 * 1024 // 4K holePunchTimeout = 1 * time.Minute dialTimeout = 10 * time.Second - maxRetries = 4 + maxRetries = 5 + retryWait = 2 * time.Second ) var ( @@ -143,7 +143,7 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { ID: rp, Addrs: obsRemote, } - hs.holePunchConnectWithBackoff(pi) + hs.holePunchConnectWithRetry(pi) case <-hs.ctx.Done(): return @@ -195,11 +195,12 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { ID: rp, Addrs: obsDial, } - hs.holePunchConnectWithBackoff(pi) + hs.holePunchConnectWithRetry(pi) } -func (hs *HolePunchService) holePunchConnectWithBackoff(pi peer.AddrInfo) { - forceDirectConnCtx := network.WithForceDirectDial(hs.ctx, "hole-punching") +func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) { + holePunchCtx := network.WithSimultaneousConnect(hs.ctx, "hole-punching") + forceDirectConnCtx := network.WithForceDirectDial(holePunchCtx, "hole-punching") dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) defer cancel() err := hs.host.Connect(dialCtx, pi) @@ -211,26 +212,19 @@ func (hs *HolePunchService) holePunchConnectWithBackoff(pi peer.AddrInfo) { } } return + } else { + log.Infof("first hole punch attempt with peer %s failed, error: %s, will retry now...", pi.ID.Pretty(), err) } - // backoff and retry before giving up. - // this code will make the peer retry for (approximately ) a TOTAL of 10 seconds - // before giving up and declaring the hole punch a failure. - b := &backoff.Backoff{ - Jitter: true, - Min: 1 * time.Second, - Max: 5 * time.Second, - Factor: 2, - } - for b.Attempt() < maxRetries { - time.Sleep(b.Duration()) + for i := 0; i < maxRetries; i++ { + time.Sleep(retryWait) dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) defer cancel() err = hs.host.Connect(dialCtx, pi) if err == nil { - log.Infof("hole punch with peer %s successful after retry, direct conns to peer are:", pi.ID.Pretty()) + log.Infof("hole punch with peer %s successful after %d retries, direct conns to peer are:", pi.ID.Pretty(), i) for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { if !isRelayAddress(c.RemoteMultiaddr()) { log.Info(c) @@ -239,7 +233,7 @@ func (hs *HolePunchService) holePunchConnectWithBackoff(pi peer.AddrInfo) { return } } - log.Errorf("hole punch with peer %s failed, err: %s", pi.ID.Pretty(), err) + log.Errorf("all retries for hole punch with peer %s failed, err: %s", pi.ID.Pretty(), err) } type netNotifiee HolePunchService From 30c5b0b6f597ead5ae79c0dc8328874cb5d16367 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 22 Feb 2021 10:51:27 +0530 Subject: [PATCH 06/22] update swarm --- go.mod | 2 +- go.sum | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 5552a18f1c..d94e0a3ff5 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/libp2p/go-libp2p-netutil v0.1.0 github.com/libp2p/go-libp2p-noise v0.1.1 github.com/libp2p/go-libp2p-peerstore v0.2.6 - github.com/libp2p/go-libp2p-swarm v0.4.2 + github.com/libp2p/go-libp2p-swarm v0.4.3 github.com/libp2p/go-libp2p-testing v0.4.0 github.com/libp2p/go-libp2p-tls v0.1.3 github.com/libp2p/go-libp2p-transport-upgrader v0.4.2 diff --git a/go.sum b/go.sum index 4c1151c5dd..a9f612b015 100644 --- a/go.sum +++ b/go.sum @@ -275,6 +275,7 @@ github.com/libp2p/go-libp2p-core v0.8.0 h1:5K3mT+64qDTKbV3yTdbMCzJ7O6wbNsavAEb8i github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= +github.com/libp2p/go-libp2p-core v0.8.3/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.5 h1:aEgbIcPGsKy6zYcC+5AJivYFedhYa4sW7mIpWpUaLKw= github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= @@ -324,8 +325,8 @@ github.com/libp2p/go-libp2p-swarm v0.2.8 h1:cIUUvytBzNQmGSjnXFlI6UpoBGsaud82mJPI github.com/libp2p/go-libp2p-swarm v0.2.8/go.mod h1:JQKMGSth4SMqonruY0a8yjlPVIkb0mdNSwckW7OYziM= github.com/libp2p/go-libp2p-swarm v0.3.0 h1:w18ZLMccbvwgyR+dODEeA3r1zbFZj+YVq6PClXo77lY= github.com/libp2p/go-libp2p-swarm v0.3.0/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk= -github.com/libp2p/go-libp2p-swarm v0.4.2 h1:rwZUsls+8dImHCcO2LZEa9+QGVF2tBSkywnA0PIYEJg= -github.com/libp2p/go-libp2p-swarm v0.4.2/go.mod h1:gNfZcYwyQJPNKEz3iyPfgwIJu7flqyBQOfCW+8paNCM= +github.com/libp2p/go-libp2p-swarm v0.4.3 h1:tAdkIj9gxMernQ6FTDPALnb8zAiw8xmcYz85FfA4oME= +github.com/libp2p/go-libp2p-swarm v0.4.3/go.mod h1:mmxP1pGBSc1Arw4F5DIjcpjFAmsRzA1KADuMtMuCT4g= github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= From 2d3c67ca11fc202f257a627b96ca13a75fd63e43 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 22 Feb 2021 14:01:03 +0530 Subject: [PATCH 07/22] log direct conns even on error --- p2p/protocol/holepunch/coordination.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 36f579df2a..47e4c6dfc4 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -214,6 +214,13 @@ func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) { return } else { log.Infof("first hole punch attempt with peer %s failed, error: %s, will retry now...", pi.ID.Pretty(), err) + log.Infof("\n direct conns to peer are...") + + for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { + if !isRelayAddress(c.RemoteMultiaddr()) { + log.Info(c) + } + } } for i := 0; i < maxRetries; i++ { @@ -233,7 +240,12 @@ func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) { return } } - log.Errorf("all retries for hole punch with peer %s failed, err: %s", pi.ID.Pretty(), err) + log.Errorf("all retries for hole punch with peer %s failed, err: %s, direct conns to peer are: ", pi.ID.Pretty(), err) + for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { + if !isRelayAddress(c.RemoteMultiaddr()) { + log.Info(c) + } + } } type netNotifiee HolePunchService From 832565922c0d6c561712bee0a8ac514ff1446c32 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 22 Feb 2021 14:04:33 +0530 Subject: [PATCH 08/22] log connection ID --- p2p/protocol/holepunch/coordination.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 47e4c6dfc4..2038699403 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -262,7 +262,7 @@ func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) { // Hole punch if it's an inbound proxy connection. // If we already have a direct connection with the remote peer, this will be a no-op. if dir == network.DirInbound && isRelayAddress(v.RemoteMultiaddr()) { - log.Debugf("got inbound proxy conn from peer %s, connection is %v", v.RemotePeer().String(), v) + log.Debugf("got inbound proxy conn from peer %s, connectionID is %s", v.RemotePeer().String(), v.ID()) hs.refCount.Add(1) go func() { defer hs.refCount.Done() From aedab0480eb3cb4ee402008a8baaf93ce73a37c8 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 22 Feb 2021 14:15:55 +0530 Subject: [PATCH 09/22] correct index --- p2p/protocol/holepunch/coordination.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 2038699403..4bf2c17477 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -223,7 +223,7 @@ func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) { } } - for i := 0; i < maxRetries; i++ { + for i := 1; i <= maxRetries; i++ { time.Sleep(retryWait) dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) From b4dad3e3a1bb2bda69e09dc08ce7ee6dc2e2d467 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 22 Feb 2021 20:21:33 +0530 Subject: [PATCH 10/22] change dial timeout --- p2p/protocol/holepunch/coordination.go | 2 +- p2p/protocol/holepunch/coordination_test.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 p2p/protocol/holepunch/coordination_test.go diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 4bf2c17477..86418a8ce0 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -21,7 +21,7 @@ const ( protocol = "/libp2p/holepunch/1.0.0" maxMsgSize = 4 * 1024 // 4K holePunchTimeout = 1 * time.Minute - dialTimeout = 10 * time.Second + dialTimeout = 5 * time.Second maxRetries = 5 retryWait = 2 * time.Second ) diff --git a/p2p/protocol/holepunch/coordination_test.go b/p2p/protocol/holepunch/coordination_test.go new file mode 100644 index 0000000000..c10d69f7c3 --- /dev/null +++ b/p2p/protocol/holepunch/coordination_test.go @@ -0,0 +1 @@ +package holepunch From c80a8ecb7e5a06a771dab788bfe3dcf6ad91c57b Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 23 Feb 2021 09:32:37 +0200 Subject: [PATCH 11/22] remove check for relay configuration when enabling hole punching --- config/config.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/config/config.go b/config/config.go index 5baf11d617..477ac53bbe 100644 --- a/config/config.go +++ b/config/config.go @@ -3,7 +3,6 @@ package config import ( "context" "crypto/rand" - "errors" "fmt" "time" @@ -188,10 +187,6 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { return nil, err } - if cfg.EnableHolePunching && !cfg.Relay { - return nil, errors.New("cannot enable hole punching; relay is not enabled") - } - h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{ ConnManager: cfg.ConnManager, AddrsFactory: cfg.AddrsFactory, From 93ab8e6f90151fb0163a342a32f7062e8f917003 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 23 Feb 2021 09:44:32 +0200 Subject: [PATCH 12/22] deduplicate hole punching attempts --- p2p/protocol/holepunch/coordination.go | 39 ++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 86418a8ce0..598599f127 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -42,12 +42,22 @@ type HolePunchService struct { // ensure we shutdown ONLY once closeSync sync.Once refCount sync.WaitGroup + + // active hole punches for deduplicating + activeMx sync.Mutex + active map[peer.ID]struct{} } // NewHolePunchService creates a new service that can be used for hole punching func NewHolePunchService(h host.Host, ids *identify.IDService) (*HolePunchService, error) { ctx, cancel := context.WithCancel(context.Background()) - hs := &HolePunchService{ctx: ctx, ctxCancel: cancel, host: h, ids: ids} + hs := &HolePunchService{ + ctx: ctx, + ctxCancel: cancel, + host: h, + ids: ids, + active: make(map[peer.ID]struct{}), + } h.SetStreamHandler(protocol, hs.handleNewStream) h.Network().Notify((*netNotifiee)(hs)) @@ -69,6 +79,13 @@ func (hs *HolePunchService) Close() error { func (hs *HolePunchService) holePunch(relayConn network.Conn) { rp := relayConn.RemotePeer() + // short-circuit check to see if we already have a direct connection + for _, c := range hs.host.Network().ConnsToPeer(rp) { + if !isRelayAddress(c.RemoteMultiaddr()) { + return + } + } + // short-circuit hole punching if a direct dial works. // attempt a direct connection ONLY if we have a public address for the remote peer for _, a := range hs.host.Peerstore().Addrs(rp) { @@ -254,7 +271,6 @@ func (nn *netNotifiee) HolePunchService() *HolePunchService { return (*HolePunchService)(nn) } -// TODO FIX For some reason, we see two such notifications for inbound proxy connections. func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) { hs := nn.HolePunchService() dir := v.Stat().Direction @@ -262,10 +278,27 @@ func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) { // Hole punch if it's an inbound proxy connection. // If we already have a direct connection with the remote peer, this will be a no-op. if dir == network.DirInbound && isRelayAddress(v.RemoteMultiaddr()) { + p := v.RemotePeer() + hs.activeMx.Lock() + _, active := hs.active[p] + if !active { + hs.active[p] = struct{}{} + } + hs.activeMx.Unlock() + + if active { + return + } + log.Debugf("got inbound proxy conn from peer %s, connectionID is %s", v.RemotePeer().String(), v.ID()) hs.refCount.Add(1) go func() { defer hs.refCount.Done() + defer func() { + hs.activeMx.Lock() + delete(hs.active, p) + hs.activeMx.Unlock() + }() select { // waiting for Identify here will allow us to access the peer's public and observed addresses // that we can dial to for a hole punch. @@ -273,7 +306,7 @@ func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) { case <-hs.ctx.Done(): return } - nn.HolePunchService().holePunch(v) + hs.holePunch(v) }() return } From 072db051f660db0d0144184095dd026729183c09 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Tue, 23 Feb 2021 13:21:22 +0530 Subject: [PATCH 13/22] tests WIP --- p2p/protocol/holepunch/coordination.go | 157 +++++++++++--------- p2p/protocol/holepunch/coordination_test.go | 138 ++++++++++++++++- 2 files changed, 220 insertions(+), 75 deletions(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 86418a8ce0..904f74c226 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -2,6 +2,8 @@ package holepunch import ( "context" + "errors" + "fmt" "sync" "time" @@ -9,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-msgio/protoio" @@ -17,13 +20,16 @@ import ( ) // TODO Should we have options for these ? -const ( - protocol = "/libp2p/holepunch/1.0.0" - maxMsgSize = 4 * 1024 // 4K - holePunchTimeout = 1 * time.Minute - dialTimeout = 5 * time.Second - maxRetries = 5 - retryWait = 2 * time.Second +var ( + // Protocol is the libp2p protocol for Hole Punching. + Protocol protocol.ID = "/libp2p/holepunch/1.0.0" + // HolePunchTimeout is the timeout for the hole punch protocol stream. + HolePunchTimeout = 1 * time.Minute + + maxMsgSize = 4 * 1024 // 4K + dialTimeout = 5 * time.Second + maxRetries = 5 + retryWait = 2 * time.Second ) var ( @@ -46,10 +52,14 @@ type HolePunchService struct { // NewHolePunchService creates a new service that can be used for hole punching func NewHolePunchService(h host.Host, ids *identify.IDService) (*HolePunchService, error) { + if ids == nil { + return nil, errors.New("Identify service can't be nil") + } + ctx, cancel := context.WithCancel(context.Background()) hs := &HolePunchService{ctx: ctx, ctxCancel: cancel, host: h, ids: ids} - h.SetStreamHandler(protocol, hs.handleNewStream) + h.SetStreamHandler(Protocol, hs.handleNewStream) h.Network().Notify((*netNotifiee)(hs)) return hs, nil } @@ -66,8 +76,7 @@ func (hs *HolePunchService) Close() error { // attempts to make a direct connection with the remote peer of `relayConn` by co-ordinating a hole punch over // the given relay connection `relayConn`. -func (hs *HolePunchService) holePunch(relayConn network.Conn) { - rp := relayConn.RemotePeer() +func (hs *HolePunchService) HolePunch(rp peer.ID) error { // short-circuit hole punching if a direct dial works. // attempt a direct connection ONLY if we have a public address for the remote peer @@ -78,7 +87,7 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { defer cancel() if err := hs.host.Connect(dialCtx, peer.AddrInfo{ID: rp}); err == nil { log.Debugf("direct connection to peer %s successful, no need for a hole punch", rp.Pretty()) - return + return nil } break } @@ -87,49 +96,59 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { // hole punch hpCtx := network.WithUseTransient(hs.ctx, "hole-punch") sCtx := network.WithNoDial(hpCtx, "hole-punch") - s, err := hs.host.NewStream(sCtx, rp, protocol) + s, err := hs.host.NewStream(sCtx, rp, Protocol) if err != nil { - log.Errorf("failed to open hole-punching stream with peer %s, err: %s", rp, err) - return + msg := fmt.Sprintf("failed to open hole-punching stream with peer %s, err: %s", rp, err) + log.Error(msg) + return errors.New(msg) } log.Infof("will attempt hole punch with peer %s", rp.Pretty()) - _ = s.SetDeadline(time.Now().Add(holePunchTimeout)) + _ = s.SetDeadline(time.Now().Add(HolePunchTimeout)) w := protoio.NewDelimitedWriter(s) - // send a CONNECT and start RTT measurement + // send a CONNECT and start RTT measurement. msg := new(pb.HolePunch) msg.Type = pb.HolePunch_CONNECT.Enum() msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) + + tstart := time.Now() if err := w.WriteMsg(msg); err != nil { s.Reset() - log.Errorf("failed to send hole punch CONNECT, err: %s", err) - return + msg := fmt.Sprintf("failed to send hole punch CONNECT, err: %s", err) + + log.Error(msg) + return errors.New(msg) } - tstart := time.Now() // wait for a CONNECT message from the remote peer rd := protoio.NewDelimitedReader(s, maxMsgSize) msg.Reset() if err := rd.ReadMsg(msg); err != nil { s.Reset() - log.Errorf("failed to read connect message from remote peer, err: %s", err) - return + + msg := fmt.Sprintf("failed to read HolePunch_CONNECT message from remote peer, err: %s", err) + log.Error(msg) + return errors.New(msg) } + rtt := time.Since(tstart) + if msg.GetType() != pb.HolePunch_CONNECT { s.Reset() - log.Debugf("expectd HolePunch_CONNECT message, got %s", msg.GetType()) - return + msg := fmt.Sprintf("expected HolePunch_CONNECT message, got %s", msg.GetType()) + + log.Debug(msg) + return errors.New(msg) } obsRemote := addrsFromBytes(msg.ObsAddrs) - rtt := time.Since(tstart) // send a SYNC message and attempt a direct connect after half the RTT msg.Reset() msg.Type = pb.HolePunch_SYNC.Enum() if err := w.WriteMsg(msg); err != nil { s.Reset() - log.Errorf("failed to send SYNC message for hole punching, err: %s", err) - return + msg := fmt.Sprintf("failed to send SYNC message for hole punching, err: %s", err) + log.Error(msg) + return errors.New(msg) } defer s.Close() @@ -143,16 +162,16 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { ID: rp, Addrs: obsRemote, } - hs.holePunchConnectWithRetry(pi) + return hs.holePunchConnectWithRetry(pi) case <-hs.ctx.Done(): - return + return hs.ctx.Err() } } func (hs *HolePunchService) handleNewStream(s network.Stream) { log.Infof("got hole punch request from peer %s", s.Conn().RemotePeer().Pretty()) - _ = s.SetDeadline(time.Now().Add(holePunchTimeout)) + _ = s.SetDeadline(time.Now().Add(HolePunchTimeout)) rp := s.Conn().RemotePeer() wr := protoio.NewDelimitedWriter(s) rd := protoio.NewDelimitedReader(s, maxMsgSize) @@ -195,10 +214,10 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { ID: rp, Addrs: obsDial, } - hs.holePunchConnectWithRetry(pi) + _ = hs.holePunchConnectWithRetry(pi) } -func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) { +func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) error { holePunchCtx := network.WithSimultaneousConnect(hs.ctx, "hole-punching") forceDirectConnCtx := network.WithForceDirectDial(holePunchCtx, "hole-punching") dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) @@ -211,16 +230,9 @@ func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) { log.Info(c) } } - return + return nil } else { log.Infof("first hole punch attempt with peer %s failed, error: %s, will retry now...", pi.ID.Pretty(), err) - log.Infof("\n direct conns to peer are...") - - for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { - if !isRelayAddress(c.RemoteMultiaddr()) { - log.Info(c) - } - } } for i := 1; i <= maxRetries; i++ { @@ -237,15 +249,39 @@ func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) { log.Info(c) } } - return + return nil } } - log.Errorf("all retries for hole punch with peer %s failed, err: %s, direct conns to peer are: ", pi.ID.Pretty(), err) - for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { - if !isRelayAddress(c.RemoteMultiaddr()) { - log.Info(c) + log.Errorf("all retries for hole punch with peer %s failed, err: %s", pi.ID.Pretty(), err) + + return err +} + +func isRelayAddress(a ma.Multiaddr) bool { + _, err := a.ValueForProtocol(ma.P_CIRCUIT) + + return err == nil +} + +func addrsToBytes(as []ma.Multiaddr) [][]byte { + bzs := make([][]byte, 0, len(as)) + for _, a := range as { + bzs = append(bzs, a.Bytes()) + } + + return bzs +} + +func addrsFromBytes(bzs [][]byte) []ma.Multiaddr { + addrs := make([]ma.Multiaddr, 0, len(bzs)) + for _, bz := range bzs { + a, err := ma.NewMultiaddrBytes(bz) + if err == nil { + addrs = append(addrs, a) } } + + return addrs } type netNotifiee HolePunchService @@ -273,42 +309,15 @@ func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) { case <-hs.ctx.Done(): return } - nn.HolePunchService().holePunch(v) + nn.HolePunchService().HolePunch(v.RemotePeer()) }() return } } -func (nn *netNotifiee) Disconnected(_ network.Network, v network.Conn) {} - +// NO-OPS +func (nn *netNotifiee) Disconnected(_ network.Network, v network.Conn) {} func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {} func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {} func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} - -func isRelayAddress(a ma.Multiaddr) bool { - _, err := a.ValueForProtocol(ma.P_CIRCUIT) - - return err == nil -} - -func addrsToBytes(as []ma.Multiaddr) [][]byte { - bzs := make([][]byte, 0, len(as)) - for _, a := range as { - bzs = append(bzs, a.Bytes()) - } - - return bzs -} - -func addrsFromBytes(bzs [][]byte) []ma.Multiaddr { - addrs := make([]ma.Multiaddr, 0, len(bzs)) - for _, bz := range bzs { - a, err := ma.NewMultiaddrBytes(bz) - if err == nil { - addrs = append(addrs, a) - } - } - - return addrs -} diff --git a/p2p/protocol/holepunch/coordination_test.go b/p2p/protocol/holepunch/coordination_test.go index c10d69f7c3..f12f9a7e64 100644 --- a/p2p/protocol/holepunch/coordination_test.go +++ b/p2p/protocol/holepunch/coordination_test.go @@ -1 +1,137 @@ -package holepunch +package holepunch_test + +import ( + "context" + "net" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" + "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "github.com/libp2p/go-msgio/protoio" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/stretchr/testify/require" +) + +func TestDirectDialWorks(t *testing.T) { + // all addrs should be marked as public + cpy := manet.Private4 + manet.Private4 = []*net.IPNet{} + defer func() { + manet.Private4 = cpy + }() + + ctx := context.Background() + + // try to hole punch without any connection and streams, if it works -> it's a direct connection + h1, h1ps := mkHostWithHolePunchSvc(t, ctx) + h2, _ := mkHostWithHolePunchSvc(t, ctx) + h2.RemoveStreamHandler(holepunch.Protocol) + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.ConnectedAddrTTL) + + require.NoError(t, h1ps.HolePunch(h2.ID())) + + cs := h1.Network().ConnsToPeer(h2.ID()) + require.Len(t, cs, 1) + + cs = h2.Network().ConnsToPeer(h1.ID()) + require.Len(t, cs, 1) +} + +func TestHolePunchFailuresOnInitiator(t *testing.T) { + ctx := context.Background() + + tcs := map[string]struct { + rhandler func(s network.Stream) + errMsg string + holePunchTimeout time.Duration + }{ + "responder does NOT send a CONNECT message": { + rhandler: func(s network.Stream) { + wr := protoio.NewDelimitedWriter(s) + msg := new(holepunch_pb.HolePunch) + msg.Type = holepunch_pb.HolePunch_SYNC.Enum() + wr.WriteMsg(msg) + }, + errMsg: "expected HolePunch_CONNECT message", + }, + "responder does NOT support protocol": { + rhandler: nil, + errMsg: "protocol not supported", + }, + "unable to READ CONNECT message from responder": { + rhandler: func(s network.Stream) { + s.Reset() + }, + errMsg: "failed to read HolePunch_CONNECT message", + }, + "responder does NOT reply within hole punch deadline": { + holePunchTimeout: 10 * time.Millisecond, + rhandler: func(s network.Stream) { + for { + + } + }, + errMsg: "i/o deadline reached", + }, + } + + for name, tc := range tcs { + t.Run(name, func(t *testing.T) { + if tc.holePunchTimeout != 0 { + cpy := holepunch.HolePunchTimeout + holepunch.HolePunchTimeout = tc.holePunchTimeout + defer func() { + holepunch.HolePunchTimeout = cpy + }() + } + + h1, h1ps := mkHostWithHolePunchSvc(t, ctx) + h2, _ := mkHostWithHolePunchSvc(t, ctx) + + if tc.rhandler != nil { + h2.SetStreamHandler(holepunch.Protocol, tc.rhandler) + } else { + h2.RemoveStreamHandler(holepunch.Protocol) + } + + connect(t, ctx, h1, h2) + err := h1ps.HolePunch(h2.ID()) + require.Error(t, err) + require.Contains(t, err.Error(), tc.errMsg) + }) + + } +} + +func TestHolePunchFailuresOnResponder(t *testing.T) { + +} + +func connect(t *testing.T, ctx context.Context, h1, h2 host.Host) network.Conn { + require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ + ID: h2.ID(), + Addrs: h2.Addrs(), + })) + + cs := h1.Network().ConnsToPeer(h2.ID()) + require.Len(t, cs, 1) + return cs[0] +} + +func mkHostWithHolePunchSvc(t *testing.T, ctx context.Context) (host.Host, *holepunch.HolePunchService) { + h, err := libp2p.New(ctx) + require.NoError(t, err) + ids, err := identify.NewIDService(h) + require.NoError(t, err) + hps, err := holepunch.NewHolePunchService(h, ids) + require.NoError(t, err) + + return h, hps +} From 6ebc4f138e2397a75ec7c5d046a08ab3830b8a15 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 23 Feb 2021 10:03:04 +0200 Subject: [PATCH 14/22] move rtt measurement to the right spots, add some logging --- p2p/protocol/holepunch/coordination.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 598599f127..e0c44dd788 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -117,12 +117,13 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { msg := new(pb.HolePunch) msg.Type = pb.HolePunch_CONNECT.Enum() msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) + + tstart := time.Now() if err := w.WriteMsg(msg); err != nil { s.Reset() log.Errorf("failed to send hole punch CONNECT, err: %s", err) return } - tstart := time.Now() // wait for a CONNECT message from the remote peer rd := protoio.NewDelimitedReader(s, maxMsgSize) @@ -137,8 +138,9 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { log.Debugf("expectd HolePunch_CONNECT message, got %s", msg.GetType()) return } - obsRemote := addrsFromBytes(msg.ObsAddrs) + rtt := time.Since(tstart) + obsRemote := addrsFromBytes(msg.ObsAddrs) // send a SYNC message and attempt a direct connect after half the RTT msg.Reset() @@ -150,7 +152,8 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) { } defer s.Close() - synTime := time.Duration(rtt.Milliseconds()/2) * time.Millisecond + synTime := rtt / 2 + log.Debugf("peer RTT is %s; starting hole punch in %s", rtt, synTime) // wait for sync to reach the other peer and then punch a hole for it in our NAT // by attempting a connect to it. @@ -216,6 +219,8 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { } func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) { + log.Debugf("starting hole punch with %s", pi.ID) + holePunchCtx := network.WithSimultaneousConnect(hs.ctx, "hole-punching") forceDirectConnCtx := network.WithForceDirectDial(holePunchCtx, "hole-punching") dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) From 6f0f02d3e875bdc7be45abf3835388690b444571 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Tue, 23 Feb 2021 18:51:40 +0530 Subject: [PATCH 15/22] tests complete --- p2p/protocol/holepunch/coordination_test.go | 124 +++++++++++++++++++- 1 file changed, 123 insertions(+), 1 deletion(-) diff --git a/p2p/protocol/holepunch/coordination_test.go b/p2p/protocol/holepunch/coordination_test.go index 2e5feb4d0a..b905202477 100644 --- a/p2p/protocol/holepunch/coordination_test.go +++ b/p2p/protocol/holepunch/coordination_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/libp2p/go-libp2p" + circuit "github.com/libp2p/go-libp2p-circuit" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -46,6 +47,41 @@ func TestDirectDialWorks(t *testing.T) { require.Len(t, cs, 1) } +func TestEndToEndSimConnect(t *testing.T) { + // all addrs should be marked as public + cpy := manet.Private4 + manet.Private4 = []*net.IPNet{} + defer func() { + manet.Private4 = cpy + }() + ctx := context.Background() + r := mkRelay(t, ctx) + + h1, _ := mkHostWithHolePunchSvc(t, ctx) + h2, _ := mkHostWithStaticAutoRelay(t, ctx, r) + + // h1 has a relay addr + // h2 should connect to the relay addr + var raddr ma.Multiaddr + for _, a := range h2.Addrs() { + if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil { + raddr = a + break + } + } + require.NotEmpty(t, raddr) + + require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ + ID: h2.ID(), + Addrs: []ma.Multiaddr{raddr}, + })) + + // wait till a direct connection is complete + ensureDirectConn(t, h1, h2) + // ensure no hole-punching streams are open on either side + ensureNoHolePunchingStream(t, h1, h2) +} + func TestFailuresOnInitiator(t *testing.T) { ctx := context.Background() @@ -248,8 +284,60 @@ func TestObservedAddressesAreExchanged(t *testing.T) { }, 2*time.Second, 100*time.Millisecond) } -func TestHolePunchingAttemptsAreDeduplicated(t *testing.T) { +func ensureNoHolePunchingStream(t *testing.T, h1, h2 host.Host) { + require.Eventually(t, func() bool { + for _, c := range h1.Network().ConnsToPeer(h2.ID()) { + for _, s := range c.GetStreams() { + if s.ID() == string(holepunch.Protocol) { + return false + } + } + } + + return true + + }, 5*time.Second, 200*time.Millisecond) + + require.Eventually(t, func() bool { + for _, c := range h2.Network().ConnsToPeer(h1.ID()) { + for _, s := range c.GetStreams() { + if s.ID() == string(holepunch.Protocol) { + return false + } + } + } + + return true + + }, 5*time.Second, 200*time.Millisecond) +} + +func ensureDirectConn(t *testing.T, h1, h2 host.Host) { + require.Eventually(t, func() bool { + cs := h1.Network().ConnsToPeer(h2.ID()) + if len(cs) != 2 { + return false + } + for _, c := range cs { + if _, err := c.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT); err != nil { + return true + } + } + return false + }, 5*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + cs := h2.Network().ConnsToPeer(h1.ID()) + if len(cs) != 2 { + return false + } + for _, c := range cs { + if _, err := c.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT); err != nil { + return true + } + } + return false + }, 5*time.Second, 200*time.Millisecond) } func TestNoHolePunchingIfDirectConnAlreadyExists(t *testing.T) { @@ -267,6 +355,40 @@ func connect(t *testing.T, ctx context.Context, h1, h2 host.Host) network.Conn { return cs[0] } +func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Host) (host.Host, *holepunch.HolePunchService) { + pi := peer.AddrInfo{ + ID: relay.ID(), + Addrs: relay.Addrs(), + } + + h, err := libp2p.New(ctx, libp2p.EnableRelay(), libp2p.EnableAutoRelay(), libp2p.ForceReachabilityPrivate(), + libp2p.StaticRelays([]peer.AddrInfo{pi})) + require.NoError(t, err) + ids, err := identify.NewIDService(h) + require.NoError(t, err) + hps, err := holepunch.NewHolePunchService(h, ids, true) + require.NoError(t, err) + + // wait till we have a relay addr + require.Eventually(t, func() bool { + for _, a := range h.Addrs() { + if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil { + return true + } + } + + return false + }, 5*time.Second, 200*time.Millisecond) + + return h, hps +} + +func mkRelay(t *testing.T, ctx context.Context) host.Host { + h, err := libp2p.New(ctx, libp2p.EnableRelay(circuit.OptHop)) + require.NoError(t, err) + return h +} + func mkHostWithHolePunchSvc(t *testing.T, ctx context.Context) (host.Host, *holepunch.HolePunchService) { h, err := libp2p.New(ctx) require.NoError(t, err) From 630f73bf38a491a5362219fbe80bf50f18e5afcb Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 24 Feb 2021 15:03:24 +0200 Subject: [PATCH 16/22] use transient connections in identify streams --- p2p/protocol/identify/id.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 9970112be6..224688cf91 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -357,7 +357,7 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) { } }() - s, err = c.NewStream(context.TODO()) + s, err = c.NewStream(network.WithUseTransient(context.TODO(), "identify")) if err != nil { log.Debugw("error opening identify stream", "error", err) // the connection is probably already closed if we hit this. From d3ae1ac07adbbf784b67cf5d329ec12bbc3a421a Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 25 Feb 2021 10:55:36 +0200 Subject: [PATCH 17/22] clean up error handling --- p2p/protocol/holepunch/coordination.go | 64 ++++++++++++-------------- 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 75d038d8a0..9bf93fc99f 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -113,9 +113,7 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { sCtx := network.WithNoDial(hpCtx, "hole-punch") s, err := hs.host.NewStream(sCtx, rp, Protocol) if err != nil { - msg := fmt.Sprintf("failed to open hole-punching stream with peer %s, err: %s", rp, err) - log.Error(msg) - return errors.New(msg) + return fmt.Errorf("failed to open hole-punching stream with peer %s: %w", rp, err) } log.Infof("will attempt hole punch with peer %s", rp.Pretty()) _ = s.SetDeadline(time.Now().Add(HolePunchTimeout)) @@ -129,10 +127,7 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { tstart := time.Now() if err := w.WriteMsg(msg); err != nil { s.Reset() - msg := fmt.Sprintf("failed to send hole punch CONNECT, err: %s", err) - - log.Error(msg) - return errors.New(msg) + return fmt.Errorf("failed to send hole punch CONNECT: %w", err) } // wait for a CONNECT message from the remote peer @@ -140,19 +135,13 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { msg.Reset() if err := rd.ReadMsg(msg); err != nil { s.Reset() - - msg := fmt.Sprintf("failed to read HolePunch_CONNECT message from remote peer, err: %s", err) - log.Error(msg) - return errors.New(msg) + return fmt.Errorf("failed to read CONNECT message from remote peer: %w", err) } rtt := time.Since(tstart) - if msg.GetType() != pb.HolePunch_CONNECT { + if t := msg.GetType(); t != pb.HolePunch_CONNECT { s.Reset() - msg := fmt.Sprintf("expected HolePunch_CONNECT message, got %s", msg.GetType()) - - log.Debug(msg) - return errors.New(msg) + return fmt.Errorf("expected CONNECT message but got %d", t) } obsRemote := addrsFromBytes(msg.ObsAddrs) @@ -162,9 +151,7 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { msg.Type = pb.HolePunch_SYNC.Enum() if err := w.WriteMsg(msg); err != nil { s.Reset() - msg := fmt.Sprintf("failed to send SYNC message for hole punching, err: %s", err) - log.Error(msg) - return errors.New(msg) + return fmt.Errorf("failed to send SYNC message for hole punching: %w", err) } defer s.Close() @@ -194,13 +181,16 @@ func (hs *HolePunchService) HandlerErrors() []error { return hs.handlerErrs } -func (hs *HolePunchService) appendHandlerErr(err error) { +func (hs *HolePunchService) handlerError(err error) { + if !hs.isTest { + log.Warn(err) + return + } + hs.handlerErrsMu.Lock() defer hs.handlerErrsMu.Unlock() - if hs.isTest { - hs.handlerErrs = append(hs.handlerErrs, err) - } + hs.handlerErrs = append(hs.handlerErrs, err) } func (hs *HolePunchService) handleNewStream(s network.Stream) { @@ -214,12 +204,12 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { msg := new(pb.HolePunch) if err := rd.ReadMsg(msg); err != nil { s.Reset() - hs.appendHandlerErr(fmt.Errorf("failed to read message from initator, err: %s", err)) + hs.handlerError(fmt.Errorf("failed to read message from initator: %w", err)) return } - if msg.GetType() != pb.HolePunch_CONNECT { + if t := msg.GetType(); t != pb.HolePunch_CONNECT { s.Reset() - hs.appendHandlerErr(errors.New("did not get expected HolePunch_CONNECT message from initiator")) + hs.handlerError(fmt.Errorf("expected CONNECT message from initiator but got %d", t)) return } obsDial := addrsFromBytes(msg.ObsAddrs) @@ -230,7 +220,7 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) if err := wr.WriteMsg(msg); err != nil { s.Reset() - hs.appendHandlerErr(fmt.Errorf("failed to write HolePunch_CONNECT message to initator, err: %s", err)) + hs.handlerError(fmt.Errorf("failed to write CONNECT message to initator:: %w", err)) return } @@ -238,12 +228,12 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { msg.Reset() if err := rd.ReadMsg(msg); err != nil { s.Reset() - hs.appendHandlerErr(fmt.Errorf("failed to read message from initator, err: %s", err)) + hs.handlerError(fmt.Errorf("failed to read message from initator: %w", err)) return } - if msg.GetType() != pb.HolePunch_SYNC { + if t := msg.GetType(); t != pb.HolePunch_SYNC { s.Reset() - hs.appendHandlerErr(errors.New("did not get expected HolePunch_SYNC message from initiator")) + hs.handlerError(fmt.Errorf("expected SYNC message from initiator but got %d", t)) return } defer s.Close() @@ -253,7 +243,11 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { ID: rp, Addrs: obsDial, } - _ = hs.holePunchConnectWithRetry(pi) + + err := hs.holePunchConnectWithRetry(pi) + if err != nil { + log.Warnf("hole punching with %s failed: %s", rp, err) + } } func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) error { @@ -292,9 +286,8 @@ func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) error { return nil } } - log.Errorf("all retries for hole punch with peer %s failed, err: %s", pi.ID.Pretty(), err) - return err + return fmt.Errorf("all retries for hole punch with peer %s failed: %w", pi.ID, err) } func isRelayAddress(a ma.Multiaddr) bool { @@ -373,7 +366,10 @@ func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) { return } - hs.HolePunch(v.RemotePeer()) + err := hs.HolePunch(v.RemotePeer()) + if err != nil { + log.Warnf("hole punching attempt with %s failed: %s", v.RemotePeer(), err) + } }() return } From 81ffb1812403591a1dde188872877fe429204ddb Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 25 Feb 2021 10:55:53 +0200 Subject: [PATCH 18/22] disable broken test --- p2p/protocol/holepunch/coordination_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/p2p/protocol/holepunch/coordination_test.go b/p2p/protocol/holepunch/coordination_test.go index b905202477..8415d128d2 100644 --- a/p2p/protocol/holepunch/coordination_test.go +++ b/p2p/protocol/holepunch/coordination_test.go @@ -83,6 +83,8 @@ func TestEndToEndSimConnect(t *testing.T) { } func TestFailuresOnInitiator(t *testing.T) { + t.Skip("broken test") + ctx := context.Background() tcs := map[string]struct { @@ -97,7 +99,7 @@ func TestFailuresOnInitiator(t *testing.T) { msg.Type = holepunch_pb.HolePunch_SYNC.Enum() wr.WriteMsg(msg) }, - errMsg: "expected HolePunch_CONNECT message", + errMsg: "expected CONNECT message", }, "responder does NOT support protocol": { rhandler: nil, @@ -107,7 +109,7 @@ func TestFailuresOnInitiator(t *testing.T) { rhandler: func(s network.Stream) { s.Reset() }, - errMsg: "failed to read HolePunch_CONNECT message", + errMsg: "failed to read CONNECT message", }, "responder does NOT reply within hole punch deadline": { holePunchTimeout: 10 * time.Millisecond, @@ -164,7 +166,7 @@ func TestFailuresOnResponder(t *testing.T) { w.WriteMsg(msg) }, - errMsg: "expected HolePunch_CONNECT message", + errMsg: "expected CONNECT message", }, "initiator does NOT send a SYNC message after a Connect message": { @@ -178,7 +180,7 @@ func TestFailuresOnResponder(t *testing.T) { msg.Type = holepunch_pb.HolePunch_CONNECT.Enum() w.WriteMsg(msg) }, - errMsg: "expected HolePunch_SYNC message", + errMsg: "expected SYNC message", }, "initiator does NOT reply within hole punch deadline": { From e9f9c12e01ba7c65de6b68e65e9e2b6b7fe31a71 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 25 Feb 2021 11:30:33 +0200 Subject: [PATCH 19/22] support options for holepunching service --- config/config.go | 17 ++++++++++------- options.go | 4 +++- p2p/host/basic/basic_host.go | 4 +++- p2p/protocol/holepunch/coordination.go | 15 +++++++++++++-- p2p/protocol/holepunch/coordination_test.go | 8 ++++++-- 5 files changed, 35 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index 477ac53bbe..501819caa6 100644 --- a/config/config.go +++ b/config/config.go @@ -21,6 +21,7 @@ import ( bhost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/host/relay" routed "github.com/libp2p/go-libp2p/p2p/host/routed" + holepunch "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" autonat "github.com/libp2p/go-libp2p-autonat" blankhost "github.com/libp2p/go-libp2p-blankhost" @@ -93,7 +94,8 @@ type Config struct { AutoNATConfig StaticRelays []peer.AddrInfo - EnableHolePunching bool + EnableHolePunching bool + HolePunchingOptions []holepunch.Option } func (cfg *Config) makeSwarm(ctx context.Context) (*swarm.Swarm, error) { @@ -188,12 +190,13 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { } h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{ - ConnManager: cfg.ConnManager, - AddrsFactory: cfg.AddrsFactory, - NATManager: cfg.NATManager, - EnablePing: !cfg.DisablePing, - UserAgent: cfg.UserAgent, - EnableHolePunching: cfg.EnableHolePunching, + ConnManager: cfg.ConnManager, + AddrsFactory: cfg.AddrsFactory, + NATManager: cfg.NATManager, + EnablePing: !cfg.DisablePing, + UserAgent: cfg.UserAgent, + EnableHolePunching: cfg.EnableHolePunching, + HolePunchingOptions: cfg.HolePunchingOptions, }) if err != nil { diff --git a/options.go b/options.go index 04afe2c95b..6bbbbf760b 100644 --- a/options.go +++ b/options.go @@ -21,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/config" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" autorelay "github.com/libp2p/go-libp2p/p2p/host/relay" + holepunch "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" ma "github.com/multiformats/go-multiaddr" ) @@ -494,9 +495,10 @@ func UserAgent(userAgent string) Option { // // If `EnableAutoRelay` is configured, the `StaticRelays` option can be used to configure a static set of Relay servers // for `AutoRelay` to connect to so that it does not need to discover Relay servers via Routing. -func EnableHolePunching() Option { +func EnableHolePunching(opts ...holepunch.Option) Option { return func(cfg *Config) error { cfg.EnableHolePunching = true + cfg.HolePunchingOptions = opts return nil } } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index f5ac5c84cc..8537456502 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -156,6 +156,8 @@ type HostOpts struct { // EnableHolePunching enables the peer to initiate/respond to hole punching attempts for NAT traversal. EnableHolePunching bool + // HolePunchingOptions are options for the hole punching service + HolePunchingOptions []holepunch.Option } // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. @@ -223,7 +225,7 @@ func NewHost(ctx context.Context, n network.Network, opts *HostOpts) (*BasicHost } if opts.EnableHolePunching { - h.hps, err = holepunch.NewHolePunchService(h, h.ids, false) + h.hps, err = holepunch.NewHolePunchService(h, h.ids, opts.HolePunchingOptions...) if err != nil { return nil, fmt.Errorf("failed to create hole punch service: %w", err) } diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 9bf93fc99f..6a82764a58 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -45,6 +45,8 @@ type HolePunchService struct { ids *identify.IDService host host.Host + //tracer *Tracer + // ensure we shutdown ONLY once closeSync sync.Once refCount sync.WaitGroup @@ -58,9 +60,11 @@ type HolePunchService struct { handlerErrs []error } +type Option func(*HolePunchService) error + // NewHolePunchService creates a new service that can be used for hole punching // The `isTest` should ONLY be turned ON for testing. -func NewHolePunchService(h host.Host, ids *identify.IDService, isTest bool) (*HolePunchService, error) { +func NewHolePunchService(h host.Host, ids *identify.IDService, opts ...Option) (*HolePunchService, error) { if ids == nil { return nil, errors.New("Identify service can't be nil") } @@ -72,7 +76,14 @@ func NewHolePunchService(h host.Host, ids *identify.IDService, isTest bool) (*Ho host: h, ids: ids, active: make(map[peer.ID]struct{}), - isTest: isTest, + } + + for _, opt := range opts { + err := opt(hs) + if err != nil { + cancel() + return nil, err + } } h.SetStreamHandler(Protocol, hs.handleNewStream) diff --git a/p2p/protocol/holepunch/coordination_test.go b/p2p/protocol/holepunch/coordination_test.go index 8415d128d2..ea1c7ffc92 100644 --- a/p2p/protocol/holepunch/coordination_test.go +++ b/p2p/protocol/holepunch/coordination_test.go @@ -368,7 +368,7 @@ func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Hos require.NoError(t, err) ids, err := identify.NewIDService(h) require.NoError(t, err) - hps, err := holepunch.NewHolePunchService(h, ids, true) + hps, err := holepunch.NewHolePunchService(h, ids, withTest) require.NoError(t, err) // wait till we have a relay addr @@ -396,8 +396,12 @@ func mkHostWithHolePunchSvc(t *testing.T, ctx context.Context) (host.Host, *hole require.NoError(t, err) ids, err := identify.NewIDService(h) require.NoError(t, err) - hps, err := holepunch.NewHolePunchService(h, ids, true) + hps, err := holepunch.NewHolePunchService(h, ids, withTest) require.NoError(t, err) return h, hps } + +func withTest(hps *HolePunchService) error { + hps.isTest = true +} From 3c329b84dcc6850b7ef51b04c87767d685cdae94 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 25 Feb 2021 13:01:35 +0200 Subject: [PATCH 20/22] add tracing support for hole punching service --- p2p/protocol/holepunch/coordination.go | 117 +++++++++++------ p2p/protocol/holepunch/tracer.go | 173 +++++++++++++++++++++++++ 2 files changed, 251 insertions(+), 39 deletions(-) create mode 100644 p2p/protocol/holepunch/tracer.go diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 6a82764a58..4c48ec6908 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -7,14 +7,16 @@ import ( "sync" "time" - logging "github.com/ipfs/go-log" + pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" - pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" - "github.com/libp2p/go-libp2p/p2p/protocol/identify" - "github.com/libp2p/go-msgio/protoio" + + logging "github.com/ipfs/go-log" + protoio "github.com/libp2p/go-msgio/protoio" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) @@ -45,7 +47,7 @@ type HolePunchService struct { ids *identify.IDService host host.Host - //tracer *Tracer + tracer *Tracer // ensure we shutdown ONLY once closeSync sync.Once @@ -110,11 +112,18 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { if manet.IsPublicAddr(a) && !isRelayAddress(a) { forceDirectConnCtx := network.WithForceDirectDial(hs.ctx, "hole-punching") dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) - defer cancel() - if err := hs.host.Connect(dialCtx, peer.AddrInfo{ID: rp}); err == nil { + + tstart := time.Now() + err := hs.host.Connect(dialCtx, peer.AddrInfo{ID: rp}) + dt := time.Since(tstart) + cancel() + + if err == nil { + hs.tracer.DirectDialSuccessful(rp, dt) log.Debugf("direct connection to peer %s successful, no need for a hole punch", rp.Pretty()) return nil } + hs.tracer.DirectDialFailed(rp, dt, err) break } } @@ -124,7 +133,9 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { sCtx := network.WithNoDial(hpCtx, "hole-punch") s, err := hs.host.NewStream(sCtx, rp, Protocol) if err != nil { - return fmt.Errorf("failed to open hole-punching stream with peer %s: %w", rp, err) + err = fmt.Errorf("failed to open hole-punching stream with peer %s: %w", rp, err) + hs.tracer.ProtocolError(rp, err) + return err } log.Infof("will attempt hole punch with peer %s", rp.Pretty()) _ = s.SetDeadline(time.Now().Add(HolePunchTimeout)) @@ -138,7 +149,9 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { tstart := time.Now() if err := w.WriteMsg(msg); err != nil { s.Reset() - return fmt.Errorf("failed to send hole punch CONNECT: %w", err) + err = fmt.Errorf("failed to send hole punch CONNECT: %w", err) + hs.tracer.ProtocolError(rp, err) + return err } // wait for a CONNECT message from the remote peer @@ -146,13 +159,17 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { msg.Reset() if err := rd.ReadMsg(msg); err != nil { s.Reset() - return fmt.Errorf("failed to read CONNECT message from remote peer: %w", err) + err = fmt.Errorf("failed to read CONNECT message from remote peer: %w", err) + hs.tracer.ProtocolError(rp, err) + return err } rtt := time.Since(tstart) if t := msg.GetType(); t != pb.HolePunch_CONNECT { s.Reset() - return fmt.Errorf("expected CONNECT message but got %d", t) + err = fmt.Errorf("expected CONNECT message but got %d", t) + hs.tracer.ProtocolError(rp, err) + return err } obsRemote := addrsFromBytes(msg.ObsAddrs) @@ -162,7 +179,9 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { msg.Type = pb.HolePunch_SYNC.Enum() if err := w.WriteMsg(msg); err != nil { s.Reset() - return fmt.Errorf("failed to send SYNC message for hole punching: %w", err) + err = fmt.Errorf("failed to send SYNC message for hole punching: %w", err) + hs.tracer.ProtocolError(rp, err) + return err } defer s.Close() @@ -177,7 +196,15 @@ func (hs *HolePunchService) HolePunch(rp peer.ID) error { ID: rp, Addrs: obsRemote, } - return hs.holePunchConnectWithRetry(pi) + tstart = time.Now() + hs.tracer.StartHolePunch(rp, obsRemote, rtt) + err = hs.holePunchConnectWithRetry(pi) + dt := time.Since(tstart) + hs.tracer.EndHolePunch(rp, dt, err) + if err == nil { + log.Infof("hole punching with %s successful after %s", rp, dt) + } + return err case <-hs.ctx.Done(): return hs.ctx.Err() @@ -192,8 +219,9 @@ func (hs *HolePunchService) HandlerErrors() []error { return hs.handlerErrs } -func (hs *HolePunchService) handlerError(err error) { +func (hs *HolePunchService) handlerError(p peer.ID, err error) { if !hs.isTest { + hs.tracer.ProtocolError(p, err) log.Warn(err) return } @@ -215,12 +243,12 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { msg := new(pb.HolePunch) if err := rd.ReadMsg(msg); err != nil { s.Reset() - hs.handlerError(fmt.Errorf("failed to read message from initator: %w", err)) + hs.handlerError(rp, fmt.Errorf("failed to read message from initator: %w", err)) return } if t := msg.GetType(); t != pb.HolePunch_CONNECT { s.Reset() - hs.handlerError(fmt.Errorf("expected CONNECT message from initiator but got %d", t)) + hs.handlerError(rp, fmt.Errorf("expected CONNECT message from initiator but got %d", t)) return } obsDial := addrsFromBytes(msg.ObsAddrs) @@ -229,9 +257,10 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { msg.Reset() msg.Type = pb.HolePunch_CONNECT.Enum() msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) + tstart := time.Now() if err := wr.WriteMsg(msg); err != nil { s.Reset() - hs.handlerError(fmt.Errorf("failed to write CONNECT message to initator:: %w", err)) + hs.handlerError(rp, fmt.Errorf("failed to write CONNECT message to initator:: %w", err)) return } @@ -239,12 +268,14 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { msg.Reset() if err := rd.ReadMsg(msg); err != nil { s.Reset() - hs.handlerError(fmt.Errorf("failed to read message from initator: %w", err)) + hs.handlerError(rp, fmt.Errorf("failed to read message from initator: %w", err)) return } + rtt := time.Since(tstart) + if t := msg.GetType(); t != pb.HolePunch_SYNC { s.Reset() - hs.handlerError(fmt.Errorf("expected SYNC message from initiator but got %d", t)) + hs.handlerError(rp, fmt.Errorf("expected SYNC message from initiator but got %d", t)) return } defer s.Close() @@ -255,9 +286,15 @@ func (hs *HolePunchService) handleNewStream(s network.Stream) { Addrs: obsDial, } + hs.tracer.StartHolePunch(rp, obsDial, rtt) + tstart = time.Now() err := hs.holePunchConnectWithRetry(pi) + dt := time.Since(tstart) + hs.tracer.EndHolePunch(rp, dt, err) if err != nil { - log.Warnf("hole punching with %s failed: %s", rp, err) + log.Warnf("hole punching with %s failed after %s: %s", rp, dt, err) + } else { + log.Infof("hole punching with %s successful after %s", rp, dt) } } @@ -265,35 +302,37 @@ func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) error { log.Debugf("starting hole punch with %s", pi.ID) holePunchCtx := network.WithSimultaneousConnect(hs.ctx, "hole-punching") forceDirectConnCtx := network.WithForceDirectDial(holePunchCtx, "hole-punching") - dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) - defer cancel() - err := hs.host.Connect(dialCtx, pi) - if err == nil { - log.Infof("hole punch with peer %s successful, direct conns to peer are:", pi.ID.Pretty()) - for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { - if !isRelayAddress(c.RemoteMultiaddr()) { - log.Info(c) - } - } - return nil - } else { - log.Infof("first hole punch attempt with peer %s failed, error: %s, will retry now...", pi.ID.Pretty(), err) - } - - for i := 1; i <= maxRetries; i++ { - time.Sleep(retryWait) + doConnect := func(attempt int) error { dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) defer cancel() - err = hs.host.Connect(dialCtx, pi) + hs.tracer.HolePunchAttempt(pi.ID, attempt) + err := hs.host.Connect(dialCtx, pi) if err == nil { - log.Infof("hole punch with peer %s successful after %d retries, direct conns to peer are:", pi.ID.Pretty(), i) + log.Infof("hole punch with peer %s successful after %d retries; direct conns to peer are:", attempt, pi.ID) for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { if !isRelayAddress(c.RemoteMultiaddr()) { log.Info(c) } } + } + + return err + } + + err := doConnect(0) + if err == nil { + return nil + } + + log.Infof("first hole punch attempt with peer %s failed: %s; will retry in %s...", pi.ID, err, retryWait) + + for i := 1; i <= maxRetries; i++ { + time.Sleep(retryWait) + + err = doConnect(i) + if err == nil { return nil } } diff --git a/p2p/protocol/holepunch/tracer.go b/p2p/protocol/holepunch/tracer.go new file mode 100644 index 0000000000..1bab5a45c4 --- /dev/null +++ b/p2p/protocol/holepunch/tracer.go @@ -0,0 +1,173 @@ +package holepunch + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/peer" + + ma "github.com/multiformats/go-multiaddr" +) + +// WithTracer is a HolePunchService option that enables hole punching tracing +func WithTracer(tr EventTracer) Option { + return func(hps *HolePunchService) error { + hps.tracer = &Tracer{tr: tr} + return nil + } +} + +type Tracer struct { + tr EventTracer +} + +type EventTracer interface { + Trace(evt *Event) +} + +type Event struct { + Timestamp int64 // UNIX nanos + Peer peer.ID // remote peer ID + Type string // event type + Evt interface{} // the actual event +} + +// Event Types +const ( + DirectDialEvtT = "DirectDial" + ProtocolErrorEvtT = "ProtocolError" + StartHolePunchEvtT = "StartHolePunch" + EndHolePunchEvtT = "EndHolePunch" + HolePunchAttemptEvtT = "HolePunchAttempt" +) + +// Event Objects +type DirectDialEvt struct { + Success bool + EllapsedTime time.Duration + Error string `json:",omitempty"` +} + +type ProtocolErrorEvt struct { + Error string +} + +type StartHolePunchEvt struct { + RemoteAddrs []string + RTT time.Duration +} + +type EndHolePunchEvt struct { + Success bool + EllapsedTime time.Duration + Error string `json:",omitempty"` +} + +type HolePunchAttemptEvt struct { + Attempt int +} + +// Tracer interface +func (t *Tracer) DirectDialSuccessful(p peer.ID, dt time.Duration) { + if t == nil { + return + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: p, + Type: DirectDialEvtT, + Evt: &DirectDialEvt{ + Success: true, + EllapsedTime: dt, + }, + }) +} + +func (t *Tracer) DirectDialFailed(p peer.ID, dt time.Duration, err error) { + if t == nil { + return + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: p, + Type: DirectDialEvtT, + Evt: &DirectDialEvt{ + Success: false, + EllapsedTime: dt, + Error: err.Error(), + }, + }) +} + +func (t *Tracer) ProtocolError(p peer.ID, err error) { + if t == nil { + return + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: p, + Type: ProtocolErrorEvtT, + Evt: &ProtocolErrorEvt{ + Error: err.Error(), + }, + }) +} + +func (t *Tracer) StartHolePunch(p peer.ID, obsAddrs []ma.Multiaddr, rtt time.Duration) { + if t == nil { + return + } + + addrs := make([]string, 0, len(obsAddrs)) + for _, a := range obsAddrs { + addrs = append(addrs, a.String()) + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: p, + Type: StartHolePunchEvtT, + Evt: &StartHolePunchEvt{ + RemoteAddrs: addrs, + RTT: rtt, + }, + }) +} + +func (t *Tracer) EndHolePunch(p peer.ID, dt time.Duration, err error) { + if t == nil { + return + } + + evt := &EndHolePunchEvt{ + Success: err == nil, + EllapsedTime: dt, + } + if err != nil { + evt.Error = err.Error() + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: p, + Type: EndHolePunchEvtT, + Evt: evt, + }) +} + +func (t *Tracer) HolePunchAttempt(p peer.ID, attempt int) { + if t == nil { + return + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: p, + Type: HolePunchAttemptEvtT, + Evt: &HolePunchAttemptEvt{ + Attempt: attempt, + }, + }) +} From fdcd9b165b3381a017dadb3423d6104e43f67047 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 25 Feb 2021 14:20:49 +0200 Subject: [PATCH 21/22] include both peer IDs in tracer events --- p2p/protocol/holepunch/tracer.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/p2p/protocol/holepunch/tracer.go b/p2p/protocol/holepunch/tracer.go index 1bab5a45c4..c37610068a 100644 --- a/p2p/protocol/holepunch/tracer.go +++ b/p2p/protocol/holepunch/tracer.go @@ -11,13 +11,14 @@ import ( // WithTracer is a HolePunchService option that enables hole punching tracing func WithTracer(tr EventTracer) Option { return func(hps *HolePunchService) error { - hps.tracer = &Tracer{tr: tr} + hps.tracer = &Tracer{tr: tr, self: hps.host.ID()} return nil } } type Tracer struct { - tr EventTracer + tr EventTracer + self peer.ID } type EventTracer interface { @@ -26,7 +27,8 @@ type EventTracer interface { type Event struct { Timestamp int64 // UNIX nanos - Peer peer.ID // remote peer ID + Peer peer.ID // local peer ID + Remote peer.ID // remote peer ID Type string // event type Evt interface{} // the actual event } @@ -74,7 +76,8 @@ func (t *Tracer) DirectDialSuccessful(p peer.ID, dt time.Duration) { t.tr.Trace(&Event{ Timestamp: time.Now().UnixNano(), - Peer: p, + Peer: t.self, + Remote: p, Type: DirectDialEvtT, Evt: &DirectDialEvt{ Success: true, @@ -90,7 +93,8 @@ func (t *Tracer) DirectDialFailed(p peer.ID, dt time.Duration, err error) { t.tr.Trace(&Event{ Timestamp: time.Now().UnixNano(), - Peer: p, + Peer: t.self, + Remote: p, Type: DirectDialEvtT, Evt: &DirectDialEvt{ Success: false, @@ -107,7 +111,8 @@ func (t *Tracer) ProtocolError(p peer.ID, err error) { t.tr.Trace(&Event{ Timestamp: time.Now().UnixNano(), - Peer: p, + Peer: t.self, + Remote: p, Type: ProtocolErrorEvtT, Evt: &ProtocolErrorEvt{ Error: err.Error(), @@ -127,7 +132,8 @@ func (t *Tracer) StartHolePunch(p peer.ID, obsAddrs []ma.Multiaddr, rtt time.Dur t.tr.Trace(&Event{ Timestamp: time.Now().UnixNano(), - Peer: p, + Peer: t.self, + Remote: p, Type: StartHolePunchEvtT, Evt: &StartHolePunchEvt{ RemoteAddrs: addrs, @@ -151,7 +157,8 @@ func (t *Tracer) EndHolePunch(p peer.ID, dt time.Duration, err error) { t.tr.Trace(&Event{ Timestamp: time.Now().UnixNano(), - Peer: p, + Peer: t.self, + Remote: p, Type: EndHolePunchEvtT, Evt: evt, }) @@ -164,7 +171,8 @@ func (t *Tracer) HolePunchAttempt(p peer.ID, attempt int) { t.tr.Trace(&Event{ Timestamp: time.Now().UnixNano(), - Peer: p, + Peer: t.self, + Remote: p, Type: HolePunchAttemptEvtT, Evt: &HolePunchAttemptEvt{ Attempt: attempt, From 4fec3e02741dfe630bcd2f08081c4b9bf3701df7 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 25 Feb 2021 19:12:06 +0200 Subject: [PATCH 22/22] fix log arguments --- p2p/protocol/holepunch/coordination.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index 4c48ec6908..b2aab4179f 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -310,7 +310,7 @@ func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) error { hs.tracer.HolePunchAttempt(pi.ID, attempt) err := hs.host.Connect(dialCtx, pi) if err == nil { - log.Infof("hole punch with peer %s successful after %d retries; direct conns to peer are:", attempt, pi.ID) + log.Infof("hole punch with peer %s successful after %d retries; direct conns to peer are:", pi.ID, attempt) for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { if !isRelayAddress(c.RemoteMultiaddr()) { log.Info(c)