Skip to content

Commit 365ccdd

Browse files
committed
[libbeat] Disk queue implementation (elastic#21176)
Initial implementation of the new libbeat disk queue (cherry picked from commit 2b8fd7c)
1 parent 1c94c6d commit 365ccdd

17 files changed

+2600
-0
lines changed

libbeat/publisher/includes/includes.go

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
_ "github.com/elastic/beats/v7/libbeat/outputs/kafka"
2828
_ "github.com/elastic/beats/v7/libbeat/outputs/logstash"
2929
_ "github.com/elastic/beats/v7/libbeat/outputs/redis"
30+
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue"
3031
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
3132
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/spool"
3233
)
+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package diskqueue
19+
20+
import (
21+
"os"
22+
"sync"
23+
24+
"github.com/elastic/beats/v7/libbeat/logp"
25+
)
26+
27+
// queuePosition represents a logical position within the queue buffer.
28+
type queuePosition struct {
29+
segmentID segmentID
30+
offset segmentOffset
31+
}
32+
33+
type diskQueueACKs struct {
34+
logger *logp.Logger
35+
36+
// This lock must be held to access diskQueueACKs fields (except for
37+
// diskQueueACKs.done, which is always safe).
38+
lock sync.Mutex
39+
40+
// The id and position of the first unacknowledged frame.
41+
nextFrameID frameID
42+
nextPosition queuePosition
43+
44+
// If a frame has been ACKed, then frameSize[frameID] contains its size on
45+
// disk. The size is used to track the queuePosition of the oldest
46+
// remaining frame, which is written to disk as ACKs are received. (We do
47+
// this to avoid duplicating events if the beat terminates without a clean
48+
// shutdown.)
49+
frameSize map[frameID]uint64
50+
51+
// segmentBoundaries maps the first frameID of each segment to its
52+
// corresponding segment ID.
53+
segmentBoundaries map[frameID]segmentID
54+
55+
// When a segment has been completely acknowledged by a consumer, it sends
56+
// the segment ID to this channel, where it is read by the core loop and
57+
// scheduled for deletion.
58+
segmentACKChan chan segmentID
59+
60+
// An open writable file handle to the file that stores the queue position.
61+
// This position is advanced as we receive ACKs, confirming it is safe
62+
// to move forward, so the acking code is responsible for updating this
63+
// file.
64+
positionFile *os.File
65+
66+
// When the queue is closed, diskQueueACKs.done is closed to signal that
67+
// the core loop will not accept any more acked segments and any future
68+
// ACKs should be ignored.
69+
done chan struct{}
70+
}
71+
72+
func newDiskQueueACKs(
73+
logger *logp.Logger, position queuePosition, positionFile *os.File,
74+
) *diskQueueACKs {
75+
return &diskQueueACKs{
76+
logger: logger,
77+
nextFrameID: 0,
78+
nextPosition: position,
79+
frameSize: make(map[frameID]uint64),
80+
segmentBoundaries: make(map[frameID]segmentID),
81+
segmentACKChan: make(chan segmentID),
82+
positionFile: positionFile,
83+
done: make(chan struct{}),
84+
}
85+
}
86+
87+
func (dqa *diskQueueACKs) addFrames(frames []*readFrame) {
88+
dqa.lock.Lock()
89+
defer dqa.lock.Unlock()
90+
select {
91+
case <-dqa.done:
92+
// We are already done and should ignore any leftover ACKs we receive.
93+
return
94+
default:
95+
}
96+
for _, frame := range frames {
97+
segment := frame.segment
98+
if frame.id != 0 && frame.id == segment.firstFrameID {
99+
// This is the first frame in its segment, mark it so we know when
100+
// we're starting a new segment.
101+
//
102+
// Subtlety: we don't count the very first frame as a "boundary" even
103+
// though it is the first frame we read from its segment. This prevents
104+
// us from resetting our segment offset to zero, in case the initial
105+
// offset was restored from a previous session instead of starting at
106+
// the beginning of the first file.
107+
dqa.segmentBoundaries[frame.id] = segment.id
108+
}
109+
dqa.frameSize[frame.id] = frame.bytesOnDisk
110+
}
111+
oldSegmentID := dqa.nextPosition.segmentID
112+
if dqa.frameSize[dqa.nextFrameID] != 0 {
113+
for ; dqa.frameSize[dqa.nextFrameID] != 0; dqa.nextFrameID++ {
114+
newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID]
115+
if ok {
116+
// This is the start of a new segment. Remove this frame from the
117+
// segment boundary list and set the position to the start of the
118+
// new segment.
119+
delete(dqa.segmentBoundaries, dqa.nextFrameID)
120+
dqa.nextPosition = queuePosition{
121+
segmentID: newSegment,
122+
offset: 0,
123+
}
124+
}
125+
dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID])
126+
delete(dqa.frameSize, dqa.nextFrameID)
127+
}
128+
// We advanced the ACK position at least somewhat, so write its
129+
// new value.
130+
err := writeQueuePositionToHandle(dqa.positionFile, dqa.nextPosition)
131+
if err != nil {
132+
// TODO: Don't spam this warning on every ACK if it's a permanent error.
133+
dqa.logger.Warnf("Couldn't save queue position: %v", err)
134+
}
135+
}
136+
if oldSegmentID != dqa.nextPosition.segmentID {
137+
// We crossed at least one segment boundary, inform the listener that
138+
// everything before the current segment has been acknowledged (but bail
139+
// out if our done channel has been closed, since that means there is no
140+
// listener on the other end.)
141+
select {
142+
case dqa.segmentACKChan <- dqa.nextPosition.segmentID - 1:
143+
case <-dqa.done:
144+
}
145+
}
146+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package diskqueue
19+
20+
import (
21+
"encoding/binary"
22+
"hash/crc32"
23+
)
24+
25+
// Computes the checksum that should be written / read in a frame footer
26+
// based on the raw content of that frame (excluding header / footer).
27+
func computeChecksum(data []byte) uint32 {
28+
hash := crc32.NewIEEE()
29+
frameLength := uint32(len(data) + frameMetadataSize)
30+
binary.Write(hash, binary.LittleEndian, &frameLength)
31+
hash.Write(data)
32+
return hash.Sum32()
33+
}
+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package diskqueue
19+
20+
import (
21+
"errors"
22+
"fmt"
23+
"path/filepath"
24+
25+
"github.com/elastic/beats/v7/libbeat/common"
26+
"github.com/elastic/beats/v7/libbeat/common/cfgtype"
27+
"github.com/elastic/beats/v7/libbeat/paths"
28+
"github.com/elastic/beats/v7/libbeat/publisher/queue"
29+
)
30+
31+
// Settings contains the configuration fields to create a new disk queue
32+
// or open an existing one.
33+
type Settings struct {
34+
// The path on disk of the queue's containing directory, which will be
35+
// created if it doesn't exist. Within the directory, the queue's state
36+
// is stored in state.dat and each segment's data is stored in
37+
// {segmentIndex}.seg
38+
// If blank, the default directory is "diskqueue" within the beat's data
39+
// directory.
40+
Path string
41+
42+
// MaxBufferSize is the maximum number of bytes that the queue should
43+
// ever occupy on disk. A value of 0 means the queue can grow until the
44+
// disk is full (this is not recommended on a primary system disk).
45+
MaxBufferSize uint64
46+
47+
// MaxSegmentSize is the maximum number of bytes that should be written
48+
// to a single segment file before creating a new one.
49+
MaxSegmentSize uint64
50+
51+
// How many events will be read from disk while waiting for a consumer
52+
// request.
53+
ReadAheadLimit int
54+
55+
// How many events will be queued in memory waiting to be written to disk.
56+
// This setting should rarely matter in practice, but if data is coming
57+
// in faster than it can be written to disk for an extended period,
58+
// this limit can keep it from overflowing memory.
59+
WriteAheadLimit int
60+
61+
// A listener that should be sent ACKs when an event is successfully
62+
// written to disk.
63+
WriteToDiskListener queue.ACKListener
64+
}
65+
66+
// userConfig holds the parameters for a disk queue that are configurable
67+
// by the end user in the beats yml file.
68+
type userConfig struct {
69+
Path string `config:"path"`
70+
MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
71+
SegmentSize *cfgtype.ByteSize `config:"segment_size"`
72+
ReadAheadLimit *int `config:"read_ahead"`
73+
WriteAheadLimit *int `config:"write_ahead"`
74+
}
75+
76+
func (c *userConfig) Validate() error {
77+
// If the segment size is explicitly specified, the total queue size must
78+
// be at least twice as large.
79+
if c.SegmentSize != nil && c.MaxSize != 0 && c.MaxSize < *c.SegmentSize*2 {
80+
return errors.New(
81+
"Disk queue max_size must be at least twice as big as segment_size")
82+
}
83+
84+
// We require a total queue size of at least 10MB, and a segment size of
85+
// at least 1MB. The queue can support lower thresholds, but it will perform
86+
// terribly, so we give an explicit error in that case.
87+
// These bounds are still extremely low for Beats ingestion, but if all you
88+
// need is for a low-volume stream on a tiny device to persist between
89+
// restarts, it will work fine.
90+
if c.MaxSize != 0 && c.MaxSize < 10*1000*1000 {
91+
return fmt.Errorf(
92+
"Disk queue max_size (%d) cannot be less than 10MB", c.MaxSize)
93+
}
94+
if c.SegmentSize != nil && *c.SegmentSize < 1000*1000 {
95+
return fmt.Errorf(
96+
"Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize)
97+
}
98+
99+
return nil
100+
}
101+
102+
// DefaultSettings returns a Settings object with reasonable default values
103+
// for all important fields.
104+
func DefaultSettings() Settings {
105+
return Settings{
106+
MaxSegmentSize: 100 * (1 << 20), // 100MiB
107+
MaxBufferSize: (1 << 30), // 1GiB
108+
109+
ReadAheadLimit: 256,
110+
WriteAheadLimit: 1024,
111+
}
112+
}
113+
114+
// SettingsForUserConfig returns a Settings struct initialized with the
115+
// end-user-configurable settings in the given config tree.
116+
func SettingsForUserConfig(config *common.Config) (Settings, error) {
117+
userConfig := userConfig{}
118+
if err := config.Unpack(&userConfig); err != nil {
119+
return Settings{}, fmt.Errorf("parsing user config: %w", err)
120+
}
121+
settings := DefaultSettings()
122+
settings.Path = userConfig.Path
123+
124+
settings.MaxBufferSize = uint64(userConfig.MaxSize)
125+
if userConfig.SegmentSize != nil {
126+
settings.MaxSegmentSize = uint64(*userConfig.SegmentSize)
127+
} else {
128+
// If no value is specified, default segment size is total queue size
129+
// divided by 10.
130+
settings.MaxSegmentSize = uint64(userConfig.MaxSize) / 10
131+
}
132+
return settings, nil
133+
}
134+
135+
//
136+
// bookkeeping helpers
137+
//
138+
139+
func (settings Settings) directoryPath() string {
140+
if settings.Path == "" {
141+
return paths.Resolve(paths.Data, "diskqueue")
142+
}
143+
return settings.Path
144+
}
145+
146+
func (settings Settings) stateFilePath() string {
147+
return filepath.Join(settings.directoryPath(), "state.dat")
148+
}
149+
150+
func (settings Settings) segmentPath(segmentID segmentID) string {
151+
return filepath.Join(
152+
settings.directoryPath(),
153+
fmt.Sprintf("%v.seg", segmentID))
154+
}
155+
156+
func (settings Settings) maxSegmentOffset() segmentOffset {
157+
return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize)
158+
}

0 commit comments

Comments
 (0)