Skip to content

Commit

Permalink
executor: remove unnecessary memory allocation in (*HashAggExec).init…
Browse files Browse the repository at this point in the history
…PartialWorkers (pingcap#52418)

close pingcap#52321
  • Loading branch information
xzhangxian1008 authored and RidRisR committed May 23, 2024
1 parent 6a7dc75 commit 000386b
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
7 changes: 3 additions & 4 deletions pkg/executor/aggregate/agg_hash_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc
partialResultsMap[i] = make(aggfuncs.AggPartialResultMapper)
}

partialResultsBuffer, groupKeyBuf := getBuffer()
e.partialWorkers[i] = HashAggPartialWorker{
baseHashAggWorker: newBaseHashAggWorker(e.finishCh, e.PartialAggFuncs, e.MaxChunkSize(), e.memTracker),
idForTest: i,
Expand All @@ -299,12 +300,12 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc
outputChs: e.partialOutputChs,
giveBackCh: e.inputCh,
BInMaps: make([]int, finalConcurrency),
partialResultsBuffer: make([][]aggfuncs.PartialResult, 0, 2048),
partialResultsBuffer: *partialResultsBuffer,
globalOutputCh: e.finalOutputCh,
partialResultsMap: partialResultsMap,
groupByItems: e.GroupByItems,
chk: exec.TryNewCacheChunk(e.Children(0)),
groupKey: make([][]byte, 0, 8),
groupKeyBuf: *groupKeyBuf,
serializeHelpers: aggfuncs.NewSerializeHelper(),
isSpillPrepared: false,
spillHelper: e.spillHelper,
Expand Down Expand Up @@ -342,9 +343,7 @@ func (e *HashAggExec) initFinalWorkers(finalConcurrency int) {
inputCh: e.partialOutputChs[i],
outputCh: e.finalOutputCh,
finalResultHolderCh: make(chan *chunk.Chunk, 1),
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(exec.RetTypes(e)),
groupKeys: make([][]byte, 0, 8),
spillHelper: e.spillHelper,
restoredAggResultMapperMem: 0,
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/executor/aggregate/agg_hash_final_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/aggfuncs"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand All @@ -42,14 +41,12 @@ type AfFinalResult struct {
type HashAggFinalWorker struct {
baseHashAggWorker

rowBuffer []types.Datum
mutableRow chunk.MutRow
partialResultMap aggfuncs.AggPartialResultMapper
BInMap int
inputCh chan *aggfuncs.AggPartialResultMapper
outputCh chan *AfFinalResult
finalResultHolderCh chan *chunk.Chunk
groupKeys [][]byte

spillHelper *parallelHashAggSpillHelper

Expand Down
12 changes: 7 additions & 5 deletions pkg/executor/aggregate/agg_hash_partial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type HashAggPartialWorker struct {
partialResultsMapMem atomic.Int64

groupByItems []expression.Expression
groupKey [][]byte
groupKeyBuf [][]byte
// chk stores the input data from child,
// and is reused by childExec and partial worker.
chk *chunk.Chunk
Expand Down Expand Up @@ -201,6 +201,8 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG

// We must ensure that there is no panic before `waitGroup.Done()` or there will be hang
waitGroup.Done()

tryRecycleBuffer(&w.partialResultsBuffer, &w.groupKeyBuf)
}()

intestBeforePartialWorkerRun()
Expand Down Expand Up @@ -254,15 +256,15 @@ func (w *HashAggPartialWorker) getPartialResultsOfEachRow(groupKey [][]byte, fin
}

func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, chk *chunk.Chunk, finalConcurrency int) (err error) {
memSize := getGroupKeyMemUsage(w.groupKey)
w.groupKey, err = GetGroupKey(w.ctx, chk, w.groupKey, w.groupByItems)
memSize := getGroupKeyMemUsage(w.groupKeyBuf)
w.groupKeyBuf, err = GetGroupKey(w.ctx, chk, w.groupKeyBuf, w.groupByItems)
failpoint.Inject("ConsumeRandomPanic", nil)
w.memTracker.Consume(getGroupKeyMemUsage(w.groupKey) - memSize)
w.memTracker.Consume(getGroupKeyMemUsage(w.groupKeyBuf) - memSize)
if err != nil {
return err
}

partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKey, finalConcurrency)
partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKeyBuf, finalConcurrency)

numRows := chk.NumRows()
rows := make([]chunk.Row, 1)
Expand Down
38 changes: 38 additions & 0 deletions pkg/executor/aggregate/agg_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"slices"
"sync"
"sync/atomic"
"time"

Expand All @@ -40,6 +41,43 @@ import (
"go.uber.org/zap"
)

const defaultPartialResultsBufferCap = 2048
const defaultGroupKeyCap = 8

var partialResultsBufferPool = sync.Pool{
New: func() any {
s := make([][]aggfuncs.PartialResult, 0, defaultPartialResultsBufferCap)
return &s
},
}

var groupKeyPool = sync.Pool{
New: func() any {
s := make([][]byte, 0, defaultGroupKeyCap)
return &s
},
}

func getBuffer() (*[][]aggfuncs.PartialResult, *[][]byte) {
partialResultsBuffer := partialResultsBufferPool.Get().(*[][]aggfuncs.PartialResult)
*partialResultsBuffer = (*partialResultsBuffer)[:0]
groupKey := groupKeyPool.Get().(*[][]byte)
*groupKey = (*groupKey)[:0]
return partialResultsBuffer, groupKey
}

// tryRecycleBuffer recycles small buffers only. This approach reduces the CPU pressure
// from memory allocation during high concurrency aggregation computations (like DDL's scheduled tasks),
// and also prevents the pool from holding too much memory and causing memory pressure.
func tryRecycleBuffer(buf *[][]aggfuncs.PartialResult, groupKey *[][]byte) {
if cap(*buf) <= defaultPartialResultsBufferCap {
partialResultsBufferPool.Put(buf)
}
if cap(*groupKey) <= defaultGroupKeyCap {
groupKeyPool.Put(groupKey)
}
}

func closeBaseExecutor(b *exec.BaseExecutor) {
if r := recover(); r != nil {
// Release the resource, but throw the panic again and let the top level handle it.
Expand Down

0 comments on commit 000386b

Please sign in to comment.