diff --git a/CHANGELOG.md b/CHANGELOG.md index cf219aef19b..537b2803e76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,8 @@ With this release the systemd configuration files for InfluxDB will use the syst - [#6986](https://github.com/influxdata/influxdb/pull/6986): update connection settings when changing hosts in cli. - [#6965](https://github.com/influxdata/influxdb/pull/6965): Minor improvements to init script. Removes sysvinit-utils as package dependency. - [#6952](https://github.com/influxdata/influxdb/pull/6952): Fix compaction planning with large TSM files +- [#6819](https://github.com/influxdata/influxdb/issues/6819): Database unresponsive after DROP MEASUREMENT +- [#6796](https://github.com/influxdata/influxdb/issues/6796): Out of Memory Error when Dropping Measurement ## v0.13.0 [2016-05-12] diff --git a/tsdb/store.go b/tsdb/store.go index a6fd20e462f..58895776146 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -347,17 +347,8 @@ func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error { // DeleteShard removes a shard from disk. func (s *Store) DeleteShard(shardID uint64) error { - s.mu.Lock() - defer s.mu.Unlock() - return s.deleteShard(shardID) -} - -// deleteShard removes a shard from disk. Callers of deleteShard need -// to handle locks appropriately. -func (s *Store) deleteShard(shardID uint64) error { - // ensure shard exists - sh, ok := s.shards[shardID] - if !ok { + sh := s.Shard(shardID) + if sh == nil { return nil } @@ -373,7 +364,10 @@ func (s *Store) deleteShard(shardID uint64) error { return err } + s.mu.Lock() delete(s.shards, shardID) + s.mu.Unlock() + return nil } @@ -388,41 +382,22 @@ func (s *Store) ShardIteratorCreator(id uint64) influxql.IteratorCreator { // DeleteDatabase will close all shards associated with a database and remove the directory and files from disk. func (s *Store) DeleteDatabase(name string) error { - type resp struct { - shardID uint64 - err error - } - s.mu.RLock() - responses := make(chan resp, len(s.shards)) - var wg sync.WaitGroup - // Close and delete all shards on the database. - for shardID, sh := range s.shards { - if sh.database == name { - wg.Add(1) - shardID, sh := shardID, sh // scoped copies of loop variables - go func() { - defer wg.Done() - err := sh.Close() - responses <- resp{shardID, err} - }() - } - } + shards := s.filterShards(func(sh *Shard) bool { + return sh.database == name + }) s.mu.RUnlock() - wg.Wait() - close(responses) - for r := range responses { - if r.err != nil { - return r.err + if err := s.walkShards(shards, func(sh *Shard) error { + if sh.database != name { + return nil } - s.mu.Lock() - delete(s.shards, r.shardID) - s.mu.Unlock() + + return sh.Close() + }); err != nil { + return err } - s.mu.Lock() - defer s.mu.Unlock() if err := os.RemoveAll(filepath.Join(s.path, name)); err != nil { return err } @@ -430,7 +405,13 @@ func (s *Store) DeleteDatabase(name string) error { return err } + s.mu.Lock() + for _, sh := range shards { + delete(s.shards, sh.id) + } delete(s.databaseIndexes, name) + s.mu.Unlock() + return nil } @@ -438,18 +419,22 @@ func (s *Store) DeleteDatabase(name string) error { // provided retention policy, remove the retention policy directories on // both the DB and WAL, and remove all shard files from disk. func (s *Store) DeleteRetentionPolicy(database, name string) error { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + shards := s.filterShards(func(sh *Shard) bool { + return sh.database == database && sh.retentionPolicy == name + }) + s.mu.RUnlock() // Close and delete all shards under the retention policy on the // database. - for shardID, sh := range s.shards { - if sh.database == database && sh.retentionPolicy == name { - // Delete the shard from disk. - if err := s.deleteShard(shardID); err != nil { - return err - } + if err := s.walkShards(shards, func(sh *Shard) error { + if sh.database != database || sh.retentionPolicy != name { + return nil } + + return sh.Close() + }); err != nil { + return err } // Remove the rentention policy folder. @@ -458,16 +443,24 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error { } // Remove the retention policy folder from the the WAL. - return os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, database, name)) + if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, database, name)); err != nil { + return err + } + + s.mu.Lock() + for _, sh := range shards { + delete(s.shards, sh.id) + } + s.mu.Unlock() + return nil } // DeleteMeasurement removes a measurement and all associated series from a database. func (s *Store) DeleteMeasurement(database, name string) error { - s.mu.Lock() - defer s.mu.Unlock() - // Find the database. + s.mu.RLock() db := s.databaseIndexes[database] + s.mu.RUnlock() if db == nil { return nil } @@ -478,21 +471,79 @@ func (s *Store) DeleteMeasurement(database, name string) error { return influxql.ErrMeasurementNotFound(name) } + seriesKeys := m.SeriesKeys() + + s.mu.RLock() + shards := s.filterShards(func(sh *Shard) bool { + return sh.database == database + }) + s.mu.RUnlock() + + if err := s.walkShards(shards, func(sh *Shard) error { + if err := sh.DeleteMeasurement(m.Name, seriesKeys); err != nil { + return err + } + return nil + }); err != nil { + return err + } + // Remove measurement from index. db.DropMeasurement(m.Name) - // Remove underlying data. + return nil +} + +// filterShards returns a slice of shards where fn returns true +// for the shard. +func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard { + shards := make([]*Shard, 0, len(s.shards)) for _, sh := range s.shards { - if sh.database != database { - continue + if fn(sh) { + shards = append(shards, sh) } + } + return shards +} - if err := sh.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil { - return err - } +// walkShards apply a function to each shard in parallel. If any of the +// functions return an error, the first error is returned. +func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { + // struct to hold the result of opening each reader in a goroutine + type res struct { + err error } - return nil + throttle := newthrottle(runtime.GOMAXPROCS(0)) + + resC := make(chan res) + var n int + + for _, sh := range shards { + n++ + + go func(sh *Shard) { + throttle.take() + defer throttle.release() + + if err := fn(sh); err != nil { + resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)} + return + } + + resC <- res{} + }(sh) + } + + var err error + for i := 0; i < n; i++ { + res := <-resC + if res.err != nil { + err = res.err + } + } + close(resC) + return err } // ShardIDs returns a slice of all ShardIDs under management. @@ -680,9 +731,15 @@ func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int6 return influxql.ErrDatabaseNotFound(database) } - for _, sh := range s.shards { + s.mu.RLock() + shards := s.filterShards(func(sh *Shard) bool { + return sh.database == database + }) + s.mu.RUnlock() + + return s.walkShards(shards, func(sh *Shard) error { if sh.database != database { - continue + return nil } if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil { return err @@ -700,9 +757,8 @@ func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int6 db.UnassignShard(k, sh.id) } } - } - - return nil + return nil + }) } // ExpandSources expands sources against all local shards. @@ -862,24 +918,16 @@ func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source) // throttle is a simple channel based concurrency limiter. It uses a fixed // size channel to limit callers from proceeding until there is a value avalable // in the channel. If all are in-use, the caller blocks until one is freed. -type throttle struct { - c chan struct{} -} +type throttle chan struct{} -func newthrottle(limit int) *throttle { - t := &throttle{ - c: make(chan struct{}, limit), - } - for i := 0; i < limit; i++ { - t.c <- struct{}{} - } - return t +func newthrottle(limit int) throttle { + return make(throttle, limit) } -func (t *throttle) take() { - <-t.c +func (t throttle) take() { + t <- struct{}{} } -func (t *throttle) release() { - t.c <- struct{}{} +func (t throttle) release() { + <-t }