Skip to content

Commit

Permalink
Load shards concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
jwilder committed Mar 29, 2016
1 parent 819e7a8 commit 03ced4c
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 74 deletions.
8 changes: 4 additions & 4 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,12 @@ func (c *Cache) Store() map[string]*entry {
return c.store
}

func (c *Cache) Lock() {
c.mu.Lock()
func (c *Cache) RLock() {
c.mu.RLock()
}

func (c *Cache) Unlock() {
c.mu.Unlock()
func (c *Cache) RUnlock() {
c.mu.RUnlock()
}

// values returns the values for the key. It doesn't lock and assumes the data is
Expand Down
34 changes: 18 additions & 16 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,17 @@ func (e *Engine) Close() error {
func (e *Engine) SetLogOutput(w io.Writer) {}

// LoadMetadataIndex loads the shard metadata into memory.
func (e *Engine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
// Save reference to index for iterator creation.
e.index = index
e.measurementFields = measurementFields

start := time.Now()
keys := e.FileStore.Keys()

keysLoaded := make(map[string]bool)

for _, k := range keys {
typ, err := e.FileStore.Type(k)
if err != nil {
return err
}
for k, typ := range keys {
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil {
return err
Expand All @@ -214,8 +211,8 @@ func (e *Engine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, mea
}

// load metadata from the Cache
e.Cache.Lock() // shouldn't need the lock, but just to be safe
defer e.Cache.Unlock()
e.Cache.RLock() // shouldn't need the lock, but just to be safe
defer e.Cache.RUnlock()

for key, entry := range e.Cache.Store() {
if keysLoaded[key] {
Expand All @@ -224,7 +221,7 @@ func (e *Engine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, mea

fieldType, err := entry.values.InfluxQLType()
if err != nil {
log.Printf("error getting the data type of values for key %s: %s", key, err.Error())
e.logger.Printf("error getting the data type of values for key %s: %s", key, err.Error())
continue
}

Expand All @@ -233,6 +230,10 @@ func (e *Engine) LoadMetadataIndex(_ *tsdb.Shard, index *tsdb.DatabaseIndex, mea
}
}

// sh may be nil in tests
if sh != nil {
e.logger.Printf("%s database index loaded in %s", sh.Path(), time.Now().Sub(start))
}
return nil
}

Expand Down Expand Up @@ -370,31 +371,32 @@ func (e *Engine) DeleteSeries(seriesKeys []string) error {

var deleteKeys []string
// go through the keys in the file store
for _, k := range e.FileStore.Keys() {
for k := range e.FileStore.Keys() {
seriesKey, _ := seriesAndFieldFromCompositeKey(k)
if _, ok := keyMap[seriesKey]; ok {
deleteKeys = append(deleteKeys, k)
}
}
e.FileStore.Delete(deleteKeys)
if err := e.FileStore.Delete(deleteKeys); err != nil {
return err
}

// find the keys in the cache and remove them
walKeys := make([]string, 0)
e.Cache.Lock()
defer e.Cache.Unlock()

e.Cache.RLock()
s := e.Cache.Store()
for k, _ := range s {
seriesKey, _ := seriesAndFieldFromCompositeKey(k)
if _, ok := keyMap[seriesKey]; ok {
walKeys = append(walKeys, k)
delete(s, k)
}
}
e.Cache.RUnlock()

e.Cache.Delete(walKeys)

// delete from the WAL
_, err := e.WAL.Delete(walKeys)

return err
}

Expand Down
23 changes: 13 additions & 10 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type TSMFile interface {
// Keys returns all keys contained in the file.
Keys() []string

// KeyCount returns the number of distict keys in the file.
KeyCount() int

// KeyAt returns the key located at index position idx
KeyAt(idx int) (string, byte)

// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBoolean, BlockString. If key does not exist,
// an error is returned.
Expand Down Expand Up @@ -204,23 +210,20 @@ func (f *FileStore) Remove(paths ...string) {
sort.Sort(tsmReaders(f.files))
}

func (f *FileStore) Keys() []string {
// Keys returns all keys and types for all files
func (f *FileStore) Keys() map[string]byte {
f.mu.RLock()
defer f.mu.RUnlock()

uniqueKeys := map[string]struct{}{}
uniqueKeys := map[string]byte{}
for _, f := range f.files {
for _, key := range f.Keys() {
uniqueKeys[key] = struct{}{}
for i := 0; i < f.KeyCount(); i++ {
key, typ := f.KeyAt(i)
uniqueKeys[key] = typ
}
}

var keys []string
for key := range uniqueKeys {
keys = append(keys, key)
}
sort.Strings(keys)
return keys
return uniqueKeys
}

func (f *FileStore) Type(key string) (byte, error) {
Expand Down
12 changes: 6 additions & 6 deletions tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (b *BlockIterator) PeekNext() string {
if len(b.entries) > 1 {
return b.key
} else if b.n-b.i > 1 {
key := b.r.KeyAt(b.i + 1)
key, _ := b.r.KeyAt(b.i + 1)
return key
}
return ""
Expand Down Expand Up @@ -197,7 +197,7 @@ func (t *TSMReader) Key(index int) (string, []IndexEntry) {
return t.index.Key(index)
}

func (t *TSMReader) KeyAt(idx int) string {
func (t *TSMReader) KeyAt(idx int) (string, byte) {
return t.index.KeyAt(idx)
}

Expand Down Expand Up @@ -557,15 +557,15 @@ func (d *indirectIndex) Key(idx int) (string, []IndexEntry) {
return string(key), entries.entries
}

func (d *indirectIndex) KeyAt(idx int) string {
func (d *indirectIndex) KeyAt(idx int) (string, byte) {
d.mu.RLock()
defer d.mu.RUnlock()

if idx < 0 || idx >= len(d.offsets) {
return ""
return "", 0
}
_, key, _ := readKey(d.b[d.offsets[idx]:])
return string(key)
n, key, _ := readKey(d.b[d.offsets[idx]:])
return string(key), d.b[d.offsets[idx]+int32(n)]
}

func (d *indirectIndex) KeyCount() int {
Expand Down
11 changes: 7 additions & 4 deletions tsdb/engine/tsm1/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type TSMIndex interface {
Key(index int) (string, []IndexEntry)

// KeyAt returns the key in the index at the given postion.
KeyAt(index int) string
KeyAt(index int) (string, byte)

// KeyCount returns the count of unique keys in the index.
KeyCount() int
Expand Down Expand Up @@ -367,11 +367,14 @@ func (d *directIndex) Key(idx int) (string, []IndexEntry) {
return k, d.blocks[k].entries
}

func (d *directIndex) KeyAt(idx int) string {
func (d *directIndex) KeyAt(idx int) (string, byte) {
if idx < 0 || idx >= len(d.blocks) {
return ""
return "", 0
}
return d.Keys()[idx]
key := d.Keys()[idx]
entries := d.blocks[key]
return key, entries.Type

}

func (d *directIndex) KeyCount() int {
Expand Down
15 changes: 14 additions & 1 deletion tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,16 @@ func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries in

// CreateSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object
func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series {
d.mu.Lock()
defer d.mu.Unlock()
// if there is a measurement for this id, it's already been added
ss := d.series[series.Key]
if ss != nil {
return ss
}

// get or create the measurement index
m := d.CreateMeasurementIndexIfNotExists(measurementName)
m := d.createMeasurementIndexIfNotExists(measurementName)

// set the in memory ID for query processing on this shard
series.id = d.lastID + 1
Expand All @@ -121,6 +123,13 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser

// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index object for the measurement
func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement {
d.mu.Lock()
defer d.mu.Unlock()
return d.createMeasurementIndexIfNotExists(name)
}

// createMeasurementIndexIfNotExists creates or retrieves an in memory index object for the measurement
func (d *DatabaseIndex) createMeasurementIndexIfNotExists(name string) *Measurement {
name = escape.UnescapeString(name)
m := d.measurements[name]
if m == nil {
Expand Down Expand Up @@ -1295,6 +1304,10 @@ func NewSeries(key string, tags map[string]string) *Series {
}
}

func (s *Series) AssignShard(shardID uint64) {
s.shardIDs[shardID] = true
}

// MarshalBinary encodes the object to a binary format.
func (s *Series) MarshalBinary() ([]byte, error) {
var pb internal.Series
Expand Down
19 changes: 3 additions & 16 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ func (s *Shard) Open() error {
s.mu.Lock()
defer s.mu.Unlock()

s.index.mu.Lock()
defer s.index.mu.Unlock()

// Return if the shard is already open
if s.engine != nil {
return nil
Expand Down Expand Up @@ -231,22 +228,18 @@ func (s *Shard) WritePoints(points []models.Point) error {

// add any new series to the in-memory index
if len(seriesToCreate) > 0 {
s.index.mu.Lock()
for _, ss := range seriesToCreate {
s.index.CreateSeriesIndexIfNotExists(ss.Measurement, ss.Series)
}
s.index.mu.Unlock()
}

if len(seriesToAddShardTo) > 0 {
s.index.mu.Lock()
for _, k := range seriesToAddShardTo {
ss := s.index.series[k]
ss := s.index.Series(k)
if ss != nil {
ss.shardIDs[s.id] = true
ss.AssignShard(s.id)
}
}
s.index.mu.Unlock()
}

// add any new fields and keep track of what needs to be saved
Expand Down Expand Up @@ -317,9 +310,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (map[
return nil, nil
}

s.index.mu.Lock()
s.mu.Lock()
defer s.index.mu.Unlock()
defer s.mu.Unlock()

// add fields
Expand Down Expand Up @@ -355,17 +346,13 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate,
var fieldsToCreate []*FieldCreate
var seriesToAddShardTo []string

// get the mutex for the in memory index, which is shared across shards
s.index.mu.RLock()
defer s.index.mu.RUnlock()

// get the shard mutex for locally defined fields
s.mu.RLock()
defer s.mu.RUnlock()

for _, p := range points {
// see if the series should be added to the index
if ss := s.index.series[string(p.Key())]; ss == nil {
if ss := s.index.Series(string(p.Key())); ss == nil {
series := NewSeries(string(p.Key()), p.Tags())
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series})
seriesToAddShardTo = append(seriesToAddShardTo, series.Key)
Expand Down
Loading

0 comments on commit 03ced4c

Please sign in to comment.