From 9478a6bfafaf15b4e2be1e5ebbf3c9a77e91945d Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Thu, 21 Apr 2022 13:36:03 +0800 Subject: [PATCH] util: track the memory usage of rowPtrs in sortedRowContainer (#34128) ref pingcap/tidb#33877 --- util/chunk/row_container.go | 24 ++++++++++++++---------- util/chunk/row_container_test.go | 4 ++-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index e2104c52b45dc..20d5496fa533d 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -78,10 +78,6 @@ func (m *mutexForRowContainer) RUnlock() { type RowContainer struct { m *mutexForRowContainer - fieldType []*types.FieldType - chunkSize int - numRow int - memTracker *memory.Tracker diskTracker *disk.Tracker actionSpill *SpillDiskAction @@ -97,8 +93,6 @@ func NewRowContainer(fieldType []*types.FieldType, chunkSize int) *RowContainer rLock: rLock, wLocks: []*sync.RWMutex{rLock}, }, - fieldType: fieldType, - chunkSize: chunkSize, memTracker: memory.NewTracker(memory.LabelForRowContainer, -1), diskTracker: disk.NewTracker(memory.LabelForRowContainer, -1), } @@ -438,20 +432,24 @@ type SortedRowContainer struct { keyCmpFuncs []CompareFunc actionSpill *SortAndSpillDiskAction + memTracker *memory.Tracker } // NewSortedRowContainer creates a new SortedRowContainer in memory. func NewSortedRowContainer(fieldType []*types.FieldType, chunkSize int, ByItemsDesc []bool, keyColumns []int, keyCmpFuncs []CompareFunc) *SortedRowContainer { - return &SortedRowContainer{RowContainer: NewRowContainer(fieldType, chunkSize), + src := SortedRowContainer{RowContainer: NewRowContainer(fieldType, chunkSize), ByItemsDesc: ByItemsDesc, keyColumns: keyColumns, keyCmpFuncs: keyCmpFuncs} + src.memTracker = memory.NewTracker(memory.LabelForRowContainer, -1) + src.RowContainer.GetMemTracker().AttachTo(src.GetMemTracker()) + return &src } // Close close the SortedRowContainer func (c *SortedRowContainer) Close() error { c.ptrM.Lock() defer c.ptrM.Unlock() - c.GetMemTracker().Consume(int64(-8 * cap(c.ptrM.rowPtrs))) + c.GetMemTracker().Consume(int64(-8 * c.NumRow())) c.ptrM.rowPtrs = nil return c.RowContainer.Close() } @@ -486,7 +484,7 @@ func (c *SortedRowContainer) Sort() { if c.ptrM.rowPtrs != nil { return } - c.ptrM.rowPtrs = make([]RowPtr, 0, c.NumRow()) + c.ptrM.rowPtrs = make([]RowPtr, 0, c.NumRow()) // The memory usage has been tracked in SortedRowContainer.Add() function for chkIdx := 0; chkIdx < c.NumChunks(); chkIdx++ { rowChk, err := c.GetChunk(chkIdx) // err must be nil, because the chunk is in memory. @@ -498,7 +496,6 @@ func (c *SortedRowContainer) Sort() { } } sort.Slice(c.ptrM.rowPtrs, c.keyColumnsLess) - c.GetMemTracker().Consume(int64(8 * c.numRow)) } func (c *SortedRowContainer) sortAndSpillToDisk() { @@ -513,6 +510,8 @@ func (c *SortedRowContainer) Add(chk *Chunk) (err error) { if c.ptrM.rowPtrs != nil { return ErrCannotAddBecauseSorted } + // Consume the memory usage of rowPtrs in advance + c.GetMemTracker().Consume(int64(chk.NumRows() * 8)) return c.RowContainer.Add(chk) } @@ -544,6 +543,11 @@ func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction { return c.actionSpill } +// GetMemTracker return the memory tracker for the sortedRowContainer +func (c *SortedRowContainer) GetMemTracker() *memory.Tracker { + return c.memTracker +} + // SortAndSpillDiskAction implements memory.ActionOnExceed for chunk.List. If // the memory quota of a query is exceeded, SortAndSpillDiskAction.Action is // triggered. diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index 3803a1451df7a..b0972c179388a 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -156,14 +156,14 @@ func TestSortedRowContainerSortSpillAction(t *testing.T) { var tracker *memory.Tracker var err error tracker = rc.GetMemTracker() - tracker.SetBytesLimit(chk.MemoryUsage() + 1) + tracker.SetBytesLimit(chk.MemoryUsage() + int64(8*chk.NumRows()) + 1) tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) require.False(t, rc.AlreadySpilledSafeForTest()) err = rc.Add(chk) rc.actionSpill.WaitForTest() require.NoError(t, err) require.False(t, rc.AlreadySpilledSafeForTest()) - require.Equal(t, chk.MemoryUsage(), rc.GetMemTracker().BytesConsumed()) + require.Equal(t, chk.MemoryUsage()+int64(8*chk.NumRows()), rc.GetMemTracker().BytesConsumed()) // The following line is erroneous, since chk is already handled by rc, Add it again causes duplicated memory usage account. // It is only for test of spill, do not double-add a chunk elsewhere. err = rc.Add(chk)