Skip to content

Commit

Permalink
Merge pull request #9141 from influxdata/jw-waitgroup
Browse files Browse the repository at this point in the history
Fix wait reused while disabling compactions
  • Loading branch information
jwilder authored Nov 22, 2017
2 parents e9cbc6d + 50b6ace commit d526b82
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- [#9065](https://github.com/influxdata/influxdb/pull/9065): Refuse extra arguments to influx CLI
- [#9058](https://github.com/influxdata/influxdb/issues/9058): Fix space required after regex operator. Thanks @stop-start!
- [#9109](https://github.com/influxdata/influxdb/issues/9109): Fix: panic: sync: WaitGroup is reused before previous Wait has returned

## v1.4.2 [2017-11-15]

Expand Down
54 changes: 50 additions & 4 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,44 @@ func (e *Engine) disableLevelCompactions(wait bool) {
e.levelWorkers += 1
}

// Hold onto the current done channel so we can wait on it if necessary
waitCh := e.done

if old == 0 && e.done != nil {
// It's possible we have closed the done channel and released the lock and another
// goroutine has attempted to disable compactions. We're current in the process of
// disabling them so check for this and wait until the original completes.
select {
case <-e.done:
e.mu.Unlock()
return
default:
}

// Prevent new compactions from starting
e.Compactor.DisableCompactions()

// Stop all background compaction goroutines
close(e.done)
e.mu.Unlock()
e.wg.Wait()

// Signal that all goroutines have exited.
e.mu.Lock()
e.done = nil
e.mu.Unlock()
return
}
e.mu.Unlock()

// Compaction were already disabled.
if waitCh == nil {
return
}

e.mu.Unlock()
// We were not the first caller to disable compactions and they were in the process
// of being disabled. Wait for them to complete before returning.
<-waitCh
e.wg.Wait()
}

Expand Down Expand Up @@ -323,15 +350,34 @@ func (e *Engine) enableSnapshotCompactions() {

func (e *Engine) disableSnapshotCompactions() {
e.mu.Lock()

var wait bool
if e.snapDone != nil {
// We may be in the process of stopping snapshots. See if the channel
// was closed.
select {
case <-e.snapDone:
e.mu.Unlock()
return
default:
}

close(e.snapDone)
e.snapDone = nil
e.Compactor.DisableSnapshots()
wait = true

}
e.mu.Unlock()

// Wait for the snapshot goroutine to exit.
if wait {
e.snapWG.Wait()
}

// Signal that the goroutines are exit and everything is stopped by setting
// snapDone to nil.
e.mu.Lock()
e.snapDone = nil
e.mu.Unlock()
e.snapWG.Wait()

// If the cache is empty, free up its resources as well.
if e.Cache.Size() == 0 {
Expand Down
29 changes: 29 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,35 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
}
}

// This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is
// is not raised.
func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
t.Parallel()

e := MustOpenDefaultEngine()
defer e.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
e.SetCompactionsEnabled(true)
e.SetCompactionsEnabled(false)
}
}()

go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
e.SetCompactionsEnabled(false)
e.SetCompactionsEnabled(true)
}
}()
wg.Wait()
}

func BenchmarkEngine_CreateIterator_Count_1K(b *testing.B) {
benchmarkEngineCreateIteratorCount(b, 1000)
}
Expand Down

0 comments on commit d526b82

Please sign in to comment.