Skip to content

Commit

Permalink
executor: recover panic in executor.Close() (pingcap#49222)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Dec 12, 2023
1 parent 24606a4 commit 80c6fc1
Show file tree
Hide file tree
Showing 22 changed files with 93 additions and 40 deletions.
16 changes: 8 additions & 8 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
func (a *recordSet) Finish() error {
var err error
a.once.Do(func() {
err = a.executor.Close()
err = exec.Close(a.executor)
cteErr := resetCTEStorageMap(a.stmt.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
Expand Down Expand Up @@ -353,7 +353,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
}

if err = exec.Open(ctx, pointExecutor); err != nil {
terror.Call(pointExecutor.Close)
terror.Log(exec.Close(pointExecutor))
return nil, err
}

Expand Down Expand Up @@ -565,7 +565,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {

breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun)
if err = a.openExecutor(ctx, e); err != nil {
terror.Call(e.Close)
terror.Log(exec.Close(e))
return nil, err
}

Expand Down Expand Up @@ -714,14 +714,14 @@ func (a *ExecStmt) handleForeignKeyCascade(ctx context.Context, fkc *FKCascadeEx
return err
}
if err := exec.Open(ctx, e); err != nil {
terror.Call(e.Close)
terror.Log(exec.Close(e))
return err
}
err = exec.Next(ctx, e, exec.NewFirstChunk(e))
if err != nil {
return err
}
err = e.Close()
err = exec.Close(e)
if err != nil {
return err
}
Expand Down Expand Up @@ -897,7 +897,7 @@ func (c *chunkRowRecordSet) Close() error {

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e exec.Executor) (_ sqlexec.RecordSet, retErr error) {
if snapshotTS := a.Ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
terror.Log(e.Close())
terror.Log(exec.Close(e))
return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set")
}

Expand Down Expand Up @@ -941,7 +941,7 @@ func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e exec.

func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e exec.Executor) (sqlexec.RecordSet, error) {
defer func() {
terror.Log(e.Close())
terror.Log(exec.Close(e))
}()
var rows []chunk.Row
var err error
Expand Down Expand Up @@ -971,7 +971,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e exec.Executor) (

var err error
defer func() {
terror.Log(e.Close())
terror.Log(exec.Close(e))
a.logAudit()
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coproces
}
totalChunks = append(totalChunks, partChunks...)
}
if err := e.Close(); err != nil {
if err := exec.Close(e); err != nil {
return h.buildErrorResponse(err)
}
return h.buildUnaryResponse(totalChunks)
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
}

func (p *cteProducer) closeProducer() (err error) {
if err = p.seedExec.Close(); err != nil {
if err = exec.Close(p.seedExec); err != nil {
return err
}
if p.recursiveExec != nil {
if err = p.recursiveExec.Close(); err != nil {
if err = exec.Close(p.recursiveExec); err != nil {
return err
}
// `iterInTbl` and `resTbl` are shared by multiple operators,
Expand Down Expand Up @@ -423,7 +423,7 @@ func (p *cteProducer) computeRecursivePart(ctx context.Context) (err error) {
}
// Make sure iterInTbl is setup before Close/Open,
// because some executors will read iterInTbl in Open() (like IndexLookupJoin).
if err = p.recursiveExec.Close(); err != nil {
if err = exec.Close(p.recursiveExec); err != nil {
return
}
if err = exec.Open(ctx, p.recursiveExec); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ func TestCTEIssue49096(t *testing.T) {
sql := "insert into t2 with cte1 as ( " +
"select c1 from t1) " +
"select c1 from cte1 natural join (select * from cte1 where c1 > 0) cte2 order by c1;"
err := tk.ExecToErr(sql)
require.NotNil(t, err)
require.Equal(t, "[executor:8175]Your query has been cancelled due to exceeding the allowed memory limit for a single SQL query. Please try narrowing your query scope or increase the tidb_mem_quota_query limit and try again.[conn=%d]", err.Error())
tk.MustExec(sql) // No deadlock
}

func TestSpillToDisk(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func onRemoveRowForFK(ctx sessionctx.Context, data []types.Datum, fkChecks []*FK
// Close implements the Executor Close interface.
func (e *DeleteExec) Close() error {
defer e.memTracker.ReplaceBytesUsed(0)
return e.Children(0).Close()
return exec.Close(e.Children(0))
}

// Open implements the Executor Open interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
return err
}
defer terror.Call(tableReader.Close)
defer func() { terror.Log(exec.Close(tableReader)) }()

if w.checkIndexValue != nil {
return w.compareData(ctx, task, tableReader)
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ func (e *CheckTableExec) Close() error {
var firstErr error
close(e.exitCh)
for _, src := range e.srcs {
if err := src.Close(); err != nil && firstErr == nil {
if err := exec.Close(src); err != nil && firstErr == nil {
firstErr = err
}
}
Expand Down Expand Up @@ -1473,7 +1473,7 @@ func init() {
return nil, e.err
}
err := exec.Open(ctx, executor)
defer terror.Call(executor.Close)
defer func() { terror.Log(exec.Close(executor)) }()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1943,7 +1943,7 @@ func (e *UnionExec) Close() error {
// promised to exit when reaching here (e.childIDChan been closed).
var firstErr error
for i := 0; i <= e.mu.maxOpenedChildID; i++ {
if err := e.Children(i).Close(); err != nil && firstErr == nil {
if err := exec.Close(e.Children(i)); err != nil && firstErr == nil {
firstErr = err
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e *ExplainExec) Close() error {
e.rows = nil
if e.analyzeExec != nil && !e.executed {
// Open(), but Next() is not called.
return e.analyzeExec.Close()
return exec.Close(e.analyzeExec)
}
return nil
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error {
func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) {
if e.analyzeExec != nil && !e.executed {
defer func() {
err1 := e.analyzeExec.Close()
err1 := exec.Close(e.analyzeExec)
if err1 != nil {
if err != nil {
err = errors.New(err.Error() + ", " + err1.Error())
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
}
innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters, true, iw.memTracker, iw.lookup.finished)
if innerExec != nil {
defer terror.Call(innerExec.Close)
defer func() { terror.Log(exec.Close(innerExec)) }()
}
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo
}
imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters, false, nil, nil)
if imw.innerExec != nil {
defer terror.Call(imw.innerExec.Close)
defer func() { terror.Log(exec.Close(imw.innerExec)) }()
}
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
defer func() {
// To make sure SelectResult.Close() is called even got panic in fetchHandles().
if !tableReaderClosed {
terror.Call(worker.tableReader.Close)
terror.Log(exec.Close(worker.tableReader))
}
}()
for parTblIdx, tbl := range tbls {
Expand Down Expand Up @@ -555,7 +555,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
// release related resources
cancel()
tableReaderClosed = true
if err = worker.tableReader.Close(); err != nil {
if err = exec.Close(worker.tableReader); err != nil {
logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err))
}
// this error is reported in fetchHandles(), so ignore it here.
Expand Down Expand Up @@ -1899,7 +1899,7 @@ func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *index
logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
return err
}
defer terror.Call(tableReader.Close)
defer func() { terror.Log(exec.Close(tableReader)) }()
task.memTracker = w.memTracker
memUsage := int64(cap(task.handles) * 8)
task.memUsage = memUsage
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (e *InsertExec) Close() error {
defer e.memTracker.ReplaceBytesUsed(0)
e.setMessage()
if e.SelectExec != nil {
return e.SelectExec.Close()
return exec.Close(e.SelectExec)
}
return nil
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/executor/internal/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (e *BaseExecutor) Open(ctx context.Context) error {
func (e *BaseExecutor) Close() error {
var firstErr error
for _, src := range e.children {
if err := src.Close(); err != nil && firstErr == nil {
if err := Close(src); err != nil && firstErr == nil {
firstErr = err
}
}
Expand Down Expand Up @@ -295,6 +295,16 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error {
return sessVars.SQLKiller.HandleSignal()
}

// Close is a wrapper function on e.Close(), it handles some common codes.
func Close(e Executor) (err error) {
defer func() {
if r := recover(); r != nil {
err = util.GetRecoverError(r)
}
}()
return e.Close()
}

// RegisterSQLAndPlanInExecForTopSQL register the sql and plan information if it doesn't register before execution.
// This uses to catch the running SQL when Top SQL is enabled in execution.
func RegisterSQLAndPlanInExecForTopSQL(sessVars *variable.SessionVars) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ func (e *NestedLoopApplyExec) Close() error {
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), runtimeStats)
}
return e.outerExec.Close()
return exec.Close(e.outerExec)
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -1426,7 +1426,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
// fetchAllInners reads all data from the inner table and stores them in a List.
func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {
err := exec.Open(ctx, e.innerExec)
defer terror.Call(e.innerExec.Close)
defer func() { terror.Log(exec.Close(e.innerExec)) }()
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/parallel_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (e *ParallelNestedLoopApplyExec) Close() error {
}
// Wait all workers to finish before Close() is called.
// Otherwise we may got data race.
err := e.outerExec.Close()
err := exec.Close(e.outerExec)

if e.RuntimeStats() != nil {
runtimeStats := newJoinRuntimeStats()
Expand Down Expand Up @@ -304,7 +304,7 @@ func (e *ParallelNestedLoopApplyExec) fetchAllInners(ctx context.Context, id int
}

err = exec.Open(ctx, e.innerExecs[id])
defer terror.Call(e.innerExecs[id].Close)
defer func() { terror.Log(exec.Close(e.innerExecs[id])) }()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (e *ReplaceExec) Close() error {
defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats)
}
if e.SelectExec != nil {
return e.SelectExec.Close()
return exec.Close(e.SelectExec)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (e *ShuffleExec) Close() error {
}
}
// close child executor of each worker
if err := w.childExec.Close(); err != nil && firstErr == nil {
if err := exec.Close(w.childExec); err != nil && firstErr == nil {
firstErr = err
}
}
Expand All @@ -192,7 +192,7 @@ func (e *ShuffleExec) Close() error {

// close dataSources
for _, dataSource := range e.dataSources {
if err := dataSource.Close(); err != nil && firstErr == nil {
if err := exec.Close(dataSource); err != nil && firstErr == nil {
firstErr = err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/sortexec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (e *SortExec) Close() error {
e.spillAction.SetFinished()
}
e.spillAction = nil
return e.Children(0).Close()
return exec.Close(e.Children(0))
}

// Open implements the Executor Open interface.
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/test/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 45,
shard_count = 46,
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/executor",
"//pkg/executor/internal/exec",
"//pkg/expression",
"//pkg/infoschema",
"//pkg/kv",
Expand Down
Loading

0 comments on commit 80c6fc1

Please sign in to comment.