From 80c6fc1fbd547c37dcb90518a21a3ee9a01f034a Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Tue, 12 Dec 2023 21:09:19 +0800 Subject: [PATCH] executor: recover panic in executor.Close() (#49222) close pingcap/tidb#49223 --- pkg/executor/adapter.go | 16 ++++---- pkg/executor/coprocessor.go | 2 +- pkg/executor/cte.go | 6 +-- pkg/executor/cte_test.go | 4 +- pkg/executor/delete.go | 2 +- pkg/executor/distsql.go | 2 +- pkg/executor/executor.go | 6 +-- pkg/executor/explain.go | 4 +- pkg/executor/index_lookup_join.go | 2 +- pkg/executor/index_lookup_merge_join.go | 2 +- pkg/executor/index_merge_reader.go | 6 +-- pkg/executor/insert.go | 2 +- pkg/executor/internal/exec/executor.go | 12 +++++- pkg/executor/join.go | 4 +- pkg/executor/parallel_apply.go | 4 +- pkg/executor/replace.go | 2 +- pkg/executor/shuffle.go | 4 +- pkg/executor/sortexec/sort.go | 2 +- pkg/executor/test/executor/BUILD.bazel | 3 +- pkg/executor/test/executor/executor_test.go | 44 +++++++++++++++++++++ pkg/executor/union_scan.go | 2 +- pkg/executor/update.go | 2 +- 22 files changed, 93 insertions(+), 40 deletions(-) diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index d9c1b3752848a..6876be64cdb3c 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -184,7 +184,7 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk { func (a *recordSet) Finish() error { var err error a.once.Do(func() { - err = a.executor.Close() + err = exec.Close(a.executor) cteErr := resetCTEStorageMap(a.stmt.Ctx) if cteErr != nil { logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr)) @@ -353,7 +353,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) { } if err = exec.Open(ctx, pointExecutor); err != nil { - terror.Call(pointExecutor.Close) + terror.Log(exec.Close(pointExecutor)) return nil, err } @@ -565,7 +565,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun) if err = a.openExecutor(ctx, e); err != nil { - terror.Call(e.Close) + terror.Log(exec.Close(e)) return nil, err } @@ -714,14 +714,14 @@ func (a *ExecStmt) handleForeignKeyCascade(ctx context.Context, fkc *FKCascadeEx return err } if err := exec.Open(ctx, e); err != nil { - terror.Call(e.Close) + terror.Log(exec.Close(e)) return err } err = exec.Next(ctx, e, exec.NewFirstChunk(e)) if err != nil { return err } - err = e.Close() + err = exec.Close(e) if err != nil { return err } @@ -897,7 +897,7 @@ func (c *chunkRowRecordSet) Close() error { func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e exec.Executor) (_ sqlexec.RecordSet, retErr error) { if snapshotTS := a.Ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { - terror.Log(e.Close()) + terror.Log(exec.Close(e)) return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set") } @@ -941,7 +941,7 @@ func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e exec. func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e exec.Executor) (sqlexec.RecordSet, error) { defer func() { - terror.Log(e.Close()) + terror.Log(exec.Close(e)) }() var rows []chunk.Row var err error @@ -971,7 +971,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e exec.Executor) ( var err error defer func() { - terror.Log(e.Close()) + terror.Log(exec.Close(e)) a.logAudit() }() diff --git a/pkg/executor/coprocessor.go b/pkg/executor/coprocessor.go index 644c92e8fad15..fd563965eb461 100644 --- a/pkg/executor/coprocessor.go +++ b/pkg/executor/coprocessor.go @@ -103,7 +103,7 @@ func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coproces } totalChunks = append(totalChunks, partChunks...) } - if err := e.Close(); err != nil { + if err := exec.Close(e); err != nil { return h.buildErrorResponse(err) } return h.buildUnaryResponse(totalChunks) diff --git a/pkg/executor/cte.go b/pkg/executor/cte.go index 2a97eaf78de6c..97fe2bb2c49cd 100644 --- a/pkg/executor/cte.go +++ b/pkg/executor/cte.go @@ -227,11 +227,11 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e } func (p *cteProducer) closeProducer() (err error) { - if err = p.seedExec.Close(); err != nil { + if err = exec.Close(p.seedExec); err != nil { return err } if p.recursiveExec != nil { - if err = p.recursiveExec.Close(); err != nil { + if err = exec.Close(p.recursiveExec); err != nil { return err } // `iterInTbl` and `resTbl` are shared by multiple operators, @@ -423,7 +423,7 @@ func (p *cteProducer) computeRecursivePart(ctx context.Context) (err error) { } // Make sure iterInTbl is setup before Close/Open, // because some executors will read iterInTbl in Open() (like IndexLookupJoin). - if err = p.recursiveExec.Close(); err != nil { + if err = exec.Close(p.recursiveExec); err != nil { return } if err = exec.Open(ctx, p.recursiveExec); err != nil { diff --git a/pkg/executor/cte_test.go b/pkg/executor/cte_test.go index e94fae83759b1..cff82c0a51ed7 100644 --- a/pkg/executor/cte_test.go +++ b/pkg/executor/cte_test.go @@ -53,9 +53,7 @@ func TestCTEIssue49096(t *testing.T) { sql := "insert into t2 with cte1 as ( " + "select c1 from t1) " + "select c1 from cte1 natural join (select * from cte1 where c1 > 0) cte2 order by c1;" - err := tk.ExecToErr(sql) - require.NotNil(t, err) - require.Equal(t, "[executor:8175]Your query has been cancelled due to exceeding the allowed memory limit for a single SQL query. Please try narrowing your query scope or increase the tidb_mem_quota_query limit and try again.[conn=%d]", err.Error()) + tk.MustExec(sql) // No deadlock } func TestSpillToDisk(t *testing.T) { diff --git a/pkg/executor/delete.go b/pkg/executor/delete.go index 1d331625512c4..63bcc882fbdcc 100644 --- a/pkg/executor/delete.go +++ b/pkg/executor/delete.go @@ -276,7 +276,7 @@ func onRemoveRowForFK(ctx sessionctx.Context, data []types.Datum, fkChecks []*FK // Close implements the Executor Close interface. func (e *DeleteExec) Close() error { defer e.memTracker.ReplaceBytesUsed(0) - return e.Children(0).Close() + return exec.Close(e.Children(0)) } // Open implements the Executor Open interface. diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 9beede5aa2ad0..f7414264ff64b 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -1455,7 +1455,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er logutil.Logger(ctx).Error("build table reader failed", zap.Error(err)) return err } - defer terror.Call(tableReader.Close) + defer func() { terror.Log(exec.Close(tableReader)) }() if w.checkIndexValue != nil { return w.compareData(ctx, task, tableReader) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 05fc8887765f9..3d0178a815fc0 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -873,7 +873,7 @@ func (e *CheckTableExec) Close() error { var firstErr error close(e.exitCh) for _, src := range e.srcs { - if err := src.Close(); err != nil && firstErr == nil { + if err := exec.Close(src); err != nil && firstErr == nil { firstErr = err } } @@ -1473,7 +1473,7 @@ func init() { return nil, e.err } err := exec.Open(ctx, executor) - defer terror.Call(executor.Close) + defer func() { terror.Log(exec.Close(executor)) }() if err != nil { return nil, err } @@ -1943,7 +1943,7 @@ func (e *UnionExec) Close() error { // promised to exit when reaching here (e.childIDChan been closed). var firstErr error for i := 0; i <= e.mu.maxOpenedChildID; i++ { - if err := e.Children(i).Close(); err != nil && firstErr == nil { + if err := exec.Close(e.Children(i)); err != nil && firstErr == nil { firstErr = err } } diff --git a/pkg/executor/explain.go b/pkg/executor/explain.go index ea635b5d0d090..0cbbc1a192d64 100644 --- a/pkg/executor/explain.go +++ b/pkg/executor/explain.go @@ -65,7 +65,7 @@ func (e *ExplainExec) Close() error { e.rows = nil if e.analyzeExec != nil && !e.executed { // Open(), but Next() is not called. - return e.analyzeExec.Close() + return exec.Close(e.analyzeExec) } return nil } @@ -98,7 +98,7 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) { if e.analyzeExec != nil && !e.executed { defer func() { - err1 := e.analyzeExec.Close() + err1 := exec.Close(e.analyzeExec) if err1 != nil { if err != nil { err = errors.New(err.Error() + ", " + err1.Error()) diff --git a/pkg/executor/index_lookup_join.go b/pkg/executor/index_lookup_join.go index 93ac46d962328..f178e99f88080 100644 --- a/pkg/executor/index_lookup_join.go +++ b/pkg/executor/index_lookup_join.go @@ -697,7 +697,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa } innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters, true, iw.memTracker, iw.lookup.finished) if innerExec != nil { - defer terror.Call(innerExec.Close) + defer func() { terror.Log(exec.Close(innerExec)) }() } if err != nil { return err diff --git a/pkg/executor/index_lookup_merge_join.go b/pkg/executor/index_lookup_merge_join.go index 01fc927897c73..c28a61eddb1a2 100644 --- a/pkg/executor/index_lookup_merge_join.go +++ b/pkg/executor/index_lookup_merge_join.go @@ -489,7 +489,7 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo } imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters, false, nil, nil) if imw.innerExec != nil { - defer terror.Call(imw.innerExec.Close) + defer func() { terror.Log(exec.Close(imw.innerExec)) }() } if err != nil { return err diff --git a/pkg/executor/index_merge_reader.go b/pkg/executor/index_merge_reader.go index 20062fb1a719b..2c22a983d7b5e 100644 --- a/pkg/executor/index_merge_reader.go +++ b/pkg/executor/index_merge_reader.go @@ -522,7 +522,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, defer func() { // To make sure SelectResult.Close() is called even got panic in fetchHandles(). if !tableReaderClosed { - terror.Call(worker.tableReader.Close) + terror.Log(exec.Close(worker.tableReader)) } }() for parTblIdx, tbl := range tbls { @@ -555,7 +555,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // release related resources cancel() tableReaderClosed = true - if err = worker.tableReader.Close(); err != nil { + if err = exec.Close(worker.tableReader); err != nil { logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) } // this error is reported in fetchHandles(), so ignore it here. @@ -1899,7 +1899,7 @@ func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *index logutil.Logger(ctx).Error("build table reader failed", zap.Error(err)) return err } - defer terror.Call(tableReader.Close) + defer func() { terror.Log(exec.Close(tableReader)) }() task.memTracker = w.memTracker memUsage := int64(cap(task.handles) * 8) task.memUsage = memUsage diff --git a/pkg/executor/insert.go b/pkg/executor/insert.go index a1a5e8836c370..f3e304b3324c7 100644 --- a/pkg/executor/insert.go +++ b/pkg/executor/insert.go @@ -327,7 +327,7 @@ func (e *InsertExec) Close() error { defer e.memTracker.ReplaceBytesUsed(0) e.setMessage() if e.SelectExec != nil { - return e.SelectExec.Close() + return exec.Close(e.SelectExec) } return nil } diff --git a/pkg/executor/internal/exec/executor.go b/pkg/executor/internal/exec/executor.go index 2f9c7f37dfb72..6295523263e4a 100644 --- a/pkg/executor/internal/exec/executor.go +++ b/pkg/executor/internal/exec/executor.go @@ -176,7 +176,7 @@ func (e *BaseExecutor) Open(ctx context.Context) error { func (e *BaseExecutor) Close() error { var firstErr error for _, src := range e.children { - if err := src.Close(); err != nil && firstErr == nil { + if err := Close(src); err != nil && firstErr == nil { firstErr = err } } @@ -295,6 +295,16 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error { return sessVars.SQLKiller.HandleSignal() } +// Close is a wrapper function on e.Close(), it handles some common codes. +func Close(e Executor) (err error) { + defer func() { + if r := recover(); r != nil { + err = util.GetRecoverError(r) + } + }() + return e.Close() +} + // RegisterSQLAndPlanInExecForTopSQL register the sql and plan information if it doesn't register before execution. // This uses to catch the running SQL when Top SQL is enabled in execution. func RegisterSQLAndPlanInExecForTopSQL(sessVars *variable.SessionVars) { diff --git a/pkg/executor/join.go b/pkg/executor/join.go index af8bbfbe66a98..0495e22d6b5f0 100644 --- a/pkg/executor/join.go +++ b/pkg/executor/join.go @@ -1319,7 +1319,7 @@ func (e *NestedLoopApplyExec) Close() error { runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0)) defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), runtimeStats) } - return e.outerExec.Close() + return exec.Close(e.outerExec) } // Open implements the Executor interface. @@ -1426,7 +1426,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch // fetchAllInners reads all data from the inner table and stores them in a List. func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error { err := exec.Open(ctx, e.innerExec) - defer terror.Call(e.innerExec.Close) + defer func() { terror.Log(exec.Close(e.innerExec)) }() if err != nil { return err } diff --git a/pkg/executor/parallel_apply.go b/pkg/executor/parallel_apply.go index a35ca4b3306fc..d697b4f4cf0f5 100644 --- a/pkg/executor/parallel_apply.go +++ b/pkg/executor/parallel_apply.go @@ -174,7 +174,7 @@ func (e *ParallelNestedLoopApplyExec) Close() error { } // Wait all workers to finish before Close() is called. // Otherwise we may got data race. - err := e.outerExec.Close() + err := exec.Close(e.outerExec) if e.RuntimeStats() != nil { runtimeStats := newJoinRuntimeStats() @@ -304,7 +304,7 @@ func (e *ParallelNestedLoopApplyExec) fetchAllInners(ctx context.Context, id int } err = exec.Open(ctx, e.innerExecs[id]) - defer terror.Call(e.innerExecs[id].Close) + defer func() { terror.Log(exec.Close(e.innerExecs[id])) }() if err != nil { return err } diff --git a/pkg/executor/replace.go b/pkg/executor/replace.go index 992ac06d74b2e..d2f30e4333c8c 100644 --- a/pkg/executor/replace.go +++ b/pkg/executor/replace.go @@ -44,7 +44,7 @@ func (e *ReplaceExec) Close() error { defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats) } if e.SelectExec != nil { - return e.SelectExec.Close() + return exec.Close(e.SelectExec) } return nil } diff --git a/pkg/executor/shuffle.go b/pkg/executor/shuffle.go index c7309cd62133f..cb7b02b6eac8b 100644 --- a/pkg/executor/shuffle.go +++ b/pkg/executor/shuffle.go @@ -175,7 +175,7 @@ func (e *ShuffleExec) Close() error { } } // close child executor of each worker - if err := w.childExec.Close(); err != nil && firstErr == nil { + if err := exec.Close(w.childExec); err != nil && firstErr == nil { firstErr = err } } @@ -192,7 +192,7 @@ func (e *ShuffleExec) Close() error { // close dataSources for _, dataSource := range e.dataSources { - if err := dataSource.Close(); err != nil && firstErr == nil { + if err := exec.Close(dataSource); err != nil && firstErr == nil { firstErr = err } } diff --git a/pkg/executor/sortexec/sort.go b/pkg/executor/sortexec/sort.go index ce26bac26b802..c2b07d3dc4508 100644 --- a/pkg/executor/sortexec/sort.go +++ b/pkg/executor/sortexec/sort.go @@ -80,7 +80,7 @@ func (e *SortExec) Close() error { e.spillAction.SetFinished() } e.spillAction = nil - return e.Children(0).Close() + return exec.Close(e.Children(0)) } // Open implements the Executor Open interface. diff --git a/pkg/executor/test/executor/BUILD.bazel b/pkg/executor/test/executor/BUILD.bazel index 6d62a8cf04d75..7443ac982b576 100644 --- a/pkg/executor/test/executor/BUILD.bazel +++ b/pkg/executor/test/executor/BUILD.bazel @@ -8,13 +8,14 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 45, + shard_count = 46, deps = [ "//pkg/config", "//pkg/ddl", "//pkg/domain", "//pkg/domain/infosync", "//pkg/executor", + "//pkg/executor/internal/exec", "//pkg/expression", "//pkg/infoschema", "//pkg/kv", diff --git a/pkg/executor/test/executor/executor_test.go b/pkg/executor/test/executor/executor_test.go index 6a3947590f25e..6e4d3376e5ead 100644 --- a/pkg/executor/test/executor/executor_test.go +++ b/pkg/executor/test/executor/executor_test.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/executor" + "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" @@ -847,6 +848,49 @@ func TestUnreasonablyClose(t *testing.T) { require.Equal(t, opsNeedsCoveredMask, opsAlreadyCoveredMask, fmt.Sprintf("these operators are not covered %s", commentBuf.String())) } +func TestTwiceCloseUnionExec(t *testing.T) { + store := testkit.CreateMockStore(t) + + is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()}) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set tidb_cost_model_version=2") + // To enable the shuffleExec operator. + tk.MustExec("set @@tidb_merge_join_concurrency=4") + + p := parser.New() + for i, tc := range []string{ + "select /*+ stream_agg()*/ sum(a+1) from (select /*+ stream_agg()*/ sum(a+1) as a from t t1 union all select a from t t2) t1 union all select a from t t2", + } { + comment := fmt.Sprintf("case:%v sql:%s", i, tc) + stmt, err := p.ParseOneStmt(tc, "", "") + require.NoError(t, err, comment) + err = sessiontxn.NewTxn(context.Background(), tk.Session()) + require.NoError(t, err, comment) + + err = sessiontxn.GetTxnManager(tk.Session()).OnStmtStart(context.TODO(), stmt) + require.NoError(t, err, comment) + + executorBuilder := executor.NewMockExecutorBuilderForTest(tk.Session(), is, nil) + p, _, _ := planner.Optimize(context.TODO(), tk.Session(), stmt, is) + e := executorBuilder.Build(p) + chk := exec.NewFirstChunk(e) + require.NoError(t, exec.Open(context.Background(), e), comment) + require.NoError(t, e.Next(context.Background(), chk), comment) + require.NoError(t, exec.Close(e), comment) + chk.Reset() + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/aggregate/mockStreamAggExecBaseExecutorOpenReturnedError", `return(true)`)) + require.NoError(t, exec.Open(context.Background(), e), comment) + err = e.Next(context.Background(), chk) + require.EqualError(t, err, "mock StreamAggExec.baseExecutor.Open returned error") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/aggregate/mockStreamAggExecBaseExecutorOpenReturnedError")) + + exec.Close(e) + // No leak. + } +} + func TestPointGetPreparedPlan(t *testing.T) { store := testkit.CreateMockStore(t) diff --git a/pkg/executor/union_scan.go b/pkg/executor/union_scan.go index e32112dbe443f..ce7e853c0d46a 100644 --- a/pkg/executor/union_scan.go +++ b/pkg/executor/union_scan.go @@ -188,7 +188,7 @@ func (us *UnionScanExec) Close() error { us.cursor4AddRows = nil us.cursor4SnapshotRows = 0 us.snapshotRows = us.snapshotRows[:0] - return us.Children(0).Close() + return exec.Close(us.Children(0)) } // getOneRow gets one result row from dirty table or child. diff --git a/pkg/executor/update.go b/pkg/executor/update.go index 01a995d45f5aa..a4aa978ff3203 100644 --- a/pkg/executor/update.go +++ b/pkg/executor/update.go @@ -437,7 +437,7 @@ func (e *UpdateExec) Close() error { } defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats) } - return e.Children(0).Close() + return exec.Close(e.Children(0)) } // Open implements the Executor Open interface.