From fa87c389d25162639dcc8b39551ace3fbaa879ce Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 18 Jun 2020 02:52:04 +0200 Subject: [PATCH 1/3] Introduce skeleton of memlog store This changes introduces the skeleton and documentation of the memlog store. The addition of the statestore package is split up into multiple changeset to ease review. The final version of the package can be found [here](https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore). Once finalized, the libbeat/statestore package contains: - The statestore frontend and interface for use within Beats - Interfaces for the store backend - A common set of tests store backends need to support - a storetest package for testing new features that require a store. The testing helpers use map[string]interface{} that can be initialized or queried after the test run for validation purposes. - The default memlog backend + tests This change introduces the skeleton and internal documentation of the memlog store for review and discussion, but not the full implementation yet to ease review. The final implementation and unit tests will be added later. The file doc.go documents how the final implementation works. --- .../statestore/backend/memlog/diskstore.go | 169 ++++++++++++++++++ libbeat/statestore/backend/memlog/doc.go | 95 ++++++++++ libbeat/statestore/backend/memlog/error.go | 27 +++ libbeat/statestore/backend/memlog/memlog.go | 131 ++++++++++++++ libbeat/statestore/backend/memlog/op.go | 46 +++++ libbeat/statestore/backend/memlog/store.go | 100 +++++++++++ libbeat/statestore/backend/memlog/util.go | 48 +++++ .../statestore/backend/memlog/util_darwin.go | 46 +++++ .../statestore/backend/memlog/util_other.go | 36 ++++ .../statestore/backend/memlog/util_windows.go | 26 +++ 10 files changed, 724 insertions(+) create mode 100644 libbeat/statestore/backend/memlog/diskstore.go create mode 100644 libbeat/statestore/backend/memlog/doc.go create mode 100644 libbeat/statestore/backend/memlog/error.go create mode 100644 libbeat/statestore/backend/memlog/memlog.go create mode 100644 libbeat/statestore/backend/memlog/op.go create mode 100644 libbeat/statestore/backend/memlog/store.go create mode 100644 libbeat/statestore/backend/memlog/util.go create mode 100644 libbeat/statestore/backend/memlog/util_darwin.go create mode 100644 libbeat/statestore/backend/memlog/util_other.go create mode 100644 libbeat/statestore/backend/memlog/util_windows.go diff --git a/libbeat/statestore/backend/memlog/diskstore.go b/libbeat/statestore/backend/memlog/diskstore.go new file mode 100644 index 000000000000..74389efc03a2 --- /dev/null +++ b/libbeat/statestore/backend/memlog/diskstore.go @@ -0,0 +1,169 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "bufio" + "os" + "path/filepath" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// diskstore manages the on-disk state of the memlog store. +type diskstore struct { + log *logp.Logger + + // store configuration + checkpointPred CheckpointPredicate + fileMode os.FileMode + bufferSize int + + // on disk file tracking information + home string // home path of the store + logFileName string // current log file + dataFiles []dataFileInfo // set of data files found + + // txid is the sequential counter that tracks + // all updates to the store. The txid is added to operation being logged + // used as name for the data files. + txid uint64 + + // log file access. The log file is updated using an in memory write buffer. + logFile *os.File + logBuf *bufio.Writer + + // internal state and metrics + logFileSize uint64 + logEntries uint + logInvalid bool + logNeedsTruncate bool +} + +// dataFileInfo is used to track and sort on disk data files. +// We should have only one data file on disk, but in case delete operations +// have failed or not finished dataFileInfo is used to detect the ordering. +type dataFileInfo struct { + path string + txid uint64 +} + +// storeEntry is used to write entries to the checkpoint file only. +type storeEntry struct { + Key string `struct:"_key"` + Fields common.MapStr `struct:",inline"` +} + +// storeMeta is read from the meta file. +type storeMeta struct { + Version string `struct:"version"` +} + +// logAction is prepended to each operation logged to the update file. +// It contains the update ID, a sequential counter to track correctness, +// and the action name. +type logAction struct { + Op string `json:"op"` + ID uint64 `json:"id"` +} + +const ( + logFileName = "log.json" + metaFileName = "meta.json" + + storeVersion = "1" + + keyField = "_key" +) + +// newDiskStorek initializes the disk store stucture only. The store must have +// been opened already. It tries to open the update log file for append +// operations. If opening the update log file fails, it is marked as +// 'corrupted', triggering a checkpoint operation on the first update to the store. +func newDiskStore( + log *logp.Logger, + home string, + dataFiles []dataFileInfo, + txid uint64, + mode os.FileMode, + entries uint, + logInvalid bool, + bufferSize uint, + checkpointPred CheckpointPredicate, +) *diskstore { + s := &diskstore{ + log: log.With("path", home), + home: home, + logFileName: filepath.Join(home, logFileName), + dataFiles: dataFiles, + txid: txid, + fileMode: mode, + bufferSize: int(bufferSize), + logFile: nil, + logBuf: nil, + logEntries: entries, + logInvalid: logInvalid, + logNeedsTruncate: false, // only truncate on next checkpoint + checkpointPred: checkpointPred, + } + + _ = s.tryOpenLog() + return s +} + +// tryOpenLog access the update log. The log file is truncated if a checkpoint operation has been +// executed last. +// The log file is marked as invalid if opening it failed. This will trigger a checkpoint operation +// and another call to tryOpenLog in the future. +func (s *diskstore) tryOpenLog() error { + panic("TODO: implement me") +} + +// mustCheckpoint returns true if the store is required to execute a checkpoint +// operation, either by predicate or by some internal state detecting a problem +// with the log file. +func (s *diskstore) mustCheckpoint() bool { + return s.logInvalid || s.checkpointPred(s.logFileSize) +} + +func (s *diskstore) Close() error { + panic("TODO: implement me") +} + +// log operation adds another entry to the update log file. +// The log file is marked as invalid if the write fails. This will trigger a +// checkpoint operation in the future. +func (s *diskstore) LogOperation(op op) error { + panic("TODO: implement me") +} + +// WriteCheckpoint serializes all state into a json file. The file contains an +// array with all states known to the memory storage. +// WriteCheckpoint first serializes all state to a temporary file, and finally +// replaces move the temporary data file into the correct location. No files +// are overwritten or replaced. Instead the change sequence number is used for +// the filename, and older data files will be deleleted after success. +// +// The active marker file is overwritten after all updates did succeed. The +// marker file contains the filename of the current valid data-file. +// NOTE: due to limitation on some Operating system or file systems, the active +// marker is not a symlink, but an actual file. +func (s *diskstore) WriteCheckpoint(state map[string]entry) error { + panic("TODO: implement me") +} diff --git a/libbeat/statestore/backend/memlog/doc.go b/libbeat/statestore/backend/memlog/doc.go new file mode 100644 index 000000000000..0facef20456e --- /dev/null +++ b/libbeat/statestore/backend/memlog/doc.go @@ -0,0 +1,95 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package memlog implements the memlog statestore backend. +// The store provided by memlog is a in-memory key-value store +// that logs all operations to an append only log file. +// Once the log file is considered full the store executes a checkpoint +// operation. The checkpoint operation serializes all state to a data file. +// +// The memory store in memlog holds all key-value pairs in a hashtable, with +// value represented by map[string]interface{}. As the store must be 'valid' +// based on the state of the last update operations (Set, Remove), it +// guarantees that no references into data structures passed via Set are held. +// Instead structured data is serialized/deserialized into a +// map[string]interface{}. The serialized states contain only primitive types +// like intX, uintX, float, bool, string, slices, or map[string]interface{} +// itself. As a side effect this also guarantees that the internal can always +// be serialized to disk after updating the in memory representation. +// +// On disk we have a meta file, an update log file, data files, and an active +// marker file in the store directory. +// +// The meta file only contains the store version number. +// +// Normally all operations that update the store in memory state are appended +// to the update log file. +// The file stores all entries in JSON format. Each entry starts with an action +// entry, followed by an data entry. +// The action entry has the schema: `{"op": "", id: }`. Supporter +// operations are 'set' or 'remove'. The `id` contains a sequential counter +// that must always be increased by 1. +// The data entry for the 'set' operation has the format: `{"K": "", "V": { ... }}`. +// The data entry for the 'remove' operation has the format: `{"K": ""}`. +// Updates to the log file are not synced to disk. Having all updates available +// between restarts/crashes also depends on the capabilities of the operation +// system and file system. When opening the store we read up until it is +// possible, reconstructing a last known valid state the beat can continue +// from. This can lead to duplicates if the machine/filesystem has had an +// outage with state not yet fully synchronised to disk. Ordinary restarts +// should not lead to any problems. +// If any error is encountered when reading the log file, the next updates to the store +// will trigger a checkpoint operation and reset the log file. +// +// The store might contain multiple data files, but only the last data file is +// supposed to be valid. Older data files will continiously tried to be cleaned up +// on checkpoint operations. +// The data files filenames do include the change sequence number. Which allows +// us to sort them by name. The checkpoint operation of memlog, writes the full state +// into a new data file, that consists of an JSON array with all known key-value pairs. +// All fields of an entry are store top-level in the object. The `_key` field +// is used to store the entries key. +// NOTE: Creating a new file guarantees that Beats can progress when creating a +// new checkpoint file. Some filesystems tend to block the +// delete/replace operation when the file is accessed by another process +// (e.g. common problem with AV Scanners on Windows). By creating a new +// file we circumvent this problem. Failures in deleting old files is +// ok, and we will try to delete old data files again in the future. +// +// The active marker file is not really used by the store. It is written for +// debugging purposes and contains the filepath of the last written data file +// that is supposed to be valid. +// +// When opening the store we first validate the meta file and read the "last" +// data file into the in-memory hashtable. Older data files are ignored. The +// filename with the update sequence number is used to sort data files. +// NOTE: the active marker file is not used, as the checkpoint operation is +// supposed to be an atomic operation that is finalized once the data +// file is moved to its correct location. +// +// After loading the data file we loop over all operations in the log file. +// Operations with a smaller sequence number are ignored when iterating the log +// file. If any subsequent entries in the log file have a sequence number difference != +// 1, we assume the log file to be corrupted and stop the loop. All processing +// continues from the last known accumulated state. +// +// When closing the store we make a last attempt at fsyncing the log file (just +// in case), close the log file and clear all in memory state. +// +// The store provided by memlog is threadsafe and uses a RWMutex. We allow only +// one active writer, but multiple concurrent readers. +package memlog diff --git a/libbeat/statestore/backend/memlog/error.go b/libbeat/statestore/backend/memlog/error.go new file mode 100644 index 000000000000..29f665bcacf5 --- /dev/null +++ b/libbeat/statestore/backend/memlog/error.go @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import "errors" + +var ( + errRegClosed = errors.New("registry has been closed") + errLogInvalid = errors.New("can not add operation to log file, a checkpoint is required") + errTxIDInvalid = errors.New("invalid update sequence number") + errKeyUnknown = errors.New("key unknown") +) diff --git a/libbeat/statestore/backend/memlog/memlog.go b/libbeat/statestore/backend/memlog/memlog.go new file mode 100644 index 000000000000..265b53a3df1c --- /dev/null +++ b/libbeat/statestore/backend/memlog/memlog.go @@ -0,0 +1,131 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + "path/filepath" + "sync" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore/backend" +) + +// Registry configures access to memlog based stores. +type Registry struct { + log *logp.Logger + + mu sync.Mutex + active bool + + settings Settings + + wg sync.WaitGroup +} + +// Settings configures a new Registry. +type Settings struct { + // Registry root directory. Stores will be single sub-directories. + Root string + + // FileMode is used to configure the file mode for new files generated by the + // regisry. File mode 0600 will be used if this field is not set. + FileMode os.FileMode + + // BufferSize configures the IO buffer size when accessing the underlying + // storage files. Defaults to 4096 if not set. + BufferSize uint + + // Checkpoint predicate that can trigger a registry file rotation. If not + // configured, memlog will automatically trigger a checkpoint every 10MB. + Checkpoint CheckpointPredicate +} + +// CheckpointPredicate is the type for configurable checkpoint checks. +// The store executes a checkpoint operation when the predicate returns true. +type CheckpointPredicate func(fileSize uint64) bool + +const defaultFileMode os.FileMode = 0600 + +const defaultBufferSize = 4096 + +func defaultCheckpoint(filesize uint64) bool { + const limit = 10 * 1 << 20 // set rotation limit to 10MB by default + return filesize >= limit +} + +// New configures a memlog Registry that can be used to open stores. +func New(log *logp.Logger, settings Settings) (*Registry, error) { + if settings.FileMode == 0 { + settings.FileMode = defaultFileMode + } + if settings.Checkpoint == nil { + settings.Checkpoint = defaultCheckpoint + } + if settings.BufferSize == 0 { + settings.BufferSize = defaultBufferSize + } + + root, err := filepath.Abs(settings.Root) + if err != nil { + return nil, err + } + + settings.Root = root + return &Registry{ + log: log, + active: true, + settings: settings, + }, nil +} + +// Access creates or opens a new store. A new sub-directory for the store if +// created, if the store does not exist. +// Returns an error is any file access fails. +func (r *Registry) Access(name string) (backend.Store, error) { + r.mu.Lock() + defer r.mu.Unlock() + + if !r.active { + return nil, errRegClosed + } + + logger := r.log.With("store", name) + + home := filepath.Join(r.settings.Root, name) + fileMode := r.settings.FileMode + bufSz := r.settings.BufferSize + store, err := openStore(logger, home, fileMode, bufSz, r.settings.Checkpoint) + if err != nil { + return nil, err + } + + return store, nil +} + +// Close closes the registry. No new store can be accessed during close. +// Close blocks until all stores have been closed. +func (r *Registry) Close() error { + r.mu.Lock() + r.active = false + r.mu.Unlock() + + // block until all stores have been closed + r.wg.Wait() + return nil +} diff --git a/libbeat/statestore/backend/memlog/op.go b/libbeat/statestore/backend/memlog/op.go new file mode 100644 index 000000000000..d30ce9757509 --- /dev/null +++ b/libbeat/statestore/backend/memlog/op.go @@ -0,0 +1,46 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import "github.com/elastic/beats/v7/libbeat/common" + +type ( + op interface { + name() string + } + + // opSet encodes the 'Set' operations in the update log. + opSet struct { + K string + V common.MapStr + } + + // opRemove encodes the 'Remove' operation in the update log. + opRemove struct { + K string + } +) + +// operation type names +const ( + opValSet = "set" + opValRemove = "remove" +) + +func (*opSet) name() string { return opValSet } +func (*opRemove) name() string { return opValRemove } diff --git a/libbeat/statestore/backend/memlog/store.go b/libbeat/statestore/backend/memlog/store.go new file mode 100644 index 000000000000..9fcef7032ec4 --- /dev/null +++ b/libbeat/statestore/backend/memlog/store.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + "sync" + + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore/backend" +) + +// store implements an actual memlog based store. +// It holds all key value pairs in memory in a memstore struct. +// All changes to the memstore are logged to the diskstore. +// The store execute a checkpoint operation if the checkpoint predicate +// triggers the operation, or if some error in the update log file has been +// detected by the diskstore. +// +// The store allows only one writer, but multiple concurrent readers. +type store struct { + lock sync.RWMutex + disk *diskstore + mem memstore +} + +// memstore is the in memory key value store +type memstore struct { + table map[string]entry +} + +type entry struct { + value map[string]interface{} +} + +// openStore opens a store from the home path. +// The directory and intermediate directories will be created if it does not exist. +// The open routine loads the full key-value store into memory by first reading the data file and finally applying all outstanding updates +// from the update log file. +// If an error in in the log file is detected, the store opening routine continues from the last known valid state and will trigger a checkpoint +// operation on subsequent writes, also truncating the log file. +// Old data files are scheduled for deletion later. +func openStore(log *logp.Logger, home string, mode os.FileMode, bufSz uint, checkpoint CheckpointPredicate) (*store, error) { + panic("TODO: implement me") +} + +// Close closes access to the update log file and clears the in memory key +// value store. Access to the store after close can lead to a panic. +func (s *store) Close() error { + panic("TODO: implement me") +} + +// Has checks if the key is known. The in memory store does not report any +// errors. +func (s *store) Has(key string) (bool, error) { + panic("TODO: implement me") +} + +// Get retrieves and decodes the key-value pair into to. +func (s *store) Get(key string, to interface{}) error { + panic("TODO: implement me") +} + +// Set inserts or overwrites a key-value pair. +// If encoding was successful the in-memory state will be updated and a +// set-operation is logged to the diskstore. +func (s *store) Set(key string, value interface{}) error { + panic("TODO: implement me") +} + +// Remove removes a key from the in memory store and logs a remove operation to +// the diskstore. The operation does not check if the key exists. +func (s *store) Remove(key string) error { + panic("TODO: implement me") +} + +// Each iterates over all key-value pairs in the store. +func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { + panic("TODO: implement me") +} + +func (e entry) Decode(to interface{}) error { + return typeconv.Convert(to, e.value) +} diff --git a/libbeat/statestore/backend/memlog/util.go b/libbeat/statestore/backend/memlog/util.go new file mode 100644 index 000000000000..f4fdb8956e82 --- /dev/null +++ b/libbeat/statestore/backend/memlog/util.go @@ -0,0 +1,48 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + "syscall" +) + +// isTxIDLessEqual compares two IDs by checking that their distance is < 2^63. +// It always returns true if +// - a == b +// - a < b (mod 2^63) +// - b > a after an integer rollover that is still within the distance of <2^63-1 +func isTxIDLessEqual(a, b uint64) bool { + return int64(a-b) <= 0 +} + +func isRetryErr(err error) bool { + return err == syscall.EINTR || err == syscall.EAGAIN +} + +// trySync provides a best-effort fsync on path (directory). The fsync is required by some +// filesystems, so to update the parents directory metadata to actually +// contain the new file being rotated in. +func trySyncPath(path string) { + f, err := os.Open(path) + if err != nil { + return // ignore error, sync on dir must not be necessarily supported by the FS + } + defer f.Close() + syncFile(f) +} diff --git a/libbeat/statestore/backend/memlog/util_darwin.go b/libbeat/statestore/backend/memlog/util_darwin.go new file mode 100644 index 000000000000..1aaf64ab42a8 --- /dev/null +++ b/libbeat/statestore/backend/memlog/util_darwin.go @@ -0,0 +1,46 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + + "golang.org/x/sys/unix" +) + +// syncFile implements the fsync operation for darwin. On darwin fsync is not +// reliable, instead the fcntl syscall with F_FULLFSYNC must be used. +func syncFile(f *os.File) error { + for { + _, err := unix.FcntlInt(f.Fd(), unix.F_FULLFSYNC, 0) + err = normalizeIOError(err) + if err == nil || isIOError(err) { + return err + } + + if isRetryErr(err) { + continue + } + + err = f.Sync() + if isRetryErr(err) { + continue + } + return err + } +} diff --git a/libbeat/statestore/backend/memlog/util_other.go b/libbeat/statestore/backend/memlog/util_other.go new file mode 100644 index 000000000000..f975924853a5 --- /dev/null +++ b/libbeat/statestore/backend/memlog/util_other.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build linux dragonfly freebsd netbsd openbsd solaris + +package memlog + +import ( + "os" +) + +// syncFile implements the fsync operation for most *nix systems. +// The call is retried if EINTR or EAGAIN is returned. +func syncFile(f *os.File) error { + // best effort. + for { + err := f.Sync() + if err == nil || !isRetryErr(err) { + return err + } + } +} diff --git a/libbeat/statestore/backend/memlog/util_windows.go b/libbeat/statestore/backend/memlog/util_windows.go new file mode 100644 index 000000000000..a59ce3c1ef39 --- /dev/null +++ b/libbeat/statestore/backend/memlog/util_windows.go @@ -0,0 +1,26 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import "os" + +// syncFile implements the fsync operation for Windows. Internally +// FlushFileBuffers will be used. +func syncFile(f *os.File) error { + return f.Sync() // stdlib already uses FlushFileBuffes, yay +} From 72e0b6d5b08105c633eea4ac3b32b1466a4e11e7 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Tue, 23 Jun 2020 20:15:05 +0200 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Andrew Kroh --- libbeat/statestore/backend/memlog/util_windows.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/statestore/backend/memlog/util_windows.go b/libbeat/statestore/backend/memlog/util_windows.go index a59ce3c1ef39..ce0d1080b724 100644 --- a/libbeat/statestore/backend/memlog/util_windows.go +++ b/libbeat/statestore/backend/memlog/util_windows.go @@ -22,5 +22,5 @@ import "os" // syncFile implements the fsync operation for Windows. Internally // FlushFileBuffers will be used. func syncFile(f *os.File) error { - return f.Sync() // stdlib already uses FlushFileBuffes, yay + return f.Sync() // stdlib already uses FlushFileBuffers, yay } From 872eba919ecb5d15123f4b66d3454f2770d0ffce Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 25 Jun 2020 17:14:27 +0200 Subject: [PATCH 3/3] memory review --- libbeat/statestore/backend/memlog/diskstore.go | 10 +++++----- libbeat/statestore/backend/memlog/doc.go | 10 ++++++---- libbeat/statestore/backend/memlog/util.go | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/libbeat/statestore/backend/memlog/diskstore.go b/libbeat/statestore/backend/memlog/diskstore.go index 74389efc03a2..5c2179e7bac9 100644 --- a/libbeat/statestore/backend/memlog/diskstore.go +++ b/libbeat/statestore/backend/memlog/diskstore.go @@ -37,7 +37,7 @@ type diskstore struct { // on disk file tracking information home string // home path of the store - logFileName string // current log file + logFilePath string // current log file dataFiles []dataFileInfo // set of data files found // txid is the sequential counter that tracks @@ -92,7 +92,7 @@ const ( keyField = "_key" ) -// newDiskStorek initializes the disk store stucture only. The store must have +// newDiskStore initializes the disk store stucture only. The store must have // been opened already. It tries to open the update log file for append // operations. If opening the update log file fails, it is marked as // 'corrupted', triggering a checkpoint operation on the first update to the store. @@ -110,7 +110,7 @@ func newDiskStore( s := &diskstore{ log: log.With("path", home), home: home, - logFileName: filepath.Join(home, logFileName), + logFilePath: filepath.Join(home, logFileName), dataFiles: dataFiles, txid: txid, fileMode: mode, @@ -156,9 +156,9 @@ func (s *diskstore) LogOperation(op op) error { // WriteCheckpoint serializes all state into a json file. The file contains an // array with all states known to the memory storage. // WriteCheckpoint first serializes all state to a temporary file, and finally -// replaces move the temporary data file into the correct location. No files +// moves the temporary data file into the correct location. No files // are overwritten or replaced. Instead the change sequence number is used for -// the filename, and older data files will be deleleted after success. +// the filename, and older data files will be deleted after success. // // The active marker file is overwritten after all updates did succeed. The // marker file contains the filename of the current valid data-file. diff --git a/libbeat/statestore/backend/memlog/doc.go b/libbeat/statestore/backend/memlog/doc.go index 0facef20456e..276daf9fd798 100644 --- a/libbeat/statestore/backend/memlog/doc.go +++ b/libbeat/statestore/backend/memlog/doc.go @@ -59,10 +59,12 @@ // supposed to be valid. Older data files will continiously tried to be cleaned up // on checkpoint operations. // The data files filenames do include the change sequence number. Which allows -// us to sort them by name. The checkpoint operation of memlog, writes the full state -// into a new data file, that consists of an JSON array with all known key-value pairs. -// All fields of an entry are store top-level in the object. The `_key` field -// is used to store the entries key. +// us to sort them by name. The checkpoint operation of memlog, writes the full +// state into a new data file, that consists of an JSON array with all known +// key-value pairs. Each JSON object in the array consists of the value +// object, with memlog private fields added. Private fields start with `_`. At +// the moment the only private field is `_key`, which is used to identify the +// key-value pair. // NOTE: Creating a new file guarantees that Beats can progress when creating a // new checkpoint file. Some filesystems tend to block the // delete/replace operation when the file is accessed by another process diff --git a/libbeat/statestore/backend/memlog/util.go b/libbeat/statestore/backend/memlog/util.go index f4fdb8956e82..af2036b1b5e7 100644 --- a/libbeat/statestore/backend/memlog/util.go +++ b/libbeat/statestore/backend/memlog/util.go @@ -35,7 +35,7 @@ func isRetryErr(err error) bool { return err == syscall.EINTR || err == syscall.EAGAIN } -// trySync provides a best-effort fsync on path (directory). The fsync is required by some +// trySyncPath provides a best-effort fsync on path (directory). The fsync is required by some // filesystems, so to update the parents directory metadata to actually // contain the new file being rotated in. func trySyncPath(path string) {