-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: add some memory tracker in HashJoin #33918
Merged
Merged
Changes from 6 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
6aee861
implement
wshwsh12 25a1a8c
add some ut
wshwsh12 8468511
fix get memory delta
wshwsh12 34e838d
fix
wshwsh12 e2a0552
Merge remote-tracking branch 'upstream/master' into hashmap-memory
wshwsh12 33551d5
fix
wshwsh12 7d358db
s/GetMemoryDelta/GetAndCleanMemoryDelta
wshwsh12 57dfb7a
Merge branch 'master' into hashmap-memory
ti-chi-bot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"hash/fnv" | ||
"sync/atomic" | ||
"time" | ||
"unsafe" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/tidb/sessionctx" | ||
|
@@ -29,6 +30,7 @@ import ( | |
"github.com/pingcap/tidb/util/codec" | ||
"github.com/pingcap/tidb/util/disk" | ||
"github.com/pingcap/tidb/util/execdetails" | ||
"github.com/pingcap/tidb/util/hack" | ||
"github.com/pingcap/tidb/util/memory" | ||
) | ||
|
||
|
@@ -83,6 +85,7 @@ type hashRowContainer struct { | |
hashTable baseHashTable | ||
|
||
rowContainer *chunk.RowContainer | ||
memTracker *memory.Tracker | ||
} | ||
|
||
func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { | ||
|
@@ -94,7 +97,9 @@ func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContex | |
stat: new(hashStatistic), | ||
hashTable: newConcurrentMapHashTable(), | ||
rowContainer: rc, | ||
memTracker: memory.NewTracker(memory.LabelForRowContainer, -1), | ||
} | ||
rc.GetMemTracker().AttachTo(c.GetMemTracker()) | ||
return c | ||
} | ||
|
||
|
@@ -186,6 +191,7 @@ func (c *hashRowContainer) PutChunkSelected(chk *chunk.Chunk, selected, ignoreNu | |
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(i)} | ||
c.hashTable.Put(key, rowPtr) | ||
} | ||
c.GetMemTracker().Consume(c.hashTable.GetMemoryDelta()) | ||
return nil | ||
} | ||
|
||
|
@@ -219,7 +225,7 @@ func (c *hashRowContainer) Close() error { | |
} | ||
|
||
// GetMemTracker returns the underlying memory usage tracker in hashRowContainer. | ||
func (c *hashRowContainer) GetMemTracker() *memory.Tracker { return c.rowContainer.GetMemTracker() } | ||
func (c *hashRowContainer) GetMemTracker() *memory.Tracker { return c.memTracker } | ||
|
||
// GetDiskTracker returns the underlying disk usage tracker in hashRowContainer. | ||
func (c *hashRowContainer) GetDiskTracker() *disk.Tracker { return c.rowContainer.GetDiskTracker() } | ||
|
@@ -251,7 +257,7 @@ func newEntryStore() *entryStore { | |
return es | ||
} | ||
|
||
func (es *entryStore) GetStore() (e *entry) { | ||
func (es *entryStore) GetStore() (e *entry, memDelta int64) { | ||
sliceIdx := uint32(len(es.slices) - 1) | ||
slice := es.slices[sliceIdx] | ||
if es.cursor >= cap(slice) { | ||
|
@@ -263,6 +269,7 @@ func (es *entryStore) GetStore() (e *entry) { | |
es.slices = append(es.slices, slice) | ||
sliceIdx++ | ||
es.cursor = 0 | ||
memDelta = int64(unsafe.Sizeof(entry{})) * int64(size) | ||
} | ||
e = &es.slices[sliceIdx][es.cursor] | ||
es.cursor++ | ||
|
@@ -273,6 +280,9 @@ type baseHashTable interface { | |
Put(hashKey uint64, rowPtr chunk.RowPtr) | ||
Get(hashKey uint64) (rowPtrs []chunk.RowPtr) | ||
Len() uint64 | ||
// GetMemoryDelta gets the memDelta of the baseHashTable. Memory delta will be cleared after each fetch. | ||
// It indicates the memory delta of the baseHashTable since the last calling GetMemoryDelta(). | ||
GetMemoryDelta() int64 | ||
} | ||
|
||
// TODO (fangzhuhe) remove unsafeHashTable later if it not used anymore | ||
|
@@ -283,6 +293,9 @@ type unsafeHashTable struct { | |
hashMap map[uint64]*entry | ||
entryStore *entryStore | ||
length uint64 | ||
|
||
bInMap int64 // indicate there are 2^bInMap buckets in hashMap | ||
memDelta int64 // the memory delta of the unsafeHashTable since the last calling GetMemoryDelta() | ||
} | ||
|
||
// newUnsafeHashTable creates a new unsafeHashTable. estCount means the estimated size of the hashMap. | ||
|
@@ -297,11 +310,16 @@ func newUnsafeHashTable(estCount int) *unsafeHashTable { | |
// Put puts the key/rowPtr pairs to the unsafeHashTable, multiple rowPtrs are stored in a list. | ||
func (ht *unsafeHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) { | ||
oldEntry := ht.hashMap[hashKey] | ||
newEntry := ht.entryStore.GetStore() | ||
newEntry, memDelta := ht.entryStore.GetStore() | ||
newEntry.ptr = rowPtr | ||
newEntry.next = oldEntry | ||
ht.hashMap[hashKey] = newEntry | ||
if len(ht.hashMap) > (1<<ht.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen { | ||
memDelta += hack.DefBucketMemoryUsageForMapIntToPtr * (1 << ht.bInMap) | ||
ht.bInMap++ | ||
} | ||
ht.length++ | ||
ht.memDelta += memDelta | ||
} | ||
|
||
// Get gets the values of the "key" and appends them to "values". | ||
|
@@ -318,11 +336,19 @@ func (ht *unsafeHashTable) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) { | |
// if the same key is put more than once. | ||
func (ht *unsafeHashTable) Len() uint64 { return ht.length } | ||
|
||
// GetMemoryDelta gets the memDelta of the unsafeHashTable. | ||
func (ht *unsafeHashTable) GetMemoryDelta() int64 { | ||
memDelta := ht.memDelta | ||
ht.memDelta = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why set this to 0? |
||
return memDelta | ||
} | ||
|
||
// concurrentMapHashTable is a concurrent hash table built on concurrentMap | ||
type concurrentMapHashTable struct { | ||
hashMap concurrentMap | ||
entryStore *entryStore | ||
length uint64 | ||
memDelta int64 // the memory delta of the concurrentMapHashTable since the last calling GetMemoryDelta() | ||
} | ||
|
||
// newConcurrentMapHashTable creates a concurrentMapHashTable | ||
|
@@ -331,6 +357,7 @@ func newConcurrentMapHashTable() *concurrentMapHashTable { | |
ht.hashMap = newConcurrentMap() | ||
ht.entryStore = newEntryStore() | ||
ht.length = 0 | ||
ht.memDelta = hack.DefBucketMemoryUsageForMapIntToPtr + int64(unsafe.Sizeof(entry{}))*initialEntrySliceLen | ||
return ht | ||
} | ||
|
||
|
@@ -341,10 +368,13 @@ func (ht *concurrentMapHashTable) Len() uint64 { | |
|
||
// Put puts the key/rowPtr pairs to the concurrentMapHashTable, multiple rowPtrs are stored in a list. | ||
func (ht *concurrentMapHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) { | ||
newEntry := ht.entryStore.GetStore() | ||
newEntry, memDelta := ht.entryStore.GetStore() | ||
newEntry.ptr = rowPtr | ||
newEntry.next = nil | ||
ht.hashMap.Insert(hashKey, newEntry) | ||
memDelta += ht.hashMap.Insert(hashKey, newEntry) | ||
if memDelta != 0 { | ||
atomic.AddInt64(&ht.memDelta, memDelta) | ||
} | ||
atomic.AddUint64(&ht.length, 1) | ||
} | ||
|
||
|
@@ -357,3 +387,15 @@ func (ht *concurrentMapHashTable) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) { | |
} | ||
return | ||
} | ||
|
||
// GetMemoryDelta gets the memDelta of the concurrentMapHashTable. Memory delta will be cleared after each fetch. | ||
func (ht *concurrentMapHashTable) GetMemoryDelta() int64 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GetAndCleanMemoryDelta |
||
var memDelta int64 | ||
for { | ||
memDelta = atomic.LoadInt64(&ht.memDelta) | ||
if atomic.CompareAndSwapInt64(&ht.memDelta, memDelta, 0) { | ||
break | ||
} | ||
} | ||
return memDelta | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment for this func