-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlabel.go
159 lines (133 loc) · 4.01 KB
/
label.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package memberlist
import (
"bufio"
"fmt"
"io"
"net"
)
// General approach is to prefix all packets and streams with the same structure:
//
// magic type byte (244): uint8
// length of Label name: uint8 (because Labels can't be longer than 255 bytes)
// Label name: []uint8
// LabelMaxSize 包、流 标签的最大长度
const LabelMaxSize = 255
// AddLabelHeaderToPacket 如果Label不是空的,则用正确的标头对流出的数据包进行前缀。
func AddLabelHeaderToPacket(buf []byte, Label string) ([]byte, error) {
if Label == "" {
return buf, nil
}
if len(Label) > LabelMaxSize {
return nil, fmt.Errorf("标签 %q is 太长", Label)
}
return makeLabelHeader(Label, buf), nil
}
// RemoveLabelHeaderFromPacket 从提供的包中删除任何标签头,并将其与剩余的包内容一起返回。
func RemoveLabelHeaderFromPacket(buf []byte) (newBuf []byte, Label string, err error) {
if len(buf) == 0 {
return buf, "", nil // 不可能有标签
}
// [type:byte] [size:byte] [size bytes]
msgType := MessageType(buf[0])
if msgType != HasLabelMsg {
return buf, "", nil
}
if len(buf) < 2 {
return nil, "", fmt.Errorf("不能解码标签;数据包已被截断")
}
size := int(buf[1])
if size < 1 {
return nil, "", fmt.Errorf("标签头存在时不能为空")
}
if len(buf) < 2+size {
return nil, "", fmt.Errorf("不能解码标签;数据包已被截断")
}
Label = string(buf[2 : 2+size])
newBuf = buf[2+size:]
return newBuf, Label, nil
}
// AddLabelHeaderToStream 用正确的标头对流出的流进行前缀,如果 标签不是空的。
func AddLabelHeaderToStream(conn net.Conn, Label string) error {
if Label == "" {
return nil
}
if len(Label) > LabelMaxSize {
return fmt.Errorf("标签 %q is too long", Label)
}
header := makeLabelHeader(Label, nil)
_, err := conn.Write(header)
return err
}
// RemoveLabelHeaderFromStream
// 如果存在的话,从流的开头删除任何标签头,并将其与删除了该头的最新conn一起返回。
// 请注意,当出现错误时,关闭连接是调用者的责任。
func RemoveLabelHeaderFromStream(conn net.Conn) (net.Conn, string, error) {
br := bufio.NewReader(conn)
peeked, err := br.Peek(1)
if err != nil {
if err == io.EOF {
// 此时返回原始的 net.Conn 是安全的,因为它一开始就没有包含任何数据,
// 所以我们不需要把缓冲区拼接到 conn 中,因为两者都是空的。
return conn, "", nil
}
return nil, "", err
}
msgType := MessageType(peeked[0])
if msgType != HasLabelMsg {
conn, err = newPeekedConnFromBufferedReader(conn, br, 0)
return conn, "", err
}
peeked, err = br.Peek(2)
if err != nil {
if err == io.EOF {
return nil, "", fmt.Errorf("无法解码标签;流被截断了")
}
return nil, "", err
}
size := int(peeked[1])
if size < 1 {
return nil, "", fmt.Errorf("标签头存在时不能为空")
}
peeked, err = br.Peek(2 + size)
if err != nil {
if err == io.EOF {
return nil, "", fmt.Errorf("无法解码标签;流被截断了")
}
return nil, "", err
}
Label := string(peeked[2 : 2+size])
conn, err = newPeekedConnFromBufferedReader(conn, br, 2+size)
if err != nil {
return nil, "", err
}
return conn, Label, nil
}
// newPeekedConnFromBufferedReader 将读取到的数据拼接回conn
// 先从Peeked读取,再从Conn读
func newPeekedConnFromBufferedReader(conn net.Conn, br *bufio.Reader, offset int) (*peekedConn, error) {
peeked, err := br.Peek(br.Buffered()) // 将所有的数据读取出来
if err != nil {
return nil, err
}
return &peekedConn{
Peeked: peeked[offset:],
Conn: conn,
}, nil
}
// HasLabelMsg + len + data
func makeLabelHeader(Label string, rest []byte) []byte {
newBuf := make([]byte, 2, 2+len(Label)+len(rest))
newBuf[0] = byte(HasLabelMsg)
newBuf[1] = byte(len(Label))
newBuf = append(newBuf, []byte(Label)...)
if len(rest) > 0 {
newBuf = append(newBuf, []byte(rest)...)
}
return newBuf
}
func LabelOverhead(Label string) int {
if Label == "" {
return 0
}
return 2 + len(Label)
}