diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go new file mode 100644 index 0000000000000..cbafe9e9fde9d --- /dev/null +++ b/executor/index_lookup_hash_join.go @@ -0,0 +1,754 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "fmt" + "hash" + "hash/fnv" + "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/expression" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/ranger" +) + +// numResChkHold indicates the number of resource chunks that an inner worker +// holds at the same time. +// It's used in 2 cases individually: +// 1. IndexMergeJoin +// 2. IndexNestedLoopHashJoin: +// It's used when IndexNestedLoopHashJoin.keepOuterOrder is true. +// Otherwise, there will be at most `concurrency` resource chunks throughout +// the execution of IndexNestedLoopHashJoin. +const numResChkHold = 4 + +// IndexNestedLoopHashJoin employs one outer worker and N inner workers to +// execute concurrently. The output order is not promised. +// +// The execution flow is very similar to IndexLookUpReader: +// 1. The outer worker reads N outer rows, builds a task and sends it to the +// inner worker channel. +// 2. The inner worker receives the tasks and does 3 things for every task: +// 1. builds hash table from the outer rows +// 2. builds key ranges from outer rows and fetches inner rows +// 3. probes the hash table and sends the join result to the main thread channel. +// Note: step 1 and step 2 runs concurrently. +// 3. The main thread receives the join results. +type IndexNestedLoopHashJoin struct { + IndexLookUpJoin + resultCh chan *indexHashJoinResult + joinChkResourceCh []chan *chunk.Chunk + // We build individual joiner for each inner worker when using chunk-based + // execution, to avoid the concurrency of joiner.chk and joiner.selected. + joiners []joiner + keepOuterOrder bool + curTask *indexHashJoinTask + // taskCh is only used when `keepOuterOrder` is true. + taskCh chan *indexHashJoinTask +} + +type indexHashJoinOuterWorker struct { + outerWorker + innerCh chan *indexHashJoinTask + keepOuterOrder bool + // taskCh is only used when the outer order needs to be promised. + taskCh chan *indexHashJoinTask +} + +type indexHashJoinInnerWorker struct { + innerWorker + matchedOuterPtrs []chunk.RowPtr + joiner joiner + joinChkResourceCh chan *chunk.Chunk + // resultCh is valid only when indexNestedLoopHashJoin do not need to keep + // order. Otherwise, it will be nil. + resultCh chan *indexHashJoinResult + taskCh <-chan *indexHashJoinTask + wg *sync.WaitGroup + joinKeyBuf []byte + outerRowStatus []outerRowStatusFlag +} + +type indexHashJoinResult struct { + chk *chunk.Chunk + err error + src chan<- *chunk.Chunk +} + +type indexHashJoinTask struct { + *lookUpJoinTask + outerRowStatus [][]outerRowStatusFlag + lookupMap baseHashTable + err error + keepOuterOrder bool + // resultCh is only used when the outer order needs to be promised. + resultCh chan *indexHashJoinResult + // matchedInnerRowPtrs is only valid when the outer order needs to be + // promised. Otherwise, it will be nil. + // len(matchedInnerRowPtrs) equals to + // lookUpJoinTask.outerResult.NumChunks(), and the elements of every + // matchedInnerRowPtrs[chkIdx][rowIdx] indicates the matched inner row ptrs + // of the corresponding outer row. + matchedInnerRowPtrs [][][]chunk.RowPtr +} + +// Open implements the IndexNestedLoopHashJoin Executor interface. +func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { + // Be careful, very dirty hack in this line!!! + // IndexLookUpJoin need to rebuild executor (the dataReaderBuilder) during + // executing. However `executor.Next()` is lazy evaluation when the RecordSet + // result is drained. + // Lazy evaluation means the saved session context may change during executor's + // building and its running. + // A specific sequence for example: + // + // e := buildExecutor() // txn at build time + // recordSet := runStmt(e) + // session.CommitTxn() // txn closed + // recordSet.Next() + // e.dataReaderBuilder.Build() // txn is used again, which is already closed + // + // The trick here is `getSnapshotTS` will cache snapshot ts in the dataReaderBuilder, + // so even txn is destroyed later, the dataReaderBuilder could still use the + // cached snapshot ts to construct DAG. + _, err := e.innerCtx.readerBuilder.getSnapshotTS() + if err != nil { + return err + } + + err = e.children[0].Open(ctx) + if err != nil { + return err + } + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.innerPtrBytes = make([][]byte, 0, 8) + e.startWorkers(ctx) + return nil +} + +func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { + concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() + workerCtx, cancelFunc := context.WithCancel(ctx) + e.cancelFunc = cancelFunc + innerCh := make(chan *indexHashJoinTask, concurrency) + if e.keepOuterOrder { + e.taskCh = make(chan *indexHashJoinTask, concurrency) + } + e.workerWg.Add(1) + ow := e.newOuterWorker(innerCh) + go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers) + + if !e.keepOuterOrder { + e.resultCh = make(chan *indexHashJoinResult, concurrency) + } else { + // When `keepOuterOrder` is true, each task holds their own `resultCh` + // individually, thus we do not need a global resultCh. + e.resultCh = nil + } + e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency) + for i := 0; i < concurrency; i++ { + if !e.keepOuterOrder { + e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) + e.joinChkResourceCh[i] <- newFirstChunk(e) + } else { + e.joinChkResourceCh[i] = make(chan *chunk.Chunk, numResChkHold) + for j := 0; j < numResChkHold; j++ { + e.joinChkResourceCh[i] <- newFirstChunk(e) + } + } + } + + e.workerWg.Add(concurrency) + for i := 0; i < concurrency; i++ { + workerID := i + go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(workerCtx, cancelFunc) }, e.finishJoinWorkers) + } + go e.wait4JoinWorkers() +} + +func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { + if r != nil { + e.resultCh <- &indexHashJoinResult{ + err: errors.New(fmt.Sprintf("%v", r)), + } + if e.cancelFunc != nil { + e.cancelFunc() + } + } + e.workerWg.Done() +} + +func (e *IndexNestedLoopHashJoin) wait4JoinWorkers() { + e.workerWg.Wait() + if e.resultCh != nil { + close(e.resultCh) + } + if e.taskCh != nil { + close(e.taskCh) + } +} + +// Next implements the IndexNestedLoopHashJoin Executor interface. +func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + if e.keepOuterOrder { + return e.runInOrder(ctx, req) + } + // unordered run + var ( + result *indexHashJoinResult + ok bool + ) + select { + case result, ok = <-e.resultCh: + if !ok { + return nil + } + if result.err != nil { + return result.err + } + case <-ctx.Done(): + return ctx.Err() + } + req.SwapColumns(result.chk) + result.src <- result.chk + return nil +} + +func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error { + var ( + result *indexHashJoinResult + ok bool + ) + for { + if e.isDryUpTasks(ctx) { + return nil + } + select { + case result, ok = <-e.curTask.resultCh: + if !ok { + e.curTask = nil + continue + } + if result.err != nil { + return result.err + } + case <-ctx.Done(): + return ctx.Err() + } + req.SwapColumns(result.chk) + result.src <- result.chk + return nil + } +} + +// isDryUpTasks indicates whether all the tasks have been processed. +func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool { + if e.curTask != nil { + return false + } + var ok bool + select { + case e.curTask, ok = <-e.taskCh: + if !ok { + return true + } + case <-ctx.Done(): + return true + } + return false +} + +// Close implements the IndexNestedLoopHashJoin Executor interface. +func (e *IndexNestedLoopHashJoin) Close() error { + if e.cancelFunc != nil { + e.cancelFunc() + e.cancelFunc = nil + } + if e.resultCh != nil { + for range e.resultCh { + } + e.resultCh = nil + } + if e.taskCh != nil { + for range e.taskCh { + } + e.taskCh = nil + } + if e.runtimeStats != nil { + concurrency := cap(e.joinChkResourceCh) + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) + } + for i := range e.joinChkResourceCh { + close(e.joinChkResourceCh[i]) + } + e.joinChkResourceCh = nil + return e.baseExecutor.Close() +} + +func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { + defer close(ow.innerCh) + for { + task, err := ow.buildTask(ctx) + failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() { + err = errors.New("mockIndexHashJoinOuterWorkerErr") + }) + if err != nil { + task = &indexHashJoinTask{err: err} + ow.pushToChan(ctx, task, ow.innerCh) + if ow.keepOuterOrder { + ow.pushToChan(ctx, task, ow.taskCh) + } + return + } + if task == nil { + return + } + if finished := ow.pushToChan(ctx, task, ow.innerCh); finished { + return + } + if ow.keepOuterOrder { + if finished := ow.pushToChan(ctx, task, ow.taskCh); finished { + return + } + } + } +} + +func (ow *indexHashJoinOuterWorker) buildTask(ctx context.Context) (*indexHashJoinTask, error) { + task, err := ow.outerWorker.buildTask(ctx) + if task == nil || err != nil { + return nil, err + } + var ( + resultCh chan *indexHashJoinResult + matchedInnerRowPtrs [][][]chunk.RowPtr + ) + if ow.keepOuterOrder { + resultCh = make(chan *indexHashJoinResult, numResChkHold) + matchedInnerRowPtrs = make([][][]chunk.RowPtr, task.outerResult.NumChunks()) + for i := range matchedInnerRowPtrs { + matchedInnerRowPtrs[i] = make([][]chunk.RowPtr, task.outerResult.GetChunk(i).NumRows()) + } + } + numChks := task.outerResult.NumChunks() + outerRowStatus := make([][]outerRowStatusFlag, numChks) + for i := 0; i < numChks; i++ { + outerRowStatus[i] = make([]outerRowStatusFlag, task.outerResult.GetChunk(i).NumRows()) + } + return &indexHashJoinTask{ + lookUpJoinTask: task, + outerRowStatus: outerRowStatus, + keepOuterOrder: ow.keepOuterOrder, + resultCh: resultCh, + matchedInnerRowPtrs: matchedInnerRowPtrs, + }, nil +} + +func (ow *indexHashJoinOuterWorker) pushToChan(ctx context.Context, task *indexHashJoinTask, dst chan<- *indexHashJoinTask) bool { + select { + case <-ctx.Done(): + return true + case dst <- task: + } + return false +} + +func (e *IndexNestedLoopHashJoin) newOuterWorker(innerCh chan *indexHashJoinTask) *indexHashJoinOuterWorker { + ow := &indexHashJoinOuterWorker{ + outerWorker: outerWorker{ + outerCtx: e.outerCtx, + ctx: e.ctx, + executor: e.children[0], + batchSize: 32, + maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize, + parentMemTracker: e.memTracker, + lookup: &e.IndexLookUpJoin, + }, + innerCh: innerCh, + keepOuterOrder: e.keepOuterOrder, + taskCh: e.taskCh, + } + return ow +} + +func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, workerID int) *indexHashJoinInnerWorker { + // Since multiple inner workers run concurrently, we should copy join's indexRanges for every worker to avoid data race. + copiedRanges := make([]*ranger.Range, 0, len(e.indexRanges)) + for _, ran := range e.indexRanges { + copiedRanges = append(copiedRanges, ran.Clone()) + } + iw := &indexHashJoinInnerWorker{ + innerWorker: innerWorker{ + innerCtx: e.innerCtx, + outerCtx: e.outerCtx, + ctx: e.ctx, + executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), + indexRanges: copiedRanges, + keyOff2IdxOff: e.keyOff2IdxOff, + }, + taskCh: taskCh, + joiner: e.joiners[workerID], + joinChkResourceCh: e.joinChkResourceCh[workerID], + resultCh: e.resultCh, + matchedOuterPtrs: make([]chunk.RowPtr, 0, e.maxChunkSize), + joinKeyBuf: make([]byte, 1), + outerRowStatus: make([]outerRowStatusFlag, 0, e.maxChunkSize), + } + if e.lastColHelper != nil { + // nextCwf.TmpConstant needs to be reset for every individual + // inner worker to avoid data race when the inner workers is running + // concurrently. + nextCwf := *e.lastColHelper + nextCwf.TmpConstant = make([]*expression.Constant, len(e.lastColHelper.TmpConstant)) + for i := range e.lastColHelper.TmpConstant { + nextCwf.TmpConstant[i] = &expression.Constant{RetType: nextCwf.TargetCol.RetType} + } + iw.nextColCompareFilters = &nextCwf + } + return iw +} + +func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.CancelFunc) { + var task *indexHashJoinTask + joinResult, ok := iw.getNewJoinResult(ctx) + if !ok { + cancelFunc() + return + } + h, resultCh := fnv.New64(), iw.resultCh + for { + select { + case <-ctx.Done(): + return + case task, ok = <-iw.taskCh: + } + if !ok { + break + } + if task.err != nil { + joinResult.err = task.err + break + } + if task.keepOuterOrder { + resultCh = task.resultCh + } + err := iw.handleTask(ctx, task, joinResult, h, resultCh) + if err != nil { + joinResult.err = err + break + } + if task.keepOuterOrder { + // We need to get a new result holder here because the old + // `joinResult` hash been sent to the `resultCh` or to the + // `joinChkResourceCh`. + joinResult, ok = iw.getNewJoinResult(ctx) + if !ok { + cancelFunc() + return + } + } + } + failpoint.Inject("testIndexHashJoinInnerWorkerErr", func() { + joinResult.err = errors.New("mockIndexHashJoinInnerWorkerErr") + }) + if joinResult.err != nil { + resultCh <- joinResult + return + } + // When task.keepOuterOrder is TRUE(resultCh != iw.resultCh), the last + // joinResult will be checked when the a task has been processed, thus we do + // not need to check it here again. + if resultCh == iw.resultCh && joinResult.chk != nil && joinResult.chk.NumRows() > 0 { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return + } + } +} + +func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*indexHashJoinResult, bool) { + joinResult := &indexHashJoinResult{ + src: iw.joinChkResourceCh, + } + ok := true + select { + case joinResult.chk, ok = <-iw.joinChkResourceCh: + case <-ctx.Done(): + return nil, false + } + return joinResult, ok +} + +func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { + buf, numChks := make([]byte, 1), task.outerResult.NumChunks() + task.lookupMap = newUnsafeHashTable(task.outerResult.Len()) + for chkIdx := 0; chkIdx < numChks; chkIdx++ { + chk := task.outerResult.GetChunk(chkIdx) + numRows := chk.NumRows() + OUTER: + for rowIdx := 0; rowIdx < numRows; rowIdx++ { + if task.outerMatch != nil && !task.outerMatch[chkIdx][rowIdx] { + continue + } + row := chk.GetRow(rowIdx) + keyColIdx := iw.outerCtx.keyCols + for _, i := range keyColIdx { + if row.IsNull(i) { + continue OUTER + } + } + h.Reset() + err := codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, row, iw.outerCtx.rowTypes, keyColIdx, buf) + failpoint.Inject("testIndexHashJoinBuildErr", func() { + err = errors.New("mockIndexHashJoinBuildErr") + }) + if err != nil { + // This panic will be recovered by the invoker. + panic(err.Error()) + } + rowPtr := chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)} + task.lookupMap.Put(h.Sum64(), rowPtr) + } + } +} + +func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask) error { + lookUpContents, err := iw.constructLookupContent(task) + if err != nil { + return err + } + lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) + return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents) +} + +func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{}) { + if r != nil { + iw.resultCh <- &indexHashJoinResult{err: errors.Errorf("%v", r)} + } + iw.wg.Done() +} + +func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { + iw.wg = &sync.WaitGroup{} + iw.wg.Add(1) + // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. + go util.WithRecovery(func() { iw.buildHashTableForOuterResult(ctx, task, h) }, iw.handleHashJoinInnerWorkerPanic) + err := iw.fetchInnerResults(ctx, task.lookUpJoinTask) + if err != nil { + return err + } + iw.wg.Wait() + if !task.keepOuterOrder { + return iw.doJoinUnordered(ctx, task, joinResult, h, resultCh) + } + return iw.doJoinInOrder(ctx, task, joinResult, h, resultCh) +} + +func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { + var ok bool + iter := chunk.NewIterator4List(task.innerResult) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + ok, joinResult = iw.joinMatchedInnerRow2Chunk(ctx, row, task, joinResult, h, iw.joinKeyBuf) + if !ok { + return errors.New("indexHashJoinInnerWorker.doJoinUnordered failed") + } + } + for chkIdx, outerRowStatus := range task.outerRowStatus { + chk := task.outerResult.GetChunk(chkIdx) + for rowIdx, val := range outerRowStatus { + if val == outerRowMatched { + continue + } + iw.joiner.onMissMatch(val == outerRowHasNull, chk.GetRow(rowIdx), joinResult.chk) + if joinResult.chk.IsFull() { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return ctx.Err() + } + joinResult, ok = iw.getNewJoinResult(ctx) + if !ok { + return errors.New("indexHashJoinInnerWorker.doJoinUnordered failed") + } + } + } + } + return nil +} + +func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task *indexHashJoinTask, h hash.Hash64, buf []byte) (matchedRows []chunk.Row, matchedRowPtr []chunk.RowPtr, err error) { + h.Reset() + err = codec.HashChunkRow(iw.ctx.GetSessionVars().StmtCtx, h, innerRow, iw.rowTypes, iw.keyCols, buf) + if err != nil { + return nil, nil, err + } + iw.matchedOuterPtrs = task.lookupMap.Get(h.Sum64()) + if len(iw.matchedOuterPtrs) == 0 { + return nil, nil, nil + } + joinType := JoinerType(iw.joiner) + isSemiJoin := joinType == plannercore.SemiJoin || joinType == plannercore.LeftOuterSemiJoin + matchedRows = make([]chunk.Row, 0, len(iw.matchedOuterPtrs)) + matchedRowPtr = make([]chunk.RowPtr, 0, len(iw.matchedOuterPtrs)) + for _, ptr := range iw.matchedOuterPtrs { + outerRow := task.outerResult.GetRow(ptr) + ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.rowTypes, iw.keyCols, outerRow, iw.outerCtx.rowTypes, iw.outerCtx.keyCols) + if err != nil { + return nil, nil, err + } + if !ok || (task.outerRowStatus[ptr.ChkIdx][ptr.RowIdx] == outerRowMatched && isSemiJoin) { + continue + } + matchedRows = append(matchedRows, outerRow) + matchedRowPtr = append(matchedRowPtr, chunk.RowPtr{ChkIdx: ptr.ChkIdx, RowIdx: ptr.RowIdx}) + } + return matchedRows, matchedRowPtr, nil +} + +func (iw *indexHashJoinInnerWorker) joinMatchedInnerRow2Chunk(ctx context.Context, innerRow chunk.Row, task *indexHashJoinTask, + joinResult *indexHashJoinResult, h hash.Hash64, buf []byte) (bool, *indexHashJoinResult) { + matchedOuterRows, matchedOuterRowPtr, err := iw.getMatchedOuterRows(innerRow, task, h, buf) + if err != nil { + joinResult.err = err + return false, joinResult + } + if len(matchedOuterRows) == 0 { + return true, joinResult + } + var ( + ok bool + iter = chunk.NewIterator4Slice(matchedOuterRows) + cursor = 0 + ) + for iter.Begin(); iter.Current() != iter.End(); { + iw.outerRowStatus, err = iw.joiner.tryToMatchOuters(iter, innerRow, joinResult.chk, iw.outerRowStatus) + if err != nil { + joinResult.err = err + return false, joinResult + } + for _, status := range iw.outerRowStatus { + chkIdx, rowIdx := matchedOuterRowPtr[cursor].ChkIdx, matchedOuterRowPtr[cursor].RowIdx + if status == outerRowMatched || task.outerRowStatus[chkIdx][rowIdx] == outerRowUnmatched { + task.outerRowStatus[chkIdx][rowIdx] = status + } + cursor++ + } + if joinResult.chk.IsFull() { + select { + case iw.resultCh <- joinResult: + case <-ctx.Done(): + } + joinResult, ok = iw.getNewJoinResult(ctx) + if !ok { + return false, joinResult + } + } + } + return true, joinResult +} + +func (iw *indexHashJoinInnerWorker) collectMatchedInnerPtrs4OuterRows(ctx context.Context, innerRow chunk.Row, innerRowPtr chunk.RowPtr, + task *indexHashJoinTask, h hash.Hash64, buf []byte) error { + _, matchedOuterRowIdx, err := iw.getMatchedOuterRows(innerRow, task, h, buf) + if err != nil { + return err + } + for _, outerRowPtr := range matchedOuterRowIdx { + chkIdx, rowIdx := outerRowPtr.ChkIdx, outerRowPtr.RowIdx + task.matchedInnerRowPtrs[chkIdx][rowIdx] = append(task.matchedInnerRowPtrs[chkIdx][rowIdx], innerRowPtr) + } + return nil +} + +// doJoinInOrder follows the following steps: +// 1. collect all the matched inner row ptrs for every outer row +// 2. do the join work +// 2.1 collect all the matched inner rows using the collected ptrs for every outer row +// 2.2 call tryToMatchInners for every outer row +// 2.3 call onMissMatch when no inner rows are matched +func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) (err error) { + defer func() { + if err == nil && joinResult.chk != nil { + if joinResult.chk.NumRows() > 0 { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return + } + } else { + joinResult.src <- joinResult.chk + } + } + close(resultCh) + }() + for i, numChunks := 0, task.innerResult.NumChunks(); i < numChunks; i++ { + for j, chk := 0, task.innerResult.GetChunk(i); j < chk.NumRows(); j++ { + row := chk.GetRow(j) + ptr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)} + err = iw.collectMatchedInnerPtrs4OuterRows(ctx, row, ptr, task, h, iw.joinKeyBuf) + if err != nil { + return err + } + } + } + // TODO: matchedInnerRowPtrs and matchedInnerRows can be moved to inner worker. + matchedInnerRows := make([]chunk.Row, len(task.matchedInnerRowPtrs)) + var hasMatched, hasNull, ok bool + for chkIdx, innerRowPtrs4Chk := range task.matchedInnerRowPtrs { + for outerRowIdx, innerRowPtrs := range innerRowPtrs4Chk { + matchedInnerRows, hasMatched, hasNull = matchedInnerRows[:0], false, false + outerRow := task.outerResult.GetChunk(chkIdx).GetRow(outerRowIdx) + for _, ptr := range innerRowPtrs { + matchedInnerRows = append(matchedInnerRows, task.innerResult.GetRow(ptr)) + } + iter := chunk.NewIterator4Slice(matchedInnerRows) + for iter.Begin(); iter.Current() != iter.End(); { + matched, isNull, err := iw.joiner.tryToMatchInners(outerRow, iter, joinResult.chk) + if err != nil { + return err + } + hasMatched, hasNull = matched || hasMatched, isNull || hasNull + if joinResult.chk.IsFull() { + select { + case resultCh <- joinResult: + case <-ctx.Done(): + return ctx.Err() + } + joinResult, ok = iw.getNewJoinResult(ctx) + if !ok { + return errors.New("indexHashJoinInnerWorker.doJoinInOrder failed") + } + } + } + if !hasMatched { + iw.joiner.onMissMatch(hasNull, outerRow, joinResult.chk) + } + } + } + return nil +} diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index a0f522dd3e695..ccd549675a403 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -296,7 +296,7 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, select { case task = <-e.resultCh: case <-ctx.Done(): - return nil, nil + return nil, ctx.Err() } if task == nil { return nil, nil @@ -308,7 +308,7 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return nil, err } case <-ctx.Done(): - return nil, nil + return nil, ctx.Err() } e.task = task @@ -545,6 +545,9 @@ func (iw *innerWorker) constructDatumLookupKey(task *lookUpJoinTask, rowIdx int) if terror.ErrorEqual(err, types.ErrOverflow) { return nil, nil } + if terror.ErrorEqual(err, types.ErrTruncated) && (innerColType.Tp == mysql.TypeSet || innerColType.Tp == mysql.TypeEnum) { + return nil, nil + } return nil, err } cmp, err := outerValue.CompareDatum(sc, &innerValue) @@ -606,6 +609,14 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa innerResult.GetMemTracker().SetLabel(innerResultLabel) innerResult.GetMemTracker().AttachTo(task.memTracker) for { +<<<<<<< HEAD +======= + select { + case <-ctx.Done(): + return ctx.Err() + default: + } +>>>>>>> f5fa3e7... executor: fix index join error when join key is ENUM or SET (#19235) err := Next(ctx, innerExec, iw.executorChk) if err != nil { return err diff --git a/executor/index_lookup_join_test.go b/executor/index_lookup_join_test.go index 29caf510ba985..0ca03a3951af5 100644 --- a/executor/index_lookup_join_test.go +++ b/executor/index_lookup_join_test.go @@ -15,6 +15,7 @@ package executor_test import ( "context" + "fmt" . "github.com/pingcap/check" "github.com/pingcap/tidb/util/testkit" @@ -179,3 +180,58 @@ func (s *testSuite2) TestIndexJoinMultiCondition(c *C) { tk.MustExec("insert into t2 values (0,1), (0,2), (0,3)") tk.MustQuery("select /*+ TIDB_INLJ(t1) */ count(*) from t1, t2 where t1.a = t2.a and t1.b < t2.b").Check(testkit.Rows("3")) } +<<<<<<< HEAD +======= + +func (s *testSuite5) TestIssue16887(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists admin_roles, admin_role_has_permissions") + tk.MustExec("CREATE TABLE `admin_role_has_permissions` (`permission_id` bigint(20) unsigned NOT NULL, `role_id` bigint(20) unsigned NOT NULL, PRIMARY KEY (`permission_id`,`role_id`), KEY `admin_role_has_permissions_role_id_foreign` (`role_id`))") + tk.MustExec("CREATE TABLE `admin_roles` (`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL COMMENT '角色名称', `created_at` timestamp NULL DEFAULT NULL, `updated_at` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`))") + tk.MustExec("INSERT INTO `admin_roles` (`id`, `name`, `created_at`, `updated_at`) VALUES(1, 'admin','2020-04-27 02:40:03', '2020-04-27 02:40:03'),(2, 'developer','2020-04-27 02:40:03', '2020-04-27 02:40:03'),(3, 'analyst','2020-04-27 02:40:03', '2020-04-27 02:40:03'),(4, 'channel_admin','2020-04-27 02:40:03', '2020-04-27 02:40:03'),(5, 'test','2020-04-27 02:40:08', '2020-04-27 02:40:08')") + tk.MustExec("INSERT INTO `admin_role_has_permissions` (`permission_id`, `role_id`) VALUES(1, 1),(2, 1),(3, 1),(4, 1),(5, 1),(6, 1),(7, 1),(8, 1),(9, 1),(10, 1),(11, 1),(12, 1),(13, 1),(14, 1),(15, 1),(16, 1),(17, 1),(18, 1),(19, 1),(20, 1),(21, 1),(22, 1),(23, 1),(24, 1),(25, 1),(26, 1),(27, 1),(28, 1),(29, 1),(30, 1),(31, 1),(32, 1),(33, 1),(34, 1),(35, 1),(36, 1),(37, 1),(38, 1),(39, 1),(40, 1),(41, 1),(42, 1),(43, 1),(44, 1),(45, 1),(46, 1),(47, 1),(48, 1),(49, 1),(50, 1),(51, 1),(52, 1),(53, 1),(54, 1),(55, 1),(56, 1),(57, 1),(58, 1),(59, 1),(60, 1),(61, 1),(62, 1),(63, 1),(64, 1),(65, 1),(66, 1),(67, 1),(68, 1),(69, 1),(70, 1),(71, 1),(72, 1),(73, 1),(74, 1),(75, 1),(76, 1),(77, 1),(78, 1),(79, 1),(80, 1),(81, 1),(82, 1),(83, 1),(5, 4),(6, 4),(7, 4),(84, 5),(85, 5),(86, 5)") + rows := tk.MustQuery("SELECT /*+ inl_merge_join(admin_role_has_permissions) */ `admin_roles`.* FROM `admin_roles` INNER JOIN `admin_role_has_permissions` ON `admin_roles`.`id` = `admin_role_has_permissions`.`role_id` WHERE `admin_role_has_permissions`.`permission_id`\n IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67)").Rows() + c.Assert(len(rows), Equals, 70) + rows = tk.MustQuery("show warnings").Rows() + c.Assert(len(rows) > 0, Equals, true) +} + +func (s *testSuite5) TestIndexJoinEnumSetIssue19233(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("drop table if exists i;") + tk.MustExec("drop table if exists p1;") + tk.MustExec("drop table if exists p2;") + tk.MustExec(`CREATE TABLE p1 (type enum('HOST_PORT') NOT NULL, UNIQUE KEY (type)) ;`) + tk.MustExec(`CREATE TABLE p2 (type set('HOST_PORT') NOT NULL, UNIQUE KEY (type)) ;`) + tk.MustExec(`CREATE TABLE i (objectType varchar(64) NOT NULL);`) + tk.MustExec(`insert into i values ('SWITCH');`) + tk.MustExec(`create table t like i;`) + tk.MustExec(`insert into t values ('HOST_PORT');`) + tk.MustExec(`insert into t select * from t;`) + tk.MustExec(`insert into t select * from t;`) + tk.MustExec(`insert into t select * from t;`) + tk.MustExec(`insert into t select * from t;`) + tk.MustExec(`insert into t select * from t;`) + tk.MustExec(`insert into t select * from t;`) + + tk.MustExec(`insert into i select * from t;`) + + tk.MustExec(`insert into p1 values('HOST_PORT');`) + tk.MustExec(`insert into p2 values('HOST_PORT');`) + for _, table := range []string{"p1", "p2"} { + for _, hint := range []string{"INL_HASH_JOIN", "INL_MERGE_JOIN", "INL_JOIN"} { + sql := fmt.Sprintf(`select /*+ %s(%s) */ * from i, %s where i.objectType = %s.type;`, hint, table, table, table) + rows := tk.MustQuery(sql).Rows() + c.Assert(len(rows), Equals, 64) + for i := 0; i < len(rows); i++ { + c.Assert(fmt.Sprint(rows[i][0]), Equals, "HOST_PORT") + } + rows = tk.MustQuery("show warnings").Rows() + c.Assert(len(rows), Equals, 0) + } + } +} +>>>>>>> f5fa3e7... executor: fix index join error when join key is ENUM or SET (#19235) diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go new file mode 100644 index 0000000000000..43d0f6cc4964d --- /dev/null +++ b/executor/index_lookup_merge_join.go @@ -0,0 +1,739 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "fmt" + "runtime" + "sort" + "sync" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/expression" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/ranger" + "go.uber.org/zap" +) + +// IndexLookUpMergeJoin realizes IndexLookUpJoin by merge join +// It preserves the order of the outer table and support batch lookup. +// +// The execution flow is very similar to IndexLookUpReader: +// 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel. +// 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, then do merge join. +// 3. main thread receives the task and fetch results from the channel in task one by one. +// 4. If channel has been closed, main thread receives the next task. +type IndexLookUpMergeJoin struct { + baseExecutor + + resultCh <-chan *lookUpMergeJoinTask + cancelFunc context.CancelFunc + workerWg *sync.WaitGroup + + outerMergeCtx outerMergeCtx + innerMergeCtx innerMergeCtx + + joiners []joiner + joinChkResourceCh []chan *chunk.Chunk + isOuterJoin bool + + requiredRows int64 + + task *lookUpMergeJoinTask + + indexRanges []*ranger.Range + keyOff2IdxOff []int + + // lastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100. + lastColHelper *plannercore.ColWithCmpFuncManager + + memTracker *memory.Tracker // track memory usage +} + +type outerMergeCtx struct { + rowTypes []*types.FieldType + joinKeys []*expression.Column + keyCols []int + filter expression.CNFExprs + needOuterSort bool + compareFuncs []expression.CompareFunc +} + +type innerMergeCtx struct { + readerBuilder *dataReaderBuilder + rowTypes []*types.FieldType + joinKeys []*expression.Column + keyCols []int + compareFuncs []expression.CompareFunc + colLens []int + desc bool + keyOff2KeyOffOrderByIdx []int +} + +type lookUpMergeJoinTask struct { + outerResult *chunk.List + outerOrderIdx []chunk.RowPtr + + innerResult *chunk.Chunk + innerIter chunk.Iterator + + sameKeyInnerRows []chunk.Row + sameKeyIter chunk.Iterator + + doneErr error + results chan *indexMergeJoinResult + + memTracker *memory.Tracker +} + +type outerMergeWorker struct { + outerMergeCtx + + lookup *IndexLookUpMergeJoin + + ctx sessionctx.Context + executor Executor + + maxBatchSize int + batchSize int + + nextColCompareFilters *plannercore.ColWithCmpFuncManager + + resultCh chan<- *lookUpMergeJoinTask + innerCh chan<- *lookUpMergeJoinTask + + parentMemTracker *memory.Tracker +} + +type innerMergeWorker struct { + innerMergeCtx + + taskCh <-chan *lookUpMergeJoinTask + joinChkResourceCh chan *chunk.Chunk + outerMergeCtx outerMergeCtx + ctx sessionctx.Context + innerExec Executor + joiner joiner + retFieldTypes []*types.FieldType + + maxChunkSize int + indexRanges []*ranger.Range + nextColCompareFilters *plannercore.ColWithCmpFuncManager + keyOff2IdxOff []int +} + +type indexMergeJoinResult struct { + chk *chunk.Chunk + src chan<- *chunk.Chunk +} + +// Open implements the Executor interface +func (e *IndexLookUpMergeJoin) Open(ctx context.Context) error { + // Be careful, very dirty hack in this line!!! + // IndexLookMergeUpJoin need to rebuild executor (the dataReaderBuilder) during + // executing. However `executor.Next()` is lazy evaluation when the RecordSet + // result is drained. + // Lazy evaluation means the saved session context may change during executor's + // building and its running. + // A specific sequence for example: + // + // e := buildExecutor() // txn at build time + // recordSet := runStmt(e) + // session.CommitTxn() // txn closed + // recordSet.Next() + // e.dataReaderBuilder.Build() // txn is used again, which is already closed + // + // The trick here is `getSnapshotTS` will cache snapshot ts in the dataReaderBuilder, + // so even txn is destroyed later, the dataReaderBuilder could still use the + // cached snapshot ts to construct DAG. + _, err := e.innerMergeCtx.readerBuilder.getSnapshotTS() + if err != nil { + return err + } + + err = e.children[0].Open(ctx) + if err != nil { + return err + } + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.startWorkers(ctx) + return nil +} + +func (e *IndexLookUpMergeJoin) startWorkers(ctx context.Context) { + // TODO: consider another session currency variable for index merge join. + // Because its parallelization is not complete. + concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() + resultCh := make(chan *lookUpMergeJoinTask, concurrency) + e.resultCh = resultCh + e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency) + for i := 0; i < concurrency; i++ { + e.joinChkResourceCh[i] = make(chan *chunk.Chunk, numResChkHold) + for j := 0; j < numResChkHold; j++ { + e.joinChkResourceCh[i] <- chunk.NewChunkWithCapacity(e.retFieldTypes, e.maxChunkSize) + } + } + workerCtx, cancelFunc := context.WithCancel(ctx) + e.cancelFunc = cancelFunc + innerCh := make(chan *lookUpMergeJoinTask, concurrency) + e.workerWg.Add(1) + go e.newOuterWorker(resultCh, innerCh).run(workerCtx, e.workerWg, e.cancelFunc) + e.workerWg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go e.newInnerMergeWorker(innerCh, i).run(workerCtx, e.workerWg, e.cancelFunc) + } +} + +func (e *IndexLookUpMergeJoin) newOuterWorker(resultCh, innerCh chan *lookUpMergeJoinTask) *outerMergeWorker { + omw := &outerMergeWorker{ + outerMergeCtx: e.outerMergeCtx, + ctx: e.ctx, + lookup: e, + executor: e.children[0], + resultCh: resultCh, + innerCh: innerCh, + batchSize: 32, + maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize, + parentMemTracker: e.memTracker, + nextColCompareFilters: e.lastColHelper, + } + failpoint.Inject("testIssue18068", func() { + omw.batchSize = 1 + }) + return omw +} + +func (e *IndexLookUpMergeJoin) newInnerMergeWorker(taskCh chan *lookUpMergeJoinTask, workID int) *innerMergeWorker { + // Since multiple inner workers run concurrently, we should copy join's indexRanges for every worker to avoid data race. + copiedRanges := make([]*ranger.Range, 0, len(e.indexRanges)) + for _, ran := range e.indexRanges { + copiedRanges = append(copiedRanges, ran.Clone()) + } + imw := &innerMergeWorker{ + innerMergeCtx: e.innerMergeCtx, + outerMergeCtx: e.outerMergeCtx, + taskCh: taskCh, + ctx: e.ctx, + indexRanges: copiedRanges, + keyOff2IdxOff: e.keyOff2IdxOff, + joiner: e.joiners[workID], + joinChkResourceCh: e.joinChkResourceCh[workID], + retFieldTypes: e.retFieldTypes, + maxChunkSize: e.maxChunkSize, + } + if e.lastColHelper != nil { + // nextCwf.TmpConstant needs to be reset for every individual + // inner worker to avoid data race when the inner workers is running + // concurrently. + nextCwf := *e.lastColHelper + nextCwf.TmpConstant = make([]*expression.Constant, len(e.lastColHelper.TmpConstant)) + for i := range e.lastColHelper.TmpConstant { + nextCwf.TmpConstant[i] = &expression.Constant{RetType: nextCwf.TargetCol.RetType} + } + imw.nextColCompareFilters = &nextCwf + } + return imw +} + +// Next implements the Executor interface +func (e *IndexLookUpMergeJoin) Next(ctx context.Context, req *chunk.Chunk) error { + if e.isOuterJoin { + atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows())) + } + req.Reset() + if e.task == nil { + e.getFinishedTask(ctx) + } + for e.task != nil { + select { + case result, ok := <-e.task.results: + if !ok { + if e.task.doneErr != nil { + return e.task.doneErr + } + e.getFinishedTask(ctx) + continue + } + req.SwapColumns(result.chk) + result.src <- result.chk + return nil + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil +} + +func (e *IndexLookUpMergeJoin) getFinishedTask(ctx context.Context) { + select { + case e.task = <-e.resultCh: + case <-ctx.Done(): + e.task = nil + } + + // TODO: reuse the finished task memory to build tasks. +} + +func (omw *outerMergeWorker) run(ctx context.Context, wg *sync.WaitGroup, cancelFunc context.CancelFunc) { + defer func() { + if r := recover(); r != nil { + task := &lookUpMergeJoinTask{ + doneErr: errors.New(fmt.Sprintf("%v", r)), + results: make(chan *indexMergeJoinResult, numResChkHold), + } + close(task.results) + omw.resultCh <- task + cancelFunc() + } + close(omw.resultCh) + close(omw.innerCh) + wg.Done() + }() + for { + task, err := omw.buildTask(ctx) + if err != nil { + task.doneErr = err + close(task.results) + omw.pushToChan(ctx, task, omw.resultCh) + return + } + failpoint.Inject("mockIndexMergeJoinOOMPanic", nil) + if task == nil { + return + } + + if finished := omw.pushToChan(ctx, task, omw.innerCh); finished { + return + } + + if finished := omw.pushToChan(ctx, task, omw.resultCh); finished { + return + } + } +} + +func (omw *outerMergeWorker) pushToChan(ctx context.Context, task *lookUpMergeJoinTask, dst chan<- *lookUpMergeJoinTask) (finished bool) { + select { + case <-ctx.Done(): + return true + case dst <- task: + } + return false +} + +// buildTask builds a lookUpMergeJoinTask and read outer rows. +// When err is not nil, task must not be nil to send the error to the main thread via task +func (omw *outerMergeWorker) buildTask(ctx context.Context) (*lookUpMergeJoinTask, error) { + task := &lookUpMergeJoinTask{ + results: make(chan *indexMergeJoinResult, numResChkHold), + outerResult: chunk.NewList(omw.rowTypes, omw.executor.base().initCap, omw.executor.base().maxChunkSize), + } + task.memTracker = memory.NewTracker(memory.LabelForSimpleTask, -1) + task.memTracker.AttachTo(omw.parentMemTracker) + + omw.increaseBatchSize() + requiredRows := omw.batchSize + if omw.lookup.isOuterJoin { + requiredRows = int(atomic.LoadInt64(&omw.lookup.requiredRows)) + } + if requiredRows <= 0 || requiredRows > omw.maxBatchSize { + requiredRows = omw.maxBatchSize + } + for requiredRows > 0 { + execChk := newFirstChunk(omw.executor) + err := Next(ctx, omw.executor, execChk) + if err != nil { + return task, err + } + if execChk.NumRows() == 0 { + break + } + + task.outerResult.Add(execChk) + requiredRows -= execChk.NumRows() + task.memTracker.Consume(execChk.MemoryUsage()) + } + + if task.outerResult.Len() == 0 { + return nil, nil + } + + return task, nil +} + +func (omw *outerMergeWorker) increaseBatchSize() { + if omw.batchSize < omw.maxBatchSize { + omw.batchSize *= 2 + } + if omw.batchSize > omw.maxBatchSize { + omw.batchSize = omw.maxBatchSize + } +} + +func (imw *innerMergeWorker) run(ctx context.Context, wg *sync.WaitGroup, cancelFunc context.CancelFunc) { + var task *lookUpMergeJoinTask + defer func() { + wg.Done() + if r := recover(); r != nil { + if task != nil { + task.doneErr = errors.Errorf("%v", r) + close(task.results) + } + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + logutil.Logger(ctx).Error("innerMergeWorker panicked", zap.String("stack", string(buf))) + cancelFunc() + } + }() + + for ok := true; ok; { + select { + case task, ok = <-imw.taskCh: + if !ok { + return + } + case <-ctx.Done(): + return + } + + err := imw.handleTask(ctx, task) + task.doneErr = err + close(task.results) + } +} + +func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJoinTask) (err error) { + numOuterChks := task.outerResult.NumChunks() + var outerMatch [][]bool + if imw.outerMergeCtx.filter != nil { + outerMatch = make([][]bool, numOuterChks) + for i := 0; i < numOuterChks; i++ { + chk := task.outerResult.GetChunk(i) + outerMatch[i] = make([]bool, chk.NumRows()) + outerMatch[i], err = expression.VectorizedFilter(imw.ctx, imw.outerMergeCtx.filter, chunk.NewIterator4Chunk(chk), outerMatch[i]) + if err != nil { + return err + } + } + } + task.outerOrderIdx = make([]chunk.RowPtr, 0, task.outerResult.Len()) + for i := 0; i < numOuterChks; i++ { + numRow := task.outerResult.GetChunk(i).NumRows() + for j := 0; j < numRow; j++ { + if len(outerMatch) == 0 || outerMatch[i][j] { + task.outerOrderIdx = append(task.outerOrderIdx, chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}) + } + } + } + task.memTracker.Consume(int64(cap(task.outerOrderIdx))) + failpoint.Inject("IndexMergeJoinMockOOM", func(val failpoint.Value) { + if val.(bool) { + panic("OOM test index merge join doesn't hang here.") + } + }) + // needOuterSort means the outer side property items can't guarantee the order of join keys. + // Because the necessary condition of merge join is both outer and inner keep order of join keys. + // In this case, we need sort the outer side. + if imw.outerMergeCtx.needOuterSort { + sort.Slice(task.outerOrderIdx, func(i, j int) bool { + idxI, idxJ := task.outerOrderIdx[i], task.outerOrderIdx[j] + rowI, rowJ := task.outerResult.GetRow(idxI), task.outerResult.GetRow(idxJ) + var cmp int64 + var err error + for _, keyOff := range imw.keyOff2KeyOffOrderByIdx { + joinKey := imw.outerMergeCtx.joinKeys[keyOff] + cmp, _, err = imw.outerMergeCtx.compareFuncs[keyOff](imw.ctx, joinKey, joinKey, rowI, rowJ) + terror.Log(err) + if cmp != 0 { + break + } + } + if cmp != 0 || imw.nextColCompareFilters == nil { + return (cmp < 0 && !imw.desc) || (cmp > 0 && imw.desc) + } + cmp = int64(imw.nextColCompareFilters.CompareRow(rowI, rowJ)) + return (cmp < 0 && !imw.desc) || (cmp > 0 && imw.desc) + }) + } + dLookUpKeys, err := imw.constructDatumLookupKeys(task) + if err != nil { + return err + } + dLookUpKeys = imw.dedupDatumLookUpKeys(dLookUpKeys) + // If the order requires descending, the deDupedLookUpContents is keep descending order before. + // So at the end, we should generate the ascending deDupedLookUpContents to build the correct range for inner read. + if imw.desc { + lenKeys := len(dLookUpKeys) + for i := 0; i < lenKeys/2; i++ { + dLookUpKeys[i], dLookUpKeys[lenKeys-i-1] = dLookUpKeys[lenKeys-i-1], dLookUpKeys[i] + } + } + imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters) + if err != nil { + return err + } + defer terror.Call(imw.innerExec.Close) + _, err = imw.fetchNextInnerResult(ctx, task) + if err != nil { + return err + } + err = imw.doMergeJoin(ctx, task) + return err +} + +func (imw *innerMergeWorker) fetchNewChunkWhenFull(ctx context.Context, task *lookUpMergeJoinTask, chk **chunk.Chunk) (continueJoin bool) { + if !(*chk).IsFull() { + return true + } + select { + case task.results <- &indexMergeJoinResult{*chk, imw.joinChkResourceCh}: + case <-ctx.Done(): + return false + } + var ok bool + select { + case *chk, ok = <-imw.joinChkResourceCh: + if !ok { + return false + } + case <-ctx.Done(): + return false + } + (*chk).Reset() + return true +} + +func (imw *innerMergeWorker) doMergeJoin(ctx context.Context, task *lookUpMergeJoinTask) (err error) { + chk := <-imw.joinChkResourceCh + defer func() { + if chk == nil { + return + } + if chk.NumRows() > 0 { + select { + case task.results <- &indexMergeJoinResult{chk, imw.joinChkResourceCh}: + case <-ctx.Done(): + return + } + } else { + imw.joinChkResourceCh <- chk + } + }() + + initCmpResult := 1 + if imw.innerMergeCtx.desc { + initCmpResult = -1 + } + noneInnerRowsRemain := task.innerResult.NumRows() == 0 + + for _, outerIdx := range task.outerOrderIdx { + outerRow := task.outerResult.GetRow(outerIdx) + hasMatch, hasNull, cmpResult := false, false, initCmpResult + // If it has iterated out all inner rows and the inner rows with same key is empty, + // that means the outer row needn't match any inner rows. + if noneInnerRowsRemain && len(task.sameKeyInnerRows) == 0 { + goto missMatch + } + if len(task.sameKeyInnerRows) > 0 { + cmpResult, err = imw.compare(outerRow, task.sameKeyIter.Begin()) + if err != nil { + return err + } + } + if (cmpResult > 0 && !imw.innerMergeCtx.desc) || (cmpResult < 0 && imw.innerMergeCtx.desc) { + if noneInnerRowsRemain { + task.sameKeyInnerRows = task.sameKeyInnerRows[:0] + goto missMatch + } + noneInnerRowsRemain, err = imw.fetchInnerRowsWithSameKey(ctx, task, outerRow) + if err != nil { + return err + } + } + + for task.sameKeyIter.Current() != task.sameKeyIter.End() { + matched, isNull, err := imw.joiner.tryToMatchInners(outerRow, task.sameKeyIter, chk) + if err != nil { + return err + } + hasMatch = hasMatch || matched + hasNull = hasNull || isNull + if !imw.fetchNewChunkWhenFull(ctx, task, &chk) { + return nil + } + } + + missMatch: + if !hasMatch { + imw.joiner.onMissMatch(hasNull, outerRow, chk) + if !imw.fetchNewChunkWhenFull(ctx, task, &chk) { + return nil + } + } + } + + return nil +} + +// fetchInnerRowsWithSameKey collects the inner rows having the same key with one outer row. +func (imw *innerMergeWorker) fetchInnerRowsWithSameKey(ctx context.Context, task *lookUpMergeJoinTask, key chunk.Row) (noneInnerRows bool, err error) { + task.sameKeyInnerRows = task.sameKeyInnerRows[:0] + curRow := task.innerIter.Current() + var cmpRes int + for cmpRes, err = imw.compare(key, curRow); ((cmpRes >= 0 && !imw.desc) || (cmpRes <= 0 && imw.desc)) && err == nil; cmpRes, err = imw.compare(key, curRow) { + if cmpRes == 0 { + task.sameKeyInnerRows = append(task.sameKeyInnerRows, curRow) + } + curRow = task.innerIter.Next() + if curRow == task.innerIter.End() { + curRow, err = imw.fetchNextInnerResult(ctx, task) + if err != nil || task.innerResult.NumRows() == 0 { + break + } + } + } + task.sameKeyIter = chunk.NewIterator4Slice(task.sameKeyInnerRows) + task.sameKeyIter.Begin() + noneInnerRows = task.innerResult.NumRows() == 0 + return +} + +func (imw *innerMergeWorker) compare(outerRow, innerRow chunk.Row) (int, error) { + for _, keyOff := range imw.innerMergeCtx.keyOff2KeyOffOrderByIdx { + cmp, _, err := imw.innerMergeCtx.compareFuncs[keyOff](imw.ctx, imw.outerMergeCtx.joinKeys[keyOff], imw.innerMergeCtx.joinKeys[keyOff], outerRow, innerRow) + if err != nil || cmp != 0 { + return int(cmp), err + } + } + return 0, nil +} + +func (imw *innerMergeWorker) constructDatumLookupKeys(task *lookUpMergeJoinTask) ([]*indexJoinLookUpContent, error) { + numRows := len(task.outerOrderIdx) + dLookUpKeys := make([]*indexJoinLookUpContent, 0, numRows) + for i := 0; i < numRows; i++ { + dLookUpKey, err := imw.constructDatumLookupKey(task, task.outerOrderIdx[i]) + if err != nil { + return nil, err + } + if dLookUpKey == nil { + continue + } + dLookUpKeys = append(dLookUpKeys, dLookUpKey) + } + + return dLookUpKeys, nil +} + +func (imw *innerMergeWorker) constructDatumLookupKey(task *lookUpMergeJoinTask, rowIdx chunk.RowPtr) (*indexJoinLookUpContent, error) { + outerRow := task.outerResult.GetRow(rowIdx) + sc := imw.ctx.GetSessionVars().StmtCtx + keyLen := len(imw.keyCols) + dLookupKey := make([]types.Datum, 0, keyLen) + for i, keyCol := range imw.outerMergeCtx.keyCols { + outerValue := outerRow.GetDatum(keyCol, imw.outerMergeCtx.rowTypes[keyCol]) + // Join-on-condition can be promised to be equal-condition in + // IndexNestedLoopJoin, thus the filter will always be false if + // outerValue is null, and we don't need to lookup it. + if outerValue.IsNull() { + return nil, nil + } + innerColType := imw.rowTypes[imw.keyCols[i]] + innerValue, err := outerValue.ConvertTo(sc, innerColType) + if err != nil { + // If the converted outerValue overflows, we don't need to lookup it. + if terror.ErrorEqual(err, types.ErrOverflow) { + return nil, nil + } + if terror.ErrorEqual(err, types.ErrTruncated) && (innerColType.Tp == mysql.TypeSet || innerColType.Tp == mysql.TypeEnum) { + return nil, nil + } + return nil, err + } + cmp, err := outerValue.CompareDatum(sc, &innerValue) + if err != nil { + return nil, err + } + if cmp != 0 { + // If the converted outerValue is not equal to the origin outerValue, we don't need to lookup it. + return nil, nil + } + dLookupKey = append(dLookupKey, innerValue) + } + return &indexJoinLookUpContent{keys: dLookupKey, row: task.outerResult.GetRow(rowIdx)}, nil +} + +func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLookUpContent) []*indexJoinLookUpContent { + if len(lookUpContents) < 2 { + return lookUpContents + } + sc := imw.ctx.GetSessionVars().StmtCtx + deDupedLookUpContents := lookUpContents[:1] + for i := 1; i < len(lookUpContents); i++ { + cmp := compareRow(sc, lookUpContents[i].keys, lookUpContents[i-1].keys) + if cmp != 0 || (imw.nextColCompareFilters != nil && imw.nextColCompareFilters.CompareRow(lookUpContents[i].row, lookUpContents[i-1].row) != 0) { + deDupedLookUpContents = append(deDupedLookUpContents, lookUpContents[i]) + } + } + return deDupedLookUpContents +} + +// fetchNextInnerResult collects a chunk of inner results from inner child executor. +func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) { + task.innerResult = chunk.NewChunkWithCapacity(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize) + err = Next(ctx, imw.innerExec, task.innerResult) + task.innerIter = chunk.NewIterator4Chunk(task.innerResult) + beginRow = task.innerIter.Begin() + return +} + +// Close implements the Executor interface. +func (e *IndexLookUpMergeJoin) Close() error { + if e.cancelFunc != nil { + e.cancelFunc() + e.cancelFunc = nil + } + if e.resultCh != nil { + for range e.resultCh { + } + e.resultCh = nil + } + e.joinChkResourceCh = nil + // joinChkResourceCh is to recycle result chunks, used by inner worker. + // resultCh is the main thread get the results, used by main thread and inner worker. + // cancelFunc control the outer worker and outer worker close the task channel. + e.workerWg.Wait() + e.memTracker = nil + if e.runtimeStats != nil { + concurrency := cap(e.resultCh) + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) + } + return e.baseExecutor.Close() +} diff --git a/types/datum.go b/types/datum.go index 4fa5b4a608775..69a9e80811cf5 100644 --- a/types/datum.go +++ b/types/datum.go @@ -30,8 +30,6 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types/json" "github.com/pingcap/tidb/util/hack" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" ) // Kind constants. @@ -1296,8 +1294,12 @@ func (d *Datum) convertToMysqlEnum(sc *stmtctx.StatementContext, target *FieldTy e, err = ParseEnumValue(target.Elems, uintDatum.GetUint64()) } if err != nil { +<<<<<<< HEAD logutil.Logger(context.Background()).Error("convert to MySQL enum failed", zap.Error(err)) err = errors.Trace(ErrTruncated) +======= + err = errors.Wrap(ErrTruncated, "convert to MySQL enum failed: "+err.Error()) +>>>>>>> f5fa3e7... executor: fix index join error when join key is ENUM or SET (#19235) } ret.SetValue(e) return ret, err @@ -1322,10 +1324,15 @@ func (d *Datum) convertToMysqlSet(sc *stmtctx.StatementContext, target *FieldTyp } if err != nil { - return invalidConv(d, target.Tp) + err = errors.Wrap(ErrTruncated, "convert to MySQL set failed: "+err.Error()) } +<<<<<<< HEAD ret.SetValue(s) return ret, nil +======= + ret.SetMysqlSet(s, target.Collate) + return ret, err +>>>>>>> f5fa3e7... executor: fix index join error when join key is ENUM or SET (#19235) } func (d *Datum) convertToMysqlJSON(sc *stmtctx.StatementContext, target *FieldType) (ret Datum, err error) {