Skip to content

Commit

Permalink
executor: exit all goroutines immediately when exceeded mem-quota (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Sep 14, 2022
1 parent a063659 commit 5a8e1b2
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 47 deletions.
9 changes: 7 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,9 +936,14 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
return e, nil
}

func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error {
func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.New(fmt.Sprint(r))
}
}()
start := time.Now()
err := e.Open(ctx)
err = e.Open(ctx)
a.phaseOpenDurations[0] += time.Since(start)
return err
}
Expand Down
2 changes: 0 additions & 2 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,6 @@ func (e *HashAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
// If panic here, the children executor should be closed because they are open.
defer closeBaseExecutor(&e.baseExecutor)
e.prepared = false

e.memTracker = memory.NewTracker(e.id, -1)
Expand Down
97 changes: 61 additions & 36 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1495,59 +1495,84 @@ func TestAggInDisk(t *testing.T) {
tk.MustQuery("select /*+ HASH_AGG() */ count(c) from t group by c1;").Check(testkit.Rows())
}

func TestRandomPanicAggConsume(t *testing.T) {
func TestRandomPanicConsume(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_max_chunk_size=32")
tk.MustExec("set @@tidb_init_chunk_size=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(pk bigint primary key auto_random,a int, index idx(a));")
tk.MustExec("SPLIT TABLE t BETWEEN (-9223372036854775808) AND (9223372036854775807) REGIONS 50;") // Split 50 regions to simulate many requests
for i := 0; i <= 1000; i++ {
tk.MustExec(fmt.Sprintf("insert into t(a) values(%v),(%v),(%v)", i, i, i))
}
tk.MustExec("drop table if exists s;")
tk.MustExec("create table s(pk bigint primary key auto_random,a int, b int, index idx(a));")
tk.MustExec("SPLIT TABLE s BETWEEN (-9223372036854775808) AND (9223372036854775807) REGIONS 50;") // Split 50 regions to simulate many requests
for i := 0; i <= 1000; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%v),(%v),(%v)", i, i, i))
tk.MustExec(fmt.Sprintf("insert into s(a,b) values(%v,%v),(%v,%v),(%v,%v)", i, i, i, i, i, i))
}

fpName := "github.com/pingcap/tidb/executor/ConsumeRandomPanic"
require.NoError(t, failpoint.Enable(fpName, "5%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")"))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
fpName2 := "github.com/pingcap/tidb/store/copr/ConsumeRandomPanic"
require.NoError(t, failpoint.Enable(fpName2, "3%panic(\"ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]\")"))
defer func() {
require.NoError(t, failpoint.Disable(fpName2))
}()

// Test 10 times panic for each AggExec.
sqls := []string{
// Without index
"select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(a) from t group by a", // HashAgg Paralleled
"select /*+ HASH_AGG() */ /*+ USE_INDEX(t) */ count(distinct a) from t", // HashAgg Unparalleled
"select /*+ STREAM_AGG() */ /*+ USE_INDEX(t) */ count(a) from t group by a", // Shuffle+StreamAgg
"select /*+ USE_INDEX(t) */ a * a, a / a, a + a , a - a from t", // Projection
"select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // HashJoin
"select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1) */ /*+ USE_INDEX(t2) */* from t t1 join t t2 on t1.a=t2.a", // Shuffle+MergeJoin
"select /*+ USE_INDEX(t) */ * from t", // TableScan

// With index
"select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t group by a", // HashAgg Paralleled
"select /*+ HASH_AGG() */ /*+ USE_INDEX(t,idx) */ count(distinct a) from t", // HashAgg Unparalleled
"select /*+ STREAM_AGG() */ /*+ USE_INDEX(t,idx) */ count(a) from t group by a", // Shuffle+StreamAgg
"select /*+ USE_INDEX(t,idx) */ a * a, a / a, a + a , a - a from t", // Projection
"select /*+ HASH_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // HashJoin
"select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from t t1 join t t2 on t1.a=t2.a", // Shuffle+MergeJoin
"select /*+ INL_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Join
"select /*+ INL_HASH_JOIN(t2) */ * from t t1 join t t2 on t1.a=t2.a;", // Index Hash Join
"select /*+ USE_INDEX(t, idx) */ * from t", // IndexScan

// With IndexLookUp
"select /*+ MERGE_JOIN(t1) */ /*+ USE_INDEX(t1,idx) */ /*+ USE_INDEX(t2,idx) */ * from s t1 join s t2 on t1.a=t2.a", // Shuffle+MergeJoin
"select /*+ INL_JOIN(t2) */ * from s t1 join s t2 on t1.a=t2.a;", // Index Join
"select /*+ INL_HASH_JOIN(t2) */ * from s t1 join s t2 on t1.a=t2.a;", // Index Hash Join
"select /*+ USE_INDEX(s, idx) */ * from s", // IndexLookUp
}

// Test 10 times panic for each Executor.
var res sqlexec.RecordSet
for i := 1; i <= 10; i++ {
var err error
for err == nil {
// Test paralleled hash agg.
res, err = tk.Exec("select /*+ HASH_AGG() */ count(a) from t group by a")
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
}
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")

err = nil
for err == nil {
// Test unparalleled hash agg.
res, err = tk.Exec("select /*+ HASH_AGG() */ count(distinct a) from t")
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
}
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")

err = nil
for err == nil {
// Test stream agg.
res, err = tk.Exec("select /*+ STREAM_AGG() */ count(a) from t")
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
for _, sql := range sqls {
for i := 1; i <= 10; i++ {
concurrency := rand.Int31n(4) + 1 // test 1~5 concurrency randomly
tk.MustExec(fmt.Sprintf("set @@tidb_executor_concurrency=%v", concurrency))
tk.MustExec(fmt.Sprintf("set @@tidb_merge_join_concurrency=%v", concurrency))
tk.MustExec(fmt.Sprintf("set @@tidb_streamagg_concurrency=%v", concurrency))
distConcurrency := rand.Int31n(15) + 1
tk.MustExec(fmt.Sprintf("set @@tidb_distsql_scan_concurrency=%v", distConcurrency))
var err error
for err == nil {
res, err = tk.Exec(sql)
if err == nil {
_, err = session.GetRows4Test(context.Background(), tk.Session(), res)
require.NoError(t, res.Close())
}
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")
}
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")
}
}

Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde

func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) {
failpoint.Inject("IndexHashJoinBuildHashTablePanic", nil)
failpoint.Inject("ConsumeRandomPanic", nil)
if iw.stats != nil {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -628,10 +629,10 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH

iw.wg = &sync.WaitGroup{}
iw.wg.Add(1)
iw.lookup.workerWg.Add(1)
// TODO(XuHuaiyu): we may always use the smaller side to build the hashtable.
go util.WithRecovery(
func() {
iw.lookup.workerWg.Add(1)
iw.buildHashTableForOuterResult(ctx, task, h)
},
func(r interface{}) {
Expand All @@ -649,6 +650,7 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH
failpoint.Inject("IndexHashJoinFetchInnerResultsErr", func() {
err = errors.New("IndexHashJoinFetchInnerResultsErr")
})
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
}()
for {
failpoint.Inject("TestIssue30211", nil)
failpoint.Inject("ConsumeRandomPanic", nil)
task, err := ow.buildTask(ctx)
if err != nil {
task.doneCh <- err
Expand Down Expand Up @@ -412,6 +413,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
task.memTracker = memory.NewTracker(-1, -1)
task.outerResult.GetMemTracker().AttachTo(task.memTracker)
task.memTracker.AttachTo(ow.parentMemTracker)
failpoint.Inject("ConsumeRandomPanic", nil)

ow.increaseBatchSize()
requiredRows := ow.batchSize
Expand Down Expand Up @@ -555,6 +557,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi
}
return nil, err
}
failpoint.Inject("ConsumeRandomPanic", nil)
if rowIdx == 0 {
iw.memTracker.Consume(types.EstimatedMemUsage(dLookUpKey, numRows))
}
Expand Down Expand Up @@ -700,6 +703,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
default:
}
err := Next(ctx, innerExec, iw.executorChk)
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) {
probeSideResult.SetRequiredRows(required, e.maxChunkSize)
}
err := Next(ctx, e.probeSideExec, probeSideResult)
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
err: err,
Expand Down Expand Up @@ -286,6 +287,7 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu
return
}
failpoint.Inject("errorFetchBuildSideRowsMockOOMPanic", nil)
failpoint.Inject("ConsumeRandomPanic", nil)
if chk.NumRows() == 0 {
return
}
Expand Down Expand Up @@ -463,6 +465,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) {
return
case probeSideResult, ok = <-e.probeResultChs[workerID]:
}
failpoint.Inject("ConsumeRandomPanic", nil)
if !ok {
break
}
Expand Down Expand Up @@ -805,6 +808,7 @@ func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chu
err = e.rowContainer.PutChunkSelected(chk, selected, e.isNullEQ)
}
}
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func (e *MergeJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
innerIter := e.innerTable.groupRowsIter
outerIter := e.outerTable.groupRowsIter
for !req.IsFull() {
failpoint.Inject("ConsumeRandomPanic", nil)
if innerIter.Current() == innerIter.End() {
if err := e.innerTable.fetchNextInnerGroup(ctx, e); err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk
e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
mSize := e.childResult.MemoryUsage()
err := Next(ctx, e.children[0], e.childResult)
failpoint.Inject("ConsumeRandomPanic", nil)
e.memTracker.Consume(e.childResult.MemoryUsage() - mSize)
if err != nil {
return err
Expand Down Expand Up @@ -219,6 +220,7 @@ func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk)
}
mSize := output.chk.MemoryUsage()
chk.SwapColumns(output.chk)
failpoint.Inject("ConsumeRandomPanic", nil)
e.memTracker.Consume(output.chk.MemoryUsage() - mSize)
e.fetcher.outputCh <- output
return nil
Expand Down Expand Up @@ -252,6 +254,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) {
})

inputChk := newFirstChunk(e.children[0])
failpoint.Inject("ConsumeRandomPanic", nil)
e.memTracker.Consume(inputChk.MemoryUsage())
e.fetcher.inputCh <- &projectionInput{
chk: inputChk,
Expand Down Expand Up @@ -379,6 +382,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) {
input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize)
mSize := input.chk.MemoryUsage()
err := Next(ctx, f.child, input.chk)
failpoint.Inject("ConsumeRandomPanic", nil)
f.proj.memTracker.Consume(input.chk.MemoryUsage() - mSize)
if err != nil || input.chk.NumRows() == 0 {
output.done <- err
Expand Down Expand Up @@ -439,6 +443,7 @@ func (w *projectionWorker) run(ctx context.Context) {

mSize := output.chk.MemoryUsage() + input.chk.MemoryUsage()
err := w.evaluatorSuit.Run(w.sctx, input.chk, output.chk)
failpoint.Inject("ConsumeRandomPanic", nil)
w.proj.memTracker.Consume(output.chk.MemoryUsage() + input.chk.MemoryUsage() - mSize)
output.done <- err

Expand Down
1 change: 1 addition & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *
}
}
})
failpoint.Inject("ConsumeRandomPanic", nil)
worker.memTracker.Consume(consumed)
}
select {
Expand Down
10 changes: 4 additions & 6 deletions util/memory/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,13 @@ func (a *PanicOnExceed) SetLogHook(hook func(uint64)) {
// Action panics when memory usage exceeds memory quota.
func (a *PanicOnExceed) Action(_ *Tracker) {
a.mutex.Lock()
if a.acted {
a.mutex.Unlock()
return
if !a.acted {
if a.logHook != nil {
a.logHook(a.ConnID)
}
}
a.acted = true
a.mutex.Unlock()
if a.logHook != nil {
a.logHook(a.ConnID)
}
panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID))
}

Expand Down

0 comments on commit 5a8e1b2

Please sign in to comment.