|
| 1 | +// Licensed to Elasticsearch B.V. under one or more contributor |
| 2 | +// license agreements. See the NOTICE file distributed with |
| 3 | +// this work for additional information regarding copyright |
| 4 | +// ownership. Elasticsearch B.V. licenses this file to you under |
| 5 | +// the Apache License, Version 2.0 (the "License"); you may |
| 6 | +// not use this file except in compliance with the License. |
| 7 | +// You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +package diskqueue |
| 19 | + |
| 20 | +import ( |
| 21 | + "errors" |
| 22 | + "fmt" |
| 23 | + "path/filepath" |
| 24 | + |
| 25 | + "github.com/elastic/beats/v7/libbeat/common" |
| 26 | + "github.com/elastic/beats/v7/libbeat/common/cfgtype" |
| 27 | + "github.com/elastic/beats/v7/libbeat/paths" |
| 28 | + "github.com/elastic/beats/v7/libbeat/publisher/queue" |
| 29 | +) |
| 30 | + |
| 31 | +// Settings contains the configuration fields to create a new disk queue |
| 32 | +// or open an existing one. |
| 33 | +type Settings struct { |
| 34 | + // The path on disk of the queue's containing directory, which will be |
| 35 | + // created if it doesn't exist. Within the directory, the queue's state |
| 36 | + // is stored in state.dat and each segment's data is stored in |
| 37 | + // {segmentIndex}.seg |
| 38 | + // If blank, the default directory is "diskqueue" within the beat's data |
| 39 | + // directory. |
| 40 | + Path string |
| 41 | + |
| 42 | + // MaxBufferSize is the maximum number of bytes that the queue should |
| 43 | + // ever occupy on disk. A value of 0 means the queue can grow until the |
| 44 | + // disk is full (this is not recommended on a primary system disk). |
| 45 | + MaxBufferSize uint64 |
| 46 | + |
| 47 | + // MaxSegmentSize is the maximum number of bytes that should be written |
| 48 | + // to a single segment file before creating a new one. |
| 49 | + MaxSegmentSize uint64 |
| 50 | + |
| 51 | + // How many events will be read from disk while waiting for a consumer |
| 52 | + // request. |
| 53 | + ReadAheadLimit int |
| 54 | + |
| 55 | + // How many events will be queued in memory waiting to be written to disk. |
| 56 | + // This setting should rarely matter in practice, but if data is coming |
| 57 | + // in faster than it can be written to disk for an extended period, |
| 58 | + // this limit can keep it from overflowing memory. |
| 59 | + WriteAheadLimit int |
| 60 | + |
| 61 | + // A listener that should be sent ACKs when an event is successfully |
| 62 | + // written to disk. |
| 63 | + WriteToDiskListener queue.ACKListener |
| 64 | +} |
| 65 | + |
| 66 | +// userConfig holds the parameters for a disk queue that are configurable |
| 67 | +// by the end user in the beats yml file. |
| 68 | +type userConfig struct { |
| 69 | + Path string `config:"path"` |
| 70 | + MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"` |
| 71 | + SegmentSize *cfgtype.ByteSize `config:"segment_size"` |
| 72 | + ReadAheadLimit *int `config:"read_ahead"` |
| 73 | + WriteAheadLimit *int `config:"write_ahead"` |
| 74 | +} |
| 75 | + |
| 76 | +func (c *userConfig) Validate() error { |
| 77 | + // If the segment size is explicitly specified, the total queue size must |
| 78 | + // be at least twice as large. |
| 79 | + if c.SegmentSize != nil && c.MaxSize != 0 && c.MaxSize < *c.SegmentSize*2 { |
| 80 | + return errors.New( |
| 81 | + "Disk queue max_size must be at least twice as big as segment_size") |
| 82 | + } |
| 83 | + |
| 84 | + // We require a total queue size of at least 10MB, and a segment size of |
| 85 | + // at least 1MB. The queue can support lower thresholds, but it will perform |
| 86 | + // terribly, so we give an explicit error in that case. |
| 87 | + // These bounds are still extremely low for Beats ingestion, but if all you |
| 88 | + // need is for a low-volume stream on a tiny device to persist between |
| 89 | + // restarts, it will work fine. |
| 90 | + if c.MaxSize != 0 && c.MaxSize < 10*1000*1000 { |
| 91 | + return fmt.Errorf( |
| 92 | + "Disk queue max_size (%d) cannot be less than 10MB", c.MaxSize) |
| 93 | + } |
| 94 | + if c.SegmentSize != nil && *c.SegmentSize < 1000*1000 { |
| 95 | + return fmt.Errorf( |
| 96 | + "Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize) |
| 97 | + } |
| 98 | + |
| 99 | + return nil |
| 100 | +} |
| 101 | + |
| 102 | +// DefaultSettings returns a Settings object with reasonable default values |
| 103 | +// for all important fields. |
| 104 | +func DefaultSettings() Settings { |
| 105 | + return Settings{ |
| 106 | + MaxSegmentSize: 100 * (1 << 20), // 100MiB |
| 107 | + MaxBufferSize: (1 << 30), // 1GiB |
| 108 | + |
| 109 | + ReadAheadLimit: 256, |
| 110 | + WriteAheadLimit: 1024, |
| 111 | + } |
| 112 | +} |
| 113 | + |
| 114 | +// SettingsForUserConfig returns a Settings struct initialized with the |
| 115 | +// end-user-configurable settings in the given config tree. |
| 116 | +func SettingsForUserConfig(config *common.Config) (Settings, error) { |
| 117 | + userConfig := userConfig{} |
| 118 | + if err := config.Unpack(&userConfig); err != nil { |
| 119 | + return Settings{}, fmt.Errorf("parsing user config: %w", err) |
| 120 | + } |
| 121 | + settings := DefaultSettings() |
| 122 | + settings.Path = userConfig.Path |
| 123 | + |
| 124 | + settings.MaxBufferSize = uint64(userConfig.MaxSize) |
| 125 | + if userConfig.SegmentSize != nil { |
| 126 | + settings.MaxSegmentSize = uint64(*userConfig.SegmentSize) |
| 127 | + } else { |
| 128 | + // If no value is specified, default segment size is total queue size |
| 129 | + // divided by 10. |
| 130 | + settings.MaxSegmentSize = uint64(userConfig.MaxSize) / 10 |
| 131 | + } |
| 132 | + return settings, nil |
| 133 | +} |
| 134 | + |
| 135 | +// |
| 136 | +// bookkeeping helpers |
| 137 | +// |
| 138 | + |
| 139 | +func (settings Settings) directoryPath() string { |
| 140 | + if settings.Path == "" { |
| 141 | + return paths.Resolve(paths.Data, "diskqueue") |
| 142 | + } |
| 143 | + return settings.Path |
| 144 | +} |
| 145 | + |
| 146 | +func (settings Settings) stateFilePath() string { |
| 147 | + return filepath.Join(settings.directoryPath(), "state.dat") |
| 148 | +} |
| 149 | + |
| 150 | +func (settings Settings) segmentPath(segmentID segmentID) string { |
| 151 | + return filepath.Join( |
| 152 | + settings.directoryPath(), |
| 153 | + fmt.Sprintf("%v.seg", segmentID)) |
| 154 | +} |
| 155 | + |
| 156 | +func (settings Settings) maxSegmentOffset() segmentOffset { |
| 157 | + return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize) |
| 158 | +} |
0 commit comments