Skip to content

Commit

Permalink
Remove per shard monitor goroutine
Browse files Browse the repository at this point in the history
The monitor goroutine ran for each shard and updated disk stats
as well as logged cardinality warnings.  This goroutine has been
removed by making the disks stats more lightweight and callable
direclty from Statisics and move the logging to the tsdb.Store.  The
latter allows one goroutine to handle all shards.
  • Loading branch information
jwilder committed May 3, 2017
1 parent 7c90c78 commit c057bd4
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 87 deletions.
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type Engine interface {
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool

io.WriterTo
Expand Down
5 changes: 5 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
return statistics
}

// DiskSize returns the total size in bytes of all TSM and WAL segments on disk.
func (e *Engine) DiskSize() int64 {
return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes()
}

// Open opens and initializes the engine.
func (e *Engine) Open() error {
if err := os.MkdirAll(e.path, 0777); err != nil {
Expand Down
11 changes: 11 additions & 0 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,9 @@ func (f *FileStore) Open() error {
f.files = append(f.files, res.r)
// Accumulate file store size stats
atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size()))
for _, ts := range res.r.TombstoneFiles() {
atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size))
}

// Re-initialize the lastModified time for the file store
if res.r.LastModified() > lm {
Expand Down Expand Up @@ -439,6 +442,10 @@ func (f *FileStore) Close() error {
return nil
}

func (f *FileStore) DiskSizeBytes() int64 {
return atomic.LoadInt64(&f.stats.DiskBytes)
}

// Read returns the slice of values for the given key and the given timestamp,
// if any file matches those constraints.
func (f *FileStore) Read(key string, t int64) ([]Value, error) {
Expand Down Expand Up @@ -628,6 +635,10 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
var totalSize int64
for _, file := range f.files {
totalSize += int64(file.Size())
for _, ts := range file.TombstoneFiles() {
totalSize += int64(ts.Size)
}

}
atomic.StoreInt64(&f.stats.DiskBytes, totalSize)

Expand Down
4 changes: 4 additions & 0 deletions tsdb/engine/tsm1/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ func (l *WAL) LastWriteTime() time.Time {
return l.lastWriteTime
}

func (l *WAL) DiskSizeBytes() int64 {
return atomic.LoadInt64(&l.stats.OldBytes) + atomic.LoadInt64(&l.stats.CurrentBytes)
}

func (l *WAL) writeToLog(entry WALEntry) (int, error) {
// limit how many concurrent encodings can be in flight. Since we can only
// write one at a time to disk, a slow disk can cause the allocations below
Expand Down
104 changes: 17 additions & 87 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"math"
"os"
"path/filepath"
"regexp"
"sort"
Expand Down Expand Up @@ -207,6 +206,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
return nil
}

// Refresh our disk size stat
_, _ = s.DiskSize()

// TODO(edd): Should statSeriesCreate be the current number of series in the
// shard, or the total number of series ever created?
sSketch, tSketch, err := s.engine.SeriesSketches()
Expand Down Expand Up @@ -289,9 +291,6 @@ func (s *Shard) Open() error {
}
s.engine = e

s.wg.Add(1)
go s.monitor()

return nil
}(); err != nil {
s.close(true)
Expand Down Expand Up @@ -355,6 +354,12 @@ func (s *Shard) close(clean bool) error {
return err
}

func (s *Shard) IndexType() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.index.Type()
}

// ready determines if the Shard is ready for queries or writes.
// It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled
func (s *Shard) ready() error {
Expand Down Expand Up @@ -402,33 +407,9 @@ func (s *Shard) SetCompactionsEnabled(enabled bool) {

// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
var size int64
err := filepath.Walk(s.path, func(_ string, fi os.FileInfo, err error) error {
if err != nil {
return err
}

if !fi.IsDir() {
size += fi.Size()
}
return err
})
if err != nil {
return 0, err
}

err = filepath.Walk(s.walPath, func(_ string, fi os.FileInfo, err error) error {
if err != nil {
return err
}

if !fi.IsDir() {
size += fi.Size()
}
return err
})

return size, err
size := s.engine.DiskSize()
atomic.StoreInt64(&s.stats.DiskBytes, size)
return size, nil
}

// FieldCreate holds information for a field to create on a measurement.
Expand Down Expand Up @@ -984,63 +965,12 @@ func (s *Shard) CreateSnapshot() (string, error) {
return s.engine.CreateSnapshot()
}

func (s *Shard) monitor() {
defer s.wg.Done()

t := time.NewTicker(monitorStatInterval)
defer t.Stop()
t2 := time.NewTicker(time.Minute)
defer t2.Stop()
var changed time.Time

for {
select {
case <-s.closing:
return
case <-t.C:
// Checking DiskSize can be expensive with a lot of shards and TSM files, only
// check if something has changed.
lm := s.LastModified()
if lm.Equal(changed) {
continue
}

size, err := s.DiskSize()
if err != nil {
s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err))
continue
}
atomic.StoreInt64(&s.stats.DiskBytes, size)
changed = lm
case <-t2.C:
if s.options.Config.MaxValuesPerTag == 0 {
continue
}

names, err := s.MeasurementNamesByExpr(nil)
if err != nil {
s.logger.Warn("cannot retrieve measurement names", zap.Error(err))
continue
}

for _, name := range names {
s.engine.ForEachMeasurementTagKey(name, func(k []byte) error {
n := s.engine.TagKeyCardinality(name, k)
perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100)
if perc > 100 {
perc = 100
}
func (s *Shard) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
return s.engine.ForEachMeasurementTagKey(name, fn)
}

// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, name, k))
}
return nil
})
}
}
}
func (s *Shard) TagKeyCardinality(name, key []byte) int {
return s.engine.TagKeyCardinality(name, key)
}

type ShardGroup interface {
Expand Down
41 changes: 41 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,8 @@ func (s *Store) monitorShards() {
defer s.wg.Done()
t := time.NewTicker(10 * time.Second)
defer t.Stop()
t2 := time.NewTicker(time.Minute)
defer t2.Stop()
for {
select {
case <-s.closing:
Expand All @@ -1069,6 +1071,45 @@ func (s *Store) monitorShards() {
}
}
s.mu.RUnlock()
case <-t2.C:
if s.EngineOptions.Config.MaxValuesPerTag == 0 {
continue
}

s.mu.RLock()
shards := s.filterShards(func(sh *Shard) bool {
return sh.IndexType() == "inmem"
})
s.mu.RUnlock()

s.walkShards(shards, func(sh *Shard) error {
db := sh.database
id := sh.id

names, err := sh.MeasurementNamesByExpr(nil)
if err != nil {
s.Logger.Warn("cannot retrieve measurement names", zap.Error(err))
return nil
}

for _, name := range names {
sh.ForEachMeasurementTagKey(name, func(k []byte) error {
n := sh.TagKeyCardinality(name, k)
perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100)
if perc > 100 {
perc = 100
}

// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.Logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
perc, n, s.EngineOptions.Config.MaxValuesPerTag, db, id, name, k))
}
return nil
})
}
return nil
})
}
}
}
Expand Down

0 comments on commit c057bd4

Please sign in to comment.