diff --git a/executor/join.go b/executor/join.go index 2454cc09b8bd1..1cca0658da1bd 100644 --- a/executor/join.go +++ b/executor/join.go @@ -21,8 +21,6 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/plan" - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -51,7 +49,6 @@ type HashJoinExec struct { concurrency uint // concurrency is number of concurrent channels and join workers. hashTable *mvmap.MVMap hashJoinBuffers []*hashJoinBuffer - outerBufferChs []chan *execResult workerWaitGroup sync.WaitGroup // workerWaitGroup is for sync multiple join workers. finished atomic.Value closeCh chan struct{} // closeCh add a lock for closing executor. @@ -62,11 +59,6 @@ type HashJoinExec struct { // to avoid the concurrency of joinResultGenerator.chk and joinResultGenerator.selected. resultGenerators []joinResultGenerator - resultBufferCh chan *execResult // Channels for output. - resultBuffer []Row - resultCursor int - - // for chunk execution outerKeyColIdx []int innerKeyColIdx []int innerResult *chunk.List @@ -107,33 +99,27 @@ func (e *HashJoinExec) Close() error { close(e.closeCh) e.finished.Store(true) if e.prepared { - if e.resultBufferCh != nil { - for range e.resultBufferCh { - } - } else { - if e.joinResultCh != nil { - for range e.joinResultCh { - } + if e.joinResultCh != nil { + for range e.joinResultCh { } - if e.outerChkResourceCh != nil { - close(e.outerChkResourceCh) - for range e.outerChkResourceCh { - } + } + if e.outerChkResourceCh != nil { + close(e.outerChkResourceCh) + for range e.outerChkResourceCh { } - for i := range e.outerResultChs { - for range e.outerResultChs[i] { - } + } + for i := range e.outerResultChs { + for range e.outerResultChs[i] { } - for i := range e.joinChkResourceCh { - close(e.joinChkResourceCh[i]) - for range e.joinChkResourceCh[i] { - } + } + for i := range e.joinChkResourceCh { + close(e.joinChkResourceCh[i]) + for range e.joinChkResourceCh[i] { } - e.outerChkResourceCh = nil - e.joinChkResourceCh = nil } + e.outerChkResourceCh = nil + e.joinChkResourceCh = nil } - e.resultBuffer = nil err := e.baseExecutor.Close() return errors.Trace(err) @@ -162,63 +148,9 @@ func (e *HashJoinExec) Open(ctx context.Context) error { e.closeCh = make(chan struct{}) e.finished.Store(false) e.workerWaitGroup = sync.WaitGroup{} - e.resultCursor = 0 return nil } -// makeJoinRow simply creates a new row that appends row b to row a. -func makeJoinRow(a Row, b Row) Row { - ret := make([]types.Datum, 0, len(a)+len(b)) - ret = append(ret, a...) - ret = append(ret, b...) - return ret -} - -func (e *HashJoinExec) encodeRow(b []byte, row Row) ([]byte, error) { - sc := e.ctx.GetSessionVars().StmtCtx - for _, datum := range row { - tmp, err := tablecodec.EncodeValue(sc, datum) - if err != nil { - return nil, errors.Trace(err) - } - b = append(b, tmp...) - } - return b, nil -} - -func (e *HashJoinExec) decodeRow(data []byte) (Row, error) { - values := make([]types.Datum, e.innerExec.Schema().Len()) - err := codec.SetRawValues(data, values) - if err != nil { - return nil, errors.Trace(err) - } - err = decodeRawValues(values, e.innerExec.Schema(), e.ctx.GetSessionVars().GetTimeZone()) - if err != nil { - return nil, errors.Trace(err) - } - return values, nil -} - -// getJoinKey gets the hash key when given a row and hash columns. -// It will return a boolean value representing if the hash key has null, a byte slice representing the result hash code. -func getJoinKey(sc *stmtctx.StatementContext, cols []*expression.Column, row Row, vals []types.Datum, bytes []byte) (bool, []byte, error) { - var err error - for i, col := range cols { - vals[i], err = col.Eval(row) - if err != nil { - return false, nil, errors.Trace(err) - } - if vals[i].IsNull() { - return true, nil, nil - } - } - if len(vals) == 0 { - return false, nil, nil - } - bytes, err = codec.HashValues(sc, bytes, vals...) - return false, bytes, errors.Trace(err) -} - func (e *HashJoinExec) getJoinKeyFromChkRow(isOuterKey bool, row chunk.Row, keyBuf []byte) (hasNull bool, _ []byte, err error) { var keyColIdx []int var allTypes []*types.FieldType @@ -244,51 +176,6 @@ func (e *HashJoinExec) getJoinKeyFromChkRow(isOuterKey bool, row chunk.Row, keyB return false, keyBuf, err } -// fetchOuterRows fetches rows from the big table in a background goroutine -// and sends the rows to multiple channels which will be read by multiple join workers. -func (e *HashJoinExec) fetchOuterRows(ctx context.Context) { - defer func() { - for _, outerBufferCh := range e.outerBufferChs { - close(outerBufferCh) - } - e.workerWaitGroup.Done() - }() - - bufferCapacity, maxBufferCapacity := 1, 128 - for i, noMoreData := uint(0), false; !noMoreData; i = (i + 1) % e.concurrency { - outerBuffer := &execResult{rows: make([]Row, 0, bufferCapacity)} - - for !noMoreData && len(outerBuffer.rows) < bufferCapacity { - if e.finished.Load().(bool) { - return - } - - outerRow, err := e.outerExec.Next(ctx) - if err != nil || outerRow == nil { - outerBuffer.err = errors.Trace(err) - noMoreData = true - break - } - - outerBuffer.rows = append(outerBuffer.rows, outerRow) - } - - if noMoreData && len(outerBuffer.rows) == 0 && outerBuffer.err == nil { - break - } - - select { - // TODO: Recover the code. - // case <-e.ctx.GoCtx().Done(): - // return - case e.outerBufferChs[i] <- outerBuffer: - if !noMoreData && bufferCapacity < maxBufferCapacity { - bufferCapacity <<= 1 - } - } - } -} - // fetchOuterChunks get chunks from fetches chunks from the big table in a background goroutine // and sends the chunks to multiple channels which will be read by multiple join workers. func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) { @@ -391,172 +278,17 @@ func (e *HashJoinExec) fetchOuterAndProbeHashTable(ctx context.Context) { // Start e.concurrency join workers to probe hash table and join inner and outer rows. for i := uint(0); i < e.concurrency; i++ { e.workerWaitGroup.Add(1) - go e.runJoinWorker4Chunk(i) + go e.runJoinWorker(i) } - go e.waitJoinWorkersAndCloseResultChan(true) + go e.waitJoinWorkersAndCloseResultChan() } -// prepare4Rows runs the first time when 'Next' is called, -// it first starts one goroutine to reads all data from the small table to build a hash table, -// then starts one worker goroutine to fetch rows/chunk from the big table, -// and, then starts multiple join worker goroutines. -func (e *HashJoinExec) prepare4Row(ctx context.Context) error { - e.resultGenerators = e.resultGenerators[:1] - e.hashTable = mvmap.NewMVMap() - var buffer []byte - for { - innerRow, err := e.innerExec.Next(ctx) - if err != nil { - return errors.Trace(err) - } - if innerRow == nil { - break - } - - hasNull, joinKey, err := getJoinKey(e.ctx.GetSessionVars().StmtCtx, e.innerKeys, innerRow, e.hashJoinBuffers[0].data, nil) - if err != nil { - return errors.Trace(err) - } - if hasNull { - continue - } - - buffer = buffer[:0] - buffer, err = e.encodeRow(buffer, innerRow) - if err != nil { - return errors.Trace(err) - } - e.hashTable.Put(joinKey, buffer) - } - - e.prepared = true - e.resultBufferCh = make(chan *execResult, e.concurrency) - - // If it's inner join and the small table is filtered out, there is no need to fetch big table and - // start join workers to do the join work. Otherwise, we start one goroutine to fetch outer rows - // and e.concurrency goroutines to concatenate the matched inner and outer rows and filter the result. - if !(e.hashTable.Len() == 0 && e.joinType == plan.InnerJoin) { - e.outerBufferChs = make([]chan *execResult, e.concurrency) - for i := uint(0); i < e.concurrency; i++ { - e.outerBufferChs[i] = make(chan *execResult, e.concurrency) - } - - // Start a worker to fetch outer rows and partition them to join workers. - e.workerWaitGroup.Add(1) - go e.fetchOuterRows(ctx) - - // Start e.concurrency join workers to probe hash table and join inner and outer rows. - for i := uint(0); i < e.concurrency; i++ { - e.workerWaitGroup.Add(1) - go e.runJoinWorker(i) - } - } - - // start a goroutine to wait join workers finish their job and close channels. - go e.waitJoinWorkersAndCloseResultChan(false) - return nil -} - -func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan(forChunk bool) { +func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() { e.workerWaitGroup.Wait() - if !forChunk { - close(e.resultBufferCh) - } else { - close(e.joinResultCh) - } -} - -// filterOuters filters the outer rows stored in "outerBuffer" and move all the matched rows ahead of the unmatched rows. -// The number of matched outer rows is returned as the first value. -func (e *HashJoinExec) filterOuters(outerBuffer *execResult, outerFilterResult []bool) (int, error) { - if e.outerFilter == nil { - return len(outerBuffer.rows), nil - } - - outerFilterResult = outerFilterResult[:0] - for _, outerRow := range outerBuffer.rows { - matched, err := expression.EvalBool(e.ctx, e.outerFilter, outerRow) - if err != nil { - return 0, errors.Trace(err) - } - outerFilterResult = append(outerFilterResult, matched) - } - - i, j := 0, len(outerBuffer.rows)-1 - for i <= j { - for i <= j && outerFilterResult[i] { - i++ - } - for i <= j && !outerFilterResult[j] { - j-- - } - if i <= j { - outerFilterResult[i], outerFilterResult[j] = outerFilterResult[j], outerFilterResult[i] - outerBuffer.rows[i], outerBuffer.rows[j] = outerBuffer.rows[j], outerBuffer.rows[i] - } - } - return i, nil + close(e.joinResultCh) } -// runJoinWorker does join job in one goroutine. func (e *HashJoinExec) runJoinWorker(workerID uint) { - bufferCapacity := 1024 - resultBuffer := &execResult{rows: make([]Row, 0, bufferCapacity)} - outerFilterResult := make([]bool, 0, bufferCapacity) - - var outerBuffer *execResult - for ok := true; ok; { - select { - // TODO: Recover the code. - // case <-e.ctx.GoCtx().Done(): - // ok = false - case outerBuffer, ok = <-e.outerBufferChs[workerID]: - } - - if !ok || e.finished.Load().(bool) { - break - } - if outerBuffer.err != nil { - resultBuffer.err = errors.Trace(outerBuffer.err) - break - } - - numMatchedOuters, err := e.filterOuters(outerBuffer, outerFilterResult) - if err != nil { - outerBuffer.err = errors.Trace(err) - break - } - // process unmatched outer rows. - for _, unMatchedOuter := range outerBuffer.rows[numMatchedOuters:] { - resultBuffer.rows, resultBuffer.err = e.resultGenerators[0].emit(unMatchedOuter, nil, resultBuffer.rows) - if resultBuffer.err != nil { - resultBuffer.err = errors.Trace(resultBuffer.err) - break - } - } - if resultBuffer.err != nil { - break - } - // process matched outer rows. - for _, outerRow := range outerBuffer.rows[:numMatchedOuters] { - if len(resultBuffer.rows) >= bufferCapacity { - e.resultBufferCh <- resultBuffer - resultBuffer = &execResult{rows: make([]Row, 0, bufferCapacity)} - } - ok = e.joinOuterRow(workerID, outerRow, resultBuffer) - if !ok { - break - } - } - } - - if len(resultBuffer.rows) > 0 || resultBuffer.err != nil { - e.resultBufferCh <- resultBuffer - } - e.workerWaitGroup.Done() -} - -func (e *HashJoinExec) runJoinWorker4Chunk(workerID uint) { defer func() { if r := recover(); r != nil { e.joinResultCh <- &hashjoinWorkerResult{err: errors.Errorf("%v", r)} @@ -603,50 +335,6 @@ func (e *HashJoinExec) runJoinWorker4Chunk(workerID uint) { } } -// joinOuterRow creates result rows from a row in a big table and sends them to resultRows channel. -// Every matching row generates a result row. -// If there are no matching rows and it is outer join, a null filled result row is created. -func (e *HashJoinExec) joinOuterRow(workerID uint, outerRow Row, resultBuffer *execResult) bool { - buffer := e.hashJoinBuffers[workerID] - hasNull, joinKey, err := getJoinKey(e.ctx.GetSessionVars().StmtCtx, e.outerKeys, outerRow, buffer.data, buffer.bytes[:0:cap(buffer.bytes)]) - if err != nil { - resultBuffer.err = errors.Trace(err) - return false - } - - if hasNull { - resultBuffer.rows, resultBuffer.err = e.resultGenerators[0].emit(outerRow, nil, resultBuffer.rows) - resultBuffer.err = errors.Trace(resultBuffer.err) - return true - } - - e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0]) - values := e.hashTableValBufs[workerID] - if len(values) == 0 { - resultBuffer.rows, resultBuffer.err = e.resultGenerators[0].emit(outerRow, nil, resultBuffer.rows) - resultBuffer.err = errors.Trace(resultBuffer.err) - return true - } - - innerRows := make([]Row, 0, len(values)) - for _, value := range values { - innerRow, err1 := e.decodeRow(value) - if err1 != nil { - resultBuffer.rows = nil - resultBuffer.err = errors.Trace(err1) - return false - } - innerRows = append(innerRows, innerRow) - } - - resultBuffer.rows, resultBuffer.err = e.resultGenerators[0].emit(outerRow, innerRows, resultBuffer.rows) - if resultBuffer.err != nil { - resultBuffer.err = errors.Trace(resultBuffer.err) - return false - } - return true -} - func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.Row, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { buffer := e.hashJoinBuffers[workerID] @@ -741,38 +429,6 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu return true, joinResult } -// Next implements the Executor Next interface. -func (e *HashJoinExec) Next(ctx context.Context) (Row, error) { - if !e.prepared { - if err := e.prepare4Row(ctx); err != nil { - return nil, errors.Trace(err) - } - } - - if e.resultCursor >= len(e.resultBuffer) { - e.resultCursor = 0 - select { - case resultBuffer, ok := <-e.resultBufferCh: - if !ok { - return nil, nil - } - if resultBuffer.err != nil { - e.finished.Store(true) - return nil, errors.Trace(resultBuffer.err) - } - e.resultBuffer = resultBuffer.rows - // TODO: Recover the code. - // case <-e.ctx.GoCtx().Done(): - // return nil, nil - } - } - - // len(e.resultBuffer) > 0 is guaranteed in the above "select". - result := e.resultBuffer[e.resultCursor] - e.resultCursor++ - return result, nil -} - // NextChunk implements the Executor NextChunk interface. // hash join constructs the result following these steps: // step 1. fetch data from inner child and build a hash table; diff --git a/executor/join_result_generators.go b/executor/join_result_generators.go index 173016344c9ae..d702c8e01b117 100644 --- a/executor/join_result_generators.go +++ b/executor/join_result_generators.go @@ -645,3 +645,11 @@ func (outputer *innerJoinResultGenerator) emitToChunk(outer chunk.Row, inners ch return nil } + +// makeJoinRow simply creates a new row that appends row b to row a. +func makeJoinRow(a Row, b Row) Row { + ret := make([]types.Datum, 0, len(a)+len(b)) + ret = append(ret, a...) + ret = append(ret, b...) + return ret +} diff --git a/executor/merge_join_test.go b/executor/merge_join_test.go index 78342ab0fd72d..7356381ea7b75 100644 --- a/executor/merge_join_test.go +++ b/executor/merge_join_test.go @@ -236,6 +236,7 @@ func checkPlanAndRun(tk *testkit.TestKit, c *C, plan string, sql string) *testki } func (s *testSuite) TestMergeJoin(c *C) { + // FIXME: the TIDB_SMJ hint does not really work when there is no index on join onCondition. tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test")