diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index bc8629e9b62..97f9f5fe1df 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -269,8 +269,8 @@ func (i *Index) Open() (rErr error) { i.partitions = make([]*Partition, i.PartitionN) for j := 0; j < len(i.partitions); j++ { p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j))) - p.MaxLogFileSize = i.maxLogFileSize - p.MaxLogFileAge = i.maxLogFileAge + p.maxLogFileSize = i.maxLogFileSize + p.maxLogFileAge = i.maxLogFileAge p.nosync = i.disableFsync p.logbufferSize = i.logfileBufferSize p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1))) diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index af328763c45..1fad7edf817 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -40,8 +40,7 @@ const ManifestFileName = "MANIFEST" // Partition represents a collection of layered index files and WAL. type Partition struct { - // exported for tests - Mu sync.RWMutex + mu sync.RWMutex opened bool sfile *tsdb.SeriesFile // series lookup file @@ -71,8 +70,9 @@ type Partition struct { id string // id portion of path. // Log file compaction thresholds. - MaxLogFileSize int64 - MaxLogFileAge time.Duration + // Should be read/changed under the lock after a partition is opened. + maxLogFileSize int64 + maxLogFileAge time.Duration nosync bool // when true, flushing and syncing of LogFile will be disabled. logbufferSize int // the LogFile's buffer is set to this value. @@ -99,7 +99,7 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { sfile: sfile, seriesIDSet: tsdb.NewSeriesIDSet(), fileSet: &FileSet{}, - MaxLogFileSize: tsdb.DefaultMaxIndexLogFileSize, + maxLogFileSize: tsdb.DefaultMaxIndexLogFileSize, // compactionEnabled: true, compactionInterrupt: make(chan struct{}), @@ -111,6 +111,16 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { return p } +// SetMaxLogFileSize provides a setter for the partition setting of maxLogFileSize +// that is otherwise only available at creation time. Returns the previous value. +// Only for tests! +func (p *Partition) SetMaxLogFileSize(new int64) (old int64) { + p.mu.Lock() + old, p.maxLogFileSize = p.maxLogFileSize, new + p.mu.Unlock() + return old +} + // bytes estimates the memory footprint of this Partition, in bytes. func (p *Partition) bytes() int { var b int @@ -135,8 +145,8 @@ func (p *Partition) bytes() int { b += int(unsafe.Sizeof(p.fieldset)) + p.fieldset.Bytes() b += int(unsafe.Sizeof(p.path)) + len(p.path) b += int(unsafe.Sizeof(p.id)) + len(p.id) - b += int(unsafe.Sizeof(p.MaxLogFileSize)) - b += int(unsafe.Sizeof(p.MaxLogFileAge)) + b += int(unsafe.Sizeof(p.maxLogFileSize)) + b += int(unsafe.Sizeof(p.maxLogFileAge)) b += int(unsafe.Sizeof(p.compactionInterrupt)) b += int(unsafe.Sizeof(p.compactionsDisabled)) b += int(unsafe.Sizeof(p.logger)) @@ -151,8 +161,8 @@ var ErrIncompatibleVersion = errors.New("incompatible tsi1 index MANIFEST") // Open opens the partition. func (p *Partition) Open() (rErr error) { - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() p.closing = make(chan struct{}) @@ -214,7 +224,7 @@ func (p *Partition) Open() (rErr error) { // Make first log file active, if within threshold. sz, _ := f.Stat() - if p.activeLogFile == nil && sz < p.MaxLogFileSize { + if p.activeLogFile == nil && sz < p.maxLogFileSize { p.activeLogFile = f } @@ -340,8 +350,8 @@ func (p *Partition) buildSeriesSet() error { // CurrentCompactionN returns the number of compactions currently running. func (p *Partition) CurrentCompactionN() int { - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.currentCompactionN } @@ -368,23 +378,22 @@ func (p *Partition) Close() error { p.Wait() // Lock index and close remaining - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() if p.fileSet == nil { return nil } // Close log files. - var err error + var err []error for _, f := range p.fileSet.files { - if localErr := f.Close(); localErr != nil { - err = localErr - } + localErr := f.Close() + err = append(err, localErr) } p.fileSet.files = nil - return err + return errors.Join(err...) } // closing returns true if the partition is currently closing. It does not require @@ -406,8 +415,8 @@ func (p *Partition) SeriesFile() *tsdb.SeriesFile { return p.sfile } // NextSequence returns the next file identifier. func (p *Partition) NextSequence() int { - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() return p.nextSequence() } @@ -449,8 +458,8 @@ func (p *Partition) manifest(newFileSet *FileSet) *Manifest { // SetManifestPathForTest is only to force a bad path in testing func (p *Partition) SetManifestPathForTest(path string) { - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() p.manifestPathFn = func() string { return path } } @@ -461,16 +470,16 @@ func (p *Partition) WithLogger(logger *zap.Logger) { // SetFieldSet sets a shared field set from the engine. func (p *Partition) SetFieldSet(fs *tsdb.MeasurementFieldSet) { - p.Mu.Lock() + p.mu.Lock() p.fieldset = fs - p.Mu.Unlock() + p.mu.Unlock() } // FieldSet returns the fieldset. func (p *Partition) FieldSet() *tsdb.MeasurementFieldSet { - p.Mu.Lock() + p.mu.Lock() fs := p.fieldset - p.Mu.Unlock() + p.mu.Unlock() return fs } @@ -480,8 +489,8 @@ func (p *Partition) RetainFileSet() (*FileSet, error) { case <-p.closing: return nil, tsdb.ErrIndexClosing default: - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.retainFileSet(), nil } } @@ -494,8 +503,8 @@ func (p *Partition) retainFileSet() *FileSet { // FileN returns the active files in the file set. func (p *Partition) FileN() int { - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return len(p.fileSet.files) } @@ -641,8 +650,8 @@ func (p *Partition) DropMeasurement(name []byte) error { // Delete key if not already deleted. if !k.Deleted() { if err := func() error { - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.activeLogFile.DeleteTagKey(name, k.Key()) }(); err != nil { return err @@ -654,8 +663,8 @@ func (p *Partition) DropMeasurement(name []byte) error { for v := vitr.Next(); v != nil; v = vitr.Next() { if !v.Deleted() { if err := func() error { - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value()) }(); err != nil { return err @@ -677,8 +686,8 @@ func (p *Partition) DropMeasurement(name []byte) error { break } if err := func() error { - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.activeLogFile.DeleteSeriesID(elem.SeriesID) }(); err != nil { return err @@ -691,8 +700,8 @@ func (p *Partition) DropMeasurement(name []byte) error { // Mark measurement as deleted. if err := func() error { - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.activeLogFile.DeleteMeasurement(name) }(); err != nil { return err @@ -724,14 +733,14 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode defer fs.Release() // Ensure fileset cannot change during insert. - p.Mu.RLock() + p.mu.RLock() // Insert series into log file. ids, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, names, tagsSlice) if err != nil { - p.Mu.RUnlock() + p.mu.RUnlock() return nil, err } - p.Mu.RUnlock() + p.mu.RUnlock() if err := p.CheckLogFile(); err != nil { return nil, err @@ -742,8 +751,8 @@ func (p *Partition) createSeriesListIfNotExists(names [][]byte, tagsSlice []mode func (p *Partition) DropSeries(seriesID uint64) error { // Delete series from index. if err := func() error { - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.activeLogFile.DeleteSeriesID(seriesID) }(); err != nil { return err @@ -908,14 +917,14 @@ func (p *Partition) AssignShard(k string, shardID uint64) {} // Compact requests a compaction of log files. func (p *Partition) Compact() { - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() p.compact() } func (p *Partition) DisableCompactions() { - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() p.compactionsDisabled++ select { @@ -931,8 +940,8 @@ func (p *Partition) DisableCompactions() { } func (p *Partition) EnableCompactions() { - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() // Already enabled? if p.compactionsEnabled() { @@ -950,9 +959,9 @@ func (p *Partition) runPeriodicCompaction() { p.Compact() // Avoid a race when using Reopen in tests - p.Mu.RLock() + p.mu.RLock() closing := p.closing - p.Mu.RUnlock() + p.mu.RUnlock() // check for compactions once an hour (usually not necessary but a nice safety check) t := time.NewTicker(1 * time.Hour) @@ -975,8 +984,8 @@ func (p *Partition) runPeriodicCompaction() { // If checkRunning = true, only count as needing a compaction if there is not a compaction already // in progress for the level that would be compacted func (p *Partition) NeedsCompaction(checkRunning bool) bool { - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() if p.needsLogCompaction() { return true } @@ -1029,10 +1038,10 @@ func (p *Partition) compact() { p.currentCompactionN++ go func() { p.compactLogFile(logFile) - p.Mu.Lock() + p.mu.Lock() p.currentCompactionN-- p.levelCompacting[0] = false - p.Mu.Unlock() + p.mu.Unlock() p.Compact() }() } @@ -1072,10 +1081,10 @@ func (p *Partition) compact() { p.compactToLevel(files, level+1, interrupt) // Ensure compaction lock for the level is released. - p.Mu.Lock() + p.mu.Lock() p.levelCompacting[level] = false p.currentCompactionN-- - p.Mu.Unlock() + p.mu.Unlock() // Check for new compactions p.Compact() @@ -1153,8 +1162,8 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch // Obtain lock to swap in index file and write manifest. if err := func() (rErr error) { - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() // Replace previous files with new index file. newFileSet := p.fileSet.MustReplace(IndexFiles(files).Files(), file) @@ -1212,14 +1221,14 @@ func (p *Partition) Rebuild() {} func (p *Partition) needsLogCompaction() bool { size := p.activeLogFile.Size() modTime := p.activeLogFile.ModTime() - return size >= p.MaxLogFileSize || (size > 0 && modTime.Before(time.Now().Add(-p.MaxLogFileAge))) + return size >= p.maxLogFileSize || (size > 0 && modTime.Before(time.Now().Add(-p.maxLogFileAge))) } func (p *Partition) CheckLogFile() error { // Check log file under read lock. needsCompaction := func() bool { - p.Mu.RLock() - defer p.Mu.RUnlock() + p.mu.RLock() + defer p.mu.RUnlock() return p.needsLogCompaction() }() if !needsCompaction { @@ -1227,8 +1236,8 @@ func (p *Partition) CheckLogFile() error { } // If file size exceeded then recheck under write lock and swap files. - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() return p.checkLogFile() } @@ -1258,9 +1267,9 @@ func (p *Partition) compactLogFile(logFile *LogFile) { return } - p.Mu.Lock() + p.mu.Lock() interrupt := p.compactionInterrupt - p.Mu.Unlock() + p.mu.Unlock() start := time.Now() @@ -1310,8 +1319,8 @@ func (p *Partition) compactLogFile(logFile *LogFile) { // Obtain lock to swap in index file and write manifest. if err := func() (rErr error) { - p.Mu.Lock() - defer p.Mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() // Replace previous log file with index file. newFileSet := p.fileSet.MustReplace([]File{logFile}, file) diff --git a/tsdb/index/tsi1/partition_test.go b/tsdb/index/tsi1/partition_test.go index f53c67908c4..b79c460e8bc 100644 --- a/tsdb/index/tsi1/partition_test.go +++ b/tsdb/index/tsi1/partition_test.go @@ -108,7 +108,7 @@ func TestPartition_PrependLogFile_Write_Fail(t *testing.T) { t.Fatalf("error closing partition: %v", err) } }) - p.Partition.MaxLogFileSize = -1 + p.SetMaxLogFileSize(-1) fileN := p.FileN() p.CheckLogFile() if fileN >= p.FileN() { @@ -134,9 +134,7 @@ func TestPartition_Compact_Write_Fail(t *testing.T) { t.Fatalf("error closing partition: %v", err) } }) - p.Partition.Mu.Lock() - p.Partition.MaxLogFileSize = -1 - p.Partition.Mu.Unlock() + p.SetMaxLogFileSize(-1) fileN := p.FileN() p.Compact() if (1 + fileN) != p.FileN() {