Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix higher disk utilization regression #9204

Merged
merged 6 commits into from
Dec 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
- [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable
- [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement

## v1.4.3 [unreleased]

### Configuration Changes

#### `[data]` Section

The default value for `cache-snapshot-memory-size` has been changed from `25m` to `256m`.

### Bugfixes

- [#9201](https://github.com/influxdata/influxdb/issues/9201): Fix higher disk i/o utilization

## v1.4.2 [2017-11-15]

Refer to the 1.4.0 breaking changes section if `influxd` fails to start with an `incompatible tsi1 index MANIFEST` error.
Expand Down
2 changes: 1 addition & 1 deletion etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
# snapshot the cache and write it to a TSM file, freeing up memory
# Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
# Values without a size suffix are in bytes.
# cache-snapshot-memory-size = "25m"
# cache-snapshot-memory-size = "256m"

# CacheSnapshotWriteColdDuration is the length of time at
# which the engine will snapshot the cache and write it to
Expand Down
2 changes: 1 addition & 1 deletion tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (

// DefaultCacheSnapshotMemorySize is the size at which the engine will
// snapshot the cache and write it to a TSM file, freeing up memory
DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB
DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB

// DefaultCacheSnapshotWriteColdDuration is the length of time at which
// the engine will snapshot the cache and write it to a new TSM file if
Expand Down
46 changes: 34 additions & 12 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,9 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
minGenerations = level + 1
}

// Each compaction group should run against 4 generations. For level 1, since these
// can get created much more quickly, bump the grouping to 8 to keep file counts lower.
groupSize := 4
if level == 1 {
groupSize = 8
}

var cGroups []CompactionGroup
for _, group := range levelGroups {
for _, chunk := range group.chunk(groupSize) {
for _, chunk := range group.chunk(4) {
var cGroup CompactionGroup
var hasTombstones bool
for _, gen := range chunk {
Expand Down Expand Up @@ -1027,16 +1020,28 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
}

func (c *Compactor) write(path string, iter KeyIterator) (err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
}

// Create the write for the new TSM file.
w, err := NewTSMWriter(fd)
if err != nil {
return err
var w TSMWriter

// Use a disk based TSM buffer if it looks like we might create a big index
// in memory.
if iter.EstimatedIndexSize() > 64*1024*1024 {
w, err = NewTSMWriterWithDiskBuffer(fd)
if err != nil {
return err
}
} else {
w, err = NewTSMWriter(fd)
if err != nil {
return err
}
}

defer func() {
closeErr := w.Close()
if err == nil {
Expand Down Expand Up @@ -1145,6 +1150,10 @@ type KeyIterator interface {

// Err returns any errors encountered during iteration.
Err() error

// EstimatedIndexSize returns the estimated size of the index that would
// be required to store all the series and entries in the KeyIterator.
EstimatedIndexSize() int
}

// tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces
Expand Down Expand Up @@ -1278,6 +1287,14 @@ func (k *tsmKeyIterator) hasMergedValues() bool {
len(k.mergedBooleanValues) > 0
}

func (k *tsmKeyIterator) EstimatedIndexSize() int {
var size uint32
for _, r := range k.readers {
size += r.IndexSize()
}
return int(size) / len(k.readers)
}

// Next returns true if there are any values remaining in the iterator.
func (k *tsmKeyIterator) Next() bool {
RETRY:
Expand Down Expand Up @@ -1516,6 +1533,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte
return cki
}

func (c *cacheKeyIterator) EstimatedIndexSize() int {
// We return 0 here since we already have all the entries in memory to write an index.
return 0
}

func (c *cacheKeyIterator) encode() {
concurrency := runtime.GOMAXPROCS(0)
n := len(c.ready)
Expand Down
16 changes: 4 additions & 12 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,14 +1792,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
Path: "08-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "09-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "10-01.tsm1",
Size: 1 * 1024 * 1024,
},
}

cp := tsm1.NewDefaultPlanner(
Expand All @@ -1810,8 +1802,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)

expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
expFiles2 := []tsm1.FileStat{data[8], data[9]}
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}

tsm := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
Expand Down Expand Up @@ -1887,8 +1879,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)

expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
expFiles2 := []tsm1.FileStat{data[8], data[9]}
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}

tsm := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
}

func (e *Engine) compact(quit <-chan struct{}) {
t := time.NewTicker(time.Second)
t := time.NewTicker(5 * time.Second)
defer t.Stop()

for {
Expand Down
48 changes: 44 additions & 4 deletions tsdb/engine/tsm1/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ const (

// max length of a key in an index entry (measurement + tags)
maxKeyLength = (1 << (2 * 8)) - 1

// The threshold amount data written before we periodically fsync a TSM file. This helps avoid
// long pauses due to very large fsyncs at the end of writing a TSM file.
fsyncEvery = 512 * 1024 * 1024
)

var (
Expand Down Expand Up @@ -233,7 +237,7 @@ func (e *IndexEntry) String() string {

// NewIndexWriter returns a new IndexWriter.
func NewIndexWriter() IndexWriter {
buf := bytes.NewBuffer(make([]byte, 0, 4096))
buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
return &directIndex{buf: buf, w: bufio.NewWriter(buf)}
}

Expand All @@ -253,6 +257,9 @@ type indexBlock struct {
type directIndex struct {
keyCount int
size uint32

// The bytes written count of when we last fsync'd
lastSync uint32
fd *os.File
buf *bytes.Buffer

Expand Down Expand Up @@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
return 0, err
}

return io.Copy(w, bufio.NewReader(d.fd))
return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024))
}

func (d *directIndex) flush(w io.Writer) (int64, error) {
Expand Down Expand Up @@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) {
d.indexEntries.Type = 0
d.indexEntries.entries = d.indexEntries.entries[:0]

// If this is a disk based index and we've written more than the fsync threshold,
// fsync the data to avoid long pauses later on.
if d.fd != nil && d.size-d.lastSync > fsyncEvery {
if err := d.fd.Sync(); err != nil {
return N, err
}
d.lastSync = d.size
}

return N, nil

}
Expand Down Expand Up @@ -486,18 +502,30 @@ type tsmWriter struct {
w *bufio.Writer
index IndexWriter
n int64

// The bytes written count of when we last fsync'd
lastSync int64
}

// NewTSMWriter returns a new TSMWriter writing to w.
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
index := NewIndexWriter()
return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
}

// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk
// based buffer for the TSM index if possible.
func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) {
var index IndexWriter
if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") {
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
// Make sure is a File so we can write the temp index alongside it.
if fw, ok := w.(*os.File); ok {
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return nil, err
}
index = NewDiskIndexWriter(f)
} else {
// w is not a file, just use an inmem index
index = NewIndexWriter()
}

Expand Down Expand Up @@ -612,6 +640,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte)
// Increment file position pointer (checksum + block len)
t.n += int64(n)

// fsync the file periodically to avoid long pauses with very big files.
if t.n-t.lastSync > fsyncEvery {
if err := t.sync(); err != nil {
return err
}
t.lastSync = t.n
}

if len(t.index.Entries(key)) >= maxIndexEntries {
return ErrMaxBlocksExceeded
}
Expand Down Expand Up @@ -646,6 +682,10 @@ func (t *tsmWriter) Flush() error {
return err
}

return t.sync()
}

func (t *tsmWriter) sync() error {
if f, ok := t.wrapped.(*os.File); ok {
if err := f.Sync(); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ func (s *Store) loadShards() error {
lim := s.EngineOptions.Config.MaxConcurrentCompactions
if lim == 0 {
lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions

// On systems with more cores, cap at 4 to reduce disk utilization
if lim > 4 {
lim = 4
}

if lim < 1 {
lim = 1
}
Expand Down