From c057bd464fe45acb45e56a44829e081f5cbb5f9b Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 2 May 2017 22:42:09 -0600 Subject: [PATCH] Remove per shard monitor goroutine The monitor goroutine ran for each shard and updated disk stats as well as logged cardinality warnings. This goroutine has been removed by making the disks stats more lightweight and callable direclty from Statisics and move the logging to the tsdb.Store. The latter allows one goroutine to handle all shards. --- tsdb/engine.go | 1 + tsdb/engine/tsm1/engine.go | 5 ++ tsdb/engine/tsm1/file_store.go | 11 ++++ tsdb/engine/tsm1/wal.go | 4 ++ tsdb/shard.go | 104 ++++++--------------------------- tsdb/store.go | 41 +++++++++++++ 6 files changed, 79 insertions(+), 87 deletions(-) diff --git a/tsdb/engine.go b/tsdb/engine.go index 71bff1e603b..a81144d07a2 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -74,6 +74,7 @@ type Engine interface { // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic LastModified() time.Time + DiskSize() int64 IsIdle() bool io.WriterTo diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 3a5a98a93e3..cd665e5bbed 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -430,6 +430,11 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { return statistics } +// DiskSize returns the total size in bytes of all TSM and WAL segments on disk. +func (e *Engine) DiskSize() int64 { + return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() +} + // Open opens and initializes the engine. func (e *Engine) Open() error { if err := os.MkdirAll(e.path, 0777); err != nil { diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 34a3569d86f..e3905357702 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -409,6 +409,9 @@ func (f *FileStore) Open() error { f.files = append(f.files, res.r) // Accumulate file store size stats atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size())) + for _, ts := range res.r.TombstoneFiles() { + atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size)) + } // Re-initialize the lastModified time for the file store if res.r.LastModified() > lm { @@ -439,6 +442,10 @@ func (f *FileStore) Close() error { return nil } +func (f *FileStore) DiskSizeBytes() int64 { + return atomic.LoadInt64(&f.stats.DiskBytes) +} + // Read returns the slice of values for the given key and the given timestamp, // if any file matches those constraints. func (f *FileStore) Read(key string, t int64) ([]Value, error) { @@ -628,6 +635,10 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error { var totalSize int64 for _, file := range f.files { totalSize += int64(file.Size()) + for _, ts := range file.TombstoneFiles() { + totalSize += int64(ts.Size) + } + } atomic.StoreInt64(&f.stats.DiskBytes, totalSize) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index faff78c425f..28082629b5a 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -375,6 +375,10 @@ func (l *WAL) LastWriteTime() time.Time { return l.lastWriteTime } +func (l *WAL) DiskSizeBytes() int64 { + return atomic.LoadInt64(&l.stats.OldBytes) + atomic.LoadInt64(&l.stats.CurrentBytes) +} + func (l *WAL) writeToLog(entry WALEntry) (int, error) { // limit how many concurrent encodings can be in flight. Since we can only // write one at a time to disk, a slow disk can cause the allocations below diff --git a/tsdb/shard.go b/tsdb/shard.go index c03030093ed..294b8489398 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "math" - "os" "path/filepath" "regexp" "sort" @@ -207,6 +206,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { return nil } + // Refresh our disk size stat + _, _ = s.DiskSize() + // TODO(edd): Should statSeriesCreate be the current number of series in the // shard, or the total number of series ever created? sSketch, tSketch, err := s.engine.SeriesSketches() @@ -289,9 +291,6 @@ func (s *Shard) Open() error { } s.engine = e - s.wg.Add(1) - go s.monitor() - return nil }(); err != nil { s.close(true) @@ -355,6 +354,12 @@ func (s *Shard) close(clean bool) error { return err } +func (s *Shard) IndexType() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.index.Type() +} + // ready determines if the Shard is ready for queries or writes. // It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled func (s *Shard) ready() error { @@ -402,33 +407,9 @@ func (s *Shard) SetCompactionsEnabled(enabled bool) { // DiskSize returns the size on disk of this shard func (s *Shard) DiskSize() (int64, error) { - var size int64 - err := filepath.Walk(s.path, func(_ string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - - if !fi.IsDir() { - size += fi.Size() - } - return err - }) - if err != nil { - return 0, err - } - - err = filepath.Walk(s.walPath, func(_ string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - - if !fi.IsDir() { - size += fi.Size() - } - return err - }) - - return size, err + size := s.engine.DiskSize() + atomic.StoreInt64(&s.stats.DiskBytes, size) + return size, nil } // FieldCreate holds information for a field to create on a measurement. @@ -984,63 +965,12 @@ func (s *Shard) CreateSnapshot() (string, error) { return s.engine.CreateSnapshot() } -func (s *Shard) monitor() { - defer s.wg.Done() - - t := time.NewTicker(monitorStatInterval) - defer t.Stop() - t2 := time.NewTicker(time.Minute) - defer t2.Stop() - var changed time.Time - - for { - select { - case <-s.closing: - return - case <-t.C: - // Checking DiskSize can be expensive with a lot of shards and TSM files, only - // check if something has changed. - lm := s.LastModified() - if lm.Equal(changed) { - continue - } - - size, err := s.DiskSize() - if err != nil { - s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err)) - continue - } - atomic.StoreInt64(&s.stats.DiskBytes, size) - changed = lm - case <-t2.C: - if s.options.Config.MaxValuesPerTag == 0 { - continue - } - - names, err := s.MeasurementNamesByExpr(nil) - if err != nil { - s.logger.Warn("cannot retrieve measurement names", zap.Error(err)) - continue - } - - for _, name := range names { - s.engine.ForEachMeasurementTagKey(name, func(k []byte) error { - n := s.engine.TagKeyCardinality(name, k) - perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100) - if perc > 100 { - perc = 100 - } +func (s *Shard) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { + return s.engine.ForEachMeasurementTagKey(name, fn) +} - // Log at 80, 85, 90-100% levels - if perc == 80 || perc == 85 || perc >= 90 { - s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", - perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, name, k)) - } - return nil - }) - } - } - } +func (s *Shard) TagKeyCardinality(name, key []byte) int { + return s.engine.TagKeyCardinality(name, key) } type ShardGroup interface { diff --git a/tsdb/store.go b/tsdb/store.go index 3abe51f3f92..cb4aafe2725 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1055,6 +1055,8 @@ func (s *Store) monitorShards() { defer s.wg.Done() t := time.NewTicker(10 * time.Second) defer t.Stop() + t2 := time.NewTicker(time.Minute) + defer t2.Stop() for { select { case <-s.closing: @@ -1069,6 +1071,45 @@ func (s *Store) monitorShards() { } } s.mu.RUnlock() + case <-t2.C: + if s.EngineOptions.Config.MaxValuesPerTag == 0 { + continue + } + + s.mu.RLock() + shards := s.filterShards(func(sh *Shard) bool { + return sh.IndexType() == "inmem" + }) + s.mu.RUnlock() + + s.walkShards(shards, func(sh *Shard) error { + db := sh.database + id := sh.id + + names, err := sh.MeasurementNamesByExpr(nil) + if err != nil { + s.Logger.Warn("cannot retrieve measurement names", zap.Error(err)) + return nil + } + + for _, name := range names { + sh.ForEachMeasurementTagKey(name, func(k []byte) error { + n := sh.TagKeyCardinality(name, k) + perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100) + if perc > 100 { + perc = 100 + } + + // Log at 80, 85, 90-100% levels + if perc == 80 || perc == 85 || perc >= 90 { + s.Logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", + perc, n, s.EngineOptions.Config.MaxValuesPerTag, db, id, name, k)) + } + return nil + }) + } + return nil + }) } } }