Skip to content

Commit

Permalink
handle EOF on single line content (#33568)
Browse files Browse the repository at this point in the history
* handle EOF on single line content

* changelog

* fallback to encode_eof if no events in aws-s3 input

* lint

* lint

* collect on EOF in line reader

* remove encode eof

* remove iterN

* fix test

* increase test coverage

* linting

* more linting

* increase coverage

(cherry picked from commit 7b45320)

# Conflicts:
#	libbeat/reader/readfile/line.go
#	libbeat/reader/readfile/line_test.go
#	x-pack/filebeat/input/awss3/s3_objects.go
  • Loading branch information
Andrea Spacca authored and mergify[bot] committed Apr 1, 2023
1 parent d081b43 commit 8ad19c9
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 90 deletions.
41 changes: 41 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,47 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Affecting all Beats*
- Fix panics when a processor is closed twice {pull}34647[34647]

- Re-enable build optimizations to reduce binary size and improve performance. {pull}33620[33620]
- Fix namespacing for agent self-monitoring, CPU no longer reports as zero. {pull}32336[32336]
- Fix namespacing on self-monitoring {pull}32336[32336]
- Expand fields in `decode_json_fields` if target is set. {issue}31712[31712] {pull}32010[32010]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Fix in AWS related services initialisation relying on custom endpoint resolver. {issue}32888[32888] {pull}32921[32921]
- Keep `orchestrator.cluster.name` if `kubeconfig` is not returned in GKE metadata. {pull}33418[33418]
- Fix Windows service install/uninstall when Win32_Service returns error, add logic to wait until the Windows Service is stopped before proceeding. {pull}33322[33322]
- Support for multiline zookeeper logs {issue}2496[2496]
- Allow `clock_nanosleep` in the default seccomp profiles for amd64 and 386. Newer versions of glibc (e.g. 2.31) require it. {issue}33792[33792]

*Auditbeat*


*Filebeat*

- Fix `httpjson` input page number initialization and documentation. {pull}33400[33400]
- Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789]
- Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995]
- Fix handling of empty array in httpjson input. {pull}32001[32001]
- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
- Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597]
- Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609]
- Fix Google workspace pagination and document ID generation. {pull}33666[33666]

*Heartbeat*
- Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723]
- Fix bug affecting let's encrypt and other users of cross-signed certs, where cert expiration was incorrectly calculated. {issue}33215[33215]
- Fix broken disable feature for kibana configured monitors. {pull}33293[33293]
- Fix states client support for output options. {pull}33405[33405]
- Fix states client reloader under managed mode. {pull}33405[33405]
- Fix bug where states.duration_ms was incorrect type. {pull}33563[33563]
- Fix handling of long UDP messages in UDP input. {issue}33836[33836] {pull}33837[33837]

*Auditbeat*


*Filebeat*


*Auditbeat*


Expand Down
5 changes: 3 additions & 2 deletions libbeat/reader/readfile/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package readfile
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -39,7 +40,7 @@ func BenchmarkEncoderReader(b *testing.B) {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
for bN := 0; bN < b.N; bN++ {
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit})
reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit, false})
if err != nil {
b.Fatal("failed to initialize reader:", err)
}
Expand All @@ -48,7 +49,7 @@ func BenchmarkEncoderReader(b *testing.B) {
for i := 0; ; i++ {
msg, err := reader.Next()
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
b.ReportMetric(float64(i), "processed_lines")
break
} else {
Expand Down
8 changes: 7 additions & 1 deletion libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ type Config struct {
BufferSize int
Terminator LineTerminator
MaxBytes int
// If CollectOnEOF is set to true (default false) the line reader will return the buffer if EOF reached: this
// will ensure full content including last line with no EOL will be returned for fully retrieved content that's
// not appended anymore between reads.
// If CollectOnEOF is set to false the line reader will return 0 content and keep the buffer at the current
// state of appending data after temporarily EOF.
CollectOnEOF bool
}

// New creates a new Encode reader from input reader by applying
// NewEncodeReader creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
Expand Down
106 changes: 79 additions & 27 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package readfile

import (
"bytes"
"errors"
"fmt"
"io"

Expand All @@ -33,18 +34,22 @@ const unlimited = 0
// LineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
// If collectOnEOF is set to true (default false) it will return the buffer if EOF reached.
// If collectOnEOF is set to false it will return 0 content and keep the buffer at the current
// state of appending data after temporarily EOF.
type LineReader struct {
reader io.ReadCloser
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
reader io.ReadCloser
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
nl []byte
decodedNl []byte
collectOnEOF bool
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
tempBuffer []byte
logger *logp.Logger
}

// NewLineReader creates a new reader object
Expand All @@ -63,15 +68,16 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
}

return &LineReader{
reader: input,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
reader: input,
maxBytes: config.MaxBytes,
decoder: config.Codec.NewDecoder(),
nl: nl,
decodedNl: terminator,
collectOnEOF: config.CollectOnEOF,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
tempBuffer: make([]byte, config.BufferSize),
logger: logp.NewLogger("reader_line"),
}, nil
}

Expand All @@ -88,9 +94,50 @@ func (r *LineReader) Next() (b []byte, n int, err error) {
// read next 'potential' line from input buffer/reader
err := r.advance()
if err != nil {
<<<<<<< HEAD
return nil, 0, err
}
=======
if errors.Is(err, io.EOF) && r.collectOnEOF {
// Found EOF and collectOnEOF is true
// -> decode input sequence into outBuffer
// let's take whole buffer len without len(nl) if it ends with it
end := r.inBuffer.Len()
if bytes.HasSuffix(r.inBuffer.Bytes(), r.decodedNl) {
end -= len(r.nl)
}

sz, err := r.decode(end)
if err != nil {
r.logger.Errorf("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = r.inBuffer.Len()
}

// Consume transformed bytes from input buffer
_ = r.inBuffer.Advance(sz)
r.inBuffer.Reset()

// output buffer contains untile EOF. Extract
// byte slice from buffer and reset output buffer.
bytes, err := r.outBuffer.Collect(r.outBuffer.Len())
r.outBuffer.Reset()
if err != nil {
// This should never happen as otherwise we have a broken state
panic(err)
}

// return and reset consumed bytes count
sz = r.byteCount
r.byteCount = 0
return bytes, sz, io.EOF
}

// return and reset consumed bytes count
sz := r.byteCount
r.byteCount = 0
return nil, sz, err
>>>>>>> 7b45320917 (handle EOF on single line content (#33568))
}
// Check last decoded byte really being newline also unencoded
// if not, continue reading
buf := r.outBuffer.Bytes()
Expand Down Expand Up @@ -141,13 +188,13 @@ func (r *LineReader) advance() error {
// Try to read more bytes into buffer
n, err := r.reader.Read(r.tempBuffer)

if err == io.EOF && n > 0 {
if errors.Is(err, io.EOF) && n > 0 {
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
err = nil
}

// Write to buffer also in case of err
r.inBuffer.Write(r.tempBuffer[:n])
_, _ = r.inBuffer.Write(r.tempBuffer[:n])

if err != nil {
return err
Expand All @@ -166,7 +213,7 @@ func (r *LineReader) advance() error {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
_ = r.inBuffer.Advance(idx + len(r.nl))
r.byteCount += idx + len(r.nl)
r.inBuffer.Reset()
r.inOffset = 0
Expand Down Expand Up @@ -234,8 +281,13 @@ func (r *LineReader) skipUntilNewLine() (int, error) {
idx = bytes.Index(r.tempBuffer[:n], r.nl)

if idx != -1 {
<<<<<<< HEAD
r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
skipped += idx
=======
_, _ = r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n])
skipped += idx + len(r.nl)
>>>>>>> 7b45320917 (handle EOF on single line content (#33568))
} else {
skipped += n
}
Expand Down Expand Up @@ -264,8 +316,8 @@ func (r *LineReader) decode(end int) (int, error) {
nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false)
if err != nil {
// Check if error is different from destination buffer too short
if err != transform.ErrShortDst {
r.outBuffer.Write(inBytes[0:end])
if !errors.Is(err, transform.ErrShortDst) {
_, _ = r.outBuffer.Write(inBytes[0:end])
start = end
break
}
Expand All @@ -275,7 +327,7 @@ func (r *LineReader) decode(end int) (int, error) {
}

start += nSrc
r.outBuffer.Write(r.tempBuffer[:nDst])
_, _ = r.outBuffer.Write(r.tempBuffer[:nDst])
}

r.byteCount += start
Expand Down
Loading

0 comments on commit 8ad19c9

Please sign in to comment.