-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrcv.go
114 lines (91 loc) · 2.3 KB
/
rcv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package tomtp
import (
"sync"
)
type RcvInsertStatus int
const (
RcvInsertOk RcvInsertStatus = iota
RcvInsertDuplicate
RcvInsertBufferFull
)
type RcvSegment struct {
offset uint64
data []byte
}
type ReceiveBuffer struct {
segments *SortedHashMap[PacketKey, *RcvSegment] // Store out-of-order segments
nextOffset uint64 // Next expected offset
capacity int // Max buffer size
size int // Current size
mu *sync.Mutex
acks []Ack
}
func NewReceiveBuffer(capacity int) *ReceiveBuffer {
return &ReceiveBuffer{
segments: NewSortedHashMap[PacketKey, *RcvSegment](func(a, b PacketKey) bool { return a.less(b) }),
capacity: capacity,
mu: &sync.Mutex{},
}
}
func (rb *ReceiveBuffer) Insert(segment *RcvSegment) RcvInsertStatus {
rb.mu.Lock()
defer rb.mu.Unlock()
dataLen := len(segment.data)
if segment.offset+uint64(dataLen) < rb.nextOffset {
return RcvInsertDuplicate
}
key := createPacketKey(segment.offset, uint16(dataLen))
if rb.segments.Contains(key) {
return RcvInsertDuplicate
}
if rb.size+dataLen > rb.capacity {
return RcvInsertBufferFull
}
rb.segments.Put(key, segment)
rb.acks = append(rb.acks, Ack{
StreamOffset: segment.offset,
Len: uint16(dataLen),
})
rb.size += dataLen
return RcvInsertOk
}
func (rb *ReceiveBuffer) RemoveOldestInOrder() *RcvSegment {
rb.mu.Lock()
defer rb.mu.Unlock()
// Get the oldest segment, check if we have data in order
oldest := rb.segments.Min()
if oldest == nil || oldest.Value.offset > rb.nextOffset {
return nil
}
rb.segments.Remove(oldest.Key)
rb.size -= int(oldest.Key.length())
segment := oldest.Value
if segment.offset < rb.nextOffset {
diff := rb.nextOffset - segment.offset
segment.data = segment.data[diff:]
segment.offset = rb.nextOffset
}
rb.nextOffset = segment.offset + uint64(len(segment.data))
return segment
}
func (rb *ReceiveBuffer) Size() int {
rb.mu.Lock()
defer rb.mu.Unlock()
return rb.size
}
func (rb *ReceiveBuffer) GetAcks() []Ack {
rb.mu.Lock()
defer rb.mu.Unlock()
numAcks := len(rb.acks)
if numAcks == 0 {
return nil
}
if numAcks <= 15 {
acks := rb.acks
rb.acks = nil
return acks
}
acks := rb.acks[:15]
rb.acks = rb.acks[15:]
return acks
}