Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhangxian1008 committed May 8, 2024
1 parent 5c6bc27 commit 61ddc98
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 22 deletions.
5 changes: 2 additions & 3 deletions pkg/executor/aggregate/agg_hash_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc
partialResultsMap[i] = make(aggfuncs.AggPartialResultMapper)
}

partialResultsBuffer := getPartialResultsBufferFromPool()
groupKey := getGroupKeyFromPool()
partialResultsBuffer, groupKeyBuf := getBuffer()
e.partialWorkers[i] = HashAggPartialWorker{
baseHashAggWorker: newBaseHashAggWorker(e.finishCh, e.PartialAggFuncs, e.MaxChunkSize(), e.memTracker),
idForTest: i,
Expand All @@ -302,7 +301,7 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc
partialResultsMap: partialResultsMap,
groupByItems: e.GroupByItems,
chk: exec.TryNewCacheChunk(e.Children(0)),
groupKey: *groupKey,
groupKeyBuf: *groupKeyBuf,
serializeHelpers: aggfuncs.NewSerializeHelper(),
isSpillPrepared: false,
spillHelper: e.spillHelper,
Expand Down
13 changes: 6 additions & 7 deletions pkg/executor/aggregate/agg_hash_partial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type HashAggPartialWorker struct {
partialResultsMapMem atomic.Int64

groupByItems []expression.Expression
groupKey [][]byte
groupKeyBuf [][]byte
// chk stores the input data from child,
// and is reused by childExec and partial worker.
chk *chunk.Chunk
Expand Down Expand Up @@ -202,8 +202,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG
// We must ensure that there is no panic before `waitGroup.Done()` or there will be hang
waitGroup.Done()

tryToRecyclePartialResultsBuffer(&w.partialResultsBuffer)
tryToRecycleGroupKey(&w.groupKey)
tryRecycleBuffer(&w.partialResultsBuffer, &w.groupKeyBuf)
}()

intestBeforePartialWorkerRun()
Expand Down Expand Up @@ -257,15 +256,15 @@ func (w *HashAggPartialWorker) getPartialResultsOfEachRow(groupKey [][]byte, fin
}

func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, chk *chunk.Chunk, finalConcurrency int) (err error) {
memSize := getGroupKeyMemUsage(w.groupKey)
w.groupKey, err = GetGroupKey(w.ctx, chk, w.groupKey, w.groupByItems)
memSize := getGroupKeyMemUsage(w.groupKeyBuf)
w.groupKeyBuf, err = GetGroupKey(w.ctx, chk, w.groupKeyBuf, w.groupByItems)
failpoint.Inject("ConsumeRandomPanic", nil)
w.memTracker.Consume(getGroupKeyMemUsage(w.groupKey) - memSize)
w.memTracker.Consume(getGroupKeyMemUsage(w.groupKeyBuf) - memSize)
if err != nil {
return err
}

partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKey, finalConcurrency)
partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKeyBuf, finalConcurrency)

numRows := chk.NumRows()
rows := make([]chunk.Row, 1)
Expand Down
20 changes: 8 additions & 12 deletions pkg/executor/aggregate/agg_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,23 @@ var groupKeyPool = sync.Pool{
},
}

func getPartialResultsBufferFromPool() *[][]aggfuncs.PartialResult {
func getBuffer() (*[][]aggfuncs.PartialResult, *[][]byte) {
partialResultsBuffer := partialResultsBufferPool.Get().(*[][]aggfuncs.PartialResult)
*partialResultsBuffer = (*partialResultsBuffer)[:0]
return partialResultsBuffer
}

func getGroupKeyFromPool() *[][]byte {
groupKey := groupKeyPool.Get().(*[][]byte)
*groupKey = (*groupKey)[:0]
return groupKey
return partialResultsBuffer, groupKey
}

func tryToRecyclePartialResultsBuffer(buf *[][]aggfuncs.PartialResult) {
// tryRecycleBuffer recycles small buffers only. This approach reduces the CPU pressure
// from memory allocation during high concurrency aggregation computations (like DDL's scheduled tasks),
// and also prevents the pool from holding too much memory and causing memory pressure.
func tryRecycleBuffer(buf *[][]aggfuncs.PartialResult, groupKey *[][]byte) {
if cap(*buf) <= defaultPartialResultsBufferCap {
partialResultsBufferPool.Put(buf)
}
}

func tryToRecycleGroupKey(buf *[][]byte) {
if cap(*buf) <= defaultGroupKeyCap {
groupKeyPool.Put(buf)
if cap(*groupKey) <= defaultGroupKeyCap {
groupKeyPool.Put(groupKey)
}
}

Expand Down

0 comments on commit 61ddc98

Please sign in to comment.