diff --git a/CHANGELOG.md b/CHANGELOG.md index d550dd72520..153ddd75fba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,18 @@ - [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable - [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement +## v1.4.3 [unreleased] + +### Configuration Changes + +#### `[data]` Section + +The default value for `cache-snapshot-memory-size` has been changed from `25m` to `256m`. + +### Bugfixes + +- [#9201](https://github.com/influxdata/influxdb/issues/9201): Fix higher disk i/o utilization + ## v1.4.2 [2017-11-15] Refer to the 1.4.0 breaking changes section if `influxd` fails to start with an `incompatible tsi1 index MANIFEST` error. diff --git a/etc/config.sample.toml b/etc/config.sample.toml index c56eb1ca636..5f9a4e8f78a 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -79,7 +79,7 @@ # snapshot the cache and write it to a TSM file, freeing up memory # Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k). # Values without a size suffix are in bytes. - # cache-snapshot-memory-size = "25m" + # cache-snapshot-memory-size = "256m" # CacheSnapshotWriteColdDuration is the length of time at # which the engine will snapshot the cache and write it to diff --git a/tsdb/config.go b/tsdb/config.go index 6ab91feaadb..5041a44cae2 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -26,7 +26,7 @@ const ( // DefaultCacheSnapshotMemorySize is the size at which the engine will // snapshot the cache and write it to a TSM file, freeing up memory - DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB + DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB // DefaultCacheSnapshotWriteColdDuration is the length of time at which // the engine will snapshot the cache and write it to a new TSM file if diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index acc888bc2cf..57d938a993b 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -263,16 +263,9 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { minGenerations = level + 1 } - // Each compaction group should run against 4 generations. For level 1, since these - // can get created much more quickly, bump the grouping to 8 to keep file counts lower. - groupSize := 4 - if level == 1 { - groupSize = 8 - } - var cGroups []CompactionGroup for _, group := range levelGroups { - for _, chunk := range group.chunk(groupSize) { + for _, chunk := range group.chunk(4) { var cGroup CompactionGroup var hasTombstones bool for _, gen := range chunk { @@ -1027,16 +1020,28 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ } func (c *Compactor) write(path string, iter KeyIterator) (err error) { - fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return errCompactionInProgress{err: err} } // Create the write for the new TSM file. - w, err := NewTSMWriter(fd) - if err != nil { - return err + var w TSMWriter + + // Use a disk based TSM buffer if it looks like we might create a big index + // in memory. + if iter.EstimatedIndexSize() > 64*1024*1024 { + w, err = NewTSMWriterWithDiskBuffer(fd) + if err != nil { + return err + } + } else { + w, err = NewTSMWriter(fd) + if err != nil { + return err + } } + defer func() { closeErr := w.Close() if err == nil { @@ -1145,6 +1150,10 @@ type KeyIterator interface { // Err returns any errors encountered during iteration. Err() error + + // EstimatedIndexSize returns the estimated size of the index that would + // be required to store all the series and entries in the KeyIterator. + EstimatedIndexSize() int } // tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces @@ -1278,6 +1287,14 @@ func (k *tsmKeyIterator) hasMergedValues() bool { len(k.mergedBooleanValues) > 0 } +func (k *tsmKeyIterator) EstimatedIndexSize() int { + var size uint32 + for _, r := range k.readers { + size += r.IndexSize() + } + return int(size) / len(k.readers) +} + // Next returns true if there are any values remaining in the iterator. func (k *tsmKeyIterator) Next() bool { RETRY: @@ -1516,6 +1533,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte return cki } +func (c *cacheKeyIterator) EstimatedIndexSize() int { + // We return 0 here since we already have all the entries in memory to write an index. + return 0 +} + func (c *cacheKeyIterator) encode() { concurrency := runtime.GOMAXPROCS(0) n := len(c.ready) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index e3608f9837c..69e5b0c392b 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1792,14 +1792,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { Path: "08-01.tsm1", Size: 1 * 1024 * 1024, }, - tsm1.FileStat{ - Path: "09-01.tsm1", - Size: 1 * 1024 * 1024, - }, - tsm1.FileStat{ - Path: "10-01.tsm1", - Size: 1 * 1024 * 1024, - }, } cp := tsm1.NewDefaultPlanner( @@ -1810,8 +1802,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - expFiles2 := []tsm1.FileStat{data[8], data[9]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} + expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { @@ -1887,8 +1879,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { }, tsdb.DefaultCompactFullWriteColdDuration, ) - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} - expFiles2 := []tsm1.FileStat{data[8], data[9]} + expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} + expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]} tsm := cp.PlanLevel(1) if exp, got := len(expFiles1), len(tsm[0]); got != exp { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 4d2773af66e..63b6a72584a 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1509,7 +1509,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { } func (e *Engine) compact(quit <-chan struct{}) { - t := time.NewTicker(time.Second) + t := time.NewTicker(5 * time.Second) defer t.Stop() for { diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index b00ec3c4af3..eda8985f2e3 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -97,6 +97,10 @@ const ( // max length of a key in an index entry (measurement + tags) maxKeyLength = (1 << (2 * 8)) - 1 + + // The threshold amount data written before we periodically fsync a TSM file. This helps avoid + // long pauses due to very large fsyncs at the end of writing a TSM file. + fsyncEvery = 512 * 1024 * 1024 ) var ( @@ -233,7 +237,7 @@ func (e *IndexEntry) String() string { // NewIndexWriter returns a new IndexWriter. func NewIndexWriter() IndexWriter { - buf := bytes.NewBuffer(make([]byte, 0, 4096)) + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) return &directIndex{buf: buf, w: bufio.NewWriter(buf)} } @@ -253,6 +257,9 @@ type indexBlock struct { type directIndex struct { keyCount int size uint32 + + // The bytes written count of when we last fsync'd + lastSync uint32 fd *os.File buf *bytes.Buffer @@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) { return 0, err } - return io.Copy(w, bufio.NewReader(d.fd)) + return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024)) } func (d *directIndex) flush(w io.Writer) (int64, error) { @@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) { d.indexEntries.Type = 0 d.indexEntries.entries = d.indexEntries.entries[:0] + // If this is a disk based index and we've written more than the fsync threshold, + // fsync the data to avoid long pauses later on. + if d.fd != nil && d.size-d.lastSync > fsyncEvery { + if err := d.fd.Sync(); err != nil { + return N, err + } + d.lastSync = d.size + } + return N, nil } @@ -486,18 +502,30 @@ type tsmWriter struct { w *bufio.Writer index IndexWriter n int64 + + // The bytes written count of when we last fsync'd + lastSync int64 } // NewTSMWriter returns a new TSMWriter writing to w. func NewTSMWriter(w io.Writer) (TSMWriter, error) { + index := NewIndexWriter() + return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil +} + +// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk +// based buffer for the TSM index if possible. +func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) { var index IndexWriter - if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") { - f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + // Make sure is a File so we can write the temp index alongside it. + if fw, ok := w.(*os.File); ok { + f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return nil, err } index = NewDiskIndexWriter(f) } else { + // w is not a file, just use an inmem index index = NewIndexWriter() } @@ -612,6 +640,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte) // Increment file position pointer (checksum + block len) t.n += int64(n) + // fsync the file periodically to avoid long pauses with very big files. + if t.n-t.lastSync > fsyncEvery { + if err := t.sync(); err != nil { + return err + } + t.lastSync = t.n + } + if len(t.index.Entries(key)) >= maxIndexEntries { return ErrMaxBlocksExceeded } @@ -646,6 +682,10 @@ func (t *tsmWriter) Flush() error { return err } + return t.sync() +} + +func (t *tsmWriter) sync() error { if f, ok := t.wrapped.(*os.File); ok { if err := f.Sync(); err != nil { return err diff --git a/tsdb/store.go b/tsdb/store.go index 14e4e876208..ae889c46cf7 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -169,6 +169,12 @@ func (s *Store) loadShards() error { lim := s.EngineOptions.Config.MaxConcurrentCompactions if lim == 0 { lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions + + // On systems with more cores, cap at 4 to reduce disk utilization + if lim > 4 { + lim = 4 + } + if lim < 1 { lim = 1 }