From 65b7ce9691f12ca72593db6b149cbd6ffe3da292 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 3 Sep 2020 16:16:32 +0700 Subject: [PATCH] use a stream wrapper when using a version that doesn't use the new stream interface --- .github/workflows/interop.yml | 12 ++++++-- integrationtests/main.go | 12 +++++--- integrationtests/stream/stream.go | 30 +++++++++++++++++++ .../stream/stream_old_interface.go | 29 ++++++++++++++++++ 4 files changed, 76 insertions(+), 7 deletions(-) create mode 100644 integrationtests/stream/stream.go create mode 100644 integrationtests/stream/stream_old_interface.go diff --git a/.github/workflows/interop.yml b/.github/workflows/interop.yml index a6d44c5..cf92e84 100644 --- a/.github/workflows/interop.yml +++ b/.github/workflows/interop.yml @@ -50,12 +50,18 @@ jobs: with: go-version: ${{ matrix.cfg.go }} - run: go version - - name: build transport + - name: Build transport run: | - mkdir builder && cp integrationtests/main.go builder/ + cp -r integrationtests builder git checkout ${{ matrix.cfg.commit }} + rm -rf integrationtests || true + mv builder integrationtests git reflog --decorate -1 - go build -o transport-go${{ matrix.cfg.go }}-${{ matrix.cfg.commit }} builder/main.go + if [[ `git merge-base --is-ancestor HEAD 126c64772ba0aef0b2b6d58ff36e55a93f9253a7; echo $?` == "1" ]]; then + go build -o transport-go${{ matrix.cfg.go }}-${{ matrix.cfg.commit }} integrationtests/main.go + else + go build -tags oldstream -o transport-go${{ matrix.cfg.go }}-${{ matrix.cfg.commit }} integrationtests/main.go + fi - name: Upload binary uses: actions/upload-artifact@v2 with: diff --git a/integrationtests/main.go b/integrationtests/main.go index bb40e1b..f398e6c 100644 --- a/integrationtests/main.go +++ b/integrationtests/main.go @@ -15,6 +15,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" libp2pquic "github.com/libp2p/go-libp2p-quic-transport" ma "github.com/multiformats/go-multiaddr" + + "github.com/libp2p/go-libp2p-quic-transport/integrationtests/stream" ) func main() { @@ -95,10 +97,11 @@ func runServer(hostKey crypto.PrivKey, peerKey crypto.PubKey, addr ma.Multiaddr) return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", conn.RemotePeer().Pretty(), clientPeerID.Pretty()) } for { - str, err := conn.AcceptStream() + st, err := conn.AcceptStream() if err != nil { return nil } + str := stream.WrapStream(st) defer str.Close() data, err := ioutil.ReadAll(str) if err != nil { @@ -107,7 +110,7 @@ func runServer(hostKey crypto.PrivKey, peerKey crypto.PubKey, addr ma.Multiaddr) if _, err := str.Write(data); err != nil { return err } - if err := str.Close(); err != nil { + if err := str.CloseWrite(); err != nil { return err } } @@ -135,16 +138,17 @@ func runClient(hostKey crypto.PrivKey, peerKey crypto.PubKey, addr ma.Multiaddr) if conn.RemotePeer() != serverPeerID { return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", conn.RemotePeer().Pretty(), serverPeerID.Pretty()) } - str, err := conn.OpenStream() + st, err := conn.OpenStream() if err != nil { return err } + str := stream.WrapStream(st) data := make([]byte, 1<<15) rand.Read(data) if _, err := str.Write(data); err != nil { return err } - if err := str.Close(); err != nil { + if err := str.CloseWrite(); err != nil { return err } echoed, err := ioutil.ReadAll(str) diff --git a/integrationtests/stream/stream.go b/integrationtests/stream/stream.go new file mode 100644 index 0000000..6ca158a --- /dev/null +++ b/integrationtests/stream/stream.go @@ -0,0 +1,30 @@ +package stream + +import ( + "io" + "time" + + "github.com/libp2p/go-libp2p-core/mux" +) + +type Stream interface { + io.Reader + io.Writer + io.Closer + + CloseWrite() error + CloseRead() error + Reset() error + + SetDeadline(time.Time) error + SetReadDeadline(time.Time) error + SetWriteDeadline(time.Time) error +} + +type stream struct { + mux.MuxedStream +} + +func WrapStream(str mux.MuxedStream) *stream { + return &stream{MuxedStream: str} +} diff --git a/integrationtests/stream/stream_old_interface.go b/integrationtests/stream/stream_old_interface.go new file mode 100644 index 0000000..fb8dc14 --- /dev/null +++ b/integrationtests/stream/stream_old_interface.go @@ -0,0 +1,29 @@ +// +build oldstream + +package stream + +import ( + "log" + + "github.com/lucas-clemente/quic-go" +) + +func init() { + log.Println("Using old stream interface wrapper.") +} + +const reset quic.ErrorCode = 0 + +func (s *stream) CloseWrite() error { + return s.MuxedStream.Close() +} + +func (s *stream) CloseRead() error { + s.MuxedStream.(quic.Stream).CancelRead(reset) + return nil +} + +func (s *stream) Close() error { + s.MuxedStream.(quic.Stream).CancelRead(reset) + return s.MuxedStream.Close() +}