Skip to content

Commit

Permalink
[Filebeat] Reduce memory usage of multiline (elastic#14068)
Browse files Browse the repository at this point in the history
The use of time.After when multiline is enabled and lines don't match
the multiline pattern increases the memory usage (from 30MB to 1GB).

This extra memory is attributed to unexpired timers allocated internally
by the Go runtime when time.After(duration) is used. According to the
docs: "If efficiency is a concern, use NewTimer instead and call Timer.Stop
if the timer is no longer needed.".

It's not clear to me why this problem only appears when many lines on
the input file don't match the pattern.
  • Loading branch information
adriansr authored Oct 15, 2019
1 parent f75a9f7 commit ce651e0
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix delay in enforcing close_renamed and close_removed options. {issue}13488[13488] {pull}13907[13907]
- Fix missing netflow fields in index template. {issue}13768[13768] {pull}13914[13914]
- Fix cisco module's asa and ftd filesets parsing of domain names where an IP address is expected. {issue}14034[14034]
- Fixed increased memory usage with large files when multiline pattern does not match. {issue}14068[14068]

*Heartbeat*

Expand Down
9 changes: 5 additions & 4 deletions libbeat/reader/readfile/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
errTimeout = errors.New("timeout")
)

// timeoutProcessor will signal some configurable timeout error if no
// TimeoutReader will signal some configurable timeout error if no
// new line can be returned in time.
type TimeoutReader struct {
reader reader.Reader
Expand All @@ -43,7 +43,7 @@ type lineMessage struct {
err error
}

// New returns a new timeout reader from an input line reader.
// NewTimeoutReader returns a new timeout reader from an input line reader.
func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *TimeoutReader {
if signal == nil {
signal = errTimeout
Expand Down Expand Up @@ -75,14 +75,15 @@ func (r *TimeoutReader) Next() (reader.Message, error) {
}
}()
}

timer := time.NewTimer(r.timeout)
select {
case msg := <-r.ch:
if msg.err != nil {
r.running = false
}
timer.Stop()
return msg.line, msg.err
case <-time.After(r.timeout):
case <-timer.C:
return reader.Message{}, r.signal
}
}

0 comments on commit ce651e0

Please sign in to comment.