-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathreader.go
144 lines (126 loc) · 3.8 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package stream
import (
"errors"
"io"
"sync"
)
// Reader is a concurrent-safe Stream Reader.
type Reader struct {
s *Stream
file File
fileMu sync.RWMutex
readMu sync.Mutex
readOff int64
closeOnce onceWithErr
}
// Name returns the name of the underlying File in the FileSystem.
func (r *Reader) Name() string { return r.file.Name() }
// ReadAt lets you Read from specific offsets in the Stream.
// ReadAt blocks while waiting for the requested section of the Stream to be written,
// unless the Stream is closed in which case it will always return immediately.
func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) {
return r.read(p, &off)
}
// Read reads from the Stream. If the end of an open Stream is reached, Read
// blocks until more data is written or the Stream is Closed.
func (r *Reader) Read(p []byte) (n int, err error) {
r.readMu.Lock()
defer r.readMu.Unlock()
return r.read(p, &r.readOff)
}
func (r *Reader) read(p []byte, off *int64) (n int, err error) {
for {
var m int
m, err = r.s.b.UseHandle(func() (int, error) {
r.fileMu.RLock()
defer r.fileMu.RUnlock()
return r.file.ReadAt(p[n:], *off)
})
n += m
*off += int64(m)
switch {
case n != 0 && (err == nil || err == io.EOF):
return n, nil
case err == io.EOF:
if err := r.s.b.Wait(r, *off); err != nil {
return n, r.checkErr(err)
}
case err != nil:
return n, r.checkErr(err)
}
}
}
func (r *Reader) checkErr(err error) error {
switch err {
case ErrCanceled:
r.Close()
}
return err
}
// Close closes this Reader on the Stream. This must be called when done with the
// Reader or else the Stream cannot be Removed.
func (r *Reader) Close() error {
return r.closeOnce.Do(func() (err error) {
r.fileMu.Lock()
err = r.file.Close()
r.fileMu.Unlock()
r.s.b.DropReader(r)
return err
})
}
// Size returns the current size of the entire stream (not the remaining bytes to be read),
// and true iff the size is valid (not canceled), and final (won't change).
// Can be safely called concurrently with all other methods.
func (r *Reader) Size() (int64, bool) {
return r.s.b.Size()
}
// Seek changes the offset of the next Read in the stream.
// Seeking to Start/Current does not block for the stream to reach that position,
// so it cannot guarantee that position exists.
// Seeking to End will block until the stream is closed and then seek to that position,
// UNLESS Stream.SetSeekEnd has specified the size, in which case Seek End will be relative
// that that size. Reads will still block if reading from unwritten portions of the stream.
// Seek is safe to call concurrently with all other methods, though calling it
// concurrently with Read will lead to an undefined order of the calls
// (ex. may Seek then Read or Read than Seek, changing which bytes are Read).
// Similarly, calling SetSeekEnd concurrently with calls to Seek may lead to
// either SeekEnd blocking OR using the SetSeekEnd.
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
r.readMu.Lock()
defer r.readMu.Unlock()
switch whence {
default:
return 0, errWhence
case io.SeekStart:
case io.SeekCurrent:
offset += r.readOff
case io.SeekEnd:
size, err := r.seekEnd()
if err != nil {
return 0, err
}
offset += size
}
if offset < 0 {
return 0, errOffset
}
r.readOff = offset
return r.readOff, nil
}
func (r *Reader) seekEnd() (int64, error) {
// Check if end was specified:
if size := r.s.seekEnd.read(); size >= 0 {
return size, nil
}
// Block until closed so we know the true size:
if err := r.s.b.Wait(r, maxInt64); err != nil && err != io.EOF {
return 0, err
}
size, _ := r.s.b.Size() // we most be closed to reach here due to ^
return size, nil
}
var (
errWhence = errors.New("Seek: invalid whence")
errOffset = errors.New("Seek: invalid offset")
)
const maxInt64 = 1<<63 - 1