diff --git a/pkg/executor/aggregate/agg_hash_executor.go b/pkg/executor/aggregate/agg_hash_executor.go index 96cd3b0157f17..348eb6cb57619 100644 --- a/pkg/executor/aggregate/agg_hash_executor.go +++ b/pkg/executor/aggregate/agg_hash_executor.go @@ -287,8 +287,7 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc partialResultsMap[i] = make(aggfuncs.AggPartialResultMapper) } - partialResultsBuffer := getPartialResultsBufferFromPool() - groupKey := getGroupKeyFromPool() + partialResultsBuffer, groupKeyBuf := getBuffer() e.partialWorkers[i] = HashAggPartialWorker{ baseHashAggWorker: newBaseHashAggWorker(e.finishCh, e.PartialAggFuncs, e.MaxChunkSize(), e.memTracker), idForTest: i, @@ -302,7 +301,7 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc partialResultsMap: partialResultsMap, groupByItems: e.GroupByItems, chk: exec.TryNewCacheChunk(e.Children(0)), - groupKey: *groupKey, + groupKeyBuf: *groupKeyBuf, serializeHelpers: aggfuncs.NewSerializeHelper(), isSpillPrepared: false, spillHelper: e.spillHelper, diff --git a/pkg/executor/aggregate/agg_hash_partial_worker.go b/pkg/executor/aggregate/agg_hash_partial_worker.go index f40e6574a99f1..17498e1a9cbba 100644 --- a/pkg/executor/aggregate/agg_hash_partial_worker.go +++ b/pkg/executor/aggregate/agg_hash_partial_worker.go @@ -56,7 +56,7 @@ type HashAggPartialWorker struct { partialResultsMapMem atomic.Int64 groupByItems []expression.Expression - groupKey [][]byte + groupKeyBuf [][]byte // chk stores the input data from child, // and is reused by childExec and partial worker. chk *chunk.Chunk @@ -202,8 +202,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG // We must ensure that there is no panic before `waitGroup.Done()` or there will be hang waitGroup.Done() - tryToRecyclePartialResultsBuffer(&w.partialResultsBuffer) - tryToRecycleGroupKey(&w.groupKey) + tryRecycleBuffer(&w.partialResultsBuffer, &w.groupKeyBuf) }() intestBeforePartialWorkerRun() @@ -257,15 +256,15 @@ func (w *HashAggPartialWorker) getPartialResultsOfEachRow(groupKey [][]byte, fin } func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, chk *chunk.Chunk, finalConcurrency int) (err error) { - memSize := getGroupKeyMemUsage(w.groupKey) - w.groupKey, err = GetGroupKey(w.ctx, chk, w.groupKey, w.groupByItems) + memSize := getGroupKeyMemUsage(w.groupKeyBuf) + w.groupKeyBuf, err = GetGroupKey(w.ctx, chk, w.groupKeyBuf, w.groupByItems) failpoint.Inject("ConsumeRandomPanic", nil) - w.memTracker.Consume(getGroupKeyMemUsage(w.groupKey) - memSize) + w.memTracker.Consume(getGroupKeyMemUsage(w.groupKeyBuf) - memSize) if err != nil { return err } - partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKey, finalConcurrency) + partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKeyBuf, finalConcurrency) numRows := chk.NumRows() rows := make([]chunk.Row, 1) diff --git a/pkg/executor/aggregate/agg_util.go b/pkg/executor/aggregate/agg_util.go index ea27df8095374..829612802f7c3 100644 --- a/pkg/executor/aggregate/agg_util.go +++ b/pkg/executor/aggregate/agg_util.go @@ -58,27 +58,23 @@ var groupKeyPool = sync.Pool{ }, } -func getPartialResultsBufferFromPool() *[][]aggfuncs.PartialResult { +func getBuffer() (*[][]aggfuncs.PartialResult, *[][]byte) { partialResultsBuffer := partialResultsBufferPool.Get().(*[][]aggfuncs.PartialResult) *partialResultsBuffer = (*partialResultsBuffer)[:0] - return partialResultsBuffer -} - -func getGroupKeyFromPool() *[][]byte { groupKey := groupKeyPool.Get().(*[][]byte) *groupKey = (*groupKey)[:0] - return groupKey + return partialResultsBuffer, groupKey } -func tryToRecyclePartialResultsBuffer(buf *[][]aggfuncs.PartialResult) { +// tryRecycleBuffer recycles small buffers only. This approach reduces the CPU pressure +// from memory allocation during high concurrency aggregation computations (like DDL's scheduled tasks), +// and also prevents the pool from holding too much memory and causing memory pressure. +func tryRecycleBuffer(buf *[][]aggfuncs.PartialResult, groupKey *[][]byte) { if cap(*buf) <= defaultPartialResultsBufferCap { partialResultsBufferPool.Put(buf) } -} - -func tryToRecycleGroupKey(buf *[][]byte) { - if cap(*buf) <= defaultGroupKeyCap { - groupKeyPool.Put(buf) + if cap(*groupKey) <= defaultGroupKeyCap { + groupKeyPool.Put(groupKey) } }