Skip to content

Commit

Permalink
executor: recovery panic in parallel hashagg and projection (#8185) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and zz-jason committed Nov 6, 2018
1 parent 49b7dc9 commit a89b6fe
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
17 changes: 17 additions & 0 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/set"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spaolacci/murmur3"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -314,9 +316,18 @@ func (w *HashAggPartialWorker) getChildInput() bool {
return true
}

func recoveryHashAgg(output chan *AfFinalResult, r interface{}) {
output <- &AfFinalResult{err: errors.Errorf("%v", r)}
buf := util.GetStack()
log.Errorf("panic in the recoverable goroutine: %v, stack trace:\n%s", r, buf)
}

func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup, finalConcurrency int) {
needShuffle, sc := false, ctx.GetSessionVars().StmtCtx
defer func() {
if r := recover(); r != nil {
recoveryHashAgg(w.globalOutputCh, r)
}
if needShuffle {
w.shuffleIntermData(sc, finalConcurrency)
}
Expand Down Expand Up @@ -492,6 +503,9 @@ func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) {

func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup) {
defer func() {
if r := recover(); r != nil {
recoveryHashAgg(w.outputCh, r)
}
waitGroup.Done()
}()
if err := w.consumeIntermData(ctx); err != nil {
Expand Down Expand Up @@ -521,6 +535,9 @@ func (e *HashAggExec) fetchChildData(ctx context.Context) {
err error
)
defer func() {
if r := recover(); r != nil {
recoveryHashAgg(e.finalOutputCh, r)
}
for i := range e.partialInputChs {
close(e.partialInputChs[i])
}
Expand Down
24 changes: 22 additions & 2 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -264,7 +266,11 @@ type projectionInputFetcher struct {
// a. There is no more input from child.
// b. "ProjectionExec" close the "globalFinishCh"
func (f *projectionInputFetcher) run(ctx context.Context) {
var output *projectionOutput
defer func() {
if r := recover(); r != nil {
recoveryProjection(output, r)
}
close(f.globalOutputCh)
}()

Expand All @@ -275,7 +281,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) {
}
targetWorker := input.targetWorker

output := readProjectionOutput(f.outputCh, f.globalFinishCh)
output = readProjectionOutput(f.outputCh, f.globalFinishCh)
if output == nil {
return
}
Expand Down Expand Up @@ -317,13 +323,19 @@ type projectionWorker struct {
// It is finished and exited once:
// a. "ProjectionExec" closes the "globalFinishCh".
func (w *projectionWorker) run(ctx context.Context) {
var output *projectionOutput
defer func() {
if r := recover(); r != nil {
recoveryProjection(output, r)
}
}()
for {
input := readProjectionInput(w.inputCh, w.globalFinishCh)
if input == nil {
return
}

output := readProjectionOutput(w.outputCh, w.globalFinishCh)
output = readProjectionOutput(w.outputCh, w.globalFinishCh)
if output == nil {
return
}
Expand All @@ -339,6 +351,14 @@ func (w *projectionWorker) run(ctx context.Context) {
}
}

func recoveryProjection(output *projectionOutput, r interface{}) {
if output != nil {
output.done <- errors.Errorf("%v", r)
}
buf := util.GetStack()
log.Errorf("panic in the recoverable goroutine: %v, stack trace:\n%s", r, buf)
}

func readProjectionInput(inputCh <-chan *projectionInput, finishCh <-chan struct{}) *projectionInput {
select {
case <-finishCh:
Expand Down

0 comments on commit a89b6fe

Please sign in to comment.