Skip to content

Commit

Permalink
statstics: reuse fmsketch (#47070) (#49573)
Browse files Browse the repository at this point in the history
close #47071
  • Loading branch information
ti-chi-bot authored Dec 19, 2023
1 parent 10caef0 commit ba5d946
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 28 deletions.
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ go_library(
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
Expand Down
9 changes: 8 additions & 1 deletion executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

type globalStatsKey struct {
Expand Down Expand Up @@ -52,9 +53,11 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
for globalStatsID := range globalStatsMap {
globalStatsTableIDs[globalStatsID.tableID] = struct{}{}
}

statsHandle := domain.GetDomain(e.ctx).StatsHandle()
tableAllPartitionStats := make(map[int64]*statistics.Table)
for tableID := range globalStatsTableIDs {
tableAllPartitionStats := make(map[int64]*statistics.Table)
maps.Clear(tableAllPartitionStats)
for globalStatsID, info := range globalStatsMap {
if globalStatsID.tableID != tableID {
continue
Expand Down Expand Up @@ -112,6 +115,10 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
}()
FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr)
}

for _, value := range tableAllPartitionStats {
value.ReleaseAndPutToPool()
}
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_twmb_murmur3//:murmur3",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
81 changes: 54 additions & 27 deletions statistics/fmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,57 @@ package statistics

import (
"hash"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tipb/go-tipb"
"github.com/twmb/murmur3"
"golang.org/x/exp/maps"
)

var murmur3Pool = sync.Pool{
New: func() any {
return murmur3.New64()
},
}

var fmSketchPool = sync.Pool{
New: func() any {
return &FMSketch{
hashset: make(map[uint64]bool),
maxSize: 0,
}
},
}

// FMSketch is used to count the number of distinct elements in a set.
type FMSketch struct {
hashset map[uint64]bool
mask uint64
maxSize int
hashFunc hash.Hash64
hashset map[uint64]bool
mask uint64
maxSize int
}

// NewFMSketch returns a new FM sketch.
func NewFMSketch(maxSize int) *FMSketch {
return &FMSketch{
hashset: make(map[uint64]bool),
maxSize: maxSize,
hashFunc: murmur3.New64(),
}
result := fmSketchPool.Get().(*FMSketch)
result.maxSize = maxSize
return result
}

// Copy makes a copy for current FMSketch.
func (s *FMSketch) Copy() *FMSketch {
if s == nil {
return nil
}
hashset := make(map[uint64]bool)
result := NewFMSketch(s.maxSize)
for key, value := range s.hashset {
hashset[key] = value
}
return &FMSketch{
hashset: hashset,
mask: s.mask,
maxSize: s.maxSize,
hashFunc: murmur3.New64(),
result.hashset[key] = value
}
result.mask = s.mask
return result
}

// NDV returns the ndv of the sketch.
Expand Down Expand Up @@ -88,31 +98,35 @@ func (s *FMSketch) InsertValue(sc *stmtctx.StatementContext, value types.Datum)
if err != nil {
return errors.Trace(err)
}
s.hashFunc.Reset()
_, err = s.hashFunc.Write(bytes)
hashFunc := murmur3Pool.Get().(hash.Hash64)
hashFunc.Reset()
defer murmur3Pool.Put(hashFunc)
_, err = hashFunc.Write(bytes)
if err != nil {
return errors.Trace(err)
}
s.insertHashValue(s.hashFunc.Sum64())
s.insertHashValue(hashFunc.Sum64())
return nil
}

// InsertRowValue inserts multi-column values to the sketch.
func (s *FMSketch) InsertRowValue(sc *stmtctx.StatementContext, values []types.Datum) error {
b := make([]byte, 0, 8)
s.hashFunc.Reset()
hashFunc := murmur3Pool.Get().(hash.Hash64)
hashFunc.Reset()
defer murmur3Pool.Put(hashFunc)
for _, v := range values {
b = b[:0]
b, err := codec.EncodeValue(sc, b, v)
if err != nil {
return err
}
_, err = s.hashFunc.Write(b)
_, err = hashFunc.Write(b)
if err != nil {
return err
}
}
s.insertHashValue(s.hashFunc.Sum64())
s.insertHashValue(hashFunc.Sum64())
return nil
}

Expand Down Expand Up @@ -151,10 +165,8 @@ func FMSketchFromProto(protoSketch *tipb.FMSketch) *FMSketch {
if protoSketch == nil {
return nil
}
sketch := &FMSketch{
hashset: make(map[uint64]bool, len(protoSketch.Hashset)),
mask: protoSketch.Mask,
}
sketch := fmSketchPool.Get().(*FMSketch)
sketch.mask = protoSketch.Mask
for _, val := range protoSketch.Hashset {
sketch.hashset[val] = true
}
Expand Down Expand Up @@ -194,3 +206,18 @@ func (s *FMSketch) MemoryUsage() (sum int64) {
sum = int64(16 + 9*len(s.hashset))
return
}

func (s *FMSketch) reset() {
maps.Clear(s.hashset)
s.mask = 0
s.maxSize = 0
}

// DestroyAndPutToPool resets the FMSketch and puts it to the pool.
func (s *FMSketch) DestroyAndPutToPool() {
if s == nil {
return
}
s.reset()
fmSketchPool.Put(s)
}
13 changes: 13 additions & 0 deletions statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/util/tracing"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -824,6 +825,18 @@ func GetOrdinalOfRangeCond(sc *stmtctx.StatementContext, ran *ranger.Range) int
return len(ran.LowVal)
}

// ReleaseAndPutToPool releases data structures of Table and put itself back to pool.
func (t *Table) ReleaseAndPutToPool() {
for _, col := range t.Columns {
col.FMSketch.DestroyAndPutToPool()
}
maps.Clear(t.Columns)
for _, idx := range t.Indices {
idx.FMSketch.DestroyAndPutToPool()
}
maps.Clear(t.Indices)
}

// ID2UniqueID generates a new HistColl whose `Columns` is built from UniqueID of given columns.
func (coll *HistColl) ID2UniqueID(columns []*expression.Column) *HistColl {
cols := make(map[int64]*Column)
Expand Down

0 comments on commit ba5d946

Please sign in to comment.