Skip to content

Commit

Permalink
transport.Listener,quic: Support multiple QUIC versions with the same…
Browse files Browse the repository at this point in the history
… Listener. Only return a single multiaddr per listener. (#1923)

* Revert "transport.Listener  returns a list of multiaddrs"

This reverts commit 8962b2a.

* Support multiple QUIC versions on the same listener

* No long running accept loop

* Don't use a goroutine

* PR comments
  • Loading branch information
MarcoPolo authored Dec 1, 2022
1 parent dc7f64e commit 1c8eaab
Show file tree
Hide file tree
Showing 21 changed files with 404 additions and 161 deletions.
2 changes: 1 addition & 1 deletion core/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Listener interface {
Accept() (CapableConn, error)
Close() error
Addr() net.Addr
Multiaddrs() []ma.Multiaddr
Multiaddr() ma.Multiaddr
}

// TransportNetwork is an inet.Network with methods for managing transports.
Expand Down
6 changes: 3 additions & 3 deletions p2p/host/autonat/dialpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func (l *mockL) Accept() (transport.CapableConn, error) {
<-l.ctx.Done()
return nil, errors.New("expected in mocked test")
}
func (l *mockL) Close() error { return nil }
func (l *mockL) Addr() net.Addr { return nil }
func (l *mockL) Multiaddrs() []multiaddr.Multiaddr { return []multiaddr.Multiaddr{l.addr} }
func (l *mockL) Close() error { return nil }
func (l *mockL) Addr() net.Addr { return nil }
func (l *mockL) Multiaddr() multiaddr.Multiaddr { return l.addr }

func TestSkipDial(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (s *Swarm) ListenAddresses() []ma.Multiaddr {
func (s *Swarm) listenAddressesNoLock() []ma.Multiaddr {
addrs := make([]ma.Multiaddr, 0, len(s.listeners.m)+10) // A bit extra so we may avoid an extra allocation in the for loop below.
for l := range s.listeners.m {
addrs = append(addrs, l.Multiaddrs()...)
addrs = append(addrs, l.Multiaddr())
}
return addrs
}
Expand Down
23 changes: 7 additions & 16 deletions p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *Swarm) ListenClose(addrs ...ma.Multiaddr) {

s.listeners.Lock()
for l := range s.listeners.m {
if !containsSomeMultiaddr(addrs, l.Multiaddrs()) {
if !containsMultiaddr(addrs, l.Multiaddr()) {
continue
}

Expand Down Expand Up @@ -96,13 +96,11 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
s.listeners.cacheEOL = time.Time{}
s.listeners.Unlock()

maddrs := list.Multiaddrs()
maddr := list.Multiaddr()

// signal to our notifiees on listen.
s.notifyAll(func(n network.Notifiee) {
for _, maddr := range maddrs {
n.Listen(s, maddr)
}
n.Listen(s, maddr)
})

go func() {
Expand All @@ -122,9 +120,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {

// signal to our notifiees on listen close.
s.notifyAll(func(n network.Notifiee) {
for _, maddr := range maddrs {
n.ListenClose(s, maddr)
}
n.ListenClose(s, maddr)
})
s.refs.Done()
}()
Expand Down Expand Up @@ -155,16 +151,11 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
return nil
}

func containsSomeMultiaddr(hayStack []ma.Multiaddr, needles []ma.Multiaddr) bool {
seenSet := make(map[string]struct{}, len(needles))
for _, a := range needles {
seenSet[string(a.Bytes())] = struct{}{}
}
for _, a := range hayStack {
if _, found := seenSet[string(a.Bytes())]; found {
func containsMultiaddr(addrs []ma.Multiaddr, addr ma.Multiaddr) bool {
for _, a := range addrs {
if addr == a {
return true
}
}
return false

}
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func TestListenCloseCount(t *testing.T) {
t.Fatal(err)
}
listenedAddrs := s.ListenAddresses()
require.Equal(t, 3, len(listenedAddrs))
require.Equal(t, 2, len(listenedAddrs))

s.ListenClose(listenedAddrs...)

Expand Down
5 changes: 0 additions & 5 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

logging "github.com/ipfs/go-log/v2"
tec "github.com/jbenet/go-temp-err-catcher"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

Expand Down Expand Up @@ -176,8 +175,4 @@ func (l *listener) String() string {
return fmt.Sprintf("<stream.Listener %s>", l.Multiaddr())
}

func (l *listener) Multiaddrs() []multiaddr.Multiaddr {
return []multiaddr.Multiaddr{l.Multiaddr()}
}

var _ transport.Listener = (*listener)(nil)
36 changes: 18 additions & 18 deletions p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestAcceptSingleConn(t *testing.T) {
ln := createListener(t, u)
defer ln.Close()

cconn, err := dial(t, u, ln.Multiaddrs()[0], id, &network.NullScope{})
cconn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)

sconn, err := ln.Accept()
Expand All @@ -64,7 +64,7 @@ func TestAcceptMultipleConns(t *testing.T) {
}()

for i := 0; i < 10; i++ {
cconn, err := dial(t, u, ln.Multiaddrs()[0], id, &network.NullScope{})
cconn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
toClose = append(toClose, cconn)

Expand All @@ -88,7 +88,7 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
ln := createListener(t, u)
defer ln.Close()

conn, err := dial(t, u, ln.Multiaddrs()[0], id, &network.NullScope{})
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)

errCh := make(chan error)
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestFailedUpgradeOnListen(t *testing.T) {
errCh <- err
}()

_, err := dial(t, u, ln.Multiaddrs()[0], id, &network.NullScope{})
_, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.Error(err)

// close the listener.
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestListenerClose(t *testing.T) {
require.Contains(err.Error(), "use of closed network connection")

// doesn't accept new connections when it is closed
_, err = dial(t, u, ln.Multiaddrs()[0], peer.ID("1"), &network.NullScope{})
_, err = dial(t, u, ln.Multiaddr(), peer.ID("1"), &network.NullScope{})
require.Error(err)
}

Expand All @@ -173,7 +173,7 @@ func TestListenerCloseClosesQueued(t *testing.T) {

var conns []transport.CapableConn
for i := 0; i < 10; i++ {
conn, err := dial(t, upgrader, ln.Multiaddrs()[0], id, &network.NullScope{})
conn, err := dial(t, upgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
conns = append(conns, conn)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestConcurrentAccept(t *testing.T) {
go func() {
defer wg.Done()

conn, err := dial(t, u, ln.Multiaddrs()[0], id, &network.NullScope{})
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestAcceptQueueBacklogged(t *testing.T) {
// setup AcceptQueueLength connections, but don't accept any of them
var counter int32 // to be used atomically
doDial := func() {
conn, err := dial(t, u, ln.Multiaddrs()[0], id, &network.NullScope{})
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
atomic.AddInt32(&counter, 1)
t.Cleanup(func() { conn.Close() })
Expand Down Expand Up @@ -299,36 +299,36 @@ func TestListenerConnectionGater(t *testing.T) {
defer ln.Close()

// no gating.
conn, err := dial(t, u, ln.Multiaddrs()[0], id, &network.NullScope{})
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()

// rejecting after handshake.
testGater.BlockSecured(true)
testGater.BlockAccept(false)
conn, err = dial(t, u, ln.Multiaddrs()[0], "invalid", &network.NullScope{})
conn, err = dial(t, u, ln.Multiaddr(), "invalid", &network.NullScope{})
require.Error(err)
require.Nil(conn)

// rejecting on accept will trigger firupgrader.
testGater.BlockSecured(true)
testGater.BlockAccept(true)
conn, err = dial(t, u, ln.Multiaddrs()[0], "invalid", &network.NullScope{})
conn, err = dial(t, u, ln.Multiaddr(), "invalid", &network.NullScope{})
require.Error(err)
require.Nil(conn)

// rejecting only on acceptance.
testGater.BlockSecured(false)
testGater.BlockAccept(true)
conn, err = dial(t, u, ln.Multiaddrs()[0], "invalid", &network.NullScope{})
conn, err = dial(t, u, ln.Multiaddr(), "invalid", &network.NullScope{})
require.Error(err)
require.Nil(conn)

// back to normal
testGater.BlockSecured(false)
testGater.BlockAccept(false)
conn, err = dial(t, u, ln.Multiaddrs()[0], id, &network.NullScope{})
conn, err = dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()
Expand All @@ -344,13 +344,13 @@ func TestListenerResourceManagement(t *testing.T) {

connScope := mocknetwork.NewMockConnManagementScope(ctrl)
gomock.InOrder(
rcmgr.EXPECT().OpenConnection(network.DirInbound, true, gomock.Not(ln.Multiaddrs()[0])).Return(connScope, nil),
rcmgr.EXPECT().OpenConnection(network.DirInbound, true, gomock.Not(ln.Multiaddr())).Return(connScope, nil),
connScope.EXPECT().PeerScope(),
connScope.EXPECT().SetPeer(id),
connScope.EXPECT().PeerScope(),
)

cconn, err := dial(t, upgrader, ln.Multiaddrs()[0], id, &network.NullScope{})
cconn, err := dial(t, upgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(t, err)
defer cconn.Close()

Expand All @@ -367,8 +367,8 @@ func TestListenerResourceManagementDenied(t *testing.T) {
id, upgrader := createUpgraderWithResourceManager(t, rcmgr)
ln := createListener(t, upgrader)

rcmgr.EXPECT().OpenConnection(network.DirInbound, true, gomock.Not(ln.Multiaddrs()[0])).Return(nil, errors.New("nope"))
_, err := dial(t, upgrader, ln.Multiaddrs()[0], id, &network.NullScope{})
rcmgr.EXPECT().OpenConnection(network.DirInbound, true, gomock.Not(ln.Multiaddr())).Return(nil, errors.New("nope"))
_, err := dial(t, upgrader, ln.Multiaddr(), id, &network.NullScope{})
require.Error(t, err)

done := make(chan struct{})
Expand Down Expand Up @@ -404,7 +404,7 @@ func TestNoCommonSecurityProto(t *testing.T) {
ln.Accept()
}()

_, err = dial(t, ub, ln.Multiaddrs()[0], idA, &network.NullScope{})
_, err = dial(t, ub, ln.Multiaddr(), idA, &network.NullScope{})
require.EqualError(t, err, "failed to negotiate security protocol: protocol not supported")
select {
case <-done:
Expand Down
10 changes: 5 additions & 5 deletions p2p/net/upgrader/upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,21 @@ func TestOutboundConnectionGating(t *testing.T) {

testGater := &testGater{}
_, dialUpgrader := createUpgraderWithConnGater(t, testGater)
conn, err := dial(t, dialUpgrader, ln.Multiaddrs()[0], id, &network.NullScope{})
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.NotNil(conn)
_ = conn.Close()

// blocking accepts doesn't affect the dialling side, only the listener.
testGater.BlockAccept(true)
conn, err = dial(t, dialUpgrader, ln.Multiaddrs()[0], id, &network.NullScope{})
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.NotNil(conn)
_ = conn.Close()

// now let's block all connections after being secured.
testGater.BlockSecured(true)
conn, err = dial(t, dialUpgrader, ln.Multiaddrs()[0], id, &network.NullScope{})
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, &network.NullScope{})
require.Error(err)
require.Contains(err.Error(), "gater rejected connection")
require.Nil(conn)
Expand All @@ -176,7 +176,7 @@ func TestOutboundResourceManagement(t *testing.T) {
connScope.EXPECT().PeerScope().Return(&network.NullScope{}),
)
_, dialUpgrader := createUpgrader(t)
conn, err := dial(t, dialUpgrader, ln.Multiaddrs()[0], id, connScope)
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id, connScope)
require.NoError(t, err)
require.NotNil(t, conn)
connScope.EXPECT().Done()
Expand All @@ -198,7 +198,7 @@ func TestOutboundResourceManagement(t *testing.T) {
connScope.EXPECT().Done(),
)
_, dialUpgrader := createUpgrader(t)
_, err := dial(t, dialUpgrader, ln.Multiaddrs()[0], id, connScope)
_, err := dial(t, dialUpgrader, ln.Multiaddr(), id, connScope)
require.Error(t, err)
})
}
2 changes: 1 addition & 1 deletion p2p/transport/quic/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func run(port string) error {
if err != nil {
return err
}
fmt.Printf("Listening. Now run: go run cmd/client/main.go %s %s\n", ln.Multiaddrs()[0], peerID)
fmt.Printf("Listening. Now run: go run cmd/client/main.go %s %s\n", ln.Multiaddr(), peerID)
for {
conn, err := ln.Accept()
if err != nil {
Expand Down
Loading

0 comments on commit 1c8eaab

Please sign in to comment.