diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 83351636223..51b98f6c252 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -508,7 +508,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ // We've hit the max file limit and there is more to write. Create a new file // and continue. - if err == errMaxFileExceeded { + if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded { files = append(files, fileName) continue } else if err == ErrNoValues { @@ -573,7 +573,16 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) { } // Write the key and value - if err := w.WriteBlock(key, minTime, maxTime, block); err != nil { + err = w.WriteBlock(key, minTime, maxTime, block) + if err == ErrMaxBlocksExceeded { + if err := w.WriteIndex(); err != nil { + return err + } + + return ErrMaxBlocksExceeded + } + + if err != nil { return err } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 5084ac847b8..d6a60501e4c 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -562,6 +562,85 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { } } +// Ensures that a compaction will properly rollover to a new file when the +// max keys per blocks is exceeded +func TestCompactor_CompactFull_MaxKeys(t *testing.T) { + if testing.Short() { + t.Skip("Skipping max keys compaction test") + } + dir := MustTempDir() + defer os.RemoveAll(dir) + + // write two files where the first contains a single key with the maximum + // number of full blocks that can fit in a TSM file + f1, f1Name := MustTSMWriter(dir, 1) + values := make([]tsm1.Value, 1000) + for i := 0; i < 65535; i++ { + values = values[:0] + for j := 0; j < 1000; j++ { + values = append(values, tsm1.NewValue(int64(i*1000+j), float64(j))) + } + if err := f1.Write("cpu,host=A#!~#value", values); err != nil { + t.Fatalf("write tsm f1: %v", err) + } + } + if err := f1.WriteIndex(); err != nil { + t.Fatalf("write index f1: %v", err) + } + f1.Close() + + // Write a new file with 1 block that when compacted would exceed the max + // blocks + lastTimeStamp := values[len(values)-1].UnixNano() + values = values[:0] + f2, f2Name := MustTSMWriter(dir, 2) + for j := lastTimeStamp; j < lastTimeStamp+1000; j++ { + values = append(values, tsm1.NewValue(int64(j), float64(j))) + } + if err := f2.Write("cpu,host=A#!~#value", values); err != nil { + t.Fatalf("write tsm f1: %v", err) + } + + if err := f2.WriteIndex(); err != nil { + t.Fatalf("write index f2: %v", err) + } + f2.Close() + + compactor := &tsm1.Compactor{ + Dir: dir, + FileStore: &fakeFileStore{}, + } + + // Compact both files, should get 2 files back + files, err := compactor.CompactFull([]string{f1Name, f2Name}) + if err != nil { + t.Fatalf("unexpected error writing snapshot: %v", err) + } + + if got, exp := len(files), 2; got != exp { + t.Fatalf("files length mismatch: got %v, exp %v", got, exp) + } + + expGen, expSeq, err := tsm1.ParseTSMFileName(f2Name) + if err != nil { + t.Fatalf("unexpected error parsing file name: %v", err) + } + expSeq = expSeq + 1 + + gotGen, gotSeq, err := tsm1.ParseTSMFileName(files[0]) + if err != nil { + t.Fatalf("unexpected error parsing file name: %v", err) + } + + if gotGen != expGen { + t.Fatalf("wrong generation for new file: got %v, exp %v", gotGen, expGen) + } + + if gotSeq != expSeq { + t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) + } +} + // Tests that a single TSM file can be read and iterated over func TestTSMKeyIterator_Single(t *testing.T) { dir := MustTempDir() @@ -1399,7 +1478,7 @@ func MustWALSegment(dir string, entries []tsm1.WALEntry) *tsm1.WALSegmentReader return tsm1.NewWALSegmentReader(f) } -func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string { +func MustTSMWriter(dir string, gen int) (tsm1.TSMWriter, string) { f := MustTempFile(dir) oldName := f.Name() @@ -1425,6 +1504,12 @@ func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string { panic(fmt.Sprintf("create TSM writer: %v", err)) } + return w, newName +} + +func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string { + w, name := MustTSMWriter(dir, gen) + for k, v := range values { if err := w.Write(k, v); err != nil { panic(fmt.Sprintf("write TSM value: %v", err)) @@ -1439,7 +1524,7 @@ func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string { panic(fmt.Sprintf("write TSM close: %v", err)) } - return newName + return name } func MustTSMReader(dir string, gen int, values map[string][]tsm1.Value) *tsm1.TSMReader { diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index 5b488b07c88..4474c217764 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -102,6 +102,7 @@ var ( ErrNoValues = fmt.Errorf("no values written") ErrTSMClosed = fmt.Errorf("tsm file closed") ErrMaxKeyLengthExceeded = fmt.Errorf("max key length exceeded") + ErrMaxBlocksExceeded = fmt.Errorf("max blocks exceeded") ) // TSMWriter writes TSM formatted key and values. @@ -471,6 +472,9 @@ func (t *tsmWriter) Write(key string, values Values) error { return nil } +// WriteBlock writes block for the given key and time range to the TSM file. If the write +// exceeds max entries for a given key, ErrMaxBlocksExceeded is returned. This indicates +// that the index is now full for this key and no future writes to this key will succeed. func (t *tsmWriter) WriteBlock(key string, minTime, maxTime int64, block []byte) error { // Nothing to write if len(block) == 0 { @@ -509,6 +513,10 @@ func (t *tsmWriter) WriteBlock(key string, minTime, maxTime int64, block []byte) // Increment file position pointer (checksum + block len) t.n += int64(n) + if len(t.index.Entries(key)) >= maxIndexEntries { + return ErrMaxBlocksExceeded + } + return nil }