diff --git a/executor/aggregate.go b/executor/aggregate.go index 02409fed792f4..3125b107677ca 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -86,8 +86,6 @@ type HashAggFinalWorker struct { type AfFinalResult struct { chk *chunk.Chunk err error - - giveBackCh chan *chunk.Chunk } // HashAggExec deals with all the aggregate functions. @@ -152,6 +150,7 @@ type HashAggExec struct { finishCh chan struct{} finalOutputCh chan *AfFinalResult + finalInputCh chan *chunk.Chunk partialOutputChs []chan *HashAggIntermData inputCh chan *HashAggInput partialInputChs []chan *chunk.Chunk @@ -247,6 +246,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { partialConcurrency := sessionVars.HashAggPartialConcurrency e.isChildReturnEmpty = true e.finalOutputCh = make(chan *AfFinalResult, finalConcurrency) + e.finalInputCh = make(chan *chunk.Chunk, finalConcurrency) e.inputCh = make(chan *HashAggInput, partialConcurrency) e.finishCh = make(chan struct{}, 1) @@ -291,11 +291,10 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { groupSet: set.NewStringSet(), inputCh: e.partialOutputChs[i], outputCh: e.finalOutputCh, - finalResultHolderCh: make(chan *chunk.Chunk, 1), + finalResultHolderCh: e.finalInputCh, rowBuffer: make([]types.Datum, 0, e.Schema().Len()), mutableRow: chunk.MutRowFromTypes(e.retTypes()), } - e.finalWorkers[i].finalResultHolderCh <- e.newFirstChunk() } } @@ -469,7 +468,6 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) { if finished { return } - result.Reset() for groupKey := range w.groupSet { partialResults := w.getPartialResult(sctx.GetSessionVars().StmtCtx, []byte(groupKey), w.partialResultMap) for i, af := range w.aggFuncs { @@ -480,18 +478,15 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) { if len(w.aggFuncs) == 0 { result.SetNumVirtualRows(result.NumRows() + 1) } - if result.NumRows() == w.maxChunkSize { - w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh} + if result.IsFull() { + w.outputCh <- &AfFinalResult{chk: result} result, finished = w.receiveFinalResultHolder() if finished { return } - result.Reset() } } - if result.NumRows() > 0 { - w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh} - } + w.outputCh <- &AfFinalResult{chk: result} } func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) { @@ -606,11 +601,18 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error e.prepare4ParallelExec(ctx) e.prepared = true } - for { + + // gofail: var parallelHashAggError bool + // if parallelHashAggError { + // return errors.New("HashAggExec.parallelExec error") + // } + + for !chk.IsFull() { + e.finalInputCh <- chk result, ok := <-e.finalOutputCh - if !ok || result.err != nil || result.chk.NumRows() == 0 { - if result != nil { - return errors.Trace(result.err) + if !ok { // all finalWorkers exited + if chk.NumRows() > 0 { // but there are some data left + return nil } if e.isChildReturnEmpty && e.defaultVal != nil { chk.Append(e.defaultVal, 0, 1) @@ -618,12 +620,11 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error e.isChildReturnEmpty = false return nil } - e.isChildReturnEmpty = false - chk.SwapColumns(result.chk) - // Put result.chk back to the corresponded final worker's finalResultHolderCh. - result.giveBackCh <- result.chk + if result.err != nil { + return result.err + } if chk.NumRows() > 0 { - break + e.isChildReturnEmpty = false } } return nil @@ -657,11 +658,11 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro chk.SetNumVirtualRows(chk.NumRows() + 1) } for i, af := range e.PartialAggFuncs { - if err := (af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk)); err != nil { + if err := af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk); err != nil { return err } } - if chk.NumRows() == e.maxChunkSize { + if chk.IsFull() { e.cursor4GroupKey++ return nil } @@ -787,7 +788,7 @@ func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() - for !e.executed && chk.NumRows() < e.maxChunkSize { + for !e.executed && !chk.IsFull() { err := e.consumeOneGroup(ctx, chk) if err != nil { e.executed = true