Skip to content

Commit

Permalink
Check if engine closed. Fixes #6140
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard committed Mar 31, 2016
1 parent 75a2218 commit 8e2d1e4
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- [#6092](https://github.com/influxdata/influxdb/issues/6092): Upgrading directly from 0.9.6.1 to 0.11.0 fails
- [#6061](https://github.com/influxdata/influxdb/issues/6061): [0.12 / master] POST to /write does not write points if request has header 'Content-Type: application/x-www-form-urlencoded'
- [#6121](https://github.com/influxdata/influxdb/issues/6121): Fix panic: slice index out of bounds in TSM index
- [#6140](https://github.com/influxdata/influxdb/issues/6140): Ensure Shard engine not accessed when closed.

## v0.11.0 [2016-03-22]

Expand Down
64 changes: 44 additions & 20 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ var (
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
// there is no mapping for.
ErrFieldUnmappedID = errors.New("field ID not mapped")

// ErrEngineClosed is returned when a caller attempts indirectly to
// access the shard's underlying engine.
ErrEngineClosed = errors.New("engine is closed")
)

// A ShardError implements the error interface, and contains extra
Expand Down Expand Up @@ -181,6 +185,14 @@ func (s *Shard) close() error {
return err
}

// closed determines if the Shard is closed.
func (s *Shard) closed() bool {
s.mu.RLock()
closed := s.engine == nil
s.mu.RUnlock()
return closed
}

// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
stats, err := os.Stat(s.path)
Expand Down Expand Up @@ -217,9 +229,9 @@ type SeriesCreate struct {

// WritePoints will write the raw data points and any new metadata to the index in the shard
func (s *Shard) WritePoints(points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()

if s.closed() {
return ErrEngineClosed
}
s.statMap.Add(statWriteReq, 1)

seriesToCreate, fieldsToCreate, seriesToAddShardTo, err := s.validateSeriesAndFields(points)
Expand Down Expand Up @@ -258,7 +270,9 @@ func (s *Shard) WritePoints(points []models.Point) error {
}

// This was populated earlier, don't need to validate that it's there.
s.mu.RLock()
mf := s.measurementFields[p.Name()]
s.mu.RUnlock()

// If a measurement is dropped while writes for it are in progress, this could be nil
if mf == nil {
Expand All @@ -285,22 +299,26 @@ func (s *Shard) WritePoints(points []models.Point) error {

// DeleteSeries deletes a list of series.
func (s *Shard) DeleteSeries(seriesKeys []string) error {
s.mu.RLock()
defer s.mu.RUnlock()
if s.closed() {
return ErrEngineClosed
}
return s.engine.DeleteSeries(seriesKeys)
}

// DeleteMeasurement deletes a measurement and all underlying series.
func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed() {
return ErrEngineClosed
}

if err := s.engine.DeleteMeasurement(name, seriesKeys); err != nil {
return err
}

// Remove entry from shard index.
s.mu.Lock()
delete(s.measurementFields, name)
s.mu.Unlock()

return nil
}
Expand All @@ -310,8 +328,8 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (map[
return nil, nil
}

s.mu.RLock()
defer s.mu.RUnlock()
s.mu.Lock()
defer s.mu.Unlock()

// add fields
measurementsToSave := make(map[string]*MeasurementFields)
Expand Down Expand Up @@ -347,9 +365,6 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate,
var seriesToAddShardTo []string

// get the shard mutex for locally defined fields
s.mu.RLock()
defer s.mu.RUnlock()

for _, p := range points {
// see if the series should be added to the index
if ss := s.index.Series(string(p.Key())); ss == nil {
Expand All @@ -363,7 +378,10 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate,
}

// see if the field definitions need to be saved to the shard
s.mu.RLock()
mf := s.measurementFields[p.Name()]
s.mu.RUnlock()

if mf == nil {
for name, value := range p.Fields() {
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: name, Type: influxql.InspectDataType(value)}})
Expand Down Expand Up @@ -391,27 +409,31 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate,

// SeriesCount returns the number of series buckets on the shard.
func (s *Shard) SeriesCount() (int, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.closed() {
return 0, ErrEngineClosed
}
return s.engine.SeriesCount()
}

// WriteTo writes the shard's data to w.
func (s *Shard) WriteTo(w io.Writer) (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.closed() {
return 0, ErrEngineClosed
}
n, err := s.engine.WriteTo(w)
s.statMap.Add(statWriteBytes, int64(n))
return n, err
}

// CreateIterator returns an iterator for the data in the shard.
func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
if s.closed() {
return nil, ErrEngineClosed
}

if influxql.Sources(opt.Sources).HasSystemSource() {
return s.createSystemIterator(opt)
}
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.CreateIterator(opt)
}

Expand Down Expand Up @@ -468,6 +490,10 @@ func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions ma

// SeriesKeys returns a list of series in the shard.
func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) {
if s.closed() {
return nil, ErrEngineClosed
}

if influxql.Sources(opt.Sources).HasSystemSource() {
// Only support a single system source.
if len(opt.Sources) > 1 {
Expand All @@ -483,8 +509,6 @@ func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e
return []influxql.Series{{Aux: auxFields}}, nil
}

s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.SeriesKeys(opt)
}

Expand Down
24 changes: 19 additions & 5 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
const DefaultPrecision = "s"

func TestShardWriteAndIndex(t *testing.T) {
t.Skip("pending tsm1 iterator impl")

tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
Expand All @@ -34,6 +32,13 @@ func TestShardWriteAndIndex(t *testing.T) {
opts.Config.WALDir = filepath.Join(tmpDir, "wal")

sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)

// Calling WritePoints when the engine is not open will return
// ErrEngineClosed.
if got, exp := sh.WritePoints(nil), tsdb.ErrEngineClosed; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
Expand Down Expand Up @@ -92,8 +97,6 @@ func TestShardWriteAndIndex(t *testing.T) {
}

func TestShardWriteAddNewField(t *testing.T) {
t.Skip("pending tsm1 iterator impl")

tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
Expand Down Expand Up @@ -151,7 +154,18 @@ func TestShardWriteAddNewField(t *testing.T) {

// Ensure a shard can create iterators for its underlying data.
func TestShard_CreateIterator(t *testing.T) {
sh := MustOpenShard()
sh := NewShard()

// Calling CreateIterator when the engine is not open will return
// ErrEngineClosed.
_, got := sh.CreateIterator(influxql.IteratorOptions{})
if exp := tsdb.ErrEngineClosed; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

if err := sh.Open(); err != nil {
t.Fatal(err)
}
defer sh.Close()

sh.MustWritePointsString(`
Expand Down

0 comments on commit 8e2d1e4

Please sign in to comment.