From 03ced4cc9037868b103510cf110e492ade1d8861 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 11 Jan 2016 11:00:25 -0700 Subject: [PATCH] Load shards concurrently --- tsdb/engine/tsm1/cache.go | 8 ++--- tsdb/engine/tsm1/engine.go | 34 ++++++++++---------- tsdb/engine/tsm1/file_store.go | 23 ++++++++------ tsdb/engine/tsm1/reader.go | 12 +++---- tsdb/engine/tsm1/writer.go | 11 ++++--- tsdb/meta.go | 15 ++++++++- tsdb/shard.go | 19 ++--------- tsdb/store.go | 58 ++++++++++++++++++++++++---------- 8 files changed, 106 insertions(+), 74 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index f94619f4e36..5ce2b3fb648 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -373,12 +373,12 @@ func (c *Cache) Store() map[string]*entry { return c.store } -func (c *Cache) Lock() { - c.mu.Lock() +func (c *Cache) RLock() { + c.mu.RLock() } -func (c *Cache) Unlock() { - c.mu.Unlock() +func (c *Cache) RUnlock() { + c.mu.RUnlock() } // values returns the values for the key. It doesn't lock and assumes the data is diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 82228b95eef..8072b13c5bc 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -187,20 +187,17 @@ func (e *Engine) Close() error { func (e *Engine) SetLogOutput(w io.Writer) {} // LoadMetadataIndex loads the shard metadata into memory. -func (e *Engine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { +func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { // Save reference to index for iterator creation. e.index = index e.measurementFields = measurementFields + start := time.Now() keys := e.FileStore.Keys() keysLoaded := make(map[string]bool) - for _, k := range keys { - typ, err := e.FileStore.Type(k) - if err != nil { - return err - } + for k, typ := range keys { fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) if err != nil { return err @@ -214,8 +211,8 @@ func (e *Engine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, mea } // load metadata from the Cache - e.Cache.Lock() // shouldn't need the lock, but just to be safe - defer e.Cache.Unlock() + e.Cache.RLock() // shouldn't need the lock, but just to be safe + defer e.Cache.RUnlock() for key, entry := range e.Cache.Store() { if keysLoaded[key] { @@ -224,7 +221,7 @@ func (e *Engine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, mea fieldType, err := entry.values.InfluxQLType() if err != nil { - log.Printf("error getting the data type of values for key %s: %s", key, err.Error()) + e.logger.Printf("error getting the data type of values for key %s: %s", key, err.Error()) continue } @@ -233,6 +230,10 @@ func (e *Engine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, mea } } + // sh may be nil in tests + if sh != nil { + e.logger.Printf("%s database index loaded in %s", sh.Path(), time.Now().Sub(start)) + } return nil } @@ -370,31 +371,32 @@ func (e *Engine) DeleteSeries(seriesKeys []string) error { var deleteKeys []string // go through the keys in the file store - for _, k := range e.FileStore.Keys() { + for k := range e.FileStore.Keys() { seriesKey, _ := seriesAndFieldFromCompositeKey(k) if _, ok := keyMap[seriesKey]; ok { deleteKeys = append(deleteKeys, k) } } - e.FileStore.Delete(deleteKeys) + if err := e.FileStore.Delete(deleteKeys); err != nil { + return err + } // find the keys in the cache and remove them walKeys := make([]string, 0) - e.Cache.Lock() - defer e.Cache.Unlock() - + e.Cache.RLock() s := e.Cache.Store() for k, _ := range s { seriesKey, _ := seriesAndFieldFromCompositeKey(k) if _, ok := keyMap[seriesKey]; ok { walKeys = append(walKeys, k) - delete(s, k) } } + e.Cache.RUnlock() + + e.Cache.Delete(walKeys) // delete from the WAL _, err := e.WAL.Delete(walKeys) - return err } diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 7796ceab369..46a53cff19e 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -52,6 +52,12 @@ type TSMFile interface { // Keys returns all keys contained in the file. Keys() []string + // KeyCount returns the number of distict keys in the file. + KeyCount() int + + // KeyAt returns the key located at index position idx + KeyAt(idx int) (string, byte) + // Type returns the block type of the values stored for the key. Returns one of // BlockFloat64, BlockInt64, BlockBoolean, BlockString. If key does not exist, // an error is returned. @@ -204,23 +210,20 @@ func (f *FileStore) Remove(paths ...string) { sort.Sort(tsmReaders(f.files)) } -func (f *FileStore) Keys() []string { +// Keys returns all keys and types for all files +func (f *FileStore) Keys() map[string]byte { f.mu.RLock() defer f.mu.RUnlock() - uniqueKeys := map[string]struct{}{} + uniqueKeys := map[string]byte{} for _, f := range f.files { - for _, key := range f.Keys() { - uniqueKeys[key] = struct{}{} + for i := 0; i < f.KeyCount(); i++ { + key, typ := f.KeyAt(i) + uniqueKeys[key] = typ } } - var keys []string - for key := range uniqueKeys { - keys = append(keys, key) - } - sort.Strings(keys) - return keys + return uniqueKeys } func (f *FileStore) Type(key string) (byte, error) { diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index f76f8c92c86..256f7f0e44e 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -50,7 +50,7 @@ func (b *BlockIterator) PeekNext() string { if len(b.entries) > 1 { return b.key } else if b.n-b.i > 1 { - key := b.r.KeyAt(b.i + 1) + key, _ := b.r.KeyAt(b.i + 1) return key } return "" @@ -197,7 +197,7 @@ func (t *TSMReader) Key(index int) (string, []IndexEntry) { return t.index.Key(index) } -func (t *TSMReader) KeyAt(idx int) string { +func (t *TSMReader) KeyAt(idx int) (string, byte) { return t.index.KeyAt(idx) } @@ -557,15 +557,15 @@ func (d *indirectIndex) Key(idx int) (string, []IndexEntry) { return string(key), entries.entries } -func (d *indirectIndex) KeyAt(idx int) string { +func (d *indirectIndex) KeyAt(idx int) (string, byte) { d.mu.RLock() defer d.mu.RUnlock() if idx < 0 || idx >= len(d.offsets) { - return "" + return "", 0 } - _, key, _ := readKey(d.b[d.offsets[idx]:]) - return string(key) + n, key, _ := readKey(d.b[d.offsets[idx]:]) + return string(key), d.b[d.offsets[idx]+int32(n)] } func (d *indirectIndex) KeyCount() int { diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index a9c0b3f1962..690d8978eee 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -162,7 +162,7 @@ type TSMIndex interface { Key(index int) (string, []IndexEntry) // KeyAt returns the key in the index at the given postion. - KeyAt(index int) string + KeyAt(index int) (string, byte) // KeyCount returns the count of unique keys in the index. KeyCount() int @@ -367,11 +367,14 @@ func (d *directIndex) Key(idx int) (string, []IndexEntry) { return k, d.blocks[k].entries } -func (d *directIndex) KeyAt(idx int) string { +func (d *directIndex) KeyAt(idx int) (string, byte) { if idx < 0 || idx >= len(d.blocks) { - return "" + return "", 0 } - return d.Keys()[idx] + key := d.Keys()[idx] + entries := d.blocks[key] + return key, entries.Type + } func (d *directIndex) KeyCount() int { diff --git a/tsdb/meta.go b/tsdb/meta.go index 07960079381..549281b7b7e 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -96,6 +96,8 @@ func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries in // CreateSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series { + d.mu.Lock() + defer d.mu.Unlock() // if there is a measurement for this id, it's already been added ss := d.series[series.Key] if ss != nil { @@ -103,7 +105,7 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser } // get or create the measurement index - m := d.CreateMeasurementIndexIfNotExists(measurementName) + m := d.createMeasurementIndexIfNotExists(measurementName) // set the in memory ID for query processing on this shard series.id = d.lastID + 1 @@ -121,6 +123,13 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser // CreateMeasurementIndexIfNotExists creates or retrieves an in memory index object for the measurement func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement { + d.mu.Lock() + defer d.mu.Unlock() + return d.createMeasurementIndexIfNotExists(name) +} + +// createMeasurementIndexIfNotExists creates or retrieves an in memory index object for the measurement +func (d *DatabaseIndex) createMeasurementIndexIfNotExists(name string) *Measurement { name = escape.UnescapeString(name) m := d.measurements[name] if m == nil { @@ -1295,6 +1304,10 @@ func NewSeries(key string, tags map[string]string) *Series { } } +func (s *Series) AssignShard(shardID uint64) { + s.shardIDs[shardID] = true +} + // MarshalBinary encodes the object to a binary format. func (s *Series) MarshalBinary() ([]byte, error) { var pb internal.Series diff --git a/tsdb/shard.go b/tsdb/shard.go index 8ca85b408ff..c57ba953ed2 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -128,9 +128,6 @@ func (s *Shard) Open() error { s.mu.Lock() defer s.mu.Unlock() - s.index.mu.Lock() - defer s.index.mu.Unlock() - // Return if the shard is already open if s.engine != nil { return nil @@ -231,22 +228,18 @@ func (s *Shard) WritePoints(points []models.Point) error { // add any new series to the in-memory index if len(seriesToCreate) > 0 { - s.index.mu.Lock() for _, ss := range seriesToCreate { s.index.CreateSeriesIndexIfNotExists(ss.Measurement, ss.Series) } - s.index.mu.Unlock() } if len(seriesToAddShardTo) > 0 { - s.index.mu.Lock() for _, k := range seriesToAddShardTo { - ss := s.index.series[k] + ss := s.index.Series(k) if ss != nil { - ss.shardIDs[s.id] = true + ss.AssignShard(s.id) } } - s.index.mu.Unlock() } // add any new fields and keep track of what needs to be saved @@ -317,9 +310,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (map[ return nil, nil } - s.index.mu.Lock() s.mu.Lock() - defer s.index.mu.Unlock() defer s.mu.Unlock() // add fields @@ -355,17 +346,13 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate, var fieldsToCreate []*FieldCreate var seriesToAddShardTo []string - // get the mutex for the in memory index, which is shared across shards - s.index.mu.RLock() - defer s.index.mu.RUnlock() - // get the shard mutex for locally defined fields s.mu.RLock() defer s.mu.RUnlock() for _, p := range points { // see if the series should be added to the index - if ss := s.index.series[string(p.Key())]; ss == nil { + if ss := s.index.Series(string(p.Key())); ss == nil { series := NewSeries(string(p.Key()), p.Tags()) seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series}) seriesToAddShardTo = append(seriesToAddShardTo, series.Key) diff --git a/tsdb/store.go b/tsdb/store.go index eb67902e95d..bb63a1c7aea 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -111,6 +111,14 @@ func (s *Store) loadIndexes() error { } func (s *Store) loadShards() error { + // struct to hold the result of opening each reader in a goroutine + type res struct { + s *Shard + err error + } + resC := make(chan *res) + var n int + // loop through the current database indexes for db := range s.databaseIndexes { rps, err := ioutil.ReadDir(filepath.Join(s.path, db)) @@ -130,27 +138,43 @@ func (s *Store) loadShards() error { return err } for _, sh := range shards { - path := filepath.Join(s.path, db, rp.Name(), sh.Name()) - walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp.Name(), sh.Name()) - - // Shard file names are numeric shardIDs - shardID, err := strconv.ParseUint(sh.Name(), 10, 64) - if err != nil { - s.Logger.Printf("%s is not a valid ID. Skipping shard.", sh.Name()) - continue - } - - shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions) - err = shard.Open() - if err != nil { - return err - } - - s.shards[shardID] = shard + n++ + go func(index *DatabaseIndex, db, rp, sh string) { + start := time.Now() + path := filepath.Join(s.path, db, rp, sh) + walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh) + + // Shard file names are numeric shardIDs + shardID, err := strconv.ParseUint(sh, 10, 64) + if err != nil { + resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} + return + } + + shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions) + + err = shard.Open() + if err != nil { + resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)} + return + } + + resC <- &res{s: shard} + s.Logger.Printf("%s opened in %s", path, time.Now().Sub(start)) + }(s.databaseIndexes[db], db, rp.Name(), sh.Name()) } } } + for i := 0; i < n; i++ { + res := <-resC + if res.err != nil { + s.Logger.Println(res.err) + continue + } + s.shards[res.s.id] = res.s + } + close(resC) return nil }