Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
speed up the dial tests
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Dec 20, 2021
1 parent 69f15a7 commit 3338d5f
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 127 deletions.
118 changes: 30 additions & 88 deletions dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ func acceptAndHang(l net.Listener) {
}

func TestSimultDials(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()

ctx := context.Background()
swarms := makeSwarms(t, 2, swarmt.OptDisableReuseport)
defer closeSwarms(swarms)

// connect everyone
{
Expand Down Expand Up @@ -128,10 +128,6 @@ func TestSimultDials(t *testing.T) {
if c10l > 2 {
t.Error("1->0 has", c10l)
}

for _, s := range swarms {
s.Close()
}
}

func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
Expand All @@ -154,8 +150,6 @@ func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
}

func TestDialWait(t *testing.T) {
t.Parallel()

const dialTimeout = 250 * time.Millisecond

swarms := makeSwarms(t, 1, swarmt.DialTimeout(dialTimeout))
Expand Down Expand Up @@ -193,22 +187,18 @@ func TestDialBackoff(t *testing.T) {
if ci.IsRunning() {
t.Skip("travis will never have fun with this test")
}

t.Parallel()

const dialTimeout = 250 * time.Millisecond

ctx := context.Background()
swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout))
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()

s2addrs, err := s2.InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs, peerstore.PermanentAddrTTL)

// dial to a non-existent peer.
Expand Down Expand Up @@ -405,13 +395,10 @@ func TestDialBackoffClears(t *testing.T) {
t.Parallel()

const dialTimeout = 250 * time.Millisecond

ctx := context.Background()
swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout))
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()

// use another address first, that accept and hang on conns
_, s2bad, s2l := newSilentPeer(t)
Expand All @@ -422,13 +409,8 @@ func TestDialBackoffClears(t *testing.T) {
s1.Peerstore().AddAddr(s2.LocalPeer(), s2bad, peerstore.PermanentAddrTTL)

before := time.Now()
c, err := s1.DialPeer(ctx, s2.LocalPeer())
if err == nil {
defer c.Close()
t.Fatal("dialing to broken addr worked...", err)
} else {
t.Log("correctly got error:", err)
}
_, err := s1.DialPeer(context.Background(), s2.LocalPeer())
require.Error(t, err, "dialing to broken addr worked...")
duration := time.Since(before)

if duration < dialTimeout*DialAttempts {
Expand All @@ -437,46 +419,24 @@ func TestDialBackoffClears(t *testing.T) {
if duration > 2*dialTimeout*DialAttempts {
t.Error("> 2*dialTimeout * DialAttempts not being respected", duration, 2*dialTimeout*DialAttempts)
}

if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should now be on backoff")
} else {
t.Log("correctly added to backoff")
}
require.True(t, s1.Backoff().Backoff(s2.LocalPeer(), s2bad), "s2 should now be on backoff")

// phase 2 -- add the working address. dial should succeed.
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
ifaceAddrs1, err := s2.InterfaceListenAddresses()
require.NoError(t, err)
s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL)

if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
c.Close()
t.Log("backoffs are per address, not peer")
}

time.Sleep(BackoffBase)

if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err != nil {
t.Fatal(err)
} else {
c.Close()
t.Log("correctly connected")
}

if s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should no longer be on backoff")
} else {
t.Log("correctly cleared backoff")
}
// backoffs are per address, not peer
c, err := s1.DialPeer(context.Background(), s2.LocalPeer())
require.NoError(t, err)
defer c.Close()
require.False(t, s1.Backoff().Backoff(s2.LocalPeer(), s2bad), "s2 should no longer be on backoff")
}

func TestDialPeerFailed(t *testing.T) {
t.Parallel()
ctx := context.Background()

swarms := makeSwarms(t, 2)
swarms := makeSwarms(t, 2, swarmt.DialTimeout(250*time.Millisecond))
defer closeSwarms(swarms)
testedSwarm, targetSwarm := swarms[0], swarms[1]

Expand All @@ -492,10 +452,8 @@ func TestDialPeerFailed(t *testing.T) {
peerstore.PermanentAddrTTL)
}

_, err := testedSwarm.DialPeer(ctx, targetSwarm.LocalPeer())
if err == nil {
t.Fatal(err)
}
_, err := testedSwarm.DialPeer(context.Background(), targetSwarm.LocalPeer())
require.NoError(t, err)

// dial_test.go:508: correctly get a combined error: failed to dial PEER: all dials failed
// * [/ip4/127.0.0.1/tcp/46485] failed to negotiate security protocol: context deadline exceeded
Expand All @@ -513,28 +471,20 @@ func TestDialPeerFailed(t *testing.T) {
}

func TestDialExistingConnection(t *testing.T) {
ctx := context.Background()

swarms := makeSwarms(t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]

s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL)

c1, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
t.Fatal(err)
}
c1, err := s1.DialPeer(context.Background(), s2.LocalPeer())
require.NoError(t, err)

c2, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
t.Fatal(err)
}
c2, err := s1.DialPeer(context.Background(), s2.LocalPeer())
require.NoError(t, err)

if c1 != c2 {
t.Fatal("expecting the same connection from both dials")
}
require.Equal(t, c1, c2, "expecting the same connection from both dials")
}

func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) {
Expand All @@ -556,16 +506,12 @@ func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) {
}

func TestDialSimultaneousJoin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const dialTimeout = 250 * time.Millisecond

swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout))
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()

s2silentAddrs, s2silentListener := newSilentListener(t)
go acceptAndHang(s2silentListener)
Expand All @@ -577,7 +523,7 @@ func TestDialSimultaneousJoin(t *testing.T) {
go func() {
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2silentAddrs, peerstore.PermanentAddrTTL)

c, err := s1.DialPeer(ctx, s2.LocalPeer())
c, err := s1.DialPeer(context.Background(), s2.LocalPeer())
if err != nil {
errs <- err
connch <- nil
Expand All @@ -602,7 +548,7 @@ func TestDialSimultaneousJoin(t *testing.T) {
}
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs[:1], peerstore.PermanentAddrTTL)

c, err := s1.DialPeer(ctx, s2.LocalPeer())
c, err := s1.DialPeer(context.Background(), s2.LocalPeer())
if err != nil {
errs <- err
connch <- nil
Expand All @@ -620,7 +566,7 @@ func TestDialSimultaneousJoin(t *testing.T) {

// start a third dial to s2, this should get the existing connection from the successful dial
go func() {
c, err := s1.DialPeer(ctx, s2.LocalPeer())
c, err := s1.DialPeer(context.Background(), s2.LocalPeer())
if err != nil {
errs <- err
connch <- nil
Expand All @@ -637,10 +583,7 @@ func TestDialSimultaneousJoin(t *testing.T) {

// raise any errors from the previous goroutines
for i := 0; i < 3; i++ {
err := <-errs
if err != nil {
t.Fatal(err)
}
require.NoError(t, <-errs)
}

if c2 != c3 {
Expand All @@ -660,13 +603,12 @@ func TestDialSimultaneousJoin(t *testing.T) {
}

func TestDialSelf(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Parallel()

swarms := makeSwarms(t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
defer s1.Close()

_, err := s1.DialPeer(ctx, s1.LocalPeer())
_, err := s1.DialPeer(context.Background(), s1.LocalPeer())
require.ErrorIs(t, err, ErrDialToSelf, "expected error from self dial")
}
43 changes: 21 additions & 22 deletions limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ import (
mafmt "github.com/multiformats/go-multiaddr-fmt"
)

func mustAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return a
func setDialTimeout(t time.Duration) (reset func()) {
orig := transport.DialTimeout
transport.DialTimeout = t
return func() { transport.DialTimeout = orig }
}

func addrWithPort(t *testing.T, p int) ma.Multiaddr {
return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p))
func addrWithPort(p int) ma.Multiaddr {
return ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p))
}

// in these tests I use addresses with tcp ports over a certain number to
Expand Down Expand Up @@ -84,8 +82,8 @@ func TestLimiterBasicDials(t *testing.T) {

l := newDialLimiterWithParams(hangDialFunc(hang), ConcurrentFdDials, 4)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)}
good := addrWithPort(20)

resch := make(chan dialResult)
pid := peer.ID("testpeer")
Expand Down Expand Up @@ -133,9 +131,9 @@ func TestFDLimiting(t *testing.T) {
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)}
pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"}
goodTCP := addrWithPort(t, 20)
goodTCP := addrWithPort(20)

ctx := context.Background()
resch := make(chan dialResult)
Expand Down Expand Up @@ -163,7 +161,7 @@ func TestFDLimiting(t *testing.T) {
}

pid5 := peer.ID("testpeer5")
utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp")
utpaddr := ma.StringCast("/ip4/127.0.0.1/udp/7777/utp")

// This should complete immediately since utp addresses arent blocked by fd rate limiting
l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch})
Expand All @@ -180,7 +178,7 @@ func TestFDLimiting(t *testing.T) {
// A relay address with tcp transport will complete because we do not consume fds for dials
// with relay addresses as the fd will be consumed when we actually dial the relay server.
pid6 := test.RandPeerIDFatal(t)
relayAddr := mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6))
relayAddr := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6))
l.AddDialJob(&dialJob{ctx: ctx, peer: pid6, addr: relayAddr, resp: resch})

select {
Expand Down Expand Up @@ -209,7 +207,7 @@ func TestTokenRedistribution(t *testing.T) {
}
l := newDialLimiterWithParams(df, 8, 4)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
bads := []ma.Multiaddr{addrWithPort(1), addrWithPort(2), addrWithPort(3), addrWithPort(4)}
pids := []peer.ID{"testpeer1", "testpeer2"}

ctx := context.Background()
Expand All @@ -224,13 +222,11 @@ func TestTokenRedistribution(t *testing.T) {
tryDialAddrs(ctx, l, pid, bads, resch)
}

good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001")

// add a good dial job for peer 1
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[1],
addr: good,
addr: ma.StringCast("/ip4/127.0.0.1/tcp/1001"),
resp: resch,
})

Expand Down Expand Up @@ -263,7 +259,7 @@ func TestTokenRedistribution(t *testing.T) {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[0],
addr: addrWithPort(t, 7),
addr: addrWithPort(7),
resp: resch,
})

Expand Down Expand Up @@ -304,10 +300,10 @@ func TestStressLimiter(t *testing.T) {

var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
bads = append(bads, addrWithPort(t, i))
bads = append(bads, addrWithPort(i))
}

addresses := append(bads, addrWithPort(t, 2000))
addresses := append(bads, addrWithPort(2000))
success := make(chan struct{})

for i := 0; i < 20; i++ {
Expand Down Expand Up @@ -345,6 +341,9 @@ func TestStressLimiter(t *testing.T) {
}

func TestFDLimitUnderflow(t *testing.T) {
reset := setDialTimeout(250 * time.Millisecond)
defer reset()

df := func(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) {
select {
case <-ctx.Done():
Expand All @@ -358,7 +357,7 @@ func TestFDLimitUnderflow(t *testing.T) {

var addrs []ma.Multiaddr
for i := 0; i <= 1000; i++ {
addrs = append(addrs, addrWithPort(t, i))
addrs = append(addrs, addrWithPort(i))
}

wg := sync.WaitGroup{}
Expand Down
Loading

0 comments on commit 3338d5f

Please sign in to comment.