Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webrtc: add a test for establishing many connections #2801

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/pion/datachannel v1.5.6
github.com/pion/ice/v2 v2.3.24
github.com/pion/ice/v2 v2.3.25
github.com/pion/logging v0.2.2
github.com/pion/sctp v1.8.16
github.com/pion/stun v0.6.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks=
github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI=
github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs=
github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/webrtc/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error
errC := make(chan error, 1)
var once sync.Once
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
switch state {
switch pc.ConnectionState() {
case webrtc.PeerConnectionStateConnected:
once.Do(func() { close(errC) })
case webrtc.PeerConnectionStateFailed:
Expand Down
117 changes: 117 additions & 0 deletions p2p/transport/webrtc/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
tpt "github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/multiformats/go-multibase"
Expand Down Expand Up @@ -867,3 +869,118 @@ func TestGenUfrag(t *testing.T) {
require.True(t, strings.HasPrefix(s, "libp2p+webrtc+v1/"))
}
}

func TestManyConnections(t *testing.T) {
var listeners []tpt.Listener
var listenerPeerIDs []peer.ID

const numListeners = 5
const dialersPerListener = 5
const connsPerDialer = 10
errCh := make(chan error, 10*numListeners*dialersPerListener*connsPerDialer)
successCh := make(chan struct{}, 10*numListeners*dialersPerListener*connsPerDialer)

for i := 0; i < numListeners; i++ {
tr, lp := getTransport(t)
listenerPeerIDs = append(listenerPeerIDs, lp)
ln, err := tr.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct"))
require.NoError(t, err)
defer ln.Close()
listeners = append(listeners, ln)
}

runListenConn := func(conn tpt.CapableConn) {
defer conn.Close()
s, err := conn.AcceptStream()
if err != nil {
t.Errorf("accept stream failed for listener: %s", err)
errCh <- err
return
}
var b [4]byte
if _, err := s.Read(b[:]); err != nil {
t.Errorf("read stream failed for listener: %s", err)
errCh <- err
return
}
s.Write(b[:])
_, err = s.Read(b[:]) // peer will close the connection after read
if !assert.Error(t, err) {
err = errors.New("invalid read: expected conn to close")
errCh <- err
return
}
successCh <- struct{}{}
}

runDialConn := func(conn tpt.CapableConn) {
defer conn.Close()

s, err := conn.OpenStream(context.Background())
if err != nil {
t.Errorf("accept stream failed for listener: %s", err)
errCh <- err
return
}
var b [4]byte
if _, err := s.Write(b[:]); err != nil {
t.Errorf("write stream failed for dialer: %s", err)
errCh <- err
return
}
if _, err := s.Read(b[:]); err != nil {
t.Errorf("read stream failed for dialer: %s", err)
errCh <- err
return
}
s.Close()
}

runListener := func(ln tpt.Listener) {
for i := 0; i < dialersPerListener*connsPerDialer; i++ {
conn, err := ln.Accept()
if err != nil {
t.Errorf("listener failed to accept conneciton: %s", err)
return
}
go runListenConn(conn)
}
}

runDialer := func(ln tpt.Listener, lp peer.ID) {
tp, _ := getTransport(t)
for i := 0; i < connsPerDialer; i++ {
// We want to test for deadlocks, set a high timeout
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
conn, err := tp.Dial(ctx, ln.Multiaddr(), lp)
if err != nil {
t.Errorf("dial failed: %s", err)
errCh <- err
cancel()
return
}
runDialConn(conn)
cancel()
}
}

for i := 0; i < numListeners; i++ {
go runListener(listeners[i])
}
for i := 0; i < numListeners; i++ {
for j := 0; j < dialersPerListener; j++ {
go runDialer(listeners[i], listenerPeerIDs[i])
}
}

for i := 0; i < numListeners*dialersPerListener*connsPerDialer; i++ {
select {
case <-successCh:
t.Log("completed conn: ", i)
case err := <-errCh:
t.Fatalf("failed: %s", err)
case <-time.After(300 * time.Second):
t.Fatalf("timed out")
}
}
}
2 changes: 1 addition & 1 deletion test-plans/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.11 // indirect
github.com/pion/ice/v2 v2.3.24 // indirect
github.com/pion/ice/v2 v2.3.25 // indirect
github.com/pion/interceptor v0.1.29 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
Expand Down
4 changes: 2 additions & 2 deletions test-plans/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks=
github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI=
github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs=
github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
Expand Down
Loading