From 789263e45bc978b09d92156c7253f558039d4589 Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 12 Jun 2024 16:41:15 +0530 Subject: [PATCH] webrtc: add a test for establishing many connections (#2801) Update pion/ice to include the fix for out of order ConnectionState update callbacks --- go.mod | 2 +- go.sum | 4 +- p2p/transport/webrtc/listener.go | 2 +- p2p/transport/webrtc/transport_test.go | 117 +++++++++++++++++++++++++ test-plans/go.mod | 2 +- test-plans/go.sum | 4 +- 6 files changed, 124 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index bff7698536..00808fdc58 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( github.com/multiformats/go-varint v0.0.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pion/datachannel v1.5.6 - github.com/pion/ice/v2 v2.3.24 + github.com/pion/ice/v2 v2.3.25 github.com/pion/logging v0.2.2 github.com/pion/sctp v1.8.16 github.com/pion/stun v0.6.1 diff --git a/go.sum b/go.sum index 5455f12e29..a2a327b99b 100644 --- a/go.sum +++ b/go.sum @@ -278,8 +278,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= -github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= +github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index 1834fc812b..3f465b34fc 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -330,7 +330,7 @@ func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error errC := make(chan error, 1) var once sync.Once pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - switch state { + switch pc.ConnectionState() { case webrtc.PeerConnectionStateConnected: once.Do(func() { close(errC) }) case webrtc.PeerConnectionStateFailed: diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index 5831a257a7..b618ec85df 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "errors" "fmt" "io" "net" @@ -17,6 +18,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + tpt "github.com/libp2p/go-libp2p/core/transport" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/multiformats/go-multibase" @@ -867,3 +869,118 @@ func TestGenUfrag(t *testing.T) { require.True(t, strings.HasPrefix(s, "libp2p+webrtc+v1/")) } } + +func TestManyConnections(t *testing.T) { + var listeners []tpt.Listener + var listenerPeerIDs []peer.ID + + const numListeners = 5 + const dialersPerListener = 5 + const connsPerDialer = 10 + errCh := make(chan error, 10*numListeners*dialersPerListener*connsPerDialer) + successCh := make(chan struct{}, 10*numListeners*dialersPerListener*connsPerDialer) + + for i := 0; i < numListeners; i++ { + tr, lp := getTransport(t) + listenerPeerIDs = append(listenerPeerIDs, lp) + ln, err := tr.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct")) + require.NoError(t, err) + defer ln.Close() + listeners = append(listeners, ln) + } + + runListenConn := func(conn tpt.CapableConn) { + defer conn.Close() + s, err := conn.AcceptStream() + if err != nil { + t.Errorf("accept stream failed for listener: %s", err) + errCh <- err + return + } + var b [4]byte + if _, err := s.Read(b[:]); err != nil { + t.Errorf("read stream failed for listener: %s", err) + errCh <- err + return + } + s.Write(b[:]) + _, err = s.Read(b[:]) // peer will close the connection after read + if !assert.Error(t, err) { + err = errors.New("invalid read: expected conn to close") + errCh <- err + return + } + successCh <- struct{}{} + } + + runDialConn := func(conn tpt.CapableConn) { + defer conn.Close() + + s, err := conn.OpenStream(context.Background()) + if err != nil { + t.Errorf("accept stream failed for listener: %s", err) + errCh <- err + return + } + var b [4]byte + if _, err := s.Write(b[:]); err != nil { + t.Errorf("write stream failed for dialer: %s", err) + errCh <- err + return + } + if _, err := s.Read(b[:]); err != nil { + t.Errorf("read stream failed for dialer: %s", err) + errCh <- err + return + } + s.Close() + } + + runListener := func(ln tpt.Listener) { + for i := 0; i < dialersPerListener*connsPerDialer; i++ { + conn, err := ln.Accept() + if err != nil { + t.Errorf("listener failed to accept conneciton: %s", err) + return + } + go runListenConn(conn) + } + } + + runDialer := func(ln tpt.Listener, lp peer.ID) { + tp, _ := getTransport(t) + for i := 0; i < connsPerDialer; i++ { + // We want to test for deadlocks, set a high timeout + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + conn, err := tp.Dial(ctx, ln.Multiaddr(), lp) + if err != nil { + t.Errorf("dial failed: %s", err) + errCh <- err + cancel() + return + } + runDialConn(conn) + cancel() + } + } + + for i := 0; i < numListeners; i++ { + go runListener(listeners[i]) + } + for i := 0; i < numListeners; i++ { + for j := 0; j < dialersPerListener; j++ { + go runDialer(listeners[i], listenerPeerIDs[i]) + } + } + + for i := 0; i < numListeners*dialersPerListener*connsPerDialer; i++ { + select { + case <-successCh: + t.Log("completed conn: ", i) + case err := <-errCh: + t.Fatalf("failed: %s", err) + case <-time.After(300 * time.Second): + t.Fatalf("timed out") + } + } +} diff --git a/test-plans/go.mod b/test-plans/go.mod index 64361ed334..6e4ff51d3d 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -66,7 +66,7 @@ require ( github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pion/datachannel v1.5.6 // indirect github.com/pion/dtls/v2 v2.2.11 // indirect - github.com/pion/ice/v2 v2.3.24 // indirect + github.com/pion/ice/v2 v2.3.25 // indirect github.com/pion/interceptor v0.1.29 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.12 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index 63eac7a98b..fdd19bbb02 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -226,8 +226,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= -github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= +github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=