-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuffer.go
152 lines (135 loc) · 3.12 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package ssh
import (
"io"
"os"
"sync"
"time"
)
// buffer provides a linked list buffer for data exchange
// between producer and consumer. Theoretically the buffer is
// of unlimited capacity as it does no allocation of its own.
type buffer struct {
// protects concurrent access to head, tail and closed
*sync.Cond
head *element // the buffer that will be read first
tail *element // the buffer that will be read last
closed bool
deadlineReached bool
timer *time.Timer
}
// An element represents a single link in a linked list.
type element struct {
buf []byte
next *element
}
// newBuffer returns an empty buffer that is not closed.
func newBuffer() *buffer {
e := new(element)
b := &buffer{
Cond: newCond(),
head: e,
tail: e,
}
return b
}
// write makes buf available for Read to receive.
// buf must not be modified after the call to write.
func (b *buffer) write(buf []byte) {
b.Cond.L.Lock()
e := &element{buf: buf}
b.tail.next = e
b.tail = e
b.Cond.Broadcast()
b.Cond.L.Unlock()
}
// eof closes the buffer. Reads from the buffer once all
// the data has been consumed will receive io.EOF.
func (b *buffer) eof() {
b.Cond.L.Lock()
b.closed = true
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
b.Cond.Broadcast()
b.Cond.L.Unlock()
}
func (b *buffer) setDeadline(deadline time.Time) {
if !deadline.IsZero() && deadline.Before(time.Now()) {
b.Cond.L.Lock()
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
b.Cond.L.Unlock()
// Unblock read, if any.
b.deadline()
return
}
b.Cond.L.Lock()
defer b.Cond.L.Unlock()
b.deadlineReached = false
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
if deadline.IsZero() {
return
}
b.timer = time.AfterFunc(time.Until(deadline), func() {
b.deadline()
})
}
func (b *buffer) deadline() {
b.Cond.L.Lock()
b.deadlineReached = true
b.Cond.Broadcast()
b.Cond.L.Unlock()
}
// Read reads data from the internal buffer in buf. Reads will block
// if no data is available, or until the buffer is closed.
func (b *buffer) Read(buf []byte) (n int, err error) {
b.Cond.L.Lock()
defer b.Cond.L.Unlock()
if b.deadlineReached {
// NOTE to reviewers: should be also try to read outstanding data if
// any?
err = os.ErrDeadlineExceeded
return
}
for len(buf) > 0 {
// if there is data in b.head, copy it
if len(b.head.buf) > 0 {
r := copy(buf, b.head.buf)
buf, b.head.buf = buf[r:], b.head.buf[r:]
n += r
continue
}
// if there is a next buffer, make it the head
if len(b.head.buf) == 0 && b.head != b.tail {
b.head = b.head.next
continue
}
// if at least one byte has been copied, return
if n > 0 {
break
}
// if nothing was read, and there is nothing outstanding
// check to see if the buffer is closed.
if b.closed {
err = io.EOF
break
}
// check if the deadline was reached.
if b.deadlineReached {
err = os.ErrDeadlineExceeded
break
}
// out of buffers, wait for producer
b.Cond.Wait()
}
return
}