From acd8fdbb90258bd49936aabcf7dfe9a29458ff9c Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 23 Aug 2022 16:55:51 +0800 Subject: [PATCH 01/14] polish test --- executor/aggregate_test.go | 49 +++++++++++++------------------------- 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 8908d9d42999a..f3f971bf22156 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1495,7 +1495,7 @@ func TestAggInDisk(t *testing.T) { tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t group by c1;").Check(testkit.Rows()) } -func TestRandomPanicAggConsume(t *testing.T) { +func TestRandomPanicConsume(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1513,41 +1513,26 @@ func TestRandomPanicAggConsume(t *testing.T) { require.NoError(t, failpoint.Disable(fpName)) }() + sqls := []string{ + "select /*+ HASH_AGG() */ count(a) from t group by a", // HashAgg Paralleled + "select /*+ HASH_AGG() */ count(distinct a) from t", // HashAgg Unparalleled + "select /*+ STREAM_AGG() */ count(a) from t", // StreamAgg + } + // Test 10 times panic for each AggExec. var res sqlexec.RecordSet - for i := 1; i <= 10; i++ { - var err error - for err == nil { - // Test paralleled hash agg. - res, err = tk.Exec("select /*+ HASH_AGG() */ count(a) from t group by a") - if err == nil { - _, err = session.GetRows4Test(context.Background(), tk.Session(), res) - require.NoError(t, res.Close()) - } - } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") - - err = nil - for err == nil { - // Test unparalleled hash agg. - res, err = tk.Exec("select /*+ HASH_AGG() */ count(distinct a) from t") - if err == nil { - _, err = session.GetRows4Test(context.Background(), tk.Session(), res) - require.NoError(t, res.Close()) - } - } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") - - err = nil - for err == nil { - // Test stream agg. - res, err = tk.Exec("select /*+ STREAM_AGG() */ count(a) from t") - if err == nil { - _, err = session.GetRows4Test(context.Background(), tk.Session(), res) - require.NoError(t, res.Close()) + for _, sql := range sqls { + for i := 1; i <= 10; i++ { + var err error + for err == nil { + res, err = tk.Exec(sql) + if err == nil { + _, err = session.GetRows4Test(context.Background(), tk.Session(), res) + require.NoError(t, res.Close()) + } } + require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } } From 26ffbd6ac5a5e8eb857f0cb6a0f47fdcd08d0bfc Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 23 Aug 2022 17:09:45 +0800 Subject: [PATCH 02/14] test projection --- executor/aggregate_test.go | 1 + executor/projection.go | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index f3f971bf22156..d9de432d1f3ec 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1517,6 +1517,7 @@ func TestRandomPanicConsume(t *testing.T) { "select /*+ HASH_AGG() */ count(a) from t group by a", // HashAgg Paralleled "select /*+ HASH_AGG() */ count(distinct a) from t", // HashAgg Unparalleled "select /*+ STREAM_AGG() */ count(a) from t", // StreamAgg + "select a * a, a / a, a + a , a - a from t", // Projection } // Test 10 times panic for each AggExec. diff --git a/executor/projection.go b/executor/projection.go index fc69763898260..ac060e7e4a391 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -190,6 +190,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize) mSize := e.childResult.MemoryUsage() err := Next(ctx, e.children[0], e.childResult) + failpoint.Inject("ConsumeRandomPanic", nil) e.memTracker.Consume(e.childResult.MemoryUsage() - mSize) if err != nil { return err @@ -219,6 +220,7 @@ func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk) } mSize := output.chk.MemoryUsage() chk.SwapColumns(output.chk) + failpoint.Inject("ConsumeRandomPanic", nil) e.memTracker.Consume(output.chk.MemoryUsage() - mSize) e.fetcher.outputCh <- output return nil @@ -252,6 +254,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) { }) inputChk := newFirstChunk(e.children[0]) + failpoint.Inject("ConsumeRandomPanic", nil) e.memTracker.Consume(inputChk.MemoryUsage()) e.fetcher.inputCh <- &projectionInput{ chk: inputChk, @@ -379,6 +382,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) { input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize) mSize := input.chk.MemoryUsage() err := Next(ctx, f.child, input.chk) + failpoint.Inject("ConsumeRandomPanic", nil) f.proj.memTracker.Consume(input.chk.MemoryUsage() - mSize) if err != nil || input.chk.NumRows() == 0 { output.done <- err @@ -439,6 +443,7 @@ func (w *projectionWorker) run(ctx context.Context) { mSize := output.chk.MemoryUsage() + input.chk.MemoryUsage() err := w.evaluatorSuit.Run(w.sctx, input.chk, output.chk) + failpoint.Inject("ConsumeRandomPanic", nil) w.proj.memTracker.Consume(output.chk.MemoryUsage() + input.chk.MemoryUsage() - mSize) output.done <- err From 421946e5e77fa330f18868a150b6864216f6f3c4 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 23 Aug 2022 17:32:04 +0800 Subject: [PATCH 03/14] test join --- executor/aggregate_test.go | 9 +++++---- executor/join.go | 4 ++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index d9de432d1f3ec..c43645b853f2c 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1514,10 +1514,11 @@ func TestRandomPanicConsume(t *testing.T) { }() sqls := []string{ - "select /*+ HASH_AGG() */ count(a) from t group by a", // HashAgg Paralleled - "select /*+ HASH_AGG() */ count(distinct a) from t", // HashAgg Unparalleled - "select /*+ STREAM_AGG() */ count(a) from t", // StreamAgg - "select a * a, a / a, a + a , a - a from t", // Projection + "select /*+ HASH_AGG() */ count(a) from t group by a", // HashAgg Paralleled + "select /*+ HASH_AGG() */ count(distinct a) from t", // HashAgg Unparalleled + "select /*+ STREAM_AGG() */ count(a) from t", // StreamAgg + "select a * a, a / a, a + a , a - a from t", // Projection + "select /*+ HASH_JOIN(t) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin } // Test 10 times panic for each AggExec. diff --git a/executor/join.go b/executor/join.go index 33cfcfafd6315..2bba7611e2d26 100644 --- a/executor/join.go +++ b/executor/join.go @@ -217,6 +217,7 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) { probeSideResult.SetRequiredRows(required, e.maxChunkSize) } err := Next(ctx, e.probeSideExec, probeSideResult) + failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { e.joinResultCh <- &hashjoinWorkerResult{ err: err, @@ -290,6 +291,7 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu return } failpoint.Inject("errorFetchBuildSideRowsMockOOMPanic", nil) + failpoint.Inject("ConsumeRandomPanic", nil) if chk.NumRows() == 0 { return } @@ -467,6 +469,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { return case probeSideResult, ok = <-e.probeResultChs[workerID]: } + failpoint.Inject("ConsumeRandomPanic", nil) if !ok { break } @@ -810,6 +813,7 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu err = e.rowContainer.PutChunkSelected(chk, selected, e.isNullEQ) } } + failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { return err } From 89592162859b6df12c62d667b0a79a4f5eba819d Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 24 Aug 2022 14:14:20 +0800 Subject: [PATCH 04/14] close concurrent ddl --- executor/aggregate_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index c43645b853f2c..e8409fa10f85b 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1501,6 +1501,7 @@ func TestRandomPanicConsume(t *testing.T) { tk.MustExec("use test") tk.MustExec("set @@tidb_max_chunk_size=32") tk.MustExec("set @@tidb_init_chunk_size=1") + tk.MustExec("set global tidb_enable_concurrent_ddl=off;") // concurrent_ddl sql use aggregate and panic tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int)") for i := 0; i <= 1000; i++ { From 5ba953793cbb8b9ed18080aa7fe38d65fbce0068 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 24 Aug 2022 14:46:32 +0800 Subject: [PATCH 05/14] random concurrency --- executor/aggregate_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index e8409fa10f85b..c8c85ed4921af 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1522,10 +1522,12 @@ func TestRandomPanicConsume(t *testing.T) { "select /*+ HASH_JOIN(t) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin } - // Test 10 times panic for each AggExec. + // Test 10 times panic for each Executor. var res sqlexec.RecordSet for _, sql := range sqls { for i := 1; i <= 10; i++ { + concurrency := rand.Int31n(4) + 1 // test 1~5 concurrency randomly + tk.MustExec(fmt.Sprintf("set @@tidb_executor_concurrency=%v", concurrency)) var err error for err == nil { res, err = tk.Exec(sql) From 90e2ac1d84b42d2a8781fd88cbed532643c1cf1e Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Thu, 25 Aug 2022 16:45:29 +0800 Subject: [PATCH 06/14] add some tests --- executor/aggregate_test.go | 15 +++++++++------ executor/index_lookup_join.go | 3 +++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index c8c85ed4921af..685f32ae1e4ec 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1503,7 +1503,7 @@ func TestRandomPanicConsume(t *testing.T) { tk.MustExec("set @@tidb_init_chunk_size=1") tk.MustExec("set global tidb_enable_concurrent_ddl=off;") // concurrent_ddl sql use aggregate and panic tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int)") + tk.MustExec("create table t(a int, index idx(a))") for i := 0; i <= 1000; i++ { tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i)) } @@ -1515,11 +1515,14 @@ func TestRandomPanicConsume(t *testing.T) { }() sqls := []string{ - "select /*+ HASH_AGG() */ count(a) from t group by a", // HashAgg Paralleled - "select /*+ HASH_AGG() */ count(distinct a) from t", // HashAgg Unparalleled - "select /*+ STREAM_AGG() */ count(a) from t", // StreamAgg - "select a * a, a / a, a + a , a - a from t", // Projection - "select /*+ HASH_JOIN(t) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin + "select /*+ HASH_AGG() */ count(a) from t group by a", // HashAgg Paralleled + "select /*+ HASH_AGG() */ count(distinct a) from t", // HashAgg Unparalleled + "select /*+ STREAM_AGG() */ count(a) from t", // StreamAgg + "select a * a, a / a, a + a , a - a from t", // Projection + "select /*+ HASH_JOIN(t1) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin + "select /*+ MERGE_JOIN(t1) */ * from t t1 join t t2 on t1.a=t2.a", // MergeJoin + "select /*+ INL_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Join + "select /*+ INL_HASH_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Hash Join } // Test 10 times panic for each Executor. diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 87ab4514e52f5..3bd582e3b51f1 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -372,6 +372,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { }() for { failpoint.Inject("TestIssue30211", nil) + failpoint.Inject("ConsumeRandomPanic", nil) task, err := ow.buildTask(ctx) if err != nil { task.doneCh <- err @@ -555,6 +556,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi } return nil, err } + failpoint.Inject("ConsumeRandomPanic", nil) if rowIdx == 0 { iw.memTracker.Consume(types.EstimatedMemUsage(dLookUpKey, numRows)) } @@ -700,6 +702,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa default: } err := Next(ctx, innerExec, iw.executorChk) + failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { return err } From 19b88212ce3fba44820874c66e7525e892a02008 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Thu, 25 Aug 2022 18:10:26 +0800 Subject: [PATCH 07/14] temp --- executor/aggregate_test.go | 12 ++++++++++-- executor/index_lookup_hash_join.go | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 685f32ae1e4ec..f23d638beb9cf 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1528,18 +1528,26 @@ func TestRandomPanicConsume(t *testing.T) { // Test 10 times panic for each Executor. var res sqlexec.RecordSet for _, sql := range sqls { - for i := 1; i <= 10; i++ { + for i := 1; i <= 1000; i++ { concurrency := rand.Int31n(4) + 1 // test 1~5 concurrency randomly tk.MustExec(fmt.Sprintf("set @@tidb_executor_concurrency=%v", concurrency)) var err error + times := 10 for err == nil { res, err = tk.Exec(sql) if err == nil { _, err = session.GetRows4Test(context.Background(), tk.Session(), res) require.NoError(t, res.Close()) + times-- } + if times == 0 { + t.Log("All Success") + break + } + } + if times > 0 { + require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } } } diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 03be724d6573d..39d428a7356a1 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -629,10 +629,10 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH iw.wg = &sync.WaitGroup{} iw.wg.Add(1) + iw.lookup.workerWg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. go util.WithRecovery( func() { - iw.lookup.workerWg.Add(1) iw.buildHashTableForOuterResult(ctx, task, h) }, func(r interface{}) { From 3dbe8720253a177a3619d2e33ea726a557101f3e Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Fri, 26 Aug 2022 15:11:16 +0800 Subject: [PATCH 08/14] fix --- executor/aggregate_test.go | 39 ++++++++++++++++++++------------------ executor/merge_join.go | 1 + 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index f23d638beb9cf..9538958fa23fb 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1515,39 +1515,42 @@ func TestRandomPanicConsume(t *testing.T) { }() sqls := []string{ - "select /*+ HASH_AGG() */ count(a) from t group by a", // HashAgg Paralleled - "select /*+ HASH_AGG() */ count(distinct a) from t", // HashAgg Unparalleled - "select /*+ STREAM_AGG() */ count(a) from t", // StreamAgg - "select a * a, a / a, a + a , a - a from t", // Projection - "select /*+ HASH_JOIN(t1) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin - "select /*+ MERGE_JOIN(t1) */ * from t t1 join t t2 on t1.a=t2.a", // MergeJoin - "select /*+ INL_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Join - "select /*+ INL_HASH_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Hash Join + // Without index + "select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(a) from t group by a", // HashAgg Paralleled + "select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(distinct a) from t", // HashAgg Unparalleled + "select /*+ STREAM_AGG() */ /*+ USE_INDEX(t) */ count(a) from t", // StreamAgg + "select /*+ USE_INDEX(t) */ a * a, a / a, a + a , a - a from t", // Projection + "select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // HashJoin + "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // MergeJoin + + // With index + "select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t group by a", // HashAgg Paralleled + "select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(distinct a) from t", // HashAgg Unparalleled + "select /*+ STREAM_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t", // StreamAgg + "select /*+ USE_INDEX(t,idx) */ a * a, a / a, a + a , a - a from t", // Projection + "select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin + "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // MergeJoin + "select /*+ INL_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Join + "select /*+ INL_HASH_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Hash Join } // Test 10 times panic for each Executor. var res sqlexec.RecordSet for _, sql := range sqls { - for i := 1; i <= 1000; i++ { + for i := 1; i <= 10; i++ { concurrency := rand.Int31n(4) + 1 // test 1~5 concurrency randomly tk.MustExec(fmt.Sprintf("set @@tidb_executor_concurrency=%v", concurrency)) + tk.MustExec(fmt.Sprintf("set @@tidb_merge_join_concurrency=%v", concurrency)) + tk.MustExec(fmt.Sprintf("set @@tidb_streamagg_concurrency=%v", concurrency)) var err error - times := 10 for err == nil { res, err = tk.Exec(sql) if err == nil { _, err = session.GetRows4Test(context.Background(), tk.Session(), res) require.NoError(t, res.Close()) - times-- } - if times == 0 { - t.Log("All Success") - break - } - } - if times > 0 { - require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } + require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } } } diff --git a/executor/merge_join.go b/executor/merge_join.go index cb2a495f09765..e8d195e3085ae 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -322,6 +322,7 @@ func (e *MergeJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) innerIter := e.innerTable.groupRowsIter outerIter := e.outerTable.groupRowsIter for !req.IsFull() { + failpoint.Inject("ConsumeRandomPanic", nil) if innerIter.Current() == innerIter.End() { if err := e.innerTable.fetchNextInnerGroup(ctx, e); err != nil { return err From c938743b451129a4403fb8fefc23db0e56c9d492 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Fri, 26 Aug 2022 15:37:47 +0800 Subject: [PATCH 09/14] panic all --- util/memory/action.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/util/memory/action.go b/util/memory/action.go index af2c33351068d..d12a6180d4643 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -136,15 +136,13 @@ func (a *PanicOnExceed) SetLogHook(hook func(uint64)) { // Action panics when memory usage exceeds memory quota. func (a *PanicOnExceed) Action(_ *Tracker) { a.mutex.Lock() - if a.acted { - a.mutex.Unlock() - return + if !a.acted { + if a.logHook != nil { + a.logHook(a.ConnID) + } } a.acted = true a.mutex.Unlock() - if a.logHook != nil { - a.logHook(a.ConnID) - } panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID)) } From 86650b1bb0c692a1822ebfc370603a85a62d343f Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 29 Aug 2022 14:50:19 +0800 Subject: [PATCH 10/14] try fix --- executor/adapter.go | 9 +++++++-- executor/aggregate.go | 1 + executor/aggregate_test.go | 2 -- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 4a648f4029c42..88b7857e17645 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -904,9 +904,14 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { return e, nil } -func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error { +func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) (err error) { + defer func() { + if r := recover(); r != nil { + err = errors.New(fmt.Sprint(r)) + } + }() start := time.Now() - err := e.Open(ctx) + err = e.Open(ctx) a.phaseOpenDurations[0] += time.Since(start) return err } diff --git a/executor/aggregate.go b/executor/aggregate.go index d33f5c8fcee5e..337ad9b5efa31 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -288,6 +288,7 @@ func (e *HashAggExec) Close() error { if e.memTracker != nil { e.memTracker.ReplaceBytesUsed(0) } + e.parallelExecInitialized = false } return e.baseExecutor.Close() } diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 9538958fa23fb..0b705d586b29e 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1501,8 +1501,6 @@ func TestRandomPanicConsume(t *testing.T) { tk.MustExec("use test") tk.MustExec("set @@tidb_max_chunk_size=32") tk.MustExec("set @@tidb_init_chunk_size=1") - tk.MustExec("set global tidb_enable_concurrent_ddl=off;") // concurrent_ddl sql use aggregate and panic - tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, index idx(a))") for i := 0; i <= 1000; i++ { tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i)) From a8f4b1863aaf9c3fe360e9afff81276fd5dd2dfe Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Mon, 29 Aug 2022 17:40:46 +0800 Subject: [PATCH 11/14] fix --- executor/aggregate.go | 3 --- executor/aggregate_test.go | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 337ad9b5efa31..dc2b0c64606c1 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -288,7 +288,6 @@ func (e *HashAggExec) Close() error { if e.memTracker != nil { e.memTracker.ReplaceBytesUsed(0) } - e.parallelExecInitialized = false } return e.baseExecutor.Close() } @@ -304,8 +303,6 @@ func (e *HashAggExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } - // If panic here, the children executor should be closed because they are open. - defer closeBaseExecutor(&e.baseExecutor) e.prepared = false e.memTracker = memory.NewTracker(e.id, -1) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 0b705d586b29e..222ae5f35f2c6 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1501,6 +1501,7 @@ func TestRandomPanicConsume(t *testing.T) { tk.MustExec("use test") tk.MustExec("set @@tidb_max_chunk_size=32") tk.MustExec("set @@tidb_init_chunk_size=1") + tk.MustExec("drop table if exists t;") tk.MustExec("create table t(a int, index idx(a))") for i := 0; i <= 1000; i++ { tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i)) From 0d742287b8b17b9a035bc06217c52aa222d3fc4b Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 30 Aug 2022 11:39:10 +0800 Subject: [PATCH 12/14] random-read-panic --- executor/aggregate_test.go | 22 ++++++++++++++++------ store/copr/coprocessor.go | 1 + 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 222ae5f35f2c6..8d21fb4d74bc9 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1502,9 +1502,10 @@ func TestRandomPanicConsume(t *testing.T) { tk.MustExec("set @@tidb_max_chunk_size=32") tk.MustExec("set @@tidb_init_chunk_size=1") tk.MustExec("drop table if exists t;") - tk.MustExec("create table t(a int, index idx(a))") + tk.MustExec("create table t(pk bigint primary key auto_random,a int, index idx(a));") + tk.MustExec("SPLIT TABLE t BETWEEN (-9223372036854775808) AND (9223372036854775807) REGIONS 50;") // Split 50 regions to simulate many requests for i := 0; i <= 1000; i++ { - tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i)) + tk.MustExec(fmt.Sprintf("insert into t(a) values(%v),(%v),(%v)", i, i, i)) } fpName := "github.com/pingcap/tidb/executor/ConsumeRandomPanic" @@ -1512,25 +1513,32 @@ func TestRandomPanicConsume(t *testing.T) { defer func() { require.NoError(t, failpoint.Disable(fpName)) }() + fpName2 := "github.com/pingcap/tidb/store/copr/ConsumeRandomPanic" + require.NoError(t, failpoint.Enable(fpName2, "3%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")")) + defer func() { + require.NoError(t, failpoint.Disable(fpName2)) + }() sqls := []string{ // Without index "select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(a) from t group by a", // HashAgg Paralleled "select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(distinct a) from t", // HashAgg Unparalleled - "select /*+ STREAM_AGG() */ /*+ USE_INDEX(t) */ count(a) from t", // StreamAgg + "select /*+ STREAM_AGG() */ /*+ USE_INDEX(t) */ count(a) from t group by a", // Shuffle+StreamAgg "select /*+ USE_INDEX(t) */ a * a, a / a, a + a , a - a from t", // Projection "select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // HashJoin - "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // MergeJoin + "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // Shuffle+MergeJoin + "select /*+ USE_INDEX(t) */ * from t", // TableScan // With index "select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t group by a", // HashAgg Paralleled "select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(distinct a) from t", // HashAgg Unparalleled - "select /*+ STREAM_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t", // StreamAgg + "select /*+ STREAM_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t group by a", // Shuffle+StreamAgg "select /*+ USE_INDEX(t,idx) */ a * a, a / a, a + a , a - a from t", // Projection "select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin - "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // MergeJoin + "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // Shuffle+MergeJoin "select /*+ INL_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Join "select /*+ INL_HASH_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Hash Join + "select /*+ USE_INDEX(t, idx) */ * from t", // IndexScan } // Test 10 times panic for each Executor. @@ -1541,6 +1549,8 @@ func TestRandomPanicConsume(t *testing.T) { tk.MustExec(fmt.Sprintf("set @@tidb_executor_concurrency=%v", concurrency)) tk.MustExec(fmt.Sprintf("set @@tidb_merge_join_concurrency=%v", concurrency)) tk.MustExec(fmt.Sprintf("set @@tidb_streamagg_concurrency=%v", concurrency)) + distConcurrency := rand.Int31n(15) + 1 + tk.MustExec(fmt.Sprintf("set @@tidb_distsql_scan_concurrency=%v", distConcurrency)) var err error for err == nil { res, err = tk.Exec(sql) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 6b9721f36504f..cb76751fabb18 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -576,6 +576,7 @@ func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- * } } }) + failpoint.Inject("ConsumeRandomPanic", nil) worker.memTracker.Consume(consumed) } select { From c0aa95bc0ed4c511199946817e204084459eae3b Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 31 Aug 2022 17:44:17 +0800 Subject: [PATCH 13/14] inject panic for index hash join --- executor/index_lookup_hash_join.go | 2 ++ executor/index_lookup_join.go | 1 + 2 files changed, 3 insertions(+) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 39d428a7356a1..6e88651d80708 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -544,6 +544,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { failpoint.Inject("IndexHashJoinBuildHashTablePanic", nil) + failpoint.Inject("ConsumeRandomPanic", nil) if iw.stats != nil { start := time.Now() defer func() { @@ -650,6 +651,7 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH failpoint.Inject("IndexHashJoinFetchInnerResultsErr", func() { err = errors.New("IndexHashJoinFetchInnerResultsErr") }) + failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { return err } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 3bd582e3b51f1..cf2722275dbe5 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -413,6 +413,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { task.memTracker = memory.NewTracker(-1, -1) task.outerResult.GetMemTracker().AttachTo(task.memTracker) task.memTracker.AttachTo(ow.parentMemTracker) + failpoint.Inject("ConsumeRandomPanic", nil) ow.increaseBatchSize() requiredRows := ow.batchSize From c608664c2984730b012d85464d173f0ef3921002 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Thu, 1 Sep 2022 16:26:11 +0800 Subject: [PATCH 14/14] add indexlookup --- executor/aggregate_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 8d21fb4d74bc9..d29fa86d961fe 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -1507,6 +1507,12 @@ func TestRandomPanicConsume(t *testing.T) { for i := 0; i <= 1000; i++ { tk.MustExec(fmt.Sprintf("insert into t(a) values(%v),(%v),(%v)", i, i, i)) } + tk.MustExec("drop table if exists s;") + tk.MustExec("create table s(pk bigint primary key auto_random,a int, b int, index idx(a));") + tk.MustExec("SPLIT TABLE s BETWEEN (-9223372036854775808) AND (9223372036854775807) REGIONS 50;") // Split 50 regions to simulate many requests + for i := 0; i <= 1000; i++ { + tk.MustExec(fmt.Sprintf("insert into s(a,b) values(%v,%v),(%v,%v),(%v,%v)", i, i, i, i, i, i)) + } fpName := "github.com/pingcap/tidb/executor/ConsumeRandomPanic" require.NoError(t, failpoint.Enable(fpName, "5%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")")) @@ -1539,6 +1545,12 @@ func TestRandomPanicConsume(t *testing.T) { "select /*+ INL_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Join "select /*+ INL_HASH_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Hash Join "select /*+ USE_INDEX(t, idx) */ * from t", // IndexScan + + // With IndexLookUp + "select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from s t1 join s t2 on t1.a=t2.a", // Shuffle+MergeJoin + "select /*+ INL_JOIN(t2) */ * from s t1 join s t2 on t1.a=t2.a;", // Index Join + "select /*+ INL_HASH_JOIN(t2) */ * from s t1 join s t2 on t1.a=t2.a;", // Index Hash Join + "select /*+ USE_INDEX(s, idx) */ * from s", // IndexLookUp } // Test 10 times panic for each Executor.