diff --git a/tsdb/engine.go b/tsdb/engine.go index 71bff1e603b..a81144d07a2 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -74,6 +74,7 @@ type Engine interface { // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic LastModified() time.Time + DiskSize() int64 IsIdle() bool io.WriterTo diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 3a5a98a93e3..cd665e5bbed 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -430,6 +430,11 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { return statistics } +// DiskSize returns the total size in bytes of all TSM and WAL segments on disk. +func (e *Engine) DiskSize() int64 { + return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() +} + // Open opens and initializes the engine. func (e *Engine) Open() error { if err := os.MkdirAll(e.path, 0777); err != nil { diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 34a3569d86f..e3905357702 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -409,6 +409,9 @@ func (f *FileStore) Open() error { f.files = append(f.files, res.r) // Accumulate file store size stats atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size())) + for _, ts := range res.r.TombstoneFiles() { + atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size)) + } // Re-initialize the lastModified time for the file store if res.r.LastModified() > lm { @@ -439,6 +442,10 @@ func (f *FileStore) Close() error { return nil } +func (f *FileStore) DiskSizeBytes() int64 { + return atomic.LoadInt64(&f.stats.DiskBytes) +} + // Read returns the slice of values for the given key and the given timestamp, // if any file matches those constraints. func (f *FileStore) Read(key string, t int64) ([]Value, error) { @@ -628,6 +635,10 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error { var totalSize int64 for _, file := range f.files { totalSize += int64(file.Size()) + for _, ts := range file.TombstoneFiles() { + totalSize += int64(ts.Size) + } + } atomic.StoreInt64(&f.stats.DiskBytes, totalSize) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index faff78c425f..28082629b5a 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -375,6 +375,10 @@ func (l *WAL) LastWriteTime() time.Time { return l.lastWriteTime } +func (l *WAL) DiskSizeBytes() int64 { + return atomic.LoadInt64(&l.stats.OldBytes) + atomic.LoadInt64(&l.stats.CurrentBytes) +} + func (l *WAL) writeToLog(entry WALEntry) (int, error) { // limit how many concurrent encodings can be in flight. Since we can only // write one at a time to disk, a slow disk can cause the allocations below diff --git a/tsdb/shard.go b/tsdb/shard.go index c03030093ed..294b8489398 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "math" - "os" "path/filepath" "regexp" "sort" @@ -207,6 +206,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { return nil } + // Refresh our disk size stat + _, _ = s.DiskSize() + // TODO(edd): Should statSeriesCreate be the current number of series in the // shard, or the total number of series ever created? sSketch, tSketch, err := s.engine.SeriesSketches() @@ -289,9 +291,6 @@ func (s *Shard) Open() error { } s.engine = e - s.wg.Add(1) - go s.monitor() - return nil }(); err != nil { s.close(true) @@ -355,6 +354,12 @@ func (s *Shard) close(clean bool) error { return err } +func (s *Shard) IndexType() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.index.Type() +} + // ready determines if the Shard is ready for queries or writes. // It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled func (s *Shard) ready() error { @@ -402,33 +407,9 @@ func (s *Shard) SetCompactionsEnabled(enabled bool) { // DiskSize returns the size on disk of this shard func (s *Shard) DiskSize() (int64, error) { - var size int64 - err := filepath.Walk(s.path, func(_ string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - - if !fi.IsDir() { - size += fi.Size() - } - return err - }) - if err != nil { - return 0, err - } - - err = filepath.Walk(s.walPath, func(_ string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - - if !fi.IsDir() { - size += fi.Size() - } - return err - }) - - return size, err + size := s.engine.DiskSize() + atomic.StoreInt64(&s.stats.DiskBytes, size) + return size, nil } // FieldCreate holds information for a field to create on a measurement. @@ -984,63 +965,12 @@ func (s *Shard) CreateSnapshot() (string, error) { return s.engine.CreateSnapshot() } -func (s *Shard) monitor() { - defer s.wg.Done() - - t := time.NewTicker(monitorStatInterval) - defer t.Stop() - t2 := time.NewTicker(time.Minute) - defer t2.Stop() - var changed time.Time - - for { - select { - case <-s.closing: - return - case <-t.C: - // Checking DiskSize can be expensive with a lot of shards and TSM files, only - // check if something has changed. - lm := s.LastModified() - if lm.Equal(changed) { - continue - } - - size, err := s.DiskSize() - if err != nil { - s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err)) - continue - } - atomic.StoreInt64(&s.stats.DiskBytes, size) - changed = lm - case <-t2.C: - if s.options.Config.MaxValuesPerTag == 0 { - continue - } - - names, err := s.MeasurementNamesByExpr(nil) - if err != nil { - s.logger.Warn("cannot retrieve measurement names", zap.Error(err)) - continue - } - - for _, name := range names { - s.engine.ForEachMeasurementTagKey(name, func(k []byte) error { - n := s.engine.TagKeyCardinality(name, k) - perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100) - if perc > 100 { - perc = 100 - } +func (s *Shard) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { + return s.engine.ForEachMeasurementTagKey(name, fn) +} - // Log at 80, 85, 90-100% levels - if perc == 80 || perc == 85 || perc >= 90 { - s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", - perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, name, k)) - } - return nil - }) - } - } - } +func (s *Shard) TagKeyCardinality(name, key []byte) int { + return s.engine.TagKeyCardinality(name, key) } type ShardGroup interface { diff --git a/tsdb/store.go b/tsdb/store.go index 3abe51f3f92..cb4aafe2725 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1055,6 +1055,8 @@ func (s *Store) monitorShards() { defer s.wg.Done() t := time.NewTicker(10 * time.Second) defer t.Stop() + t2 := time.NewTicker(time.Minute) + defer t2.Stop() for { select { case <-s.closing: @@ -1069,6 +1071,45 @@ func (s *Store) monitorShards() { } } s.mu.RUnlock() + case <-t2.C: + if s.EngineOptions.Config.MaxValuesPerTag == 0 { + continue + } + + s.mu.RLock() + shards := s.filterShards(func(sh *Shard) bool { + return sh.IndexType() == "inmem" + }) + s.mu.RUnlock() + + s.walkShards(shards, func(sh *Shard) error { + db := sh.database + id := sh.id + + names, err := sh.MeasurementNamesByExpr(nil) + if err != nil { + s.Logger.Warn("cannot retrieve measurement names", zap.Error(err)) + return nil + } + + for _, name := range names { + sh.ForEachMeasurementTagKey(name, func(k []byte) error { + n := sh.TagKeyCardinality(name, k) + perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100) + if perc > 100 { + perc = 100 + } + + // Log at 80, 85, 90-100% levels + if perc == 80 || perc == 85 || perc >= 90 { + s.Logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", + perc, n, s.EngineOptions.Config.MaxValuesPerTag, db, id, name, k)) + } + return nil + }) + } + return nil + }) } } }