diff --git a/executor/aggregate.go b/executor/aggregate.go index 6c28ef688b90b..3a4c2f588e5ca 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -23,10 +23,12 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/set" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/spaolacci/murmur3" "golang.org/x/net/context" ) @@ -314,9 +316,18 @@ func (w *HashAggPartialWorker) getChildInput() bool { return true } +func recoveryHashAgg(output chan *AfFinalResult, r interface{}) { + output <- &AfFinalResult{err: errors.Errorf("%v", r)} + buf := util.GetStack() + log.Errorf("panic in the recoverable goroutine: %v, stack trace:\n%s", r, buf) +} + func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup, finalConcurrency int) { needShuffle, sc := false, ctx.GetSessionVars().StmtCtx defer func() { + if r := recover(); r != nil { + recoveryHashAgg(w.globalOutputCh, r) + } if needShuffle { w.shuffleIntermData(sc, finalConcurrency) } @@ -492,6 +503,9 @@ func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) { func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup) { defer func() { + if r := recover(); r != nil { + recoveryHashAgg(w.outputCh, r) + } waitGroup.Done() }() if err := w.consumeIntermData(ctx); err != nil { @@ -521,6 +535,9 @@ func (e *HashAggExec) fetchChildData(ctx context.Context) { err error ) defer func() { + if r := recover(); r != nil { + recoveryHashAgg(e.finalOutputCh, r) + } for i := range e.partialInputChs { close(e.partialInputChs[i]) } diff --git a/executor/projection.go b/executor/projection.go index 4b3506c999cba..90a5fcc72f840 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -18,8 +18,10 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -264,7 +266,11 @@ type projectionInputFetcher struct { // a. There is no more input from child. // b. "ProjectionExec" close the "globalFinishCh" func (f *projectionInputFetcher) run(ctx context.Context) { + var output *projectionOutput defer func() { + if r := recover(); r != nil { + recoveryProjection(output, r) + } close(f.globalOutputCh) }() @@ -275,7 +281,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) { } targetWorker := input.targetWorker - output := readProjectionOutput(f.outputCh, f.globalFinishCh) + output = readProjectionOutput(f.outputCh, f.globalFinishCh) if output == nil { return } @@ -317,13 +323,19 @@ type projectionWorker struct { // It is finished and exited once: // a. "ProjectionExec" closes the "globalFinishCh". func (w *projectionWorker) run(ctx context.Context) { + var output *projectionOutput + defer func() { + if r := recover(); r != nil { + recoveryProjection(output, r) + } + }() for { input := readProjectionInput(w.inputCh, w.globalFinishCh) if input == nil { return } - output := readProjectionOutput(w.outputCh, w.globalFinishCh) + output = readProjectionOutput(w.outputCh, w.globalFinishCh) if output == nil { return } @@ -339,6 +351,14 @@ func (w *projectionWorker) run(ctx context.Context) { } } +func recoveryProjection(output *projectionOutput, r interface{}) { + if output != nil { + output.done <- errors.Errorf("%v", r) + } + buf := util.GetStack() + log.Errorf("panic in the recoverable goroutine: %v, stack trace:\n%s", r, buf) +} + func readProjectionInput(inputCh <-chan *projectionInput, finishCh <-chan struct{}) *projectionInput { select { case <-finishCh: