-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathassembler.go
161 lines (127 loc) · 2.93 KB
/
assembler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// SPDX-FileCopyrightText: 2025 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0
package wrpssp
import (
"context"
"io"
"strings"
"sync"
"github.com/xmidt-org/wrp-go/v3"
)
// Assembler is a struct that reads from a stream of WRP messages and assembles
// them into a single stream.
//
// The Assembler implements the io.ReadCloser interface, as well as the wrp.Processor
// interface.
type Assembler struct {
closed bool
current uint64
final string
offset int
blocks map[uint64]block
m sync.Mutex
}
var _ io.ReadCloser = (*Assembler)(nil)
type block struct {
headers headers
payload []byte
}
// Read implements an io.Reader method.
func (a *Assembler) Read(p []byte) (int, error) {
a.m.Lock()
defer a.m.Unlock()
block, found := a.getBlock(a.current)
if !found {
err := a.getFinalState()
if err != nil {
a.close()
}
return 0, err
}
if block.headers.finalPacket != "" {
a.final = strings.TrimSpace(block.headers.finalPacket)
}
n := copy(p, block.payload[a.offset:])
a.offset += n
if a.offset >= len(block.payload) {
delete(a.blocks, a.current)
a.current++
a.offset = 0
}
err := a.getFinalState()
if err != nil {
a.close()
}
return n, err
}
func (a *Assembler) getBlock(n uint64) (block, bool) {
if a.blocks == nil {
return block{}, false
}
b, found := a.blocks[n]
return b, found
}
func (a *Assembler) getFinalState() error {
if a.final == "" {
return nil
}
if strings.ToLower(a.final) == "eof" {
return io.EOF
}
return &unexpectedEOF{message: a.final}
}
// Close closes the Assembler and implements the io.Closer interface.
func (a *Assembler) Close() error {
a.m.Lock()
defer a.m.Unlock()
a.close()
return nil
}
func (a *Assembler) close() {
a.blocks = nil
a.closed = true
}
// ProcessWRP takes a WRP message and processes it. If the message is not an SSP
// message, it is ignored. If the message is an SSP message, it is processed.
// The context is not used, but is required by the wrp.Processor interface.
func (a *Assembler) ProcessWRP(_ context.Context, msg wrp.Message) error {
if !isSSP(&msg) {
return wrp.ErrNotHandled
}
h, err := get(&msg)
if err != nil {
return err
}
a.m.Lock()
defer a.m.Unlock()
if a.closed {
return ErrClosed
}
// We're past the current packet, so it can be dropped.
if a.current > uint64(h.currentPacketNumber) {
return nil
}
if a.blocks == nil {
a.blocks = make(map[uint64]block)
}
// We have the current packet already, so it can be dropped.
if _, found := a.blocks[uint64(h.currentPacketNumber)]; found {
return nil
}
a.blocks[uint64(h.currentPacketNumber)] = block{
headers: h,
payload: msg.Payload,
}
return nil
}
// GetStreamID returns the stream ID of the message if it is an SSP message.
func GetStreamID(msg wrp.Message) (string, error) {
if !isSSP(&msg) {
return "", wrp.ErrNotHandled
}
h, err := get(&msg)
if err != nil {
return "", err
}
return h.id, nil
}