Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: remove unnecessary memory allocation in (*HashAggExec).initPartialWorkers #52418

Merged
merged 5 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/executor/aggregate/agg_hash_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc
partialResultsMap[i] = make(aggfuncs.AggPartialResultMapper)
}

partialResultsBuffer, groupKeyBuf := getBuffer()
e.partialWorkers[i] = HashAggPartialWorker{
baseHashAggWorker: newBaseHashAggWorker(e.finishCh, e.PartialAggFuncs, e.MaxChunkSize(), e.memTracker),
idForTest: i,
Expand All @@ -295,12 +296,12 @@ func (e *HashAggExec) initPartialWorkers(partialConcurrency int, finalConcurrenc
outputChs: e.partialOutputChs,
giveBackCh: e.inputCh,
BInMaps: make([]int, finalConcurrency),
partialResultsBuffer: make([][]aggfuncs.PartialResult, 0, 2048),
partialResultsBuffer: *partialResultsBuffer,
globalOutputCh: e.finalOutputCh,
partialResultsMap: partialResultsMap,
groupByItems: e.GroupByItems,
chk: exec.TryNewCacheChunk(e.Children(0)),
groupKey: make([][]byte, 0, 8),
groupKeyBuf: *groupKeyBuf,
serializeHelpers: aggfuncs.NewSerializeHelper(),
isSpillPrepared: false,
spillHelper: e.spillHelper,
Expand Down Expand Up @@ -338,9 +339,7 @@ func (e *HashAggExec) initFinalWorkers(finalConcurrency int) {
inputCh: e.partialOutputChs[i],
outputCh: e.finalOutputCh,
finalResultHolderCh: make(chan *chunk.Chunk, 1),
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(exec.RetTypes(e)),
groupKeys: make([][]byte, 0, 8),
spillHelper: e.spillHelper,
restoredAggResultMapperMem: 0,
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/executor/aggregate/agg_hash_final_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/aggfuncs"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand All @@ -42,14 +41,12 @@ type AfFinalResult struct {
type HashAggFinalWorker struct {
baseHashAggWorker

rowBuffer []types.Datum
mutableRow chunk.MutRow
partialResultMap aggfuncs.AggPartialResultMapper
BInMap int
inputCh chan *aggfuncs.AggPartialResultMapper
outputCh chan *AfFinalResult
finalResultHolderCh chan *chunk.Chunk
groupKeys [][]byte

spillHelper *parallelHashAggSpillHelper

Expand Down
12 changes: 7 additions & 5 deletions pkg/executor/aggregate/agg_hash_partial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type HashAggPartialWorker struct {
partialResultsMapMem atomic.Int64

groupByItems []expression.Expression
groupKey [][]byte
groupKeyBuf [][]byte
// chk stores the input data from child,
// and is reused by childExec and partial worker.
chk *chunk.Chunk
Expand Down Expand Up @@ -201,6 +201,8 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG

// We must ensure that there is no panic before `waitGroup.Done()` or there will be hang
waitGroup.Done()

tryRecycleBuffer(&w.partialResultsBuffer, &w.groupKeyBuf)
}()

intestBeforePartialWorkerRun()
Expand Down Expand Up @@ -254,15 +256,15 @@ func (w *HashAggPartialWorker) getPartialResultsOfEachRow(groupKey [][]byte, fin
}

func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, chk *chunk.Chunk, finalConcurrency int) (err error) {
memSize := getGroupKeyMemUsage(w.groupKey)
w.groupKey, err = GetGroupKey(w.ctx, chk, w.groupKey, w.groupByItems)
memSize := getGroupKeyMemUsage(w.groupKeyBuf)
w.groupKeyBuf, err = GetGroupKey(w.ctx, chk, w.groupKeyBuf, w.groupByItems)
failpoint.Inject("ConsumeRandomPanic", nil)
w.memTracker.Consume(getGroupKeyMemUsage(w.groupKey) - memSize)
w.memTracker.Consume(getGroupKeyMemUsage(w.groupKeyBuf) - memSize)
if err != nil {
return err
}

partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKey, finalConcurrency)
partialResultOfEachRow := w.getPartialResultsOfEachRow(w.groupKeyBuf, finalConcurrency)

numRows := chk.NumRows()
rows := make([]chunk.Row, 1)
Expand Down
38 changes: 38 additions & 0 deletions pkg/executor/aggregate/agg_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"slices"
"sync"
"sync/atomic"
"time"

Expand All @@ -40,6 +41,43 @@ import (
"go.uber.org/zap"
)

const defaultPartialResultsBufferCap = 2048
const defaultGroupKeyCap = 8

var partialResultsBufferPool = sync.Pool{
New: func() any {
s := make([][]aggfuncs.PartialResult, 0, defaultPartialResultsBufferCap)
return &s
},
}

var groupKeyPool = sync.Pool{
New: func() any {
s := make([][]byte, 0, defaultGroupKeyCap)
return &s
},
}

func getBuffer() (*[][]aggfuncs.PartialResult, *[][]byte) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use *[][]xx instead of [][]xx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use *[][]xx instead of [][]xx

It's better to return pointer type in sync.Pool, refer

partialResultsBuffer := partialResultsBufferPool.Get().(*[][]aggfuncs.PartialResult)
*partialResultsBuffer = (*partialResultsBuffer)[:0]
groupKey := groupKeyPool.Get().(*[][]byte)
*groupKey = (*groupKey)[:0]
return partialResultsBuffer, groupKey
}

// tryRecycleBuffer recycles small buffers only. This approach reduces the CPU pressure
// from memory allocation during high concurrency aggregation computations (like DDL's scheduled tasks),
// and also prevents the pool from holding too much memory and causing memory pressure.
func tryRecycleBuffer(buf *[][]aggfuncs.PartialResult, groupKey *[][]byte) {
if cap(*buf) <= defaultPartialResultsBufferCap {
partialResultsBufferPool.Put(buf)
}
if cap(*groupKey) <= defaultGroupKeyCap {
groupKeyPool.Put(groupKey)
}
}

func closeBaseExecutor(b *exec.BaseExecutor) {
if r := recover(); r != nil {
// Release the resource, but throw the panic again and let the top level handle it.
Expand Down