Skip to content

Commit

Permalink
Merge pull request #9204 from influxdata/jw-tsm-sync
Browse files Browse the repository at this point in the history
Fix higher disk utilization regression
  • Loading branch information
jwilder authored Dec 7, 2017
2 parents fd11e20 + 0b929fe commit f250b64
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 31 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
- [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable
- [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement

## v1.4.3 [unreleased]

### Configuration Changes

#### `[data]` Section

The default value for `cache-snapshot-memory-size` has been changed from `25m` to `256m`.

### Bugfixes

- [#9201](https://github.com/influxdata/influxdb/issues/9201): Fix higher disk i/o utilization

## v1.4.2 [2017-11-15]

Refer to the 1.4.0 breaking changes section if `influxd` fails to start with an `incompatible tsi1 index MANIFEST` error.
Expand Down
2 changes: 1 addition & 1 deletion etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
# snapshot the cache and write it to a TSM file, freeing up memory
# Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
# Values without a size suffix are in bytes.
# cache-snapshot-memory-size = "25m"
# cache-snapshot-memory-size = "256m"

# CacheSnapshotWriteColdDuration is the length of time at
# which the engine will snapshot the cache and write it to
Expand Down
2 changes: 1 addition & 1 deletion tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (

// DefaultCacheSnapshotMemorySize is the size at which the engine will
// snapshot the cache and write it to a TSM file, freeing up memory
DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB
DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB

// DefaultCacheSnapshotWriteColdDuration is the length of time at which
// the engine will snapshot the cache and write it to a new TSM file if
Expand Down
46 changes: 34 additions & 12 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,9 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
minGenerations = level + 1
}

// Each compaction group should run against 4 generations. For level 1, since these
// can get created much more quickly, bump the grouping to 8 to keep file counts lower.
groupSize := 4
if level == 1 {
groupSize = 8
}

var cGroups []CompactionGroup
for _, group := range levelGroups {
for _, chunk := range group.chunk(groupSize) {
for _, chunk := range group.chunk(4) {
var cGroup CompactionGroup
var hasTombstones bool
for _, gen := range chunk {
Expand Down Expand Up @@ -1027,16 +1020,28 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
}

func (c *Compactor) write(path string, iter KeyIterator) (err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
}

// Create the write for the new TSM file.
w, err := NewTSMWriter(fd)
if err != nil {
return err
var w TSMWriter

// Use a disk based TSM buffer if it looks like we might create a big index
// in memory.
if iter.EstimatedIndexSize() > 64*1024*1024 {
w, err = NewTSMWriterWithDiskBuffer(fd)
if err != nil {
return err
}
} else {
w, err = NewTSMWriter(fd)
if err != nil {
return err
}
}

defer func() {
closeErr := w.Close()
if err == nil {
Expand Down Expand Up @@ -1145,6 +1150,10 @@ type KeyIterator interface {

// Err returns any errors encountered during iteration.
Err() error

// EstimatedIndexSize returns the estimated size of the index that would
// be required to store all the series and entries in the KeyIterator.
EstimatedIndexSize() int
}

// tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces
Expand Down Expand Up @@ -1278,6 +1287,14 @@ func (k *tsmKeyIterator) hasMergedValues() bool {
len(k.mergedBooleanValues) > 0
}

func (k *tsmKeyIterator) EstimatedIndexSize() int {
var size uint32
for _, r := range k.readers {
size += r.IndexSize()
}
return int(size) / len(k.readers)
}

// Next returns true if there are any values remaining in the iterator.
func (k *tsmKeyIterator) Next() bool {
RETRY:
Expand Down Expand Up @@ -1516,6 +1533,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte
return cki
}

func (c *cacheKeyIterator) EstimatedIndexSize() int {
// We return 0 here since we already have all the entries in memory to write an index.
return 0
}

func (c *cacheKeyIterator) encode() {
concurrency := runtime.GOMAXPROCS(0)
n := len(c.ready)
Expand Down
16 changes: 4 additions & 12 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,14 +1792,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
Path: "08-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "09-01.tsm1",
Size: 1 * 1024 * 1024,
},
tsm1.FileStat{
Path: "10-01.tsm1",
Size: 1 * 1024 * 1024,
},
}

cp := tsm1.NewDefaultPlanner(
Expand All @@ -1810,8 +1802,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)

expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
expFiles2 := []tsm1.FileStat{data[8], data[9]}
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}

tsm := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
Expand Down Expand Up @@ -1887,8 +1879,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
}, tsdb.DefaultCompactFullWriteColdDuration,
)

expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
expFiles2 := []tsm1.FileStat{data[8], data[9]}
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}

tsm := cp.PlanLevel(1)
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
}

func (e *Engine) compact(quit <-chan struct{}) {
t := time.NewTicker(time.Second)
t := time.NewTicker(5 * time.Second)
defer t.Stop()

for {
Expand Down
48 changes: 44 additions & 4 deletions tsdb/engine/tsm1/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ const (

// max length of a key in an index entry (measurement + tags)
maxKeyLength = (1 << (2 * 8)) - 1

// The threshold amount data written before we periodically fsync a TSM file. This helps avoid
// long pauses due to very large fsyncs at the end of writing a TSM file.
fsyncEvery = 512 * 1024 * 1024
)

var (
Expand Down Expand Up @@ -233,7 +237,7 @@ func (e *IndexEntry) String() string {

// NewIndexWriter returns a new IndexWriter.
func NewIndexWriter() IndexWriter {
buf := bytes.NewBuffer(make([]byte, 0, 4096))
buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
return &directIndex{buf: buf, w: bufio.NewWriter(buf)}
}

Expand All @@ -253,6 +257,9 @@ type indexBlock struct {
type directIndex struct {
keyCount int
size uint32

// The bytes written count of when we last fsync'd
lastSync uint32
fd *os.File
buf *bytes.Buffer

Expand Down Expand Up @@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
return 0, err
}

return io.Copy(w, bufio.NewReader(d.fd))
return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024))
}

func (d *directIndex) flush(w io.Writer) (int64, error) {
Expand Down Expand Up @@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) {
d.indexEntries.Type = 0
d.indexEntries.entries = d.indexEntries.entries[:0]

// If this is a disk based index and we've written more than the fsync threshold,
// fsync the data to avoid long pauses later on.
if d.fd != nil && d.size-d.lastSync > fsyncEvery {
if err := d.fd.Sync(); err != nil {
return N, err
}
d.lastSync = d.size
}

return N, nil

}
Expand Down Expand Up @@ -486,18 +502,30 @@ type tsmWriter struct {
w *bufio.Writer
index IndexWriter
n int64

// The bytes written count of when we last fsync'd
lastSync int64
}

// NewTSMWriter returns a new TSMWriter writing to w.
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
index := NewIndexWriter()
return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
}

// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk
// based buffer for the TSM index if possible.
func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) {
var index IndexWriter
if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") {
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
// Make sure is a File so we can write the temp index alongside it.
if fw, ok := w.(*os.File); ok {
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return nil, err
}
index = NewDiskIndexWriter(f)
} else {
// w is not a file, just use an inmem index
index = NewIndexWriter()
}

Expand Down Expand Up @@ -612,6 +640,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte)
// Increment file position pointer (checksum + block len)
t.n += int64(n)

// fsync the file periodically to avoid long pauses with very big files.
if t.n-t.lastSync > fsyncEvery {
if err := t.sync(); err != nil {
return err
}
t.lastSync = t.n
}

if len(t.index.Entries(key)) >= maxIndexEntries {
return ErrMaxBlocksExceeded
}
Expand Down Expand Up @@ -646,6 +682,10 @@ func (t *tsmWriter) Flush() error {
return err
}

return t.sync()
}

func (t *tsmWriter) sync() error {
if f, ok := t.wrapped.(*os.File); ok {
if err := f.Sync(); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ func (s *Store) loadShards() error {
lim := s.EngineOptions.Config.MaxConcurrentCompactions
if lim == 0 {
lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions

// On systems with more cores, cap at 4 to reduce disk utilization
if lim > 4 {
lim = 4
}

if lim < 1 {
lim = 1
}
Expand Down

0 comments on commit f250b64

Please sign in to comment.