Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up delete/drop statements #7015

Merged
merged 8 commits into from
Jul 15, 2016
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#6986](https://github.com/influxdata/influxdb/pull/6986): update connection settings when changing hosts in cli.
- [#6965](https://github.com/influxdata/influxdb/pull/6965): Minor improvements to init script. Removes sysvinit-utils as package dependency.
- [#6952](https://github.com/influxdata/influxdb/pull/6952): Fix compaction planning with large TSM files
- [#6819](https://github.com/influxdata/influxdb/issues/6819): Database unresponsive after DROP MEASUREMENT
- [#6796](https://github.com/influxdata/influxdb/issues/6796): Out of Memory Error when Dropping Measurement

## v0.13.0 [2016-05-12]

Expand Down
181 changes: 117 additions & 64 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,17 +347,8 @@ func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error {

// DeleteShard removes a shard from disk.
func (s *Store) DeleteShard(shardID uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.deleteShard(shardID)
}

// deleteShard removes a shard from disk. Callers of deleteShard need
// to handle locks appropriately.
func (s *Store) deleteShard(shardID uint64) error {
// ensure shard exists
sh, ok := s.shards[shardID]
if !ok {
sh := s.Shard(shardID)
if sh == nil {
return nil
}

Expand All @@ -373,7 +364,10 @@ func (s *Store) deleteShard(shardID uint64) error {
return err
}

s.mu.Lock()
delete(s.shards, shardID)
s.mu.Unlock()

return nil
}

Expand All @@ -388,68 +382,59 @@ func (s *Store) ShardIteratorCreator(id uint64) influxql.IteratorCreator {

// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (s *Store) DeleteDatabase(name string) error {
type resp struct {
shardID uint64
err error
}

s.mu.RLock()
responses := make(chan resp, len(s.shards))
var wg sync.WaitGroup
// Close and delete all shards on the database.
for shardID, sh := range s.shards {
if sh.database == name {
wg.Add(1)
shardID, sh := shardID, sh // scoped copies of loop variables
go func() {
defer wg.Done()
err := sh.Close()
responses <- resp{shardID, err}
}()
}
}
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == name
})
s.mu.RUnlock()
wg.Wait()
close(responses)

for r := range responses {
if r.err != nil {
return r.err
if err := s.walkShards(shards, func(sh *Shard) error {
if sh.database != name {
return nil
}
s.mu.Lock()
delete(s.shards, r.shardID)
s.mu.Unlock()

return sh.Close()
}); err != nil {
return err
}

s.mu.Lock()
defer s.mu.Unlock()
if err := os.RemoveAll(filepath.Join(s.path, name)); err != nil {
return err
}
if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, name)); err != nil {
return err
}

s.mu.Lock()
for _, sh := range shards {
delete(s.shards, sh.id)
}
delete(s.databaseIndexes, name)
s.mu.Unlock()

return nil
}

// DeleteRetentionPolicy will close all shards associated with the
// provided retention policy, remove the retention policy directories on
// both the DB and WAL, and remove all shard files from disk.
func (s *Store) DeleteRetentionPolicy(database, name string) error {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == database && sh.retentionPolicy == name
})
s.mu.RUnlock()

// Close and delete all shards under the retention policy on the
// database.
for shardID, sh := range s.shards {
if sh.database == database && sh.retentionPolicy == name {
// Delete the shard from disk.
if err := s.deleteShard(shardID); err != nil {
return err
}
if err := s.walkShards(shards, func(sh *Shard) error {
if sh.database != database || sh.retentionPolicy != name {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be impossible right? Is this just being extra defensive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

}

return sh.Close()
}); err != nil {
return err
}

// Remove the rentention policy folder.
Expand All @@ -458,16 +443,24 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
}

// Remove the retention policy folder from the the WAL.
return os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, database, name))
if err := os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, database, name)); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the RP get removed from the metadata before this is called? Would this maybe cause a problem if it isn't?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The statement executor drops the data from the tsdb.Store and only removes it from the meta store after that succeeds. We previously deleted it from the meta store and then drop the shard data, but that causes problems if the data deleting fails (orphan data, data re-appearing after startup, etc..).

return err
}

s.mu.Lock()
for _, sh := range shards {
delete(s.shards, sh.id)
}
s.mu.Unlock()
return nil
}

// DeleteMeasurement removes a measurement and all associated series from a database.
func (s *Store) DeleteMeasurement(database, name string) error {
s.mu.Lock()
defer s.mu.Unlock()

// Find the database.
s.mu.RLock()
db := s.databaseIndexes[database]
s.mu.RUnlock()
if db == nil {
return nil
}
Expand All @@ -478,21 +471,78 @@ func (s *Store) DeleteMeasurement(database, name string) error {
return influxql.ErrMeasurementNotFound(name)
}

seriesKeys := m.SeriesKeys()

shards := s.filterShards(func(sh *Shard) bool {
return sh.database == database
})

if err := s.walkShards(shards, func(sh *Shard) error {
if err := sh.DeleteMeasurement(m.Name, seriesKeys); err != nil {
return err
}
return nil
}); err != nil {
return err
}

// Remove measurement from index.
db.DropMeasurement(m.Name)

// Remove underlying data.
return nil
}

// filterShards returns a slice of shards where fn returns true
// for the shard.
func (s *Store) filterShards(fn func(sh *Shard) bool) []*Shard {
shards := make([]*Shard, 0, len(s.shards))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this filter function dup s.shards while it has a read lock and then walk? Might this cause a problem if a shard is deleted (either through retention or user) while this runs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller is required to acquire a read lock on the tsdb.Store. Looks like I missed two spots. Generally, the exported functions acquire locks and the unexported ones assume the caller has acquired them which makes reusing the unexported functions easier to re-use.

for _, sh := range s.shards {
if sh.database != database {
continue
if fn(sh) {
shards = append(shards, sh)
}
}
return shards
}

if err := sh.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil {
return err
}
// walkShards apply a function to each shard in parallel. If any of the
// functions return an error, the first error is returned.
func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
// struct to hold the result of opening each reader in a goroutine
type res struct {
s *Shard
err error
}

return nil
throttle := newthrottle(runtime.GOMAXPROCS(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this change, but throttle isn't needed. We can just use an empty buffered channel as a semaphore: https://golang.org/ref/mem#tmp_7

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The throttle type just wraps an empty buffered channel to make it easier to reuse in other places without having to know that trick and embed it everywhere. I think it's also a little more clear that it's trying limit things when reading the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throttle actually fills the channel and uses receive as the "acquire": https://github.com/influxdata/influxdb/blob/master/tsdb/store.go#L869

It's also a struct containing a channel, which you then get a pointer to, rather than just being an aliased type with some methods. It works, but it's just a little messy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Updated with a revised throttle type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use runtime.NumCPU here instead as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a small chance, but someone could manually override GOMAXPROCS to a smaller value than NumCPU, in which case we would have increased contention for resources. Reading the current value of GOMAXPROCS is probably the better solution, especially since Go 1.5, where it defaults to the number of available CPU cores.


resC := make(chan *res)
var n int

for _, sh := range shards {
n++

go func(sh *Shard) {
throttle.take()
defer throttle.release()

if err := fn(sh); err != nil {
resC <- &res{err: fmt.Errorf("shard %d: %s", sh.id, err)}
return
}

resC <- &res{s: sh}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shard field is never used, so this send doesn't really do anything other than signal the work is done. What do you think about using a WaitGroup for the synchronization aspect, rather than counting? resC can just be errC := make(chan error), and go func() { wg.Wait(); close(errC) } will handle closing the channel so that for err := range errC can be used to read the errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, the shard is not used here. It was adapted from the Open code which does use it. I'll remove it.

I tend to prefer using channels over WaitGroup.

}(sh)
}

var err error
for i := 0; i < n; i++ {
res := <-resC
if res.err != nil {
err = res.err
}
}
close(resC)
return err
}

// ShardIDs returns a slice of all ShardIDs under management.
Expand Down Expand Up @@ -680,9 +730,13 @@ func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int6
return influxql.ErrDatabaseNotFound(database)
}

for _, sh := range s.shards {
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == database
})

return s.walkShards(shards, func(sh *Shard) error {
if sh.database != database {
continue
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this ever executes, it either means that filterShards doesn't work, or sh.database changed since filterShards ran, making this check a race...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I left it in to be little more paranoid since it's deleting data.

}
if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil {
return err
Expand All @@ -700,9 +754,8 @@ func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int6
db.UnassignShard(k, sh.id)
}
}
}

return nil
return nil
})
}

// ExpandSources expands sources against all local shards.
Expand Down