diff --git a/executor/adapter.go b/executor/adapter.go index 301183e98548c..1ba91891673ff 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -936,9 +936,14 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { return e, nil } -func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error { +func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) (err error) { + defer func() { + if r := recover(); r != nil { + err = errors.New(fmt.Sprint(r)) + } + }() start := time.Now() - err := e.Open(ctx) + err = e.Open(ctx) a.phaseOpenDurations[0] += time.Since(start) return err } diff --git a/executor/aggregate.go b/executor/aggregate.go index cbcd2c8ec3793..d6ec79412b7f6 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -300,8 +300,6 @@ func (e *HashAggExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } - // If panic here, the children executor should be closed because they are open. - defer closeBaseExecutor(&e.baseExecutor) e.prepared = false e.memTracker = memory.NewTracker(e.id, -1) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 8908d9d42999a..d29fa86d961fe 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1495,16 +1495,23 @@ func TestAggInDisk(t *testing.T) { tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t group by c1;").Check(testkit.Rows()) } -func TestRandomPanicAggConsume(t *testing.T) { +func TestRandomPanicConsume(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("set @@tidb_max_chunk_size=32") tk.MustExec("set @@tidb_init_chunk_size=1") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int)") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(pk bigint primary key auto_random,a int, index idx(a));") + tk.MustExec("SPLIT TABLE t BETWEEN (-9223372036854775808) AND (9223372036854775807) REGIONS 50;") // Split 50 regions to simulate many requests + for i := 0; i <= 1000; i++ { + tk.MustExec(fmt.Sprintf("insert into t(a) values(%v),(%v),(%v)", i, i, i)) + } + tk.MustExec("drop table if exists s;") + tk.MustExec("create table s(pk bigint primary key auto_random,a int, b int, index idx(a));") + tk.MustExec("SPLIT TABLE s BETWEEN (-9223372036854775808) AND (9223372036854775807) REGIONS 50;") // Split 50 regions to simulate many requests for i := 0; i <= 1000; i++ { - tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i)) + tk.MustExec(fmt.Sprintf("insert into s(a,b) values(%v,%v),(%v,%v),(%v,%v)", i, i, i, i, i, i)) } fpName := "github.com/pingcap/tidb/executor/ConsumeRandomPanic" @@ -1512,42 +1519,60 @@ func TestRandomPanicAggConsume(t *testing.T) { defer func() { require.NoError(t, failpoint.Disable(fpName)) }() + fpName2 := "github.com/pingcap/tidb/store/copr/ConsumeRandomPanic" + require.NoError(t, failpoint.Enable(fpName2, "3%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")")) + defer func() { + require.NoError(t, failpoint.Disable(fpName2)) + }() - // Test 10 times panic for each AggExec. + sqls := []string{ + // Without index + "select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(a) from t group by a", // HashAgg Paralleled + "select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(distinct a) from t", // HashAgg Unparalleled + "select /*+ STREAM_AGG() */ /*+ USE_INDEX(t) */ count(a) from t group by a", // Shuffle+StreamAgg + "select /*+ USE_INDEX(t) */ a * a, a / a, a + a , a - a from t", // Projection + "select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // HashJoin + "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // Shuffle+MergeJoin + "select /*+ USE_INDEX(t) */ * from t", // TableScan + + // With index + "select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t group by a", // HashAgg Paralleled + "select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(distinct a) from t", // HashAgg Unparalleled + "select /*+ STREAM_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t group by a", // Shuffle+StreamAgg + "select /*+ USE_INDEX(t,idx) */ a * a, a / a, a + a , a - a from t", // Projection + "select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin + "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // Shuffle+MergeJoin + "select /*+ INL_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Join + "select /*+ INL_HASH_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Hash Join + "select /*+ USE_INDEX(t, idx) */ * from t", // IndexScan + + // With IndexLookUp + "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from s t1 join s t2 on t1.a=t2.a", // Shuffle+MergeJoin + "select /*+ INL_JOIN(t2) */ * from s t1 join s t2 on t1.a=t2.a;", // Index Join + "select /*+ INL_HASH_JOIN(t2) */ * from s t1 join s t2 on t1.a=t2.a;", // Index Hash Join + "select /*+ USE_INDEX(s, idx) */ * from s", // IndexLookUp + } + + // Test 10 times panic for each Executor. var res sqlexec.RecordSet - for i := 1; i <= 10; i++ { - var err error - for err == nil { - // Test paralleled hash agg. - res, err = tk.Exec("select /*+ HASH_AGG() */ count(a) from t group by a") - if err == nil { - _, err = session.GetRows4Test(context.Background(), tk.Session(), res) - require.NoError(t, res.Close()) - } - } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") - - err = nil - for err == nil { - // Test unparalleled hash agg. - res, err = tk.Exec("select /*+ HASH_AGG() */ count(distinct a) from t") - if err == nil { - _, err = session.GetRows4Test(context.Background(), tk.Session(), res) - require.NoError(t, res.Close()) - } - } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") - - err = nil - for err == nil { - // Test stream agg. - res, err = tk.Exec("select /*+ STREAM_AGG() */ count(a) from t") - if err == nil { - _, err = session.GetRows4Test(context.Background(), tk.Session(), res) - require.NoError(t, res.Close()) + for _, sql := range sqls { + for i := 1; i <= 10; i++ { + concurrency := rand.Int31n(4) + 1 // test 1~5 concurrency randomly + tk.MustExec(fmt.Sprintf("set @@tidb_executor_concurrency=%v", concurrency)) + tk.MustExec(fmt.Sprintf("set @@tidb_merge_join_concurrency=%v", concurrency)) + tk.MustExec(fmt.Sprintf("set @@tidb_streamagg_concurrency=%v", concurrency)) + distConcurrency := rand.Int31n(15) + 1 + tk.MustExec(fmt.Sprintf("set @@tidb_distsql_scan_concurrency=%v", distConcurrency)) + var err error + for err == nil { + res, err = tk.Exec(sql) + if err == nil { + _, err = session.GetRows4Test(context.Background(), tk.Session(), res) + require.NoError(t, res.Close()) + } } + require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } } diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 73a3df2ff146c..d7e20c33bf94c 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -543,6 +543,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { failpoint.Inject("IndexHashJoinBuildHashTablePanic", nil) + failpoint.Inject("ConsumeRandomPanic", nil) if iw.stats != nil { start := time.Now() defer func() { @@ -628,10 +629,10 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH iw.wg = &sync.WaitGroup{} iw.wg.Add(1) + iw.lookup.workerWg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. go util.WithRecovery( func() { - iw.lookup.workerWg.Add(1) iw.buildHashTableForOuterResult(ctx, task, h) }, func(r interface{}) { @@ -649,6 +650,7 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH failpoint.Inject("IndexHashJoinFetchInnerResultsErr", func() { err = errors.New("IndexHashJoinFetchInnerResultsErr") }) + failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { return err } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 87ab4514e52f5..cf2722275dbe5 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -372,6 +372,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { }() for { failpoint.Inject("TestIssue30211", nil) + failpoint.Inject("ConsumeRandomPanic", nil) task, err := ow.buildTask(ctx) if err != nil { task.doneCh <- err @@ -412,6 +413,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { task.memTracker = memory.NewTracker(-1, -1) task.outerResult.GetMemTracker().AttachTo(task.memTracker) task.memTracker.AttachTo(ow.parentMemTracker) + failpoint.Inject("ConsumeRandomPanic", nil) ow.increaseBatchSize() requiredRows := ow.batchSize @@ -555,6 +557,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi } return nil, err } + failpoint.Inject("ConsumeRandomPanic", nil) if rowIdx == 0 { iw.memTracker.Consume(types.EstimatedMemUsage(dLookUpKey, numRows)) } @@ -700,6 +703,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa default: } err := Next(ctx, innerExec, iw.executorChk) + failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { return err } diff --git a/executor/join.go b/executor/join.go index 160bab9c4e330..b65817fc2e1f4 100644 --- a/executor/join.go +++ b/executor/join.go @@ -213,6 +213,7 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) { probeSideResult.SetRequiredRows(required, e.maxChunkSize) } err := Next(ctx, e.probeSideExec, probeSideResult) + failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { e.joinResultCh <- &hashjoinWorkerResult{ err: err, @@ -286,6 +287,7 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu return } failpoint.Inject("errorFetchBuildSideRowsMockOOMPanic", nil) + failpoint.Inject("ConsumeRandomPanic", nil) if chk.NumRows() == 0 { return } @@ -463,6 +465,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { return case probeSideResult, ok = <-e.probeResultChs[workerID]: } + failpoint.Inject("ConsumeRandomPanic", nil) if !ok { break } @@ -805,6 +808,7 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu err = e.rowContainer.PutChunkSelected(chk, selected, e.isNullEQ) } } + failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { return err } diff --git a/executor/merge_join.go b/executor/merge_join.go index cb2a495f09765..e8d195e3085ae 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -322,6 +322,7 @@ func (e *MergeJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) innerIter := e.innerTable.groupRowsIter outerIter := e.outerTable.groupRowsIter for !req.IsFull() { + failpoint.Inject("ConsumeRandomPanic", nil) if innerIter.Current() == innerIter.End() { if err := e.innerTable.fetchNextInnerGroup(ctx, e); err != nil { return err diff --git a/executor/projection.go b/executor/projection.go index fc69763898260..ac060e7e4a391 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -190,6 +190,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize) mSize := e.childResult.MemoryUsage() err := Next(ctx, e.children[0], e.childResult) + failpoint.Inject("ConsumeRandomPanic", nil) e.memTracker.Consume(e.childResult.MemoryUsage() - mSize) if err != nil { return err @@ -219,6 +220,7 @@ func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk) } mSize := output.chk.MemoryUsage() chk.SwapColumns(output.chk) + failpoint.Inject("ConsumeRandomPanic", nil) e.memTracker.Consume(output.chk.MemoryUsage() - mSize) e.fetcher.outputCh <- output return nil @@ -252,6 +254,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) { }) inputChk := newFirstChunk(e.children[0]) + failpoint.Inject("ConsumeRandomPanic", nil) e.memTracker.Consume(inputChk.MemoryUsage()) e.fetcher.inputCh <- &projectionInput{ chk: inputChk, @@ -379,6 +382,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) { input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize) mSize := input.chk.MemoryUsage() err := Next(ctx, f.child, input.chk) + failpoint.Inject("ConsumeRandomPanic", nil) f.proj.memTracker.Consume(input.chk.MemoryUsage() - mSize) if err != nil || input.chk.NumRows() == 0 { output.done <- err @@ -439,6 +443,7 @@ func (w *projectionWorker) run(ctx context.Context) { mSize := output.chk.MemoryUsage() + input.chk.MemoryUsage() err := w.evaluatorSuit.Run(w.sctx, input.chk, output.chk) + failpoint.Inject("ConsumeRandomPanic", nil) w.proj.memTracker.Consume(output.chk.MemoryUsage() + input.chk.MemoryUsage() - mSize) output.done <- err diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 6cc367930ebce..c46ec0e6f9e1b 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -581,6 +581,7 @@ func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- * } } }) + failpoint.Inject("ConsumeRandomPanic", nil) worker.memTracker.Consume(consumed) } select { diff --git a/util/memory/action.go b/util/memory/action.go index 9056f33189f60..2ad4f76dcb695 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -136,15 +136,13 @@ func (a *PanicOnExceed) SetLogHook(hook func(uint64)) { // Action panics when memory usage exceeds memory quota. func (a *PanicOnExceed) Action(_ *Tracker) { a.mutex.Lock() - if a.acted { - a.mutex.Unlock() - return + if !a.acted { + if a.logHook != nil { + a.logHook(a.ConnID) + } } a.acted = true a.mutex.Unlock() - if a.logHook != nil { - a.logHook(a.ConnID) - } panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID)) }