Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wait reused while disabling compactions #9141

Merged
merged 1 commit into from
Nov 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- [#9065](https://github.com/influxdata/influxdb/pull/9065): Refuse extra arguments to influx CLI
- [#9058](https://github.com/influxdata/influxdb/issues/9058): Fix space required after regex operator. Thanks @stop-start!
- [#9109](https://github.com/influxdata/influxdb/issues/9109): Fix: panic: sync: WaitGroup is reused before previous Wait has returned

## v1.4.2 [2017-11-15]

Expand Down
54 changes: 50 additions & 4 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,44 @@ func (e *Engine) disableLevelCompactions(wait bool) {
e.levelWorkers += 1
}

// Hold onto the current done channel so we can wait on it if necessary
waitCh := e.done

if old == 0 && e.done != nil {
// It's possible we have closed the done channel and released the lock and another
// goroutine has attempted to disable compactions. We're current in the process of
// disabling them so check for this and wait until the original completes.
select {
case <-e.done:
e.mu.Unlock()
return
default:
}

// Prevent new compactions from starting
e.Compactor.DisableCompactions()

// Stop all background compaction goroutines
close(e.done)
e.mu.Unlock()
e.wg.Wait()

// Signal that all goroutines have exited.
e.mu.Lock()
e.done = nil
e.mu.Unlock()
return
}
e.mu.Unlock()

// Compaction were already disabled.
if waitCh == nil {
return
}

e.mu.Unlock()
// We were not the first caller to disable compactions and they were in the process
// of being disabled. Wait for them to complete before returning.
<-waitCh
e.wg.Wait()
}

Expand Down Expand Up @@ -323,15 +350,34 @@ func (e *Engine) enableSnapshotCompactions() {

func (e *Engine) disableSnapshotCompactions() {
e.mu.Lock()

var wait bool
if e.snapDone != nil {
// We may be in the process of stopping snapshots. See if the channel
// was closed.
select {
case <-e.snapDone:
e.mu.Unlock()
return
default:
}

close(e.snapDone)
e.snapDone = nil
e.Compactor.DisableSnapshots()
wait = true

}
e.mu.Unlock()

// Wait for the snapshot goroutine to exit.
if wait {
e.snapWG.Wait()
}

// Signal that the goroutines are exit and everything is stopped by setting
// snapDone to nil.
e.mu.Lock()
e.snapDone = nil
e.mu.Unlock()
e.snapWG.Wait()

// If the cache is empty, free up its resources as well.
if e.Cache.Size() == 0 {
Expand Down
29 changes: 29 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,35 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
}
}

// This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is
// is not raised.
func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
t.Parallel()

e := MustOpenDefaultEngine()
defer e.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
e.SetCompactionsEnabled(true)
e.SetCompactionsEnabled(false)
}
}()

go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
e.SetCompactionsEnabled(false)
e.SetCompactionsEnabled(true)
}
}()
wg.Wait()
}

func BenchmarkEngine_CreateIterator_Count_1K(b *testing.B) {
benchmarkEngineCreateIteratorCount(b, 1000)
}
Expand Down