Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle EOF on single line content #33568

Merged
merged 24 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789]
- Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995]
- Fix handling of empty array in httpjson input. {pull}32001[32001]
- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
- Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597]
- Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609]
- Fix Google workspace pagination and document ID generation. {pull}33666[33666]
Expand Down
5 changes: 3 additions & 2 deletions libbeat/reader/readfile/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package readfile
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -39,7 +40,7 @@ func BenchmarkEncoderReader(b *testing.B) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
for bN := 0; bN < b.N; bN++ {
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit})
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit, false})
if err != nil {
b.Fatal("failed to initialize reader:", err)
}
Expand All @@ -48,7 +49,7 @@ func BenchmarkEncoderReader(b *testing.B) {
for i := 0; ; i++ {
msg, err := reader.Next()
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
b.ReportMetric(float64(i), "processed_lines")
break
} else {
Expand Down
8 changes: 7 additions & 1 deletion libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ type Config struct {
BufferSize int
Terminator LineTerminator
MaxBytes int
// If CollectOnEOF is set to true (default false) the line reader will return the buffer if EOF reached: this
// will ensure full content including last line with no EOL will be returned for fully retrieved content that's
// not appended anymore between reads.
// If CollectOnEOF is set to false the line reader will return 0 content and keep the buffer at the current
// state of appending data after temporarily EOF.
CollectOnEOF bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth a comment on how behavior changes with this flag.

}

// New creates a new Encode reader from input reader by applying
// NewEncodeReader creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
Expand Down
96 changes: 68 additions & 28 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package readfile

import (
"bytes"
"errors"
"fmt"
"io"

Expand All @@ -33,18 +34,22 @@ const unlimited = 0
// LineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
// If collectOnEOF is set to true (default false) it will return the buffer if EOF reached.
// If collectOnEOF is set to false it will return 0 content and keep the buffer at the current
// state of appending data after temporarily EOF.
type LineReader struct {
reader io.ReadCloser
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
reader io.ReadCloser
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
collectOnEOF bool
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
}

// NewLineReader creates a new reader object
Expand All @@ -63,15 +68,16 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
}

return &LineReader{
reader: input,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
reader: input,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
collectOnEOF: config.CollectOnEOF,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
}, nil
}

Expand All @@ -88,12 +94,46 @@ func (r *LineReader) Next() (b []byte, n int, err error) {
// read next 'potential' line from input buffer/reader
err := r.advance()
if err != nil {
if errors.Is(err, io.EOF) && r.collectOnEOF {
// Found EOF and collectOnEOF is true
// -> decode input sequence into outBuffer
// let's take whole buffer len without len(nl) if it ends with it
end := r.inBuffer.Len()
if bytes.HasSuffix(r.inBuffer.Bytes(), r.decodedNl) {
end -= len(r.nl)
}

sz, err := r.decode(end)
if err != nil {
r.logger.Errorf("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = r.inBuffer.Len()
}

// Consume transformed bytes from input buffer
_ = r.inBuffer.Advance(sz)
r.inBuffer.Reset()

// output buffer contains untile EOF. Extract
// byte slice from buffer and reset output buffer.
bytes, err := r.outBuffer.Collect(r.outBuffer.Len())
r.outBuffer.Reset()
if err != nil {
// This should never happen as otherwise we have a broken state
panic(err)
}

// return and reset consumed bytes count
sz = r.byteCount
r.byteCount = 0
return bytes, sz, io.EOF
}

// return and reset consumed bytes count
sz := r.byteCount
r.byteCount = 0
return nil, sz, err
}

// Check last decoded byte really being newline also unencoded
// if not, continue reading
buf := r.outBuffer.Bytes()
Expand Down Expand Up @@ -144,13 +184,13 @@ func (r *LineReader) advance() error {
// Try to read more bytes into buffer
n, err := r.reader.Read(r.tempBuffer)

if err == io.EOF && n > 0 {
if errors.Is(err, io.EOF) && n > 0 {
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
err = nil
}

// Write to buffer also in case of err
r.inBuffer.Write(r.tempBuffer[:n])
_, _ = r.inBuffer.Write(r.tempBuffer[:n])

if err != nil {
return err
Expand All @@ -169,7 +209,7 @@ func (r *LineReader) advance() error {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
_ = r.inBuffer.Advance(idx + len(r.nl))
r.byteCount += idx + len(r.nl)
r.inBuffer.Reset()
r.inOffset = 0
Expand Down Expand Up @@ -237,7 +277,7 @@ func (r *LineReader) skipUntilNewLine() (int, error) {
idx = bytes.Index(r.tempBuffer[:n], r.nl)

if idx != -1 {
r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
_, _ = r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
skipped += idx + len(r.nl)
} else {
skipped += n
Expand Down Expand Up @@ -267,8 +307,8 @@ func (r *LineReader) decode(end int) (int, error) {
nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false)
if err != nil {
// Check if error is different from destination buffer too short
if err != transform.ErrShortDst {
r.outBuffer.Write(inBytes[0:end])
if !errors.Is(err, transform.ErrShortDst) {
_, _ = r.outBuffer.Write(inBytes[0:end])
start = end
break
}
Expand All @@ -278,7 +318,7 @@ func (r *LineReader) decode(end int) (int, error) {
}

start += nSrc
r.outBuffer.Write(r.tempBuffer[:nDst])
_, _ = r.outBuffer.Write(r.tempBuffer[:nDst])
}

r.byteCount += start
Expand Down
Loading