Skip to content

Commit

Permalink
Ensure syncronised access to engine
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard committed Mar 31, 2016
1 parent e7cce69 commit 75a2218
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ type Shard struct {
database string
retentionPolicy string

engine Engine
options EngineOptions

mu sync.RWMutex
measurementFields map[string]*MeasurementFields // measurement name to their fields
engine Engine

// expvar-based stats.
statMap *expvar.Map
Expand Down Expand Up @@ -217,6 +217,9 @@ type SeriesCreate struct {

// WritePoints will write the raw data points and any new metadata to the index in the shard
func (s *Shard) WritePoints(points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()

s.statMap.Add(statWriteReq, 1)

seriesToCreate, fieldsToCreate, seriesToAddShardTo, err := s.validateSeriesAndFields(points)
Expand Down Expand Up @@ -255,9 +258,7 @@ func (s *Shard) WritePoints(points []models.Point) error {
}

// This was populated earlier, don't need to validate that it's there.
s.mu.RLock()
mf := s.measurementFields[p.Name()]
s.mu.RUnlock()

// If a measurement is dropped while writes for it are in progress, this could be nil
if mf == nil {
Expand All @@ -284,6 +285,8 @@ func (s *Shard) WritePoints(points []models.Point) error {

// DeleteSeries deletes a list of series.
func (s *Shard) DeleteSeries(seriesKeys []string) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.DeleteSeries(seriesKeys)
}

Expand All @@ -307,8 +310,8 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (map[
return nil, nil
}

s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()

// add fields
measurementsToSave := make(map[string]*MeasurementFields)
Expand Down Expand Up @@ -387,10 +390,16 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate,
}

// SeriesCount returns the number of series buckets on the shard.
func (s *Shard) SeriesCount() (int, error) { return s.engine.SeriesCount() }
func (s *Shard) SeriesCount() (int, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.SeriesCount()
}

// WriteTo writes the shard's data to w.
func (s *Shard) WriteTo(w io.Writer) (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
n, err := s.engine.WriteTo(w)
s.statMap.Add(statWriteBytes, int64(n))
return n, err
Expand All @@ -401,6 +410,8 @@ func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator,
if influxql.Sources(opt.Sources).HasSystemSource() {
return s.createSystemIterator(opt)
}
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.CreateIterator(opt)
}

Expand Down Expand Up @@ -472,6 +483,8 @@ func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e
return []influxql.Series{{Aux: auxFields}}, nil
}

s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.SeriesKeys(opt)
}

Expand Down

0 comments on commit 75a2218

Please sign in to comment.