From e972c1f002105423758b8ed6ad19d1d899c3691f Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 8 May 2019 15:19:06 -0700 Subject: [PATCH] call Stream.Reset instead of Stream.Close May fix https://github.com/ipfs/go-ipfs/issues/6237 Basically, 1. We hang while closing a stream (because `Close` waits). 2. This blocks the connection manager because it assumes that close _doesn't_ wait. This may also fix a stream leak. --- conn.go | 35 ++++++++++++++++++++++++++++++----- go.sum | 4 ++++ listen.go | 4 ++-- relay.go | 4 ++-- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/conn.go b/conn.go index df39f11..9c7f11e 100644 --- a/conn.go +++ b/conn.go @@ -3,6 +3,7 @@ package relay import ( "fmt" "net" + "time" inet "github.com/libp2p/go-libp2p-net" pstore "github.com/libp2p/go-libp2p-peerstore" @@ -11,7 +12,7 @@ import ( ) type Conn struct { - inet.Stream + stream inet.Stream remote pstore.PeerInfo } @@ -28,9 +29,33 @@ func (n *NetAddr) String() string { return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay) } +func (c *Conn) Close() error { + return c.stream.Reset() +} + +func (c *Conn) Read(buf []byte) (int, error) { + return c.stream.Read(buf) +} + +func (c *Conn) Write(buf []byte) (int, error) { + return c.stream.Write(buf) +} + +func (c *Conn) SetDeadline(t time.Time) error { + return c.stream.SetDeadline(t) +} + +func (c *Conn) SetReadDeadline(t time.Time) error { + return c.stream.SetReadDeadline(t) +} + +func (c *Conn) SetWriteDeadline(t time.Time) error { + return c.stream.SetReadDeadline(t) +} + func (c *Conn) RemoteAddr() net.Addr { return &NetAddr{ - Relay: c.Conn().RemotePeer().Pretty(), + Relay: c.stream.Conn().RemotePeer().Pretty(), Remote: c.remote.ID.Pretty(), } } @@ -38,7 +63,7 @@ func (c *Conn) RemoteAddr() net.Addr { // TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input" func (c *Conn) RemoteMultiaddr() ma.Multiaddr { proto := ma.ProtocolWithCode(ma.P_P2P).Name - peerid := c.Conn().RemotePeer().Pretty() + peerid := c.stream.Conn().RemotePeer().Pretty() p2paddr := ma.StringCast(fmt.Sprintf("/%s/%s", proto, peerid)) circaddr := ma.Cast(ma.CodeToVarint(P_CIRCUIT)) @@ -46,11 +71,11 @@ func (c *Conn) RemoteMultiaddr() ma.Multiaddr { } func (c *Conn) LocalMultiaddr() ma.Multiaddr { - return c.Conn().LocalMultiaddr() + return c.stream.Conn().LocalMultiaddr() } func (c *Conn) LocalAddr() net.Addr { - na, err := manet.ToNetAddr(c.Conn().LocalMultiaddr()) + na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr()) if err != nil { log.Error("failed to convert local multiaddr to net addr:", err) return nil diff --git a/go.sum b/go.sum index 2068fcf..33e53ef 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,10 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/listen.go b/listen.go index 35661cb..0de4120 100644 --- a/listen.go +++ b/listen.go @@ -25,10 +25,10 @@ func (r *Relay) Listener() *RelayListener { func (l *RelayListener) Accept() (manet.Conn, error) { select { case c := <-l.incoming: - err := l.Relay().writeResponse(c.Stream, pb.CircuitRelay_SUCCESS) + err := l.Relay().writeResponse(c.stream, pb.CircuitRelay_SUCCESS) if err != nil { log.Debugf("error writing relay response: %s", err.Error()) - c.Stream.Reset() + c.stream.Reset() return nil, err } diff --git a/relay.go b/relay.go index 0afcf04..8ce0447 100644 --- a/relay.go +++ b/relay.go @@ -180,7 +180,7 @@ func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore return nil, RelayError{msg.GetCode()} } - return &Conn{Stream: s, remote: dest}, nil + return &Conn{stream: s, remote: dest}, nil } func (r *Relay) Matches(addr ma.Multiaddr) bool { @@ -438,7 +438,7 @@ func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) { } select { - case r.incoming <- &Conn{Stream: s, remote: src}: + case r.incoming <- &Conn{stream: s, remote: src}: case <-time.After(RelayAcceptTimeout): r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED) }