From 9c1d7d00a9fa89ea667e233e21e26aba91f5a04b Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 09:35:24 -0700 Subject: [PATCH 1/6] Switch O_SYNC to periodic fsync O_SYNC was added with writing TSM files to fix an issue where the final fsync at the end cause the process to stall. This ends up increase disk util to much so this change switches to use multiple fsyncs while writing the TSM file instead of O_SYNC or one large one at the end. --- tsdb/engine/tsm1/compact.go | 2 +- tsdb/engine/tsm1/writer.go | 37 ++++++++++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index acc888bc2cf..67cab267ce7 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1027,7 +1027,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ } func (c *Compactor) write(path string, iter KeyIterator) (err error) { - fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return errCompactionInProgress{err: err} } diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index b00ec3c4af3..146a9fea172 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -97,6 +97,10 @@ const ( // max length of a key in an index entry (measurement + tags) maxKeyLength = (1 << (2 * 8)) - 1 + + // The threshold amount data written before we periodically fsync a TSM file. This helps avoid + // long pauses due to very large fsyncs at the end of writing a TSM file. + fsyncEvery = 512 * 1024 * 1024 ) var ( @@ -233,7 +237,7 @@ func (e *IndexEntry) String() string { // NewIndexWriter returns a new IndexWriter. func NewIndexWriter() IndexWriter { - buf := bytes.NewBuffer(make([]byte, 0, 4096)) + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) return &directIndex{buf: buf, w: bufio.NewWriter(buf)} } @@ -253,6 +257,9 @@ type indexBlock struct { type directIndex struct { keyCount int size uint32 + + // The bytes written count of when we last fsync'd + lastSync uint32 fd *os.File buf *bytes.Buffer @@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) { return 0, err } - return io.Copy(w, bufio.NewReader(d.fd)) + return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024)) } func (d *directIndex) flush(w io.Writer) (int64, error) { @@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) { d.indexEntries.Type = 0 d.indexEntries.entries = d.indexEntries.entries[:0] + // If this is a disk based index and we've written more than the fsync threshold, + // fsync the data to avoid long pauses later on. + if d.fd != nil && d.size-d.lastSync > fsyncEvery { + if err := d.fd.Sync(); err != nil { + return N, err + } + d.lastSync = d.size + } + return N, nil } @@ -486,13 +502,16 @@ type tsmWriter struct { w *bufio.Writer index IndexWriter n int64 + + // The bytes written count of when we last fsync'd + lastSync int64 } // NewTSMWriter returns a new TSMWriter writing to w. func NewTSMWriter(w io.Writer) (TSMWriter, error) { var index IndexWriter if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") { - f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return nil, err } @@ -612,6 +631,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte) // Increment file position pointer (checksum + block len) t.n += int64(n) + // fsync the file periodically to avoid long pauses with very big files. + if t.n-t.lastSync > fsyncEvery { + if err := t.sync(); err != nil { + return err + } + t.lastSync = t.n + } + if len(t.index.Entries(key)) >= maxIndexEntries { return ErrMaxBlocksExceeded } @@ -646,6 +673,10 @@ func (t *tsmWriter) Flush() error { return err } + return t.sync() +} + +func (t *tsmWriter) sync() error { if f, ok := t.wrapped.(*os.File); ok { if err := f.Sync(); err != nil { return err From e584cb6842b89f9c3e166fc3226b89a31101dcac Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 09:39:03 -0700 Subject: [PATCH 2/6] Increase cache-snapshot-memory-size default With the recent changes to compactions and snapshotting, the current default can create lots of small level 1 TSM files. This increases the default in order to create larger level 1 files and less disk utilization. --- etc/config.sample.toml | 2 +- tsdb/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index c56eb1ca636..5f9a4e8f78a 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -79,7 +79,7 @@ # snapshot the cache and write it to a TSM file, freeing up memory # Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k). # Values without a size suffix are in bytes. - # cache-snapshot-memory-size = "25m" + # cache-snapshot-memory-size = "256m" # CacheSnapshotWriteColdDuration is the length of time at # which the engine will snapshot the cache and write it to diff --git a/tsdb/config.go b/tsdb/config.go index 6ab91feaadb..5041a44cae2 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -26,7 +26,7 @@ const ( // DefaultCacheSnapshotMemorySize is the size at which the engine will // snapshot the cache and write it to a TSM file, freeing up memory - DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB + DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB // DefaultCacheSnapshotWriteColdDuration is the length of time at which // the engine will snapshot the cache and write it to a new TSM file if From 56d8f05f1211467d9228f0f5ce748b8fa81b4eee Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 09:40:49 -0700 Subject: [PATCH 3/6] Cap concurrent compactions when large number of cores exists The default max-concurrent-compactions settings allows up to 50% of cores to be used for compactions. When the number of cores is high (>8), this can lead to high disk utilization. Capping at 4 and combined with high snapshot sizes seems to keep the compaction backlog reasonable and not tax the disks as much. Systems with lots of IOPS, RAM and CPU cores may want to increase these. --- tsdb/store.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tsdb/store.go b/tsdb/store.go index 14e4e876208..ae889c46cf7 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -169,6 +169,12 @@ func (s *Store) loadShards() error { lim := s.EngineOptions.Config.MaxConcurrentCompactions if lim == 0 { lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions + + // On systems with more cores, cap at 4 to reduce disk utilization + if lim > 4 { + lim = 4 + } + if lim < 1 { lim = 1 } From 0a85ce2b73552f95e5f3152f11573f842932b638 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 09:45:01 -0700 Subject: [PATCH 4/6] Schedule compactions less aggressively This runs the scheduler every 5s instead of every 1s as well as reduces the scope of a level 1 plan. --- tsdb/engine/tsm1/compact.go | 9 +-------- tsdb/engine/tsm1/compact_test.go | 16 ++++------------ tsdb/engine/tsm1/engine.go | 2 +- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 67cab267ce7..39759be3a96 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -263,16 +263,9 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { minGenerations = level + 1 } - // Each compaction group should run against 4 generations. For level 1, since these - // can get created much more quickly, bump the grouping to 8 to keep file counts lower. - groupSize := 4 - if level == 1 { - groupSize = 8 - } - var cGroups []CompactionGroup for _, group := range levelGroups { - for _, chunk := range group.chunk(groupSize) { + for _, chunk := range group.chunk(4) { var cGroup CompactionGroup var hasTombstones bool for _, gen := range chunk { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index e3608f9837c..69e5b0c392b 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1792,14 +1792,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { Path: "08-01.tsm1", Size: 1 * 1024 * 1024, }, - tsm1.FileStat{ - Path: "09-01.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "10-01.tsm1", - Size: 1 * 1024 * 1024, - }, } cp := tsm1.NewDefaultPlanner( @@ -1810,8 +1802,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - expFiles2 := []tsm1.FileStat{data[8], data[9]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} + expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -1887,8 +1879,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - expFiles2 := []tsm1.FileStat{data[8], data[9]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} + expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 4d2773af66e..63b6a72584a 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1509,7 +1509,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compact(quit <-chan struct{}) { - t := time.NewTicker(time.Second) + t := time.NewTicker(5 * time.Second) defer t.Stop() for { From 9f2a422039a3a1d3f536724c51cb73bbeeccbb23 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 13:29:04 -0700 Subject: [PATCH 5/6] Use disk based TSM index more selectively The disk based temp index for writing a TSM file was used for compactions other than snapshot compactions. That meant it was used even for smaller compactiont that would not use much memory. An unintended side-effect of this is higher disk IO when copying the index to the final file. This switches when to use the index based on the estimated size of the new index that will be written. This isn't exact, but seems to work kick in at higher cardinality and larger compactions when it is necessary to avoid OOMs. --- tsdb/engine/tsm1/compact.go | 35 ++++++++++++++++++++++++++++++++--- tsdb/engine/tsm1/writer.go | 11 ++++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 39759be3a96..57d938a993b 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1026,10 +1026,22 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) { } // Create the write for the new TSM file. - w, err := NewTSMWriter(fd) - if err != nil { - return err + var w TSMWriter + + // Use a disk based TSM buffer if it looks like we might create a big index + // in memory. + if iter.EstimatedIndexSize() > 64*1024*1024 { + w, err = NewTSMWriterWithDiskBuffer(fd) + if err != nil { + return err + } + } else { + w, err = NewTSMWriter(fd) + if err != nil { + return err + } } + defer func() { closeErr := w.Close() if err == nil { @@ -1138,6 +1150,10 @@ type KeyIterator interface { // Err returns any errors encountered during iteration. Err() error + + // EstimatedIndexSize returns the estimated size of the index that would + // be required to store all the series and entries in the KeyIterator. + EstimatedIndexSize() int } // tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces @@ -1271,6 +1287,14 @@ func (k *tsmKeyIterator) hasMergedValues() bool { len(k.mergedBooleanValues) > 0 } +func (k *tsmKeyIterator) EstimatedIndexSize() int { + var size uint32 + for _, r := range k.readers { + size += r.IndexSize() + } + return int(size) / len(k.readers) +} + // Next returns true if there are any values remaining in the iterator. func (k *tsmKeyIterator) Next() bool { RETRY: @@ -1509,6 +1533,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte return cki } +func (c *cacheKeyIterator) EstimatedIndexSize() int { + // We return 0 here since we already have all the entries in memory to write an index. + return 0 +} + func (c *cacheKeyIterator) encode() { concurrency := runtime.GOMAXPROCS(0) n := len(c.ready) diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index 146a9fea172..eda8985f2e3 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -509,14 +509,23 @@ type tsmWriter struct { // NewTSMWriter returns a new TSMWriter writing to w. func NewTSMWriter(w io.Writer) (TSMWriter, error) { + index := NewIndexWriter() + return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil +} + +// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk +// based buffer for the TSM index if possible. +func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) { var index IndexWriter - if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") { + // Make sure is a File so we can write the temp index alongside it. + if fw, ok := w.(*os.File); ok { f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return nil, err } index = NewDiskIndexWriter(f) } else { + // w is not a file, just use an inmem index index = NewIndexWriter() } From 0b929fe669cfaa6ddf243bb5337e1b2f77c9fb34 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 6 Dec 2017 13:39:38 -0700 Subject: [PATCH 6/6] Update changelog --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d550dd72520..153ddd75fba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,18 @@ - [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable - [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement +## v1.4.3 [unreleased] + +### Configuration Changes + +#### `[data]` Section + +The default value for `cache-snapshot-memory-size` has been changed from `25m` to `256m`. + +### Bugfixes + +- [#9201](https://github.com/influxdata/influxdb/issues/9201): Fix higher disk i/o utilization + ## v1.4.2 [2017-11-15] Refer to the 1.4.0 breaking changes section if `influxd` fails to start with an `incompatible tsi1 index MANIFEST` error.