diff --git a/pkg/executor/aggregate/agg_hash_executor.go b/pkg/executor/aggregate/agg_hash_executor.go index a8c5fda843448..348eb6cb57619 100644 --- a/pkg/executor/aggregate/agg_hash_executor.go +++ b/pkg/executor/aggregate/agg_hash_executor.go @@ -287,6 +287,7 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc partialResultsMap[i] = make(aggfuncs.AggPartialResultMapper) } + partialResultsBuffer, groupKeyBuf := getBuffer() e.partialWorkers[i] = HashAggPartialWorker{ baseHashAggWorker: newBaseHashAggWorker(e.finishCh, e.PartialAggFuncs, e.MaxChunkSize(), e.memTracker), idForTest: i, @@ -295,12 +296,12 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc outputChs: e.partialOutputChs, giveBackCh: e.inputCh, BInMaps: make([]int, finalConcurrency), - partialResultsBuffer: make([][]aggfuncs.PartialResult, 0, 2048), + partialResultsBuffer: *partialResultsBuffer, globalOutputCh: e.finalOutputCh, partialResultsMap: partialResultsMap, groupByItems: e.GroupByItems, chk: exec.TryNewCacheChunk(e.Children(0)), - groupKey: make([][]byte, 0, 8), + groupKeyBuf: *groupKeyBuf, serializeHelpers: aggfuncs.NewSerializeHelper(), isSpillPrepared: false, spillHelper: e.spillHelper, @@ -338,9 +339,7 @@ func (e *HashAggExec) initFinalWorkers(finalConcurrency int) { inputCh: e.partialOutputChs[i], outputCh: e.finalOutputCh, finalResultHolderCh: make(chan *chunk.Chunk, 1), - rowBuffer: make([]types.Datum, 0, e.Schema().Len()), mutableRow: chunk.MutRowFromTypes(exec.RetTypes(e)), - groupKeys: make([][]byte, 0, 8), spillHelper: e.spillHelper, restoredAggResultMapperMem: 0, } diff --git a/pkg/executor/aggregate/agg_hash_final_worker.go b/pkg/executor/aggregate/agg_hash_final_worker.go index 684ab110d137b..d2cc2cb047f10 100644 --- a/pkg/executor/aggregate/agg_hash_final_worker.go +++ b/pkg/executor/aggregate/agg_hash_final_worker.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/executor/aggfuncs" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/hack" "github.com/pingcap/tidb/pkg/util/logutil" @@ -42,14 +41,12 @@ type AfFinalResult struct { type HashAggFinalWorker struct { baseHashAggWorker - rowBuffer []types.Datum mutableRow chunk.MutRow partialResultMap aggfuncs.AggPartialResultMapper BInMap int inputCh chan *aggfuncs.AggPartialResultMapper outputCh chan *AfFinalResult finalResultHolderCh chan *chunk.Chunk - groupKeys [][]byte spillHelper *parallelHashAggSpillHelper diff --git a/pkg/executor/aggregate/agg_hash_partial_worker.go b/pkg/executor/aggregate/agg_hash_partial_worker.go index bacd5491f19e5..17498e1a9cbba 100644 --- a/pkg/executor/aggregate/agg_hash_partial_worker.go +++ b/pkg/executor/aggregate/agg_hash_partial_worker.go @@ -56,7 +56,7 @@ type HashAggPartialWorker struct { partialResultsMapMem atomic.Int64 groupByItems []expression.Expression - groupKey [][]byte + groupKeyBuf [][]byte // chk stores the input data from child, // and is reused by childExec and partial worker. chk *chunk.Chunk @@ -201,6 +201,8 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG // We must ensure that there is no panic before `waitGroup.Done()` or there will be hang waitGroup.Done() + + tryRecycleBuffer(&w.partialResultsBuffer, &w.groupKeyBuf) }() intestBeforePartialWorkerRun() @@ -254,15 +256,15 @@ func (w *HashAggPartialWorker) getPartialResultsOfEachRow(groupKey [][]byte, fin } func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, chk *chunk.Chunk, finalConcurrency int) (err error) { - memSize := getGroupKeyMemUsage(w.groupKey) - w.groupKey, err = GetGroupKey(w.ctx, chk, w.groupKey, w.groupByItems) + memSize := getGroupKeyMemUsage(w.groupKeyBuf) + w.groupKeyBuf, err = GetGroupKey(w.ctx, chk, w.groupKeyBuf, w.groupByItems) failpoint.Inject("ConsumeRandomPanic", nil) - w.memTracker.Consume(getGroupKeyMemUsage(w.groupKey) - memSize) + w.memTracker.Consume(getGroupKeyMemUsage(w.groupKeyBuf) - memSize) if err != nil { return err } - partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKey, finalConcurrency) + partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKeyBuf, finalConcurrency) numRows := chk.NumRows() rows := make([]chunk.Row, 1) diff --git a/pkg/executor/aggregate/agg_util.go b/pkg/executor/aggregate/agg_util.go index 69daafb1f84d5..829612802f7c3 100644 --- a/pkg/executor/aggregate/agg_util.go +++ b/pkg/executor/aggregate/agg_util.go @@ -20,6 +20,7 @@ import ( "fmt" "math/rand" "slices" + "sync" "sync/atomic" "time" @@ -40,6 +41,43 @@ import ( "go.uber.org/zap" ) +const defaultPartialResultsBufferCap = 2048 +const defaultGroupKeyCap = 8 + +var partialResultsBufferPool = sync.Pool{ + New: func() any { + s := make([][]aggfuncs.PartialResult, 0, defaultPartialResultsBufferCap) + return &s + }, +} + +var groupKeyPool = sync.Pool{ + New: func() any { + s := make([][]byte, 0, defaultGroupKeyCap) + return &s + }, +} + +func getBuffer() (*[][]aggfuncs.PartialResult, *[][]byte) { + partialResultsBuffer := partialResultsBufferPool.Get().(*[][]aggfuncs.PartialResult) + *partialResultsBuffer = (*partialResultsBuffer)[:0] + groupKey := groupKeyPool.Get().(*[][]byte) + *groupKey = (*groupKey)[:0] + return partialResultsBuffer, groupKey +} + +// tryRecycleBuffer recycles small buffers only. This approach reduces the CPU pressure +// from memory allocation during high concurrency aggregation computations (like DDL's scheduled tasks), +// and also prevents the pool from holding too much memory and causing memory pressure. +func tryRecycleBuffer(buf *[][]aggfuncs.PartialResult, groupKey *[][]byte) { + if cap(*buf) <= defaultPartialResultsBufferCap { + partialResultsBufferPool.Put(buf) + } + if cap(*groupKey) <= defaultGroupKeyCap { + groupKeyPool.Put(groupKey) + } +} + func closeBaseExecutor(b *exec.BaseExecutor) { if r := recover(); r != nil { // Release the resource, but throw the panic again and let the top level handle it.