Skip to content

Commit

Permalink
move deduplication logic to the holepunch service
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Sep 7, 2021
1 parent 2605890 commit 84900aa
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 35 deletions.
73 changes: 43 additions & 30 deletions p2p/protocol/holepunch/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (

var (
log = logging.Logger("p2p-holepunch")
// ErrHolePunchActive is returned from DirectConnect when another hole punching attempt is currently running
ErrHolePunchActive = errors.New("another hole punching attempt to this peer is active")
// ErrClosed is returned when the hole punching is closed
ErrClosed = errors.New("hole punching service closing")
)

// The Service is used to make direct connections with a peer via hole-punching.
Expand Down Expand Up @@ -149,10 +153,42 @@ func (hs *Service) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration,
return addrs, rtt, nil
}

func (hs *Service) beginDirectConnect(p peer.ID) error {
hs.closeMx.RLock()
defer hs.closeMx.RUnlock()
if hs.closed {
return ErrClosed
}

hs.activeMx.Lock()
defer hs.activeMx.Unlock()
if _, ok := hs.active[p]; ok {
return ErrHolePunchActive
}

hs.active[p] = struct{}{}
return nil
}

// DirectConnect attempts to make a direct connection with a remote peer.
// It first attempts a direct dial (if we have a public address of that peer), and then
// coordinates a hole punch over the given relay connection.
func (hs *Service) DirectConnect(rp peer.ID) error {
func (hs *Service) DirectConnect(p peer.ID) error {
log.Debugw("got inbound proxy conn from peer", p)
if err := hs.beginDirectConnect(p); err != nil {
return err
}

defer func() {
hs.activeMx.Lock()
delete(hs.active, p)
hs.activeMx.Unlock()
}()

return hs.directConnect(p)
}

func (hs *Service) directConnect(rp peer.ID) error {
// short-circuit check to see if we already have a direct connection
for _, c := range hs.host.Network().ConnsToPeer(rp) {
if !isRelayAddress(c.RemoteMultiaddr()) {
Expand Down Expand Up @@ -339,48 +375,25 @@ func addrsFromBytes(bzs [][]byte) []ma.Multiaddr {

type netNotifiee Service

func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) {
func (nn *netNotifiee) Connected(_ network.Network, conn network.Conn) {
hs := (*Service)(nn)
dir := v.Stat().Direction

// Hole punch if it's an inbound proxy connection.
// If we already have a direct connection with the remote peer, this will be a no-op.
if dir == network.DirInbound && isRelayAddress(v.RemoteMultiaddr()) {
p := v.RemotePeer()
hs.activeMx.Lock()
hs.closeMx.RLock()
closed := hs.closed
_, active := hs.active[p]
if !active && !closed {
hs.refCount.Add(1)
hs.active[p] = struct{}{}
}
hs.closeMx.RUnlock()
hs.activeMx.Unlock()

if active || closed {
return
}

log.Debugw("got inbound proxy conn from peer", v.RemotePeer())
if conn.Stat().Direction == network.DirInbound && isRelayAddress(conn.RemoteMultiaddr()) {
hs.refCount.Add(1)
go func() {
defer hs.refCount.Done()
defer func() {
hs.activeMx.Lock()
delete(hs.active, p)
hs.activeMx.Unlock()
}()

select {
// waiting for Identify here will allow us to access the peer's public and observed addresses
// that we can dial to for a hole punch.
case <-hs.ids.IdentifyWait(v):
case <-hs.ids.IdentifyWait(conn):
case <-hs.ctx.Done():
return
}

_ = hs.DirectConnect(v.RemotePeer())
_ = hs.DirectConnect(conn.RemotePeer())
}()
return
}
}

Expand Down
13 changes: 8 additions & 5 deletions p2p/protocol/holepunch/coordination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestDirectDialWorks(t *testing.T) {

func TestEndToEndSimConnect(t *testing.T) {
tr := &mockEventTracer{}
h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr))
h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), true)
defer h1.Close()
defer h2.Close()
defer relay.Close()
Expand Down Expand Up @@ -149,10 +149,11 @@ func TestFailuresOnInitiator(t *testing.T) {
}

tr := &mockEventTracer{}
h1, h2, relay, hps := makeRelayedHosts(t, holepunch.WithTracer(tr))
h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), false)
defer h1.Close()
defer h2.Close()
defer relay.Close()
hps := addHolePunchService(t, h2)

if tc.rhandler != nil {
h1.SetStreamHandler(holepunch.Protocol, tc.rhandler)
Expand Down Expand Up @@ -209,7 +210,7 @@ func TestFailuresOnResponder(t *testing.T) {
}

tr := &mockEventTracer{}
h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr))
h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), false)
defer h1.Close()
defer h2.Close()
defer relay.Close()
Expand Down Expand Up @@ -317,7 +318,7 @@ func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Hos
return h
}

func makeRelayedHosts(t *testing.T, h1Opt holepunch.Option) (h1, h2, relay host.Host, hps *holepunch.Service) {
func makeRelayedHosts(t *testing.T, h1Opt holepunch.Option, addHolePuncher bool) (h1, h2, relay host.Host, hps *holepunch.Service) {
t.Helper()
h1, _ = mkHostWithHolePunchSvc(t, h1Opt)
var err error
Expand All @@ -327,7 +328,9 @@ func makeRelayedHosts(t *testing.T, h1Opt holepunch.Option) (h1, h2, relay host.
)
require.NoError(t, err)
h2 = mkHostWithStaticAutoRelay(t, context.Background(), relay)
hps = addHolePunchService(t, h2)
if addHolePuncher {
hps = addHolePunchService(t, h2)
}

// h1 has a relay addr
// h2 should connect to the relay addr
Expand Down

0 comments on commit 84900aa

Please sign in to comment.