-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathstream.go
103 lines (84 loc) · 2.48 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package peerstream
import (
"fmt"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
smux "gx/ipfs/Qmb1US8uyZeEpMyc56wVZy2cDFdQjNFojAUYVCoo9ieTqp/go-stream-muxer"
)
// StreamHandler is a function which receives a Stream. It
// allows clients to set a function to receive newly created
// streams, and decide whether to continue adding them.
// It works sort of like a http.HandleFunc.
// Note: the StreamHandler is called sequentially, so spawn
// goroutines or pass the Stream. See EchoHandler.
type StreamHandler func(s *Stream)
// Stream is an io.{Read,Write,Close}r to a remote counterpart.
// It wraps a spdystream.Stream, and links it to a Conn and groups
type Stream struct {
smuxStream smux.Stream
conn *Conn
groups groupSet
protocol protocol.ID
}
func newStream(ss smux.Stream, c *Conn) *Stream {
s := &Stream{
conn: c,
smuxStream: ss,
groups: groupSet{m: make(map[Group]struct{})},
}
s.groups.AddSet(&c.groups) // inherit groups
return s
}
// String returns a string representation of the Stream
func (s *Stream) String() string {
f := "<peerstream.Stream %s <--> %s>"
return fmt.Sprintf(f, s.conn.NetConn().LocalAddr(), s.conn.NetConn().RemoteAddr())
}
// SPDYStream returns the underlying *spdystream.Stream
func (s *Stream) Stream() smux.Stream {
return s.smuxStream
}
// Conn returns the Conn associated with this Stream
func (s *Stream) Conn() *Conn {
return s.conn
}
// Swarm returns the Swarm asociated with this Stream
func (s *Stream) Swarm() *Swarm {
return s.conn.swarm
}
// Groups returns the Groups this Stream belongs to
func (s *Stream) Groups() []Group {
return s.groups.Groups()
}
// InGroup returns whether this stream belongs to a Group
func (s *Stream) InGroup(g Group) bool {
return s.groups.Has(g)
}
// AddGroup assigns given Group to Stream
func (s *Stream) AddGroup(g Group) {
s.groups.Add(g)
}
func (s *Stream) Read(p []byte) (n int, err error) {
return s.smuxStream.Read(p)
}
func (s *Stream) Write(p []byte) (n int, err error) {
return s.smuxStream.Write(p)
}
func (s *Stream) Close() error {
return s.conn.swarm.removeStream(s)
}
func (s *Stream) Protocol() protocol.ID {
return s.protocol
}
func (s *Stream) SetProtocol(p protocol.ID) {
s.protocol = p
}
// StreamsWithGroup narrows down a set of streams to those in given group.
func StreamsWithGroup(g Group, streams []*Stream) []*Stream {
var out []*Stream
for _, s := range streams {
if s.InGroup(g) {
out = append(out, s)
}
}
return out
}