Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statstics: reuse fmsketch #47070

Merged
merged 12 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ go_library(
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//maps",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
9 changes: 7 additions & 2 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

type globalStatsKey struct {
Expand Down Expand Up @@ -53,10 +54,10 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob

statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
tableIDs := make(map[int64]struct{}, len(globalStatsTableIDs))

tableAllPartitionStats := make(map[int64]*statistics.Table)
for tableID := range globalStatsTableIDs {
tableIDs[tableID] = struct{}{}
tableAllPartitionStats := make(map[int64]*statistics.Table)
maps.Clear(tableAllPartitionStats)

for globalStatsID, info := range globalStatsMap {
if globalStatsID.tableID != tableID {
Expand Down Expand Up @@ -126,6 +127,10 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob

FinishAnalyzeMergeJob(e.Ctx(), job, mergeStatsErr)
}

for _, value := range tableAllPartitionStats {
value.ReleaseAndPutToPool()
}
}

for tableID := range tableIDs {
Expand Down
49 changes: 34 additions & 15 deletions statistics/fmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tipb/go-tipb"
"github.com/twmb/murmur3"
"golang.org/x/exp/maps"
)

var murmur3Pool = sync.Pool{
Expand All @@ -32,6 +33,15 @@ var murmur3Pool = sync.Pool{
},
}

var fmSketchPool = sync.Pool{
New: func() any {
return &FMSketch{
hashset: make(map[uint64]bool),
maxSize: 0,
}
},
}

// FMSketch is used to count the number of distinct elements in a set.
type FMSketch struct {
hashset map[uint64]bool
Expand All @@ -41,26 +51,22 @@ type FMSketch struct {

// NewFMSketch returns a new FM sketch.
func NewFMSketch(maxSize int) *FMSketch {
return &FMSketch{
hashset: make(map[uint64]bool),
maxSize: maxSize,
}
result := fmSketchPool.Get().(*FMSketch)
result.maxSize = maxSize
return result
}

// Copy makes a copy for current FMSketch.
func (s *FMSketch) Copy() *FMSketch {
if s == nil {
return nil
}
hashset := make(map[uint64]bool)
result := NewFMSketch(s.maxSize)
for key, value := range s.hashset {
hashset[key] = value
}
return &FMSketch{
hashset: hashset,
mask: s.mask,
maxSize: s.maxSize,
result.hashset[key] = value
}
result.mask = s.mask
return result
}

// NDV returns the ndv of the sketch.
Expand Down Expand Up @@ -159,10 +165,8 @@ func FMSketchFromProto(protoSketch *tipb.FMSketch) *FMSketch {
if protoSketch == nil {
return nil
}
sketch := &FMSketch{
hashset: make(map[uint64]bool, len(protoSketch.Hashset)),
mask: protoSketch.Mask,
}
sketch := fmSketchPool.Get().(*FMSketch)
sketch.mask = protoSketch.Mask
for _, val := range protoSketch.Hashset {
sketch.hashset[val] = true
}
Expand Down Expand Up @@ -201,3 +205,18 @@ func (s *FMSketch) MemoryUsage() (sum int64) {
sum = int64(16 + 9*len(s.hashset))
return
}

func (s *FMSketch) reset() {
maps.Clear(s.hashset)
s.mask = 0
s.maxSize = 0
}

// DestroyAndPutToPool resets the FMSketch and puts it to the pool.
func (s *FMSketch) DestroyAndPutToPool() {
if s == nil {
return
}
s.reset()
fmSketchPool.Put(s)
}
25 changes: 16 additions & 9 deletions statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(
loadTablePartitionStatsFn loadTablePartitionStatsFunc,
) (globalStats *GlobalStats, err error) {
partitionNum := len(globalTableInfo.Partition.Definitions)
externalCache := false
if allPartitionStats == nil {
allPartitionStats = make(map[int64]*statistics.Table)
} else {
externalCache = true
}
if len(histIDs) == 0 {
for _, col := range globalTableInfo.Columns {
// The virtual generated column stats can not be merged to the global stats.
Expand Down Expand Up @@ -133,12 +139,10 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(

tableInfo := partitionTable.Meta()
var partitionStats *statistics.Table
if allPartitionStats != nil {
partitionStats, ok = allPartitionStats[partitionID]
}

// If preload partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats.
if allPartitionStats == nil || partitionStats == nil || !ok {
partitionStats, ok = allPartitionStats[partitionID]
// If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats
if !ok {
var err1 error
partitionStats, err1 = loadTablePartitionStatsFn(tableInfo, &def)
if err1 != nil {
Expand All @@ -149,9 +153,6 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(
err = err1
return
}
if allPartitionStats == nil {
allPartitionStats = make(map[int64]*statistics.Table)
}
allPartitionStats[partitionID] = partitionStats
}

Expand Down Expand Up @@ -249,16 +250,22 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(
globalStats.Fms[i] = allFms[i][0].Copy()
for j := 1; j < len(allFms[i]); j++ {
globalStats.Fms[i].MergeFMSketch(allFms[i][j])
allFms[i][j].DestroyAndPutToPool()
}

// Update the global NDV.
globalStatsNDV := globalStats.Fms[i].NDV()
if globalStatsNDV > globalStats.Count {
globalStatsNDV = globalStats.Count
}
globalStats.Fms[i].DestroyAndPutToPool()
globalStats.Hg[i].NDV = globalStatsNDV
}

if !externalCache {
for _, value := range allPartitionStats {
value.ReleaseAndPutToPool()
}
}
return
}

Expand Down
12 changes: 12 additions & 0 deletions statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,18 @@ func (t *Table) IsOutdated() bool {
return false
}

// ReleaseAndPutToPool releases data structures of Table and put itself back to pool.
func (t *Table) ReleaseAndPutToPool() {
for _, col := range t.Columns {
col.FMSketch.DestroyAndPutToPool()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safer to set FMSketch to nil after putting to the pool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok to me to improve them in another PR or in another way at your convenience.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done,use map.clear to clean it.

}
maps.Clear(t.Columns)
for _, idx := range t.Indices {
idx.FMSketch.DestroyAndPutToPool()
}
maps.Clear(t.Indices)
}

// ID2UniqueID generates a new HistColl whose `Columns` is built from UniqueID of given columns.
func (coll *HistColl) ID2UniqueID(columns []*expression.Column) *HistColl {
cols := make(map[int64]*Column)
Expand Down