Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Support for Hole punching #233

Merged
merged 12 commits into from
Feb 18, 2021
6 changes: 3 additions & 3 deletions dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func (ad *activeDial) start(ctx context.Context) {
ad.cancel()
}

func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
func (ds *DialSync) getActiveDial(ctx context.Context, p peer.ID) *activeDial {
ds.dialsLk.Lock()
defer ds.dialsLk.Unlock()

actd, ok := ds.dials[p]
if !ok {
adctx, cancel := context.WithCancel(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to use the background context. Otherwise, if the first dialer cancels, we'll cancel the overall dial.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also why the tests are failing.

adctx, cancel := context.WithCancel(ctx)
actd = &activeDial{
id: p,
cancel: cancel,
Expand All @@ -123,7 +123,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
// DialLock initiates a dial to the given peer if there are none in progress
// then waits for the dial to that peer to complete.
func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) {
return ds.getActiveDial(p).wait(ctx)
return ds.getActiveDial(ctx, p).wait(ctx)
}

// CancelDial cancels all in-progress dials to the given peer.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-addr-util v0.0.2
github.com/libp2p/go-conn-security-multistream v0.2.1
github.com/libp2p/go-libp2p-core v0.8.2
github.com/libp2p/go-libp2p-core v0.8.3
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-quic-transport v0.10.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB
github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.2 h1:/eaSZACWftJZYm07S0nRxdI84v1hSmgnCXrGOvJdpNQ=
github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.3 h1:BZTReEF6o8g/n4DwxTyeFannOeae35Xy0TD+mES3CNE=
github.com/libp2p/go-libp2p-core v0.8.3/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE=
Expand Down
22 changes: 17 additions & 5 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error
// a non-closed connection.
dials := 0
for {
// will prefer direct connections over relayed connections for opening streams
c := s.bestConnToPeer(p)
if c == nil {
if nodial, _ := network.GetNoDial(ctx); nodial {
Expand Down Expand Up @@ -392,9 +393,10 @@ func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn {

// bestConnToPeer returns the best connection to peer.
func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
// Selects the best connection we have to the peer.
// TODO: Prefer some transports over others. Currently, we just select
// the newest non-closed connection with the most streams.

// TODO: Prefer some transports over others.
// For now, prefers direct connections over Relayed connections.
// For tie-breaking, select the newest non-closed connection with the most streams.
vyzo marked this conversation as resolved.
Show resolved Hide resolved
s.conns.RLock()
defer s.conns.RUnlock()

Expand All @@ -409,15 +411,25 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
cLen := len(c.streams.m)
c.streams.Unlock()

if cLen >= bestLen {
// We will never prefer a Relayed connection over a direct connection.
if isDirectConn(best) && !isDirectConn(c) {
continue
}

// 1. Always prefer a direct connection over a relayed connection.
// 2. If both conns are direct or relayed, pick the one with as many or more streams.
if (!isDirectConn(best) && isDirectConn(c)) || (cLen >= bestLen) {
best = c
bestLen = cLen
}

}
return best
}

func isDirectConn(c *Conn) bool {
return c != nil && !c.conn.Transport().Proxy()
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
}

// Connectedness returns our "connectedness" state with the given peer.
//
// To check if we have an open connection, use `s.Connectedness(p) ==
Expand Down
54 changes: 40 additions & 14 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,14 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {

defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()

// check if we already have an open connection first
conn := s.bestConnToPeer(p)
if conn != nil {
forceDirect, _ := network.GetForceDirectDial(ctx)
if forceDirect {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
if isDirectConn(conn) {
return conn, nil
}
} else if conn != nil {
// check if we already have an open connection first
return conn, nil
}

Expand Down Expand Up @@ -287,8 +292,13 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
// Short circuit.
// By the time we take the dial lock, we may already *have* a connection
// to the peer.
forceDirect, _ := network.GetForceDirectDial(ctx)
c := s.bestConnToPeer(p)
if c != nil {
if forceDirect {
if isDirectConn(c) {
return c, nil
}
} else if c != nil {
return c, nil
}

Expand All @@ -301,12 +311,17 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
conn, err := s.dial(ctx, p)
if err != nil {
conn = s.bestConnToPeer(p)
if conn != nil {
if forceDirect {
if isDirectConn(conn) {
log.Debugf("ignoring dial error because we already have a direct connection: %s", err)
return conn, nil
}
} else if conn != nil {
// Hm? What error?
// Could have canceled the dial because we received a
// connection or some other random reason.
// Just ignore the error and return the connection.
log.Debugf("ignoring dial error because we have a connection: %s", err)
log.Debugf("ignoring dial error because we already have a connection: %s", err)
return conn, nil
}

vyzo marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -321,6 +336,11 @@ func (s *Swarm) canDial(addr ma.Multiaddr) bool {
return t != nil && t.CanDial(addr)
}

func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
t := s.TransportForDialing(addr)
return !t.Proxy()
Comment on lines +340 to +341
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could panic. I know we don't call it in a way that can panic, but it could still panic and someone else could call it.

}

// ranks addresses in descending order of preference for dialing
// Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server
func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
Expand Down Expand Up @@ -362,6 +382,7 @@ func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {

// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
forceDirect, _ := network.GetForceDirectDial(ctx)
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local {
log.Event(ctx, "swarmDialDoDialSelf", logdial)
Expand All @@ -383,20 +404,25 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, &DialError{Peer: p, Cause: ErrNoAddresses}
}
goodAddrs := s.filterKnownUndialables(p, peerAddrs)
if forceDirect {
goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr)
}
if len(goodAddrs) == 0 {
return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses}
}

/////// Check backoff andnRank addresses
var nonBackoff bool
for _, a := range goodAddrs {
// skip addresses in back-off
if !s.backf.Backoff(p, a) {
nonBackoff = true
if !forceDirect {
/////// Check backoff andnRank addresses
var nonBackoff bool
for _, a := range goodAddrs {
// skip addresses in back-off
if !s.backf.Backoff(p, a) {
nonBackoff = true
}
}
if !nonBackoff {
return nil, ErrDialBackoff
}
}
if !nonBackoff {
return nil, ErrDialBackoff
}

connC, dialErr := s.dialAddrs(ctx, p, s.rankAddrs(goodAddrs))
Expand Down