From 4edf3cfa4c6dc9e4f3effb748d797471493ef132 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 2 Nov 2016 23:35:03 -0700 Subject: [PATCH 1/3] move protocol methods down into peerstream --- p2p/net/swarm/swarm.go | 13 ++++--------- p2p/net/swarm/swarm_conn.go | 6 +++++- p2p/net/swarm/swarm_stream.go | 33 ++++++++------------------------- 3 files changed, 17 insertions(+), 35 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index bb179bbc69..ec156d4e4b 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -256,7 +256,7 @@ func (s *Swarm) SetConnHandler(handler ConnHandler) { // See peerstream. func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) { s.swarm.SetStreamHandler(func(s *ps.Stream) { - handler(wrapStream(s)) + handler((*Stream)(s)) }) } @@ -273,12 +273,7 @@ func (s *Swarm) NewStreamWithPeer(ctx context.Context, p peer.ID) (*Stream, erro // TODO: think about passing a context down to NewStreamWithGroup st, err := s.swarm.NewStreamWithGroup(p) - return wrapStream(st), err -} - -// StreamsWithPeer returns all the live Streams to p -func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream { - return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams())) + return (*Stream)(st), err } // ConnectionsToPeer returns all the live connections to p @@ -387,9 +382,9 @@ func (n *ps2netNotifee) Disconnected(c *ps.Conn) { } func (n *ps2netNotifee) OpenedStream(s *ps.Stream) { - n.not.OpenedStream(n.net, &Stream{stream: s}) + n.not.OpenedStream(n.net, (*Stream)(s)) } func (n *ps2netNotifee) ClosedStream(s *ps.Stream) { - n.not.ClosedStream(n.net, &Stream{stream: s}) + n.not.ClosedStream(n.net, (*Stream)(s)) } diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go index 72609f0f32..bc5b69e4ad 100644 --- a/p2p/net/swarm/swarm_conn.go +++ b/p2p/net/swarm/swarm_conn.go @@ -77,7 +77,7 @@ func (c *Conn) RemotePublicKey() ic.PubKey { // NewSwarmStream returns a new Stream from this connection func (c *Conn) NewSwarmStream() (*Stream, error) { s, err := c.StreamConn().NewStream() - return wrapStream(s), err + return (*Stream)(s), err } // NewStream returns a new Stream from this connection @@ -91,6 +91,10 @@ func (c *Conn) Close() error { return c.StreamConn().Close() } +func (c *Conn) GetStreams() ([]inet.Stream, error) { + return nil, fmt.Errorf("GetStreams() not yet implemented") +} + func wrapConn(psc *ps.Conn) (*Conn, error) { // grab the underlying connection. if _, ok := psc.NetConn().(iconn.Conn); !ok { diff --git a/p2p/net/swarm/swarm_stream.go b/p2p/net/swarm/swarm_stream.go index ef3ab3c8fd..e3b12563bb 100644 --- a/p2p/net/swarm/swarm_stream.go +++ b/p2p/net/swarm/swarm_stream.go @@ -9,14 +9,11 @@ import ( // Stream is a wrapper around a ps.Stream that exposes a way to get // our Conn and Swarm (instead of just the ps.Conn and ps.Swarm) -type Stream struct { - stream *ps.Stream - protocol protocol.ID -} +type Stream ps.Stream // Stream returns the underlying peerstream.Stream func (s *Stream) Stream() *ps.Stream { - return s.stream + return (*ps.Stream)(s) } // Conn returns the Conn associated with this Stream, as an inet.Conn @@ -26,43 +23,29 @@ func (s *Stream) Conn() inet.Conn { // SwarmConn returns the Conn associated with this Stream, as a *Conn func (s *Stream) SwarmConn() *Conn { - return (*Conn)(s.stream.Conn()) + return (*Conn)(s.Stream().Conn()) } // Read reads bytes from a stream. func (s *Stream) Read(p []byte) (n int, err error) { - return s.stream.Read(p) + return s.Stream().Read(p) } // Write writes bytes to a stream, flushing for each call. func (s *Stream) Write(p []byte) (n int, err error) { - return s.stream.Write(p) + return s.Stream().Write(p) } // Close closes the stream, indicating this side is finished // with the stream. func (s *Stream) Close() error { - return s.stream.Close() + return s.Stream().Close() } func (s *Stream) Protocol() protocol.ID { - return s.protocol + return (*ps.Stream)(s).Protocol() } func (s *Stream) SetProtocol(p protocol.ID) { - s.protocol = p -} - -func wrapStream(pss *ps.Stream) *Stream { - return &Stream{ - stream: pss, - } -} - -func wrapStreams(st []*ps.Stream) []*Stream { - out := make([]*Stream, len(st)) - for i, s := range st { - out[i] = wrapStream(s) - } - return out + (*ps.Stream)(s).SetProtocol(p) } From 0ac5cdf74458835bbcc18d9bfb703666f912bd1c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 3 Nov 2016 13:15:48 -0700 Subject: [PATCH 2/3] bubble up peerstream modifications --- p2p/net/swarm/swarm.go | 2 +- p2p/net/swarm/swarm_conn.go | 2 +- p2p/net/swarm/swarm_listen.go | 2 +- p2p/net/swarm/swarm_stream.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index ec156d4e4b..3a59517c34 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -12,7 +12,6 @@ import ( "time" logging "github.com/ipfs/go-log" - ps "github.com/jbenet/go-peerstream" pst "github.com/jbenet/go-stream-muxer" "github.com/jbenet/goprocess" goprocessctx "github.com/jbenet/goprocess/context" @@ -26,6 +25,7 @@ import ( pstore "github.com/libp2p/go-libp2p-peerstore" transport "github.com/libp2p/go-libp2p-transport" filter "github.com/libp2p/go-maddr-filter" + ps "github.com/libp2p/go-peerstream" tcpt "github.com/libp2p/go-tcp-transport" ma "github.com/multiformats/go-multiaddr" psmss "github.com/whyrusleeping/go-smux-multistream" diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go index bc5b69e4ad..4b13be54f1 100644 --- a/p2p/net/swarm/swarm_conn.go +++ b/p2p/net/swarm/swarm_conn.go @@ -4,11 +4,11 @@ import ( "context" "fmt" - ps "github.com/jbenet/go-peerstream" ic "github.com/libp2p/go-libp2p-crypto" iconn "github.com/libp2p/go-libp2p-interface-conn" inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" + ps "github.com/libp2p/go-peerstream" ma "github.com/multiformats/go-multiaddr" ) diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index d76d1e640e..571a305e3a 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -4,13 +4,13 @@ import ( "context" "fmt" - ps "github.com/jbenet/go-peerstream" conn "github.com/libp2p/go-libp2p-conn" iconn "github.com/libp2p/go-libp2p-interface-conn" lgbl "github.com/libp2p/go-libp2p-loggables" mconn "github.com/libp2p/go-libp2p-metrics/conn" inet "github.com/libp2p/go-libp2p-net" transport "github.com/libp2p/go-libp2p-transport" + ps "github.com/libp2p/go-peerstream" ma "github.com/multiformats/go-multiaddr" ) diff --git a/p2p/net/swarm/swarm_stream.go b/p2p/net/swarm/swarm_stream.go index e3b12563bb..771038621f 100644 --- a/p2p/net/swarm/swarm_stream.go +++ b/p2p/net/swarm/swarm_stream.go @@ -4,7 +4,7 @@ import ( inet "github.com/libp2p/go-libp2p-net" protocol "github.com/libp2p/go-libp2p-protocol" - ps "github.com/jbenet/go-peerstream" + ps "github.com/libp2p/go-peerstream" ) // Stream is a wrapper around a ps.Stream that exposes a way to get From 23bb096d3ebaf22a7490507d73b76824a0c570c8 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 3 Nov 2016 14:01:18 -0700 Subject: [PATCH 3/3] implement GetStreams --- p2p/net/swarm/swarm_conn.go | 8 +++++++- p2p/net/swarm/swarm_net_test.go | 9 +++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go index 4b13be54f1..0b0c626cea 100644 --- a/p2p/net/swarm/swarm_conn.go +++ b/p2p/net/swarm/swarm_conn.go @@ -92,7 +92,13 @@ func (c *Conn) Close() error { } func (c *Conn) GetStreams() ([]inet.Stream, error) { - return nil, fmt.Errorf("GetStreams() not yet implemented") + ss := c.StreamConn().Streams() + out := make([]inet.Stream, len(ss)) + + for i, s := range ss { + out[i] = (*Stream)(s) + } + return out, nil } func wrapConn(psc *ps.Conn) (*Conn, error) { diff --git a/p2p/net/swarm/swarm_net_test.go b/p2p/net/swarm/swarm_net_test.go index ff13934088..95bc268307 100644 --- a/p2p/net/swarm/swarm_net_test.go +++ b/p2p/net/swarm/swarm_net_test.go @@ -165,6 +165,15 @@ func TestNetworkOpenStream(t *testing.T) { t.Fatal(err) } + streams, err := nets[0].ConnsToPeer(nets[1].LocalPeer())[0].GetStreams() + if err != nil { + t.Fatal(err) + } + + if len(streams) != 1 { + t.Fatal("should only have one stream there") + } + _, err = s.Write([]byte("hello ipfs")) if err != nil { t.Fatal(err)