-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
Copy pathfile.go
369 lines (312 loc) · 9.67 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
type EmitFunc func(ctx context.Context, attrs *FileAttributes, token []byte)
type Manager struct {
*zap.SugaredLogger
wg sync.WaitGroup
cancel context.CancelFunc
readerFactory readerFactory
finder Finder
roller roller
persister operator.Persister
pollInterval time.Duration
maxBatches int
maxBatchFiles int
deleteAfterRead bool
knownFiles []*Reader
seenPaths map[string]struct{}
currentFps []*Fingerprint
}
func (m *Manager) Start(persister operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel
m.persister = persister
// Load offsets from disk
if err := m.loadLastPollFiles(ctx); err != nil {
return fmt.Errorf("read known files from database: %w", err)
}
if len(m.finder.FindFiles()) == 0 {
m.Warnw("no files match the configured include patterns",
"include", m.finder.Include,
"exclude", m.finder.Exclude)
}
// Start polling goroutine
m.startPoller(ctx)
return nil
}
// Stop will stop the file monitoring process
func (m *Manager) Stop() error {
m.cancel()
m.wg.Wait()
m.roller.cleanup()
for _, reader := range m.knownFiles {
reader.Close()
}
m.knownFiles = nil
m.cancel = nil
return nil
}
// startPoller kicks off a goroutine that will poll the filesystem periodically,
// checking if there are new files or new logs in the watched files
func (m *Manager) startPoller(ctx context.Context) {
m.wg.Add(1)
go func() {
defer m.wg.Done()
globTicker := time.NewTicker(m.pollInterval)
defer globTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-globTicker.C:
}
m.poll(ctx)
}
}()
}
// poll checks all the watched paths for new entries
func (m *Manager) poll(ctx context.Context) {
// Increment the generation on all known readers
// This is done here because the next generation is about to start
for i := 0; i < len(m.knownFiles); i++ {
m.knownFiles[i].generation++
}
// Used to keep track of the number of batches processed in this poll cycle
batchesProcessed := 0
// Get the list of paths on disk
matches := m.finder.FindFiles()
for len(matches) > m.maxBatchFiles {
m.consume(ctx, matches[:m.maxBatchFiles])
// If a maxBatches is set, check if we have hit the limit
if m.maxBatches != 0 {
batchesProcessed++
if batchesProcessed >= m.maxBatches {
return
}
}
matches = matches[m.maxBatchFiles:]
}
m.consume(ctx, matches)
}
func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files")
readers := make([]*Reader, 0, len(paths))
for _, path := range paths {
r := m.makeReader(path)
if r != nil {
readers = append(readers, r)
}
}
// take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
m.roller.readLostFiles(ctx, readers)
var wg sync.WaitGroup
for _, reader := range readers {
wg.Add(1)
go func(r *Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
// Delete a file if deleteAfterRead is enabled and we reached the end of the file
if m.deleteAfterRead && r.eof {
r.Close()
if err := os.Remove(r.file.Name()); err != nil {
m.Errorf("could not delete %s", r.file.Name())
}
}
}(reader)
}
wg.Wait()
// Save off any files that were not fully read
if m.deleteAfterRead {
unfinished := make([]*Reader, 0, len(readers))
for _, r := range readers {
if !r.eof {
unfinished = append(unfinished, r)
}
}
readers = unfinished
// If all files were read and deleted then no need to do bookkeeping on readers
if len(readers) == 0 {
return
}
}
// Any new files that appear should be consumed entirely
m.readerFactory.fromBeginning = true
m.roller.roll(ctx, readers)
m.saveCurrent(readers)
m.syncLastPollFiles(ctx)
m.clearCurrentFingerprints()
}
func (m *Manager) makeFingerprint(path string) (*Fingerprint, *os.File) {
if _, ok := m.seenPaths[path]; !ok {
if m.readerFactory.fromBeginning {
m.Infow("Started watching file", "path", path)
} else {
m.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
m.seenPaths[path] = struct{}{}
}
file, err := os.Open(path) // #nosec - operator must read in files defined by user
if err != nil {
m.Debugf("Failed to open file", zap.Error(err))
return nil, nil
}
fp, err := m.readerFactory.newFingerprint(file)
if err != nil {
if err = file.Close(); err != nil {
m.Errorf("problem closing file %s", file.Name())
}
return nil, nil
}
if len(fp.FirstBytes) == 0 {
// Empty file, don't read it until we can compare its fingerprint
if err = file.Close(); err != nil {
m.Errorf("problem closing file %s", file.Name())
}
return nil, nil
}
return fp, file
}
func (m *Manager) checkDuplicates(fp *Fingerprint) bool {
for i := 0; i < len(m.currentFps); i++ {
fp2 := m.currentFps[i]
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
return true
}
}
return false
}
// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReader(path string) *Reader {
// Open the files first to minimize the time between listing and opening
fp, file := m.makeFingerprint(path)
if fp == nil {
return nil
}
// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
if m.checkDuplicates(fp) {
if err := file.Close(); err != nil {
m.Errorf("problem closing file", "file", file.Name())
}
return nil
}
m.currentFps = append(m.currentFps, fp)
reader, err := m.newReader(file, fp)
if err != nil {
m.Errorw("Failed to create reader", zap.Error(err))
return nil
}
return reader
}
func (m *Manager) clearCurrentFingerprints() {
m.currentFps = make([]*Fingerprint, 0)
}
// saveCurrent adds the readers from this polling interval to this list of
// known files, then increments the generation of all tracked old readers
// before clearing out readers that have existed for 3 generations.
func (m *Manager) saveCurrent(readers []*Reader) {
// Add readers from the current, completed poll interval to the list of known files
m.knownFiles = append(m.knownFiles, readers...)
// Clear out old readers. They are sorted such that they are oldest first,
// so we can just find the first reader whose generation is less than our
// max, and keep every reader after that
for i := 0; i < len(m.knownFiles); i++ {
reader := m.knownFiles[i]
if reader.generation <= 3 {
m.knownFiles = m.knownFiles[i:]
break
}
}
}
func (m *Manager) newReader(file *os.File, fp *Fingerprint) (*Reader, error) {
// Check if the new path has the same fingerprint as an old path
if oldReader, ok := m.findFingerprintMatch(fp); ok {
return m.readerFactory.copy(oldReader, file)
}
// If we don't match any previously known files, create a new reader from scratch
return m.readerFactory.newReader(file, fp)
}
func (m *Manager) findFingerprintMatch(fp *Fingerprint) (*Reader, bool) {
// Iterate backwards to match newest first
for i := len(m.knownFiles) - 1; i >= 0; i-- {
oldReader := m.knownFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
// Remove the old reader from the list of known files. We will
// add it back in saveCurrent if it is still alive.
m.knownFiles = append(m.knownFiles[:i], m.knownFiles[i+1:]...)
return oldReader, true
}
}
return nil, false
}
const knownFilesKey = "knownFiles"
// syncLastPollFiles syncs the most recent set of files to the database
func (m *Manager) syncLastPollFiles(ctx context.Context) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
// Encode the number of known files
if err := enc.Encode(len(m.knownFiles)); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
return
}
// Encode each known file
for _, fileReader := range m.knownFiles {
if err := enc.Encode(fileReader); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
}
}
if err := m.persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil {
m.Errorw("Failed to sync to database", zap.Error(err))
}
}
// syncLastPollFiles loads the most recent set of files to the database
func (m *Manager) loadLastPollFiles(ctx context.Context) error {
encoded, err := m.persister.Get(ctx, knownFilesKey)
if err != nil {
return err
}
if encoded == nil {
m.knownFiles = make([]*Reader, 0, 10)
return nil
}
dec := json.NewDecoder(bytes.NewReader(encoded))
// Decode the number of entries
var knownFileCount int
if err := dec.Decode(&knownFileCount); err != nil {
return fmt.Errorf("decoding file count: %w", err)
}
if knownFileCount > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.fromBeginning = true
}
// Decode each of the known files
m.knownFiles = make([]*Reader, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
// Only the offset, fingerprint, and splitter
// will be used before this reader is discarded
unsafeReader, err := m.readerFactory.unsafeReader()
if err != nil {
return err
}
if err = dec.Decode(unsafeReader); err != nil {
return err
}
m.knownFiles = append(m.knownFiles, unsafeReader)
}
return nil
}