Skip to content

Commit

Permalink
executor: control Chunk size for StreamAgg&HashAgg (pingcap#9512)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Apr 12, 2019
1 parent a4707e2 commit dee51e9
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ type HashAggFinalWorker struct {
type AfFinalResult struct {
chk *chunk.Chunk
err error

giveBackCh chan *chunk.Chunk
}

// HashAggExec deals with all the aggregate functions.
Expand Down Expand Up @@ -152,6 +150,7 @@ type HashAggExec struct {

finishCh chan struct{}
finalOutputCh chan *AfFinalResult
finalInputCh chan *chunk.Chunk
partialOutputChs []chan *HashAggIntermData
inputCh chan *HashAggInput
partialInputChs []chan *chunk.Chunk
Expand Down Expand Up @@ -247,6 +246,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
partialConcurrency := sessionVars.HashAggPartialConcurrency
e.isChildReturnEmpty = true
e.finalOutputCh = make(chan *AfFinalResult, finalConcurrency)
e.finalInputCh = make(chan *chunk.Chunk, finalConcurrency)
e.inputCh = make(chan *HashAggInput, partialConcurrency)
e.finishCh = make(chan struct{}, 1)

Expand Down Expand Up @@ -291,11 +291,10 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
groupSet: set.NewStringSet(),
inputCh: e.partialOutputChs[i],
outputCh: e.finalOutputCh,
finalResultHolderCh: make(chan *chunk.Chunk, 1),
finalResultHolderCh: e.finalInputCh,
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(e.retTypes()),
}
e.finalWorkers[i].finalResultHolderCh <- e.newFirstChunk()
}
}

Expand Down Expand Up @@ -469,7 +468,6 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) {
if finished {
return
}
result.Reset()
for groupKey := range w.groupSet {
partialResults := w.getPartialResult(sctx.GetSessionVars().StmtCtx, []byte(groupKey), w.partialResultMap)
for i, af := range w.aggFuncs {
Expand All @@ -480,18 +478,15 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) {
if len(w.aggFuncs) == 0 {
result.SetNumVirtualRows(result.NumRows() + 1)
}
if result.NumRows() == w.maxChunkSize {
w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh}
if result.IsFull() {
w.outputCh <- &AfFinalResult{chk: result}
result, finished = w.receiveFinalResultHolder()
if finished {
return
}
result.Reset()
}
}
if result.NumRows() > 0 {
w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh}
}
w.outputCh <- &AfFinalResult{chk: result}
}

func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) {
Expand Down Expand Up @@ -606,24 +601,30 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
e.prepare4ParallelExec(ctx)
e.prepared = true
}
for {

// gofail: var parallelHashAggError bool
// if parallelHashAggError {
// return errors.New("HashAggExec.parallelExec error")
// }

for !chk.IsFull() {
e.finalInputCh <- chk
result, ok := <-e.finalOutputCh
if !ok || result.err != nil || result.chk.NumRows() == 0 {
if result != nil {
return errors.Trace(result.err)
if !ok { // all finalWorkers exited
if chk.NumRows() > 0 { // but there are some data left
return nil
}
if e.isChildReturnEmpty && e.defaultVal != nil {
chk.Append(e.defaultVal, 0, 1)
}
e.isChildReturnEmpty = false
return nil
}
e.isChildReturnEmpty = false
chk.SwapColumns(result.chk)
// Put result.chk back to the corresponded final worker's finalResultHolderCh.
result.giveBackCh <- result.chk
if result.err != nil {
return result.err
}
if chk.NumRows() > 0 {
break
e.isChildReturnEmpty = false
}
}
return nil
Expand Down Expand Up @@ -657,11 +658,11 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
chk.SetNumVirtualRows(chk.NumRows() + 1)
}
for i, af := range e.PartialAggFuncs {
if err := (af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk)); err != nil {
if err := af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk); err != nil {
return err
}
}
if chk.NumRows() == e.maxChunkSize {
if chk.IsFull() {
e.cursor4GroupKey++
return nil
}
Expand Down Expand Up @@ -787,7 +788,7 @@ func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for !e.executed && chk.NumRows() < e.maxChunkSize {
for !e.executed && !chk.IsFull() {
err := e.consumeOneGroup(ctx, chk)
if err != nil {
e.executed = true
Expand Down

0 comments on commit dee51e9

Please sign in to comment.