Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
Merge pull request #249 from libp2p/rcmgr
Browse files Browse the repository at this point in the history
use the Resource Manager
  • Loading branch information
marten-seemann authored Jan 18, 2022
2 parents 5c11755 + 2933aa4 commit b7a53c7
Show file tree
Hide file tree
Showing 17 changed files with 330 additions and 90 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/interop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ jobs:
if [[ `git merge-base --is-ancestor HEAD 3123af36d6cec13e31dac75058c8046e6e4a6690; echo $?` != "1" ]]; then
TAGS+=("stream_open_no_context")
fi
# This command doesn't take into account off-master releases. That's why we need a special case for the v0.11.2 release.
if [[ `git merge-base --is-ancestor HEAD 5c11755be71c11950e107f0c2c7b900e1d59ce6d; echo $?` != "1" || `git merge-base --is-ancestor HEAD v0.11.2; echo $?` != "1" ]]; then
TAGS+=("new_transport_no_rcmgr")
fi
if [[ "${{ matrix.cfg.retireBugBackwardsCompatiblityMode }}" == "true" ]]; then
TAGS+=("retirebugcompatmode")
fi
Expand Down
2 changes: 1 addition & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func run(raddr string, p string) error {
return err
}

t, err := libp2pquic.NewTransport(priv, nil, nil)
t, err := libp2pquic.NewTransport(priv, nil, nil, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func run(port string) error {
return err
}

t, err := libp2pquic.NewTransport(priv, nil, nil)
t, err := libp2pquic.NewTransport(priv, nil, nil, nil)
if err != nil {
return err
}
Expand Down
22 changes: 17 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
tpt "github.com/libp2p/go-libp2p-core/transport"

Expand All @@ -15,7 +15,8 @@ import (
type conn struct {
sess quic.Session
pconn *reuseConn
transport tpt.Transport
transport *transport
scope network.ConnManagementScope

localPeer peer.ID
privKey ic.PrivKey
Expand All @@ -32,23 +33,30 @@ var _ tpt.CapableConn = &conn{}
// It must be called even if the peer closed the connection in order for
// garbage collection to properly work in this package.
func (c *conn) Close() error {
c.transport.removeConn(c.sess)
err := c.sess.CloseWithError(0, "")
c.pconn.DecreaseCount()
return c.sess.CloseWithError(0, "")
c.scope.Done()
return err
}

// IsClosed returns whether a connection is fully closed.
func (c *conn) IsClosed() bool {
return c.sess.Context().Err() != nil
}

func (c *conn) allowWindowIncrease(size uint64) bool {
return c.scope.ReserveMemory(int(size), network.ReservationPriorityMedium) == nil
}

// OpenStream creates a new stream.
func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) {
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
qstr, err := c.sess.OpenStreamSync(ctx)
return &stream{Stream: qstr}, err
}

// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (mux.MuxedStream, error) {
func (c *conn) AcceptStream() (network.MuxedStream, error) {
qstr, err := c.sess.AcceptStream(context.Background())
return &stream{Stream: qstr}, err
}
Expand Down Expand Up @@ -86,3 +94,7 @@ func (c *conn) RemoteMultiaddr() ma.Multiaddr {
func (c *conn) Transport() tpt.Transport {
return c.transport
}

func (c *conn) Scope() network.ConnScope {
return c.scope
}
159 changes: 135 additions & 24 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -14,9 +15,12 @@ import (
"time"

ic "github.com/libp2p/go-libp2p-core/crypto"
n "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
tpt "github.com/libp2p/go-libp2p-core/transport"

mocknetwork "github.com/libp2p/go-libp2p-testing/mocks/network"

quicproxy "github.com/lucas-clemente/quic-go/integrationtests/tools/proxy"
ma "github.com/multiformats/go-multiaddr"

Expand Down Expand Up @@ -46,24 +50,22 @@ func createPeer(t *testing.T) (peer.ID, ic.PrivKey) {
return id, priv
}

func runServer(t *testing.T, tr tpt.Transport, multiaddr string) tpt.Listener {
func runServer(t *testing.T, tr tpt.Transport, addr string) tpt.Listener {
t.Helper()
addr, err := ma.NewMultiaddr(multiaddr)
require.NoError(t, err)
ln, err := tr.Listen(addr)
ln, err := tr.Listen(ma.StringCast(addr))
require.NoError(t, err)
return ln
}

func TestHandshake(t *testing.T) {
serverID, serverKey := createPeer(t)
clientID, clientKey := createPeer(t)
serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()

handshake := func(t *testing.T, ln tpt.Listener) {
clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expand Down Expand Up @@ -97,17 +99,126 @@ func TestHandshake(t *testing.T) {
})
}

func TestResourceManagerSuccess(t *testing.T) {
serverID, serverKey := createPeer(t)
clientID, clientKey := createPeer(t)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

serverRcmgr := mocknetwork.NewMockResourceManager(ctrl)
serverTransport, err := NewTransport(serverKey, nil, nil, serverRcmgr)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln, err := serverTransport.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/quic"))
require.NoError(t, err)
defer ln.Close()

clientRcmgr := mocknetwork.NewMockResourceManager(ctrl)
clientTransport, err := NewTransport(clientKey, nil, nil, clientRcmgr)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()

connChan := make(chan tpt.CapableConn)
serverConnScope := mocknetwork.NewMockConnManagementScope(ctrl)
go func() {
serverRcmgr.EXPECT().OpenConnection(network.DirInbound, false).Return(serverConnScope, nil)
serverConnScope.EXPECT().SetPeer(clientID)
serverConn, err := ln.Accept()
require.NoError(t, err)
connChan <- serverConn
}()

connScope := mocknetwork.NewMockConnManagementScope(ctrl)
clientRcmgr.EXPECT().OpenConnection(network.DirOutbound, false).Return(connScope, nil)
connScope.EXPECT().SetPeer(serverID)
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
require.NoError(t, err)
serverConn := <-connChan
t.Log("received conn")
connScope.EXPECT().Done().MinTimes(1) // for dialed connections, we might call Done multiple times
conn.Close()
serverConnScope.EXPECT().Done()
serverConn.Close()
}

func TestResourceManagerDialDenied(t *testing.T) {
_, clientKey := createPeer(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

rcmgr := mocknetwork.NewMockResourceManager(ctrl)
clientTransport, err := NewTransport(clientKey, nil, nil, rcmgr)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()

connScope := mocknetwork.NewMockConnManagementScope(ctrl)
rcmgr.EXPECT().OpenConnection(network.DirOutbound, false).Return(connScope, nil)
rerr := errors.New("nope")
p := peer.ID("server")
connScope.EXPECT().SetPeer(p).Return(rerr)
connScope.EXPECT().Done()

_, err = clientTransport.Dial(context.Background(), ma.StringCast("/ip4/127.0.0.1/udp/1234/quic"), p)
require.ErrorIs(t, err, rerr)
}

func TestResourceManagerAcceptDenied(t *testing.T) {
serverID, serverKey := createPeer(t)
clientID, clientKey := createPeer(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

clientRcmgr := mocknetwork.NewMockResourceManager(ctrl)
clientTransport, err := NewTransport(clientKey, nil, nil, clientRcmgr)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()

serverRcmgr := mocknetwork.NewMockResourceManager(ctrl)
serverConnScope := mocknetwork.NewMockConnManagementScope(ctrl)
rerr := errors.New("denied")
gomock.InOrder(
serverRcmgr.EXPECT().OpenConnection(network.DirInbound, false).Return(serverConnScope, nil),
serverConnScope.EXPECT().SetPeer(clientID).Return(rerr),
serverConnScope.EXPECT().Done(),
)
serverTransport, err := NewTransport(serverKey, nil, nil, serverRcmgr)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln, err := serverTransport.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/quic"))
require.NoError(t, err)
defer ln.Close()
connChan := make(chan tpt.CapableConn)
go func() {
ln.Accept()
close(connChan)
}()

clientConnScope := mocknetwork.NewMockConnManagementScope(ctrl)
clientRcmgr.EXPECT().OpenConnection(network.DirOutbound, false).Return(clientConnScope, nil)
clientConnScope.EXPECT().SetPeer(serverID)
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
require.NoError(t, err)
_, err = conn.AcceptStream()
require.Error(t, err)
select {
case <-connChan:
t.Fatal("didn't expect to accept a connection")
default:
}
}

func TestStreams(t *testing.T) {
serverID, serverKey := createPeer(t)
_, clientKey := createPeer(t)

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln.Close()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expand All @@ -134,12 +245,12 @@ func TestHandshakeFailPeerIDMismatch(t *testing.T) {
_, clientKey := createPeer(t)
thirdPartyID, _ := createPeer(t)

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
// dial, but expect the wrong peer ID
_, err = clientTransport.Dial(context.Background(), ln.Multiaddr(), thirdPartyID)
Expand Down Expand Up @@ -172,7 +283,7 @@ func TestConnectionGating(t *testing.T) {
cg := NewMockConnectionGater(mockCtrl)

t.Run("accepted connections", func(t *testing.T) {
serverTransport, err := NewTransport(serverKey, nil, cg)
serverTransport, err := NewTransport(serverKey, nil, cg, nil)
defer serverTransport.(io.Closer).Close()
require.NoError(t, err)
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -187,7 +298,7 @@ func TestConnectionGating(t *testing.T) {
require.NoError(t, err)
}()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
// make sure that connection attempts fails
Expand Down Expand Up @@ -215,7 +326,7 @@ func TestConnectionGating(t *testing.T) {
})

t.Run("secured connections", func(t *testing.T) {
serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -224,7 +335,7 @@ func TestConnectionGating(t *testing.T) {
cg := NewMockConnectionGater(mockCtrl)
cg.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any())

clientTransport, err := NewTransport(clientKey, nil, cg)
clientTransport, err := NewTransport(clientKey, nil, cg, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()

Expand All @@ -247,12 +358,12 @@ func TestDialTwo(t *testing.T) {
_, clientKey := createPeer(t)
serverID2, serverKey2 := createPeer(t)

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln1 := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln1.Close()
serverTransport2, err := NewTransport(serverKey2, nil, nil)
serverTransport2, err := NewTransport(serverKey2, nil, nil, nil)
require.NoError(t, err)
defer serverTransport2.(io.Closer).Close()
ln2 := runServer(t, serverTransport2, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -278,7 +389,7 @@ func TestDialTwo(t *testing.T) {
}
}()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
c1, err := clientTransport.Dial(context.Background(), ln1.Multiaddr(), serverID)
Expand Down Expand Up @@ -329,7 +440,7 @@ func TestStatelessReset(t *testing.T) {
serverID, serverKey := createPeer(t)
_, clientKey := createPeer(t)

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -346,7 +457,7 @@ func TestStatelessReset(t *testing.T) {
defer proxy.Close()

// establish a connection
clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
proxyAddr, err := toQuicMultiaddr(proxy.LocalAddr())
Expand Down Expand Up @@ -395,7 +506,7 @@ func TestHolePunching(t *testing.T) {
serverID, serverKey := createPeer(t)
clientID, clientKey := createPeer(t)

t1, err := NewTransport(serverKey, nil, nil)
t1, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer t1.(io.Closer).Close()
laddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -409,7 +520,7 @@ func TestHolePunching(t *testing.T) {
require.Error(t, err, "didn't expect to accept any connections")
}()

t2, err := NewTransport(clientKey, nil, nil)
t2, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer t2.(io.Closer).Close()
ln2, err := t2.Listen(laddr)
Expand All @@ -423,15 +534,15 @@ func TestHolePunching(t *testing.T) {
connChan := make(chan tpt.CapableConn)
go func() {
conn, err := t2.Dial(
n.WithSimultaneousConnect(context.Background(), false, ""),
network.WithSimultaneousConnect(context.Background(), false, ""),
ln1.Multiaddr(),
serverID,
)
require.NoError(t, err)
connChan <- conn
}()
conn1, err := t1.Dial(
n.WithSimultaneousConnect(context.Background(), true, ""),
network.WithSimultaneousConnect(context.Background(), true, ""),
ln2.Multiaddr(),
clientID,
)
Expand Down
Loading

0 comments on commit b7a53c7

Please sign in to comment.