From 2ba6f6c0ac2e9e5dd16a18b04954d1aaf56c9d5b Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Thu, 16 Dec 2021 13:04:35 +0800 Subject: [PATCH 1/5] cherry pick #30696 to release-5.0 Signed-off-by: ti-srebot --- executor/builder.go | 17 ++ executor/cte.go | 578 ++++++++++++++++++++++++++++++++++++++ executor/distsql.go | 11 + executor/executor_test.go | 343 ++++++++++++++++++++++ executor/join.go | 7 +- executor/merge_join.go | 5 + 6 files changed, 960 insertions(+), 1 deletion(-) create mode 100644 executor/cte.go diff --git a/executor/builder.go b/executor/builder.go index d3793cac94f22..9972c99759d91 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -96,6 +96,23 @@ type MockPhysicalPlan interface { GetExecutor() Executor } +// MockExecutorBuilder is a wrapper for executorBuilder. +// ONLY used in test. +type MockExecutorBuilder struct { + *executorBuilder +} + +// NewMockExecutorBuilderForTest is ONLY used in test. +func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *MockExecutorBuilder { + return &MockExecutorBuilder{ + executorBuilder: newExecutorBuilder(ctx, is, ti, snapshotTS, isStaleness, replicaReadScope)} +} + +// Build builds an executor tree according to `p`. +func (b *MockExecutorBuilder) Build(p plannercore.Plan) Executor { + return b.build(p) +} + func (b *executorBuilder) build(p plannercore.Plan) Executor { switch v := p.(type) { case nil: diff --git a/executor/cte.go b/executor/cte.go new file mode 100644 index 0000000000000..8345bf5e57f5d --- /dev/null +++ b/executor/cte.go @@ -0,0 +1,578 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/cteutil" + "github.com/pingcap/tidb/util/disk" + "github.com/pingcap/tidb/util/memory" +) + +var _ Executor = &CTEExec{} + +// CTEExec implements CTE. +// Following diagram describes how CTEExec works. +// +// `iterInTbl` is shared by `CTEExec` and `CTETableReaderExec`. +// `CTETableReaderExec` reads data from `iterInTbl`, +// and its output will be stored `iterOutTbl` by `CTEExec`. +// +// When an iteration ends, `CTEExec` will move all data from `iterOutTbl` into `iterInTbl`, +// which will be the input for new iteration. +// At the end of each iteration, data in `iterOutTbl` will also be added into `resTbl`. +// `resTbl` stores data of all iteration. +// +----------+ +// write |iterOutTbl| +// CTEExec ------------------->| | +// | +----+-----+ +// ------------- | write +// | | v +// other op other op +----------+ +// (seed) (recursive) | resTbl | +// ^ | | +// | +----------+ +// CTETableReaderExec +// ^ +// | read +----------+ +// +---------------+iterInTbl | +// | | +// +----------+ +type CTEExec struct { + baseExecutor + + seedExec Executor + recursiveExec Executor + + // `resTbl` and `iterInTbl` are shared by all CTEExec which reference to same the CTE. + // `iterInTbl` is also shared by CTETableReaderExec. + resTbl cteutil.Storage + iterInTbl cteutil.Storage + iterOutTbl cteutil.Storage + + hashTbl baseHashTable + + // Index of chunk to read from `resTbl`. + chkIdx int + + // UNION ALL or UNION DISTINCT. + isDistinct bool + curIter int + hCtx *hashContext + sel []int + + // Limit related info. + hasLimit bool + limitBeg uint64 + limitEnd uint64 + cursor uint64 + meetFirstBatch bool + + memTracker *memory.Tracker + diskTracker *disk.Tracker +} + +// Open implements the Executor interface. +func (e *CTEExec) Open(ctx context.Context) (err error) { + e.reset() + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } + + if e.seedExec == nil { + return errors.New("seedExec for CTEExec is nil") + } + if err = e.seedExec.Open(ctx); err != nil { + return err + } + + e.memTracker = memory.NewTracker(e.id, -1) + e.diskTracker = disk.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) + + if e.recursiveExec != nil { + if err = e.recursiveExec.Open(ctx); err != nil { + return err + } + recursiveTypes := e.recursiveExec.base().retFieldTypes + e.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, e.maxChunkSize) + if err = e.iterOutTbl.OpenAndRef(); err != nil { + return err + } + } + + if e.isDistinct { + e.hashTbl = newConcurrentMapHashTable() + e.hCtx = &hashContext{ + allTypes: e.base().retFieldTypes, + } + // We use all columns to compute hash. + e.hCtx.keyColIdx = make([]int, len(e.hCtx.allTypes)) + for i := range e.hCtx.keyColIdx { + e.hCtx.keyColIdx[i] = i + } + } + return nil +} + +// Next implements the Executor interface. +func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { + req.Reset() + e.resTbl.Lock() + defer e.resTbl.Unlock() + if !e.resTbl.Done() { + resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) + iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) + var iterOutAction *chunk.SpillDiskAction + if e.iterOutTbl != nil { + iterOutAction = setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) + } + + failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { + if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage { + defer resAction.WaitForTest() + defer iterInAction.WaitForTest() + if iterOutAction != nil { + defer iterOutAction.WaitForTest() + } + } + }) + + if err = e.computeSeedPart(ctx); err != nil { + // Don't put it in defer. + // Because it should be called only when the filling process is not completed. + if err1 := e.reopenTbls(); err1 != nil { + return err1 + } + return err + } + if err = e.computeRecursivePart(ctx); err != nil { + if err1 := e.reopenTbls(); err1 != nil { + return err1 + } + return err + } + e.resTbl.SetDone() + } + + if e.hasLimit { + return e.nextChunkLimit(req) + } + if e.chkIdx < e.resTbl.NumChunks() { + res, err := e.resTbl.GetChunk(e.chkIdx) + if err != nil { + return err + } + // Need to copy chunk to make sure upper operator will not change chunk in resTbl. + // Also we ignore copying rows not selected, because some operators like Projection + // doesn't support swap column if chunk.sel is no nil. + req.SwapColumns(res.CopyConstructSel()) + e.chkIdx++ + } + return nil +} + +// Close implements the Executor interface. +func (e *CTEExec) Close() (err error) { + e.reset() + if err = e.seedExec.Close(); err != nil { + return err + } + if e.recursiveExec != nil { + if err = e.recursiveExec.Close(); err != nil { + return err + } + // `iterInTbl` and `resTbl` are shared by multiple operators, + // so will be closed when the SQL finishes. + if e.iterOutTbl != nil { + if err = e.iterOutTbl.DerefAndClose(); err != nil { + return err + } + } + } + + return e.baseExecutor.Close() +} + +func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { + e.curIter = 0 + e.iterInTbl.SetIter(e.curIter) + // This means iterInTbl's can be read. + defer close(e.iterInTbl.GetBegCh()) + chks := make([]*chunk.Chunk, 0, 10) + for { + if e.limitDone(e.iterInTbl) { + break + } + chk := newFirstChunk(e.seedExec) + if err = Next(ctx, e.seedExec, chk); err != nil { + return err + } + if chk.NumRows() == 0 { + break + } + if chk, err = e.tryDedupAndAdd(chk, e.iterInTbl, e.hashTbl); err != nil { + return err + } + chks = append(chks, chk) + } + // Initial resTbl is empty, so no need to deduplicate chk using resTbl. + // Just adding is ok. + for _, chk := range chks { + if err = e.resTbl.Add(chk); err != nil { + return err + } + } + e.curIter++ + e.iterInTbl.SetIter(e.curIter) + + return nil +} + +func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { + if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { + return nil + } + + if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { + return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) + } + + if e.limitDone(e.resTbl) { + return nil + } + + for { + chk := newFirstChunk(e.recursiveExec) + if err = Next(ctx, e.recursiveExec, chk); err != nil { + return err + } + if chk.NumRows() == 0 { + if err = e.setupTblsForNewIteration(); err != nil { + return err + } + if e.limitDone(e.resTbl) { + break + } + if e.iterInTbl.NumChunks() == 0 { + break + } + // Next iteration begins. Need use iterOutTbl as input of next iteration. + e.curIter++ + e.iterInTbl.SetIter(e.curIter) + if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { + return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) + } + // Make sure iterInTbl is setup before Close/Open, + // because some executors will read iterInTbl in Open() (like IndexLookupJoin). + if err = e.recursiveExec.Close(); err != nil { + return err + } + if err = e.recursiveExec.Open(ctx); err != nil { + return err + } + } else { + if err = e.iterOutTbl.Add(chk); err != nil { + return err + } + } + } + return nil +} + +// Get next chunk from resTbl for limit. +func (e *CTEExec) nextChunkLimit(req *chunk.Chunk) error { + if !e.meetFirstBatch { + for e.chkIdx < e.resTbl.NumChunks() { + res, err := e.resTbl.GetChunk(e.chkIdx) + if err != nil { + return err + } + e.chkIdx++ + numRows := uint64(res.NumRows()) + if newCursor := e.cursor + numRows; newCursor >= e.limitBeg { + e.meetFirstBatch = true + begInChk, endInChk := e.limitBeg-e.cursor, numRows + if newCursor > e.limitEnd { + endInChk = e.limitEnd - e.cursor + } + e.cursor += endInChk + if begInChk == endInChk { + break + } + tmpChk := res.CopyConstructSel() + req.Append(tmpChk, int(begInChk), int(endInChk)) + return nil + } + e.cursor += numRows + } + } + if e.chkIdx < e.resTbl.NumChunks() && e.cursor < e.limitEnd { + res, err := e.resTbl.GetChunk(e.chkIdx) + if err != nil { + return err + } + e.chkIdx++ + numRows := uint64(res.NumRows()) + if e.cursor+numRows > e.limitEnd { + numRows = e.limitEnd - e.cursor + req.Append(res.CopyConstructSel(), 0, int(numRows)) + } else { + req.SwapColumns(res.CopyConstructSel()) + } + e.cursor += numRows + } + return nil +} + +func (e *CTEExec) setupTblsForNewIteration() (err error) { + num := e.iterOutTbl.NumChunks() + chks := make([]*chunk.Chunk, 0, num) + // Setup resTbl's data. + for i := 0; i < num; i++ { + chk, err := e.iterOutTbl.GetChunk(i) + if err != nil { + return err + } + // Data should be copied in UNION DISTINCT. + // Because deduplicate() will change data in iterOutTbl, + // which will cause panic when spilling data into disk concurrently. + if e.isDistinct { + chk = chk.CopyConstruct() + } + chk, err = e.tryDedupAndAdd(chk, e.resTbl, e.hashTbl) + if err != nil { + return err + } + chks = append(chks, chk) + } + + // Setup new iteration data in iterInTbl. + if err = e.iterInTbl.Reopen(); err != nil { + return err + } + defer close(e.iterInTbl.GetBegCh()) + if e.isDistinct { + // Already deduplicated by resTbl, adding directly is ok. + for _, chk := range chks { + if err = e.iterInTbl.Add(chk); err != nil { + return err + } + } + } else { + if err = e.iterInTbl.SwapData(e.iterOutTbl); err != nil { + return err + } + } + + // Clear data in iterOutTbl. + return e.iterOutTbl.Reopen() +} + +func (e *CTEExec) reset() { + e.curIter = 0 + e.chkIdx = 0 + e.hashTbl = nil + e.cursor = 0 + e.meetFirstBatch = false +} + +func (e *CTEExec) reopenTbls() (err error) { + e.hashTbl = newConcurrentMapHashTable() + if err := e.resTbl.Reopen(); err != nil { + return err + } + return e.iterInTbl.Reopen() +} + +// Check if tbl meets the requirement of limit. +func (e *CTEExec) limitDone(tbl cteutil.Storage) bool { + return e.hasLimit && uint64(tbl.NumRows()) >= e.limitEnd +} + +func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker, + parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { + memTracker := tbl.GetMemTracker() + memTracker.SetLabel(memory.LabelForCTEStorage) + memTracker.AttachTo(parentMemTracker) + + diskTracker := tbl.GetDiskTracker() + diskTracker.SetLabel(memory.LabelForCTEStorage) + diskTracker.AttachTo(parentDiskTracker) + + if config.GetGlobalConfig().OOMUseTmpStorage { + actionSpill = tbl.ActionSpill() + failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { + if val.(bool) { + actionSpill = tbl.(*cteutil.StorageRC).ActionSpillForTest() + } + }) + ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill) + } + return actionSpill +} + +func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, + storage cteutil.Storage, + hashTbl baseHashTable) (res *chunk.Chunk, err error) { + if e.isDistinct { + if chk, err = e.deduplicate(chk, storage, hashTbl); err != nil { + return nil, err + } + } + return chk, storage.Add(chk) +} + +// Compute hash values in chk and put it in hCtx.hashVals. +// Use the returned sel to choose the computed hash values. +func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { + numRows := chk.NumRows() + e.hCtx.initHash(numRows) + // Continue to reset to make sure all hasher is new. + for i := numRows; i < len(e.hCtx.hashVals); i++ { + e.hCtx.hashVals[i].Reset() + } + sel = chk.Sel() + var hashBitMap []bool + if sel != nil { + hashBitMap = make([]bool, chk.Capacity()) + for _, val := range sel { + hashBitMap[val] = true + } + } else { + // All rows is selected, sel will be [0....numRows). + // e.sel is setup when building executor. + sel = e.sel + } + + for i := 0; i < chk.NumCols(); i++ { + if err = codec.HashChunkSelected(e.ctx.GetSessionVars().StmtCtx, e.hCtx.hashVals, + chk, e.hCtx.allTypes[i], i, e.hCtx.buf, e.hCtx.hasNull, + hashBitMap, false); err != nil { + return nil, err + } + } + return sel, nil +} + +// Use hashTbl to deduplicate rows, and unique rows will be added to hashTbl. +// Duplicated rows are only marked to be removed by sel in Chunk, instead of really deleted. +func (e *CTEExec) deduplicate(chk *chunk.Chunk, + storage cteutil.Storage, + hashTbl baseHashTable) (chkNoDup *chunk.Chunk, err error) { + numRows := chk.NumRows() + if numRows == 0 { + return chk, nil + } + + // 1. Compute hash values for chunk. + chkHashTbl := newConcurrentMapHashTable() + selOri, err := e.computeChunkHash(chk) + if err != nil { + return nil, err + } + + // 2. Filter rows duplicated in input chunk. + // This sel is for filtering rows duplicated in cur chk. + selChk := make([]int, 0, numRows) + for i := 0; i < numRows; i++ { + key := e.hCtx.hashVals[selOri[i]].Sum64() + row := chk.GetRow(i) + + hasDup, err := e.checkHasDup(key, row, chk, storage, chkHashTbl) + if err != nil { + return nil, err + } + if hasDup { + continue + } + + selChk = append(selChk, selOri[i]) + + rowPtr := chunk.RowPtr{ChkIdx: uint32(0), RowIdx: uint32(i)} + chkHashTbl.Put(key, rowPtr) + } + chk.SetSel(selChk) + chkIdx := storage.NumChunks() + + // 3. Filter rows duplicated in RowContainer. + // This sel is for filtering rows duplicated in cteutil.Storage. + selStorage := make([]int, 0, len(selChk)) + for i := 0; i < len(selChk); i++ { + key := e.hCtx.hashVals[selChk[i]].Sum64() + row := chk.GetRow(i) + + hasDup, err := e.checkHasDup(key, row, nil, storage, hashTbl) + if err != nil { + return nil, err + } + if hasDup { + continue + } + + rowIdx := len(selStorage) + selStorage = append(selStorage, selChk[i]) + + rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)} + hashTbl.Put(key, rowPtr) + } + + chk.SetSel(selStorage) + return chk, nil +} + +// Use the row's probe key to check if it already exists in chk or storage. +// We also need to compare the row's real encoding value to avoid hash collision. +func (e *CTEExec) checkHasDup(probeKey uint64, + row chunk.Row, + curChk *chunk.Chunk, + storage cteutil.Storage, + hashTbl baseHashTable) (hasDup bool, err error) { + ptrs := hashTbl.Get(probeKey) + + if len(ptrs) == 0 { + return false, nil + } + + for _, ptr := range ptrs { + var matchedRow chunk.Row + if curChk != nil { + matchedRow = curChk.GetRow(int(ptr.RowIdx)) + } else { + matchedRow, err = storage.GetRow(ptr) + } + if err != nil { + return false, err + } + isEqual, err := codec.EqualChunkRow(e.ctx.GetSessionVars().StmtCtx, + row, e.hCtx.allTypes, e.hCtx.keyColIdx, + matchedRow, e.hCtx.allTypes, e.hCtx.keyColIdx) + if err != nil { + return false, err + } + if isEqual { + return true, nil + } + } + return false, nil +} diff --git a/executor/distsql.go b/executor/distsql.go index 27251d77886ad..a9a5bb41777fe 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -200,8 +200,19 @@ type IndexReaderExecutor struct { } // Close clears all resources hold by current object. +<<<<<<< HEAD func (e *IndexReaderExecutor) Close() error { err := e.result.Close() +======= +func (e *IndexReaderExecutor) Close() (err error) { + if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { + return nil + } + + if e.result != nil { + err = e.result.Close() + } +>>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) e.result = nil e.ctx.StoreQueryFeedback(e.feedback) return err diff --git a/executor/executor_test.go b/executor/executor_test.go index 27e12ee99f717..178dd27ab8dac 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -21,6 +21,12 @@ import ( "math" "net" "os" +<<<<<<< HEAD +======= + "path/filepath" + "reflect" + "runtime" +>>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) "strconv" "strings" "sync" @@ -8279,3 +8285,340 @@ func (s *testSuite) TestIssue26532(c *C) { tk.MustQuery("select greatest(\"2020-01-01 01:01:01\" ,\"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2020-01-01 01:01:01", "")) tk.MustQuery("select least(\"2020-01-01 01:01:01\" , \"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2019-01-01 01:01:01", "")) } +<<<<<<< HEAD +======= + +func (s *testSuite) TestIssue25447(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1(a int, b varchar(8))") + tk.MustExec("insert into t1 values(1,'1')") + tk.MustExec("create table t2(a int , b varchar(8) GENERATED ALWAYS AS (c) VIRTUAL, c varchar(8), PRIMARY KEY (a))") + tk.MustExec("insert into t2(a) values(1)") + tk.MustQuery("select /*+ tidb_inlj(t2) */ t2.b, t1.b from t1 join t2 ON t2.a=t1.a").Check(testkit.Rows(" 1")) +} + +func (s *testSuite) TestIssue23602(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("USE test") + tk.Se.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn + tk.MustExec("CREATE TABLE t (a bigint unsigned PRIMARY KEY)") + defer tk.MustExec("DROP TABLE t") + tk.MustExec("INSERT INTO t VALUES (0),(1),(2),(3),(18446744073709551600),(18446744073709551605),(18446744073709551610),(18446744073709551615)") + tk.MustExec("ANALYZE TABLE t") + tk.MustQuery(`EXPLAIN FORMAT = 'brief' SELECT a FROM t WHERE a >= 0x1 AND a <= 0x2`).Check(testkit.Rows( + "TableReader 2.00 root data:TableRangeScan]\n" + + "[└─TableRangeScan 2.00 cop[tikv] table:t range:[1,2], keep order:false")) + tk.MustQuery(`EXPLAIN FORMAT = 'brief' SELECT a FROM t WHERE a BETWEEN 0x1 AND 0x2`).Check(testkit.Rows( + "TableReader 2.00 root data:TableRangeScan]\n" + + "[└─TableRangeScan 2.00 cop[tikv] table:t range:[1,2], keep order:false")) + tk.MustQuery("SELECT a FROM t WHERE a BETWEEN 0xFFFFFFFFFFFFFFF5 AND X'FFFFFFFFFFFFFFFA'").Check(testkit.Rows("18446744073709551605", "18446744073709551610")) +} + +func (s *testSuite) TestCTEWithIndexLookupJoinDeadLock(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table t (a int(11) default null,b int(11) default null,key b (b),key ba (b))") + tk.MustExec("create table t1 (a int(11) default null,b int(11) default null,key idx_ab (a,b),key idx_a (a),key idx_b (b))") + tk.MustExec("create table t2 (a int(11) default null,b int(11) default null,key idx_ab (a,b),key idx_a (a),key idx_b (b))") + // It's easy to reproduce this problem in 30 times execution of IndexLookUpJoin. + for i := 0; i < 30; i++ { + tk.MustExec("with cte as (with cte1 as (select * from t2 use index(idx_ab) where a > 1 and b > 1) select * from cte1) select /*+use_index(t1 idx_ab)*/ * from cte join t1 on t1.a=cte.a;") + } +} + +func (s *testSuite) TestGetResultRowsCount(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + for i := 1; i <= 10; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%v)", i)) + } + cases := []struct { + sql string + row int64 + }{ + {"select * from t", 10}, + {"select * from t where a < 0", 0}, + {"select * from t where a <= 3", 3}, + {"insert into t values (11)", 0}, + {"replace into t values (12)", 0}, + {"update t set a=13 where a=12", 0}, + } + + for _, ca := range cases { + if strings.HasPrefix(ca.sql, "select") { + tk.MustQuery(ca.sql) + } else { + tk.MustExec(ca.sql) + } + info := tk.Se.ShowProcess() + c.Assert(info, NotNil) + p, ok := info.Plan.(plannercore.Plan) + c.Assert(ok, IsTrue) + cnt := executor.GetResultRowsCount(tk.Se, p) + c.Assert(ca.row, Equals, cnt, Commentf("sql: %v", ca.sql)) + } +} + +func checkFileName(s string) bool { + files := []string{ + "config.toml", + "meta.txt", + "stats/test.t_dump_single.json", + "schema/test.t_dump_single.schema.txt", + "variables.toml", + "sqls.sql", + "session_bindings.sql", + "global_bindings.sql", + "explain.txt", + } + for _, f := range files { + if strings.Compare(f, s) == 0 { + return true + } + } + return false +} + +func (s *testSuiteWithData) TestPlanReplayerDumpSingle(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t_dump_single") + tk.MustExec("create table t_dump_single(a int)") + res := tk.MustQuery("plan replayer dump explain select * from t_dump_single") + path := s.testData.ConvertRowsToStrings(res.Rows()) + + reader, err := zip.OpenReader(filepath.Join(domain.GetPlanReplayerDirName(), path[0])) + c.Assert(err, IsNil) + defer reader.Close() + for _, file := range reader.File { + c.Assert(checkFileName(file.Name), IsTrue) + } +} + +func (s *testSuiteP1) TestIssue28935(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@tidb_enable_vectorized_expression=true") + tk.MustQuery(`select trim(leading from " a "), trim(both from " a "), trim(trailing from " a ")`).Check(testkit.Rows("a a a")) + tk.MustQuery(`select trim(leading null from " a "), trim(both null from " a "), trim(trailing null from " a ")`).Check(testkit.Rows(" ")) + tk.MustQuery(`select trim(null from " a ")`).Check(testkit.Rows("")) + + tk.MustExec("set @@tidb_enable_vectorized_expression=false") + tk.MustQuery(`select trim(leading from " a "), trim(both from " a "), trim(trailing from " a ")`).Check(testkit.Rows("a a a")) + tk.MustQuery(`select trim(leading null from " a "), trim(both null from " a "), trim(trailing null from " a ")`).Check(testkit.Rows(" ")) + tk.MustQuery(`select trim(null from " a ")`).Check(testkit.Rows("")) +} + +func (s *testSuiteP1) TestIssue29412(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t29142_1") + tk.MustExec("drop table if exists t29142_2") + tk.MustExec("create table t29142_1(a int);") + tk.MustExec("create table t29142_2(a double);") + tk.MustExec("insert into t29142_1 value(20);") + tk.MustQuery("select sum(distinct a) as x from t29142_1 having x > some ( select a from t29142_2 where x in (a));").Check(nil) +} + +func (s *testSerialSuite) TestIssue28650(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int, index(a));") + tk.MustExec("create table t2(a int, c int, b char(50), index(a,c,b));") + tk.MustExec("set tidb_enable_rate_limit_action=off;") + + wg := &sync.WaitGroup{} + sql := `explain analyze + select /*+ stream_agg(@sel_1) stream_agg(@sel_3) %s(@sel_2 t2)*/ count(1) from + ( + SELECT t2.a AS t2_external_user_ext_id, t2.b AS t2_t1_ext_id FROM t2 INNER JOIN (SELECT t1.a AS d_t1_ext_id FROM t1 GROUP BY t1.a) AS anon_1 ON anon_1.d_t1_ext_id = t2.a WHERE t2.c = 123 AND t2.b + IN ("%s") ) tmp` + + wg.Add(1) + sqls := make([]string, 2) + go func() { + defer wg.Done() + inElems := make([]string, 1000) + for i := 0; i < len(inElems); i++ { + inElems[i] = fmt.Sprintf("wm_%dbDgAAwCD-v1QB%dxky-g_dxxQCw", rand.Intn(100), rand.Intn(100)) + } + sqls[0] = fmt.Sprintf(sql, "inl_join", strings.Join(inElems, "\",\"")) + sqls[1] = fmt.Sprintf(sql, "inl_hash_join", strings.Join(inElems, "\",\"")) + }() + + tk.MustExec("insert into t1 select rand()*400;") + for i := 0; i < 10; i++ { + tk.MustExec("insert into t1 select rand()*400 from t1;") + } + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + defer func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionLog + }) + }() + wg.Wait() + for _, sql := range sqls { + tk.MustExec("set @@tidb_mem_quota_query = 1073741824") // 1GB + c.Assert(tk.QueryToErr(sql), IsNil) + tk.MustExec("set @@tidb_mem_quota_query = 33554432") // 32MB, out of memory during executing + c.Assert(strings.Contains(tk.QueryToErr(sql).Error(), "Out Of Memory Quota!"), IsTrue) + tk.MustExec("set @@tidb_mem_quota_query = 65536") // 64KB, out of memory during building the plan + func() { + defer func() { + r := recover() + c.Assert(r, NotNil) + err := errors.Errorf("%v", r) + c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue) + }() + tk.MustExec(sql) + }() + } +} + +func (s *testSerialSuite) TestIssue30289(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + fpName := "github.com/pingcap/tidb/executor/issue30289" + c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") + c.Assert(err.Error(), Matches, "issue30289 build return error") +} + +// Test invoke Close without invoking Open before for each operators. +func (s *testSerialSuite) TestUnreasonablyClose(c *C) { + defer testleak.AfterTest(c)() + + is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()}) + se, err := session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + _, err = se.Execute(context.Background(), "use test") + c.Assert(err, IsNil) + // To enable the shuffleExec operator. + _, err = se.Execute(context.Background(), "set @@tidb_merge_join_concurrency=4") + c.Assert(err, IsNil) + + var opsNeedsCovered = []plannercore.PhysicalPlan{ + &plannercore.PhysicalHashJoin{}, + &plannercore.PhysicalMergeJoin{}, + &plannercore.PhysicalIndexJoin{}, + &plannercore.PhysicalIndexHashJoin{}, + &plannercore.PhysicalTableReader{}, + &plannercore.PhysicalIndexReader{}, + &plannercore.PhysicalIndexLookUpReader{}, + &plannercore.PhysicalIndexMergeReader{}, + &plannercore.PhysicalApply{}, + &plannercore.PhysicalHashAgg{}, + &plannercore.PhysicalStreamAgg{}, + &plannercore.PhysicalLimit{}, + &plannercore.PhysicalSort{}, + &plannercore.PhysicalTopN{}, + &plannercore.PhysicalCTE{}, + &plannercore.PhysicalCTETable{}, + &plannercore.PhysicalMaxOneRow{}, + &plannercore.PhysicalProjection{}, + &plannercore.PhysicalSelection{}, + &plannercore.PhysicalTableDual{}, + &plannercore.PhysicalWindow{}, + &plannercore.PhysicalShuffle{}, + &plannercore.PhysicalUnionAll{}, + } + executorBuilder := executor.NewMockExecutorBuilderForTest(se, is, nil, math.MaxUint64, false, "global") + + var opsNeedsCoveredMask uint64 = 1< t1.a) AS a from t as t1) t", + "select /*+ hash_agg() */ count(f) from t group by a", + "select /*+ stream_agg() */ count(f) from t group by a", + "select * from t order by a, f", + "select * from t order by a, f limit 1", + "select * from t limit 1", + "select (select t1.a from t t1 where t1.a > t2.a) as a from t t2;", + "select a + 1 from t", + "select count(*) a from t having a > 1", + "select * from t where a = 1.1", + "with recursive cte1(c1) as (select 1 union select c1 + 1 from cte1 limit 5 offset 0) select * from cte1", + "select /*+use_index_merge(t, c_d_e, f)*/ * from t where c < 1 or f > 2", + "select sum(f) over (partition by f) from t", + "select /*+ merge_join(t1)*/ * from t t1 join t t2 on t1.d = t2.d", + "select a from t union all select a from t", + } { + comment := Commentf("case:%v sql:%s", i, tc) + c.Assert(err, IsNil, comment) + stmt, err := s.ParseOneStmt(tc, "", "") + c.Assert(err, IsNil, comment) + + err = se.NewTxn(context.Background()) + c.Assert(err, IsNil, comment) + p, _, err := planner.Optimize(context.TODO(), se, stmt, is) + c.Assert(err, IsNil, comment) + // This for loop level traverses the plan tree to get which operators are covered. + for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; { + newChild := make([]plannercore.PhysicalPlan, 0, len(child)) + for _, ch := range child { + found := false + for k, t := range opsNeedsCovered { + if reflect.TypeOf(t) == reflect.TypeOf(ch) { + opsAlreadyCoveredMask |= 1 << k + found = true + break + } + } + c.Assert(found, IsTrue, Commentf("case: %v sql: %s operator %v is not registered in opsNeedsCoveredMask", i, tc, reflect.TypeOf(ch))) + switch x := ch.(type) { + case *plannercore.PhysicalCTE: + newChild = append(newChild, x.RecurPlan) + newChild = append(newChild, x.SeedPlan) + continue + case *plannercore.PhysicalShuffle: + newChild = append(newChild, x.DataSources...) + newChild = append(newChild, x.Tails...) + continue + } + newChild = append(newChild, ch.Children()...) + } + child = newChild + } + + e := executorBuilder.Build(p) + + func() { + defer func() { + r := recover() + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + c.Assert(r, IsNil, Commentf("case: %v\n sql: %s\n error stack: %v", i, tc, string(buf))) + }() + c.Assert(e.Close(), IsNil, comment) + }() + } + // The following code is used to make sure all the operators registered + // in opsNeedsCoveredMask are covered. + commentBuf := strings.Builder{} + if opsAlreadyCoveredMask != opsNeedsCoveredMask { + for i := range opsNeedsCovered { + if opsAlreadyCoveredMask&(1<>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) diff --git a/executor/join.go b/executor/join.go index 472df41ded441..4b09d6e787e81 100644 --- a/executor/join.go +++ b/executor/join.go @@ -114,7 +114,9 @@ type hashjoinWorkerResult struct { // Close implements the Executor Close interface. func (e *HashJoinExec) Close() error { - close(e.closeCh) + if e.closeCh != nil { + close(e.closeCh) + } e.finished.Store(true) if e.prepared { if e.buildFinished != nil { @@ -157,7 +159,10 @@ func (e *HashJoinExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } +<<<<<<< HEAD +======= +>>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) e.prepared = false e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) diff --git a/executor/merge_join.go b/executor/merge_join.go index d6374910a53b9..57e02b6e7a753 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -52,6 +52,7 @@ type MergeJoinExec struct { } type mergeJoinTable struct { + inited bool isInner bool childIndex int joinKeys []*expression.Column @@ -107,10 +108,14 @@ func (t *mergeJoinTable) init(exec *MergeJoinExec) { } t.memTracker.AttachTo(exec.memTracker) + t.inited = true t.memTracker.Consume(t.childChunk.MemoryUsage()) } func (t *mergeJoinTable) finish() error { + if !t.inited { + return nil + } t.memTracker.Consume(-t.childChunk.MemoryUsage()) if t.isInner { From 1088852335b32a1b1ad489c638e09cf29afd1893 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Thu, 16 Dec 2021 16:41:00 +0800 Subject: [PATCH 2/5] resolve conflicts --- cmd/ddltest/index_test.go | 2 +- executor/cte.go | 578 -------------------------------------- executor/distsql.go | 9 - executor/executor_test.go | 214 -------------- executor/join.go | 3 - 5 files changed, 1 insertion(+), 805 deletions(-) delete mode 100644 executor/cte.go diff --git a/cmd/ddltest/index_test.go b/cmd/ddltest/index_test.go index 1101486de37b2..dcb2280f231e9 100644 --- a/cmd/ddltest/index_test.go +++ b/cmd/ddltest/index_test.go @@ -104,7 +104,7 @@ func (s *TestDDLSuite) checkDropIndex(c *C, indexInfo *model.IndexInfo) { c.Assert(err, IsNil) txn, err := ctx.Txn(false) c.Assert(err, IsNil) - defer func(){ + defer func() { err := txn.Rollback() c.Assert(err, IsNil) }() diff --git a/executor/cte.go b/executor/cte.go deleted file mode 100644 index 8345bf5e57f5d..0000000000000 --- a/executor/cte.go +++ /dev/null @@ -1,578 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package executor - -import ( - "context" - - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/cteutil" - "github.com/pingcap/tidb/util/disk" - "github.com/pingcap/tidb/util/memory" -) - -var _ Executor = &CTEExec{} - -// CTEExec implements CTE. -// Following diagram describes how CTEExec works. -// -// `iterInTbl` is shared by `CTEExec` and `CTETableReaderExec`. -// `CTETableReaderExec` reads data from `iterInTbl`, -// and its output will be stored `iterOutTbl` by `CTEExec`. -// -// When an iteration ends, `CTEExec` will move all data from `iterOutTbl` into `iterInTbl`, -// which will be the input for new iteration. -// At the end of each iteration, data in `iterOutTbl` will also be added into `resTbl`. -// `resTbl` stores data of all iteration. -// +----------+ -// write |iterOutTbl| -// CTEExec ------------------->| | -// | +----+-----+ -// ------------- | write -// | | v -// other op other op +----------+ -// (seed) (recursive) | resTbl | -// ^ | | -// | +----------+ -// CTETableReaderExec -// ^ -// | read +----------+ -// +---------------+iterInTbl | -// | | -// +----------+ -type CTEExec struct { - baseExecutor - - seedExec Executor - recursiveExec Executor - - // `resTbl` and `iterInTbl` are shared by all CTEExec which reference to same the CTE. - // `iterInTbl` is also shared by CTETableReaderExec. - resTbl cteutil.Storage - iterInTbl cteutil.Storage - iterOutTbl cteutil.Storage - - hashTbl baseHashTable - - // Index of chunk to read from `resTbl`. - chkIdx int - - // UNION ALL or UNION DISTINCT. - isDistinct bool - curIter int - hCtx *hashContext - sel []int - - // Limit related info. - hasLimit bool - limitBeg uint64 - limitEnd uint64 - cursor uint64 - meetFirstBatch bool - - memTracker *memory.Tracker - diskTracker *disk.Tracker -} - -// Open implements the Executor interface. -func (e *CTEExec) Open(ctx context.Context) (err error) { - e.reset() - if err := e.baseExecutor.Open(ctx); err != nil { - return err - } - - if e.seedExec == nil { - return errors.New("seedExec for CTEExec is nil") - } - if err = e.seedExec.Open(ctx); err != nil { - return err - } - - e.memTracker = memory.NewTracker(e.id, -1) - e.diskTracker = disk.NewTracker(e.id, -1) - e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) - e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) - - if e.recursiveExec != nil { - if err = e.recursiveExec.Open(ctx); err != nil { - return err - } - recursiveTypes := e.recursiveExec.base().retFieldTypes - e.iterOutTbl = cteutil.NewStorageRowContainer(recursiveTypes, e.maxChunkSize) - if err = e.iterOutTbl.OpenAndRef(); err != nil { - return err - } - } - - if e.isDistinct { - e.hashTbl = newConcurrentMapHashTable() - e.hCtx = &hashContext{ - allTypes: e.base().retFieldTypes, - } - // We use all columns to compute hash. - e.hCtx.keyColIdx = make([]int, len(e.hCtx.allTypes)) - for i := range e.hCtx.keyColIdx { - e.hCtx.keyColIdx[i] = i - } - } - return nil -} - -// Next implements the Executor interface. -func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { - req.Reset() - e.resTbl.Lock() - defer e.resTbl.Unlock() - if !e.resTbl.Done() { - resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) - iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) - var iterOutAction *chunk.SpillDiskAction - if e.iterOutTbl != nil { - iterOutAction = setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) - } - - failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { - if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage { - defer resAction.WaitForTest() - defer iterInAction.WaitForTest() - if iterOutAction != nil { - defer iterOutAction.WaitForTest() - } - } - }) - - if err = e.computeSeedPart(ctx); err != nil { - // Don't put it in defer. - // Because it should be called only when the filling process is not completed. - if err1 := e.reopenTbls(); err1 != nil { - return err1 - } - return err - } - if err = e.computeRecursivePart(ctx); err != nil { - if err1 := e.reopenTbls(); err1 != nil { - return err1 - } - return err - } - e.resTbl.SetDone() - } - - if e.hasLimit { - return e.nextChunkLimit(req) - } - if e.chkIdx < e.resTbl.NumChunks() { - res, err := e.resTbl.GetChunk(e.chkIdx) - if err != nil { - return err - } - // Need to copy chunk to make sure upper operator will not change chunk in resTbl. - // Also we ignore copying rows not selected, because some operators like Projection - // doesn't support swap column if chunk.sel is no nil. - req.SwapColumns(res.CopyConstructSel()) - e.chkIdx++ - } - return nil -} - -// Close implements the Executor interface. -func (e *CTEExec) Close() (err error) { - e.reset() - if err = e.seedExec.Close(); err != nil { - return err - } - if e.recursiveExec != nil { - if err = e.recursiveExec.Close(); err != nil { - return err - } - // `iterInTbl` and `resTbl` are shared by multiple operators, - // so will be closed when the SQL finishes. - if e.iterOutTbl != nil { - if err = e.iterOutTbl.DerefAndClose(); err != nil { - return err - } - } - } - - return e.baseExecutor.Close() -} - -func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) { - e.curIter = 0 - e.iterInTbl.SetIter(e.curIter) - // This means iterInTbl's can be read. - defer close(e.iterInTbl.GetBegCh()) - chks := make([]*chunk.Chunk, 0, 10) - for { - if e.limitDone(e.iterInTbl) { - break - } - chk := newFirstChunk(e.seedExec) - if err = Next(ctx, e.seedExec, chk); err != nil { - return err - } - if chk.NumRows() == 0 { - break - } - if chk, err = e.tryDedupAndAdd(chk, e.iterInTbl, e.hashTbl); err != nil { - return err - } - chks = append(chks, chk) - } - // Initial resTbl is empty, so no need to deduplicate chk using resTbl. - // Just adding is ok. - for _, chk := range chks { - if err = e.resTbl.Add(chk); err != nil { - return err - } - } - e.curIter++ - e.iterInTbl.SetIter(e.curIter) - - return nil -} - -func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) { - if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 { - return nil - } - - if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { - return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) - } - - if e.limitDone(e.resTbl) { - return nil - } - - for { - chk := newFirstChunk(e.recursiveExec) - if err = Next(ctx, e.recursiveExec, chk); err != nil { - return err - } - if chk.NumRows() == 0 { - if err = e.setupTblsForNewIteration(); err != nil { - return err - } - if e.limitDone(e.resTbl) { - break - } - if e.iterInTbl.NumChunks() == 0 { - break - } - // Next iteration begins. Need use iterOutTbl as input of next iteration. - e.curIter++ - e.iterInTbl.SetIter(e.curIter) - if e.curIter > e.ctx.GetSessionVars().CTEMaxRecursionDepth { - return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter) - } - // Make sure iterInTbl is setup before Close/Open, - // because some executors will read iterInTbl in Open() (like IndexLookupJoin). - if err = e.recursiveExec.Close(); err != nil { - return err - } - if err = e.recursiveExec.Open(ctx); err != nil { - return err - } - } else { - if err = e.iterOutTbl.Add(chk); err != nil { - return err - } - } - } - return nil -} - -// Get next chunk from resTbl for limit. -func (e *CTEExec) nextChunkLimit(req *chunk.Chunk) error { - if !e.meetFirstBatch { - for e.chkIdx < e.resTbl.NumChunks() { - res, err := e.resTbl.GetChunk(e.chkIdx) - if err != nil { - return err - } - e.chkIdx++ - numRows := uint64(res.NumRows()) - if newCursor := e.cursor + numRows; newCursor >= e.limitBeg { - e.meetFirstBatch = true - begInChk, endInChk := e.limitBeg-e.cursor, numRows - if newCursor > e.limitEnd { - endInChk = e.limitEnd - e.cursor - } - e.cursor += endInChk - if begInChk == endInChk { - break - } - tmpChk := res.CopyConstructSel() - req.Append(tmpChk, int(begInChk), int(endInChk)) - return nil - } - e.cursor += numRows - } - } - if e.chkIdx < e.resTbl.NumChunks() && e.cursor < e.limitEnd { - res, err := e.resTbl.GetChunk(e.chkIdx) - if err != nil { - return err - } - e.chkIdx++ - numRows := uint64(res.NumRows()) - if e.cursor+numRows > e.limitEnd { - numRows = e.limitEnd - e.cursor - req.Append(res.CopyConstructSel(), 0, int(numRows)) - } else { - req.SwapColumns(res.CopyConstructSel()) - } - e.cursor += numRows - } - return nil -} - -func (e *CTEExec) setupTblsForNewIteration() (err error) { - num := e.iterOutTbl.NumChunks() - chks := make([]*chunk.Chunk, 0, num) - // Setup resTbl's data. - for i := 0; i < num; i++ { - chk, err := e.iterOutTbl.GetChunk(i) - if err != nil { - return err - } - // Data should be copied in UNION DISTINCT. - // Because deduplicate() will change data in iterOutTbl, - // which will cause panic when spilling data into disk concurrently. - if e.isDistinct { - chk = chk.CopyConstruct() - } - chk, err = e.tryDedupAndAdd(chk, e.resTbl, e.hashTbl) - if err != nil { - return err - } - chks = append(chks, chk) - } - - // Setup new iteration data in iterInTbl. - if err = e.iterInTbl.Reopen(); err != nil { - return err - } - defer close(e.iterInTbl.GetBegCh()) - if e.isDistinct { - // Already deduplicated by resTbl, adding directly is ok. - for _, chk := range chks { - if err = e.iterInTbl.Add(chk); err != nil { - return err - } - } - } else { - if err = e.iterInTbl.SwapData(e.iterOutTbl); err != nil { - return err - } - } - - // Clear data in iterOutTbl. - return e.iterOutTbl.Reopen() -} - -func (e *CTEExec) reset() { - e.curIter = 0 - e.chkIdx = 0 - e.hashTbl = nil - e.cursor = 0 - e.meetFirstBatch = false -} - -func (e *CTEExec) reopenTbls() (err error) { - e.hashTbl = newConcurrentMapHashTable() - if err := e.resTbl.Reopen(); err != nil { - return err - } - return e.iterInTbl.Reopen() -} - -// Check if tbl meets the requirement of limit. -func (e *CTEExec) limitDone(tbl cteutil.Storage) bool { - return e.hasLimit && uint64(tbl.NumRows()) >= e.limitEnd -} - -func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker, - parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { - memTracker := tbl.GetMemTracker() - memTracker.SetLabel(memory.LabelForCTEStorage) - memTracker.AttachTo(parentMemTracker) - - diskTracker := tbl.GetDiskTracker() - diskTracker.SetLabel(memory.LabelForCTEStorage) - diskTracker.AttachTo(parentDiskTracker) - - if config.GetGlobalConfig().OOMUseTmpStorage { - actionSpill = tbl.ActionSpill() - failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { - if val.(bool) { - actionSpill = tbl.(*cteutil.StorageRC).ActionSpillForTest() - } - }) - ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionSpill) - } - return actionSpill -} - -func (e *CTEExec) tryDedupAndAdd(chk *chunk.Chunk, - storage cteutil.Storage, - hashTbl baseHashTable) (res *chunk.Chunk, err error) { - if e.isDistinct { - if chk, err = e.deduplicate(chk, storage, hashTbl); err != nil { - return nil, err - } - } - return chk, storage.Add(chk) -} - -// Compute hash values in chk and put it in hCtx.hashVals. -// Use the returned sel to choose the computed hash values. -func (e *CTEExec) computeChunkHash(chk *chunk.Chunk) (sel []int, err error) { - numRows := chk.NumRows() - e.hCtx.initHash(numRows) - // Continue to reset to make sure all hasher is new. - for i := numRows; i < len(e.hCtx.hashVals); i++ { - e.hCtx.hashVals[i].Reset() - } - sel = chk.Sel() - var hashBitMap []bool - if sel != nil { - hashBitMap = make([]bool, chk.Capacity()) - for _, val := range sel { - hashBitMap[val] = true - } - } else { - // All rows is selected, sel will be [0....numRows). - // e.sel is setup when building executor. - sel = e.sel - } - - for i := 0; i < chk.NumCols(); i++ { - if err = codec.HashChunkSelected(e.ctx.GetSessionVars().StmtCtx, e.hCtx.hashVals, - chk, e.hCtx.allTypes[i], i, e.hCtx.buf, e.hCtx.hasNull, - hashBitMap, false); err != nil { - return nil, err - } - } - return sel, nil -} - -// Use hashTbl to deduplicate rows, and unique rows will be added to hashTbl. -// Duplicated rows are only marked to be removed by sel in Chunk, instead of really deleted. -func (e *CTEExec) deduplicate(chk *chunk.Chunk, - storage cteutil.Storage, - hashTbl baseHashTable) (chkNoDup *chunk.Chunk, err error) { - numRows := chk.NumRows() - if numRows == 0 { - return chk, nil - } - - // 1. Compute hash values for chunk. - chkHashTbl := newConcurrentMapHashTable() - selOri, err := e.computeChunkHash(chk) - if err != nil { - return nil, err - } - - // 2. Filter rows duplicated in input chunk. - // This sel is for filtering rows duplicated in cur chk. - selChk := make([]int, 0, numRows) - for i := 0; i < numRows; i++ { - key := e.hCtx.hashVals[selOri[i]].Sum64() - row := chk.GetRow(i) - - hasDup, err := e.checkHasDup(key, row, chk, storage, chkHashTbl) - if err != nil { - return nil, err - } - if hasDup { - continue - } - - selChk = append(selChk, selOri[i]) - - rowPtr := chunk.RowPtr{ChkIdx: uint32(0), RowIdx: uint32(i)} - chkHashTbl.Put(key, rowPtr) - } - chk.SetSel(selChk) - chkIdx := storage.NumChunks() - - // 3. Filter rows duplicated in RowContainer. - // This sel is for filtering rows duplicated in cteutil.Storage. - selStorage := make([]int, 0, len(selChk)) - for i := 0; i < len(selChk); i++ { - key := e.hCtx.hashVals[selChk[i]].Sum64() - row := chk.GetRow(i) - - hasDup, err := e.checkHasDup(key, row, nil, storage, hashTbl) - if err != nil { - return nil, err - } - if hasDup { - continue - } - - rowIdx := len(selStorage) - selStorage = append(selStorage, selChk[i]) - - rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)} - hashTbl.Put(key, rowPtr) - } - - chk.SetSel(selStorage) - return chk, nil -} - -// Use the row's probe key to check if it already exists in chk or storage. -// We also need to compare the row's real encoding value to avoid hash collision. -func (e *CTEExec) checkHasDup(probeKey uint64, - row chunk.Row, - curChk *chunk.Chunk, - storage cteutil.Storage, - hashTbl baseHashTable) (hasDup bool, err error) { - ptrs := hashTbl.Get(probeKey) - - if len(ptrs) == 0 { - return false, nil - } - - for _, ptr := range ptrs { - var matchedRow chunk.Row - if curChk != nil { - matchedRow = curChk.GetRow(int(ptr.RowIdx)) - } else { - matchedRow, err = storage.GetRow(ptr) - } - if err != nil { - return false, err - } - isEqual, err := codec.EqualChunkRow(e.ctx.GetSessionVars().StmtCtx, - row, e.hCtx.allTypes, e.hCtx.keyColIdx, - matchedRow, e.hCtx.allTypes, e.hCtx.keyColIdx) - if err != nil { - return false, err - } - if isEqual { - return true, nil - } - } - return false, nil -} diff --git a/executor/distsql.go b/executor/distsql.go index a9a5bb41777fe..a0519dd460622 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -200,19 +200,10 @@ type IndexReaderExecutor struct { } // Close clears all resources hold by current object. -<<<<<<< HEAD -func (e *IndexReaderExecutor) Close() error { - err := e.result.Close() -======= func (e *IndexReaderExecutor) Close() (err error) { - if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { - return nil - } - if e.result != nil { err = e.result.Close() } ->>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) e.result = nil e.ctx.StoreQueryFeedback(e.feedback) return err diff --git a/executor/executor_test.go b/executor/executor_test.go index 178dd27ab8dac..1388ab26756a5 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -21,12 +21,8 @@ import ( "math" "net" "os" -<<<<<<< HEAD -======= - "path/filepath" "reflect" "runtime" ->>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) "strconv" "strings" "sync" @@ -8285,215 +8281,6 @@ func (s *testSuite) TestIssue26532(c *C) { tk.MustQuery("select greatest(\"2020-01-01 01:01:01\" ,\"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2020-01-01 01:01:01", "")) tk.MustQuery("select least(\"2020-01-01 01:01:01\" , \"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2019-01-01 01:01:01", "")) } -<<<<<<< HEAD -======= - -func (s *testSuite) TestIssue25447(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t1, t2") - tk.MustExec("create table t1(a int, b varchar(8))") - tk.MustExec("insert into t1 values(1,'1')") - tk.MustExec("create table t2(a int , b varchar(8) GENERATED ALWAYS AS (c) VIRTUAL, c varchar(8), PRIMARY KEY (a))") - tk.MustExec("insert into t2(a) values(1)") - tk.MustQuery("select /*+ tidb_inlj(t2) */ t2.b, t1.b from t1 join t2 ON t2.a=t1.a").Check(testkit.Rows(" 1")) -} - -func (s *testSuite) TestIssue23602(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("USE test") - tk.Se.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn - tk.MustExec("CREATE TABLE t (a bigint unsigned PRIMARY KEY)") - defer tk.MustExec("DROP TABLE t") - tk.MustExec("INSERT INTO t VALUES (0),(1),(2),(3),(18446744073709551600),(18446744073709551605),(18446744073709551610),(18446744073709551615)") - tk.MustExec("ANALYZE TABLE t") - tk.MustQuery(`EXPLAIN FORMAT = 'brief' SELECT a FROM t WHERE a >= 0x1 AND a <= 0x2`).Check(testkit.Rows( - "TableReader 2.00 root data:TableRangeScan]\n" + - "[└─TableRangeScan 2.00 cop[tikv] table:t range:[1,2], keep order:false")) - tk.MustQuery(`EXPLAIN FORMAT = 'brief' SELECT a FROM t WHERE a BETWEEN 0x1 AND 0x2`).Check(testkit.Rows( - "TableReader 2.00 root data:TableRangeScan]\n" + - "[└─TableRangeScan 2.00 cop[tikv] table:t range:[1,2], keep order:false")) - tk.MustQuery("SELECT a FROM t WHERE a BETWEEN 0xFFFFFFFFFFFFFFF5 AND X'FFFFFFFFFFFFFFFA'").Check(testkit.Rows("18446744073709551605", "18446744073709551610")) -} - -func (s *testSuite) TestCTEWithIndexLookupJoinDeadLock(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("create table t (a int(11) default null,b int(11) default null,key b (b),key ba (b))") - tk.MustExec("create table t1 (a int(11) default null,b int(11) default null,key idx_ab (a,b),key idx_a (a),key idx_b (b))") - tk.MustExec("create table t2 (a int(11) default null,b int(11) default null,key idx_ab (a,b),key idx_a (a),key idx_b (b))") - // It's easy to reproduce this problem in 30 times execution of IndexLookUpJoin. - for i := 0; i < 30; i++ { - tk.MustExec("with cte as (with cte1 as (select * from t2 use index(idx_ab) where a > 1 and b > 1) select * from cte1) select /*+use_index(t1 idx_ab)*/ * from cte join t1 on t1.a=cte.a;") - } -} - -func (s *testSuite) TestGetResultRowsCount(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (a int)") - for i := 1; i <= 10; i++ { - tk.MustExec(fmt.Sprintf("insert into t values (%v)", i)) - } - cases := []struct { - sql string - row int64 - }{ - {"select * from t", 10}, - {"select * from t where a < 0", 0}, - {"select * from t where a <= 3", 3}, - {"insert into t values (11)", 0}, - {"replace into t values (12)", 0}, - {"update t set a=13 where a=12", 0}, - } - - for _, ca := range cases { - if strings.HasPrefix(ca.sql, "select") { - tk.MustQuery(ca.sql) - } else { - tk.MustExec(ca.sql) - } - info := tk.Se.ShowProcess() - c.Assert(info, NotNil) - p, ok := info.Plan.(plannercore.Plan) - c.Assert(ok, IsTrue) - cnt := executor.GetResultRowsCount(tk.Se, p) - c.Assert(ca.row, Equals, cnt, Commentf("sql: %v", ca.sql)) - } -} - -func checkFileName(s string) bool { - files := []string{ - "config.toml", - "meta.txt", - "stats/test.t_dump_single.json", - "schema/test.t_dump_single.schema.txt", - "variables.toml", - "sqls.sql", - "session_bindings.sql", - "global_bindings.sql", - "explain.txt", - } - for _, f := range files { - if strings.Compare(f, s) == 0 { - return true - } - } - return false -} - -func (s *testSuiteWithData) TestPlanReplayerDumpSingle(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t_dump_single") - tk.MustExec("create table t_dump_single(a int)") - res := tk.MustQuery("plan replayer dump explain select * from t_dump_single") - path := s.testData.ConvertRowsToStrings(res.Rows()) - - reader, err := zip.OpenReader(filepath.Join(domain.GetPlanReplayerDirName(), path[0])) - c.Assert(err, IsNil) - defer reader.Close() - for _, file := range reader.File { - c.Assert(checkFileName(file.Name), IsTrue) - } -} - -func (s *testSuiteP1) TestIssue28935(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("set @@tidb_enable_vectorized_expression=true") - tk.MustQuery(`select trim(leading from " a "), trim(both from " a "), trim(trailing from " a ")`).Check(testkit.Rows("a a a")) - tk.MustQuery(`select trim(leading null from " a "), trim(both null from " a "), trim(trailing null from " a ")`).Check(testkit.Rows(" ")) - tk.MustQuery(`select trim(null from " a ")`).Check(testkit.Rows("")) - - tk.MustExec("set @@tidb_enable_vectorized_expression=false") - tk.MustQuery(`select trim(leading from " a "), trim(both from " a "), trim(trailing from " a ")`).Check(testkit.Rows("a a a")) - tk.MustQuery(`select trim(leading null from " a "), trim(both null from " a "), trim(trailing null from " a ")`).Check(testkit.Rows(" ")) - tk.MustQuery(`select trim(null from " a ")`).Check(testkit.Rows("")) -} - -func (s *testSuiteP1) TestIssue29412(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t29142_1") - tk.MustExec("drop table if exists t29142_2") - tk.MustExec("create table t29142_1(a int);") - tk.MustExec("create table t29142_2(a double);") - tk.MustExec("insert into t29142_1 value(20);") - tk.MustQuery("select sum(distinct a) as x from t29142_1 having x > some ( select a from t29142_2 where x in (a));").Check(nil) -} - -func (s *testSerialSuite) TestIssue28650(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t1, t2;") - tk.MustExec("create table t1(a int, index(a));") - tk.MustExec("create table t2(a int, c int, b char(50), index(a,c,b));") - tk.MustExec("set tidb_enable_rate_limit_action=off;") - - wg := &sync.WaitGroup{} - sql := `explain analyze - select /*+ stream_agg(@sel_1) stream_agg(@sel_3) %s(@sel_2 t2)*/ count(1) from - ( - SELECT t2.a AS t2_external_user_ext_id, t2.b AS t2_t1_ext_id FROM t2 INNER JOIN (SELECT t1.a AS d_t1_ext_id FROM t1 GROUP BY t1.a) AS anon_1 ON anon_1.d_t1_ext_id = t2.a WHERE t2.c = 123 AND t2.b - IN ("%s") ) tmp` - - wg.Add(1) - sqls := make([]string, 2) - go func() { - defer wg.Done() - inElems := make([]string, 1000) - for i := 0; i < len(inElems); i++ { - inElems[i] = fmt.Sprintf("wm_%dbDgAAwCD-v1QB%dxky-g_dxxQCw", rand.Intn(100), rand.Intn(100)) - } - sqls[0] = fmt.Sprintf(sql, "inl_join", strings.Join(inElems, "\",\"")) - sqls[1] = fmt.Sprintf(sql, "inl_hash_join", strings.Join(inElems, "\",\"")) - }() - - tk.MustExec("insert into t1 select rand()*400;") - for i := 0; i < 10; i++ { - tk.MustExec("insert into t1 select rand()*400 from t1;") - } - config.UpdateGlobal(func(conf *config.Config) { - conf.OOMAction = config.OOMActionCancel - }) - defer func() { - config.UpdateGlobal(func(conf *config.Config) { - conf.OOMAction = config.OOMActionLog - }) - }() - wg.Wait() - for _, sql := range sqls { - tk.MustExec("set @@tidb_mem_quota_query = 1073741824") // 1GB - c.Assert(tk.QueryToErr(sql), IsNil) - tk.MustExec("set @@tidb_mem_quota_query = 33554432") // 32MB, out of memory during executing - c.Assert(strings.Contains(tk.QueryToErr(sql).Error(), "Out Of Memory Quota!"), IsTrue) - tk.MustExec("set @@tidb_mem_quota_query = 65536") // 64KB, out of memory during building the plan - func() { - defer func() { - r := recover() - c.Assert(r, NotNil) - err := errors.Errorf("%v", r) - c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue) - }() - tk.MustExec(sql) - }() - } -} - -func (s *testSerialSuite) TestIssue30289(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - fpName := "github.com/pingcap/tidb/executor/issue30289" - c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil) - defer func() { - c.Assert(failpoint.Disable(fpName), IsNil) - }() - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int)") - err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") - c.Assert(err.Error(), Matches, "issue30289 build return error") -} // Test invoke Close without invoking Open before for each operators. func (s *testSerialSuite) TestUnreasonablyClose(c *C) { @@ -8621,4 +8408,3 @@ func (s *testSerialSuite) TestUnreasonablyClose(c *C) { } c.Assert(opsAlreadyCoveredMask, Equals, opsNeedsCoveredMask, Commentf("these operators are not covered %s", commentBuf.String())) } ->>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) diff --git a/executor/join.go b/executor/join.go index 4b09d6e787e81..dfb2d40fe33d1 100644 --- a/executor/join.go +++ b/executor/join.go @@ -159,10 +159,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } -<<<<<<< HEAD -======= ->>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) e.prepared = false e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) From b953bd06144ce468664e0f2bf97f18ec0f13b66e Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 17 Dec 2021 12:03:01 +0800 Subject: [PATCH 3/5] fix ci --- executor/builder.go | 4 ++-- executor/executor_test.go | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 9972c99759d91..fd44b1574063b 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -103,9 +103,9 @@ type MockExecutorBuilder struct { } // NewMockExecutorBuilderForTest is ONLY used in test. -func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *MockExecutorBuilder { +func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema) *MockExecutorBuilder { return &MockExecutorBuilder{ - executorBuilder: newExecutorBuilder(ctx, is, ti, snapshotTS, isStaleness, replicaReadScope)} + executorBuilder: newExecutorBuilder(ctx, is)} } // Build builds an executor tree according to `p`. diff --git a/executor/executor_test.go b/executor/executor_test.go index 1388ab26756a5..fb6254c229d65 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8310,8 +8310,6 @@ func (s *testSerialSuite) TestUnreasonablyClose(c *C) { &plannercore.PhysicalLimit{}, &plannercore.PhysicalSort{}, &plannercore.PhysicalTopN{}, - &plannercore.PhysicalCTE{}, - &plannercore.PhysicalCTETable{}, &plannercore.PhysicalMaxOneRow{}, &plannercore.PhysicalProjection{}, &plannercore.PhysicalSelection{}, @@ -8320,7 +8318,7 @@ func (s *testSerialSuite) TestUnreasonablyClose(c *C) { &plannercore.PhysicalShuffle{}, &plannercore.PhysicalUnionAll{}, } - executorBuilder := executor.NewMockExecutorBuilderForTest(se, is, nil, math.MaxUint64, false, "global") + executorBuilder := executor.NewMockExecutorBuilderForTest(se, is) var opsNeedsCoveredMask uint64 = 1< Date: Mon, 20 Dec 2021 11:15:40 +0800 Subject: [PATCH 4/5] fix ci --- executor/executor_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index fb6254c229d65..f0ffc6ac926d1 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8367,10 +8367,6 @@ func (s *testSerialSuite) TestUnreasonablyClose(c *C) { } c.Assert(found, IsTrue, Commentf("case: %v sql: %s operator %v is not registered in opsNeedsCoveredMask", i, tc, reflect.TypeOf(ch))) switch x := ch.(type) { - case *plannercore.PhysicalCTE: - newChild = append(newChild, x.RecurPlan) - newChild = append(newChild, x.SeedPlan) - continue case *plannercore.PhysicalShuffle: newChild = append(newChild, x.DataSources...) newChild = append(newChild, x.Tails...) From 5482ef49b26c0a6d167bebe86cf09edfd7b4444b Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 20 Dec 2021 13:10:09 +0800 Subject: [PATCH 5/5] remove unsupported case --- executor/executor_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 538c975279d3b..52a8272868935 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8338,7 +8338,6 @@ func (s *testSerialSuite) TestUnreasonablyClose(c *C) { "select a + 1 from t", "select count(*) a from t having a > 1", "select * from t where a = 1.1", - "with recursive cte1(c1) as (select 1 union select c1 + 1 from cte1 limit 5 offset 0) select * from cte1", "select /*+use_index_merge(t, c_d_e, f)*/ * from t where c < 1 or f > 2", "select sum(f) over (partition by f) from t", "select /*+ merge_join(t1)*/ * from t t1 join t t2 on t1.d = t2.d",