Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, stats: extract topn from cm sketch (#11409) #13428

Merged
merged 2 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions executor/analyze.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
maxSketchSize = 10000
defaultCMSketchDepth = 5
defaultCMSketchWidth = 2048
defaultNumTopN = uint32(20)
)

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -336,7 +337,8 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
}
}
}
return hist, cms, nil
err := hist.ExtractTopN(cms, len(e.idxInfo.Columns), defaultNumTopN)
return hist, cms, err
}

func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool) (hist *statistics.Histogram, cms *statistics.CMSketch, err error) {
Expand Down Expand Up @@ -509,6 +511,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range) (hists []*statis
cms = append(cms, nil)
}
for i, col := range e.colsInfo {
collectors[i].ExtractTopN(defaultNumTopN)
for j, s := range collectors[i].Samples {
collectors[i].Samples[j].Ordinal = j
collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone)
Expand Down Expand Up @@ -1023,14 +1026,13 @@ func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *s
data[i] = append(data[i], sample.Value.GetBytes()[:preLen])
}
}
numTop := uint32(20)
cmSketch, ndv, scaleRatio := statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[0], numTop, uint64(rowCount))
cmSketch, ndv, scaleRatio := statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[0], defaultNumTopN, uint64(rowCount))
// Build CM Sketch for each prefix and merge them into one.
for i := 1; i < len(idxInfo.Columns); i++ {
var curCMSketch *statistics.CMSketch
// `ndv` should be the ndv of full index, so just rewrite it here.
curCMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[i], numTop, uint64(rowCount))
err := cmSketch.MergeCMSketch(curCMSketch, numTop)
curCMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[i], defaultNumTopN, uint64(rowCount))
err := cmSketch.MergeCMSketch(curCMSketch, defaultNumTopN)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1207,7 +1209,7 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
return analyzeResult{Err: err, job: idxExec.job}
}
if idxExec.oldCMS != nil && cms != nil {
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS)
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS, defaultNumTopN)
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
Expand Down
27 changes: 27 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,30 @@ func (s *testSuite1) TestFailedAnalyzeRequest(c *C) {
c.Assert(err.Error(), Equals, "mock buildStatsFromResult error")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/buildStatsFromResult"), IsNil)
}

func (s *testSuite1) TestExtractTopN(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, index index_b(b))")
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i))
}
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, 0)", i+10))
}
tk.MustExec("analyze table t")
is := s.dom.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tblInfo := table.Meta()
tblStats := s.dom.StatsHandle().GetTableStats(tblInfo)
colStats := tblStats.Columns[tblInfo.Columns[1].ID]
c.Assert(len(colStats.CMSketch.TopN()), Equals, 1)
item := colStats.CMSketch.TopN()[0]
c.Assert(item.Count, Equals, uint64(11))
idxStats := tblStats.Indices[tblInfo.Indices[0].ID]
c.Assert(len(idxStats.CMSketch.TopN()), Equals, 1)
item = idxStats.CMSketch.TopN()[0]
c.Assert(item.Count, Equals, uint64(11))
}
1 change: 1 addition & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,7 @@ func (s *testSuite1) SetUpSuite(c *C) {
mockstore.WithHijackClient(hijackClient),
)
c.Assert(err, IsNil)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.dom.SetStatsUpdating(true)
Expand Down
37 changes: 29 additions & 8 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func newTopNHelper(sample [][]byte, numTop uint32) *topNHelper {
if actualNumTop >= numTop && sorted[actualNumTop].cnt*3 < sorted[numTop-1].cnt*2 {
break
}
if sorted[actualNumTop].cnt == 1 {
break
}
sumTopN += sorted[actualNumTop].cnt
}

Expand Down Expand Up @@ -247,6 +250,14 @@ func (c *CMSketch) setValue(h1, h2 uint64, count uint64) {
}
}

func (c *CMSketch) subValue(h1, h2 uint64, count uint64) {
c.count -= count
for i := range c.table {
j := (h1 + h2*uint64(i)) % uint64(c.width)
c.table[i][j] = c.table[i][j] - uint32(count)
}
}

func (c *CMSketch) queryValue(sc *stmtctx.StatementContext, val types.Datum) (uint64, error) {
bytes, err := tablecodec.EncodeValue(sc, val)
if err != nil {
Expand Down Expand Up @@ -290,7 +301,7 @@ func (c *CMSketch) queryHashValue(h1, h2 uint64) uint64 {
return uint64(res)
}

func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*TopNMeta, numTop uint32) {
func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*TopNMeta, numTop uint32, usingMax bool) {
counter := make(map[hack.MutableString]uint64)
for _, metas := range lTopN {
for _, meta := range metas {
Expand All @@ -299,7 +310,11 @@ func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*T
}
for _, metas := range rTopN {
for _, meta := range metas {
counter[hack.String(meta.Data)] += meta.Count
if usingMax {
counter[hack.String(meta.Data)] = mathutil.MaxUint64(counter[hack.String(meta.Data)], meta.Count)
} else {
counter[hack.String(meta.Data)] += meta.Count
}
}
}
sorted := make([]uint64, len(counter))
Expand Down Expand Up @@ -332,7 +347,7 @@ func (c *CMSketch) MergeCMSketch(rc *CMSketch, numTopN uint32) error {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
c.mergeTopN(c.topN, rc.topN, numTopN)
c.mergeTopN(c.topN, rc.topN, numTopN, false)
}
c.count += rc.count
for i := range c.table {
Expand All @@ -351,12 +366,12 @@ func (c *CMSketch) MergeCMSketch(rc *CMSketch, numTopN uint32) error {
// (3): For values that appears both in `c` and `rc`, if they do not appear partially in `c` and `rc`, for example,
// if `v` appears 5 times in the table, it can appears 5 times in `c` and 3 times in `rc`, then `max` also gives the correct answer.
// So in fact, if we can know the number of appearances of each value in the first place, it is better to use `max` to construct the CM sketch rather than `sum`.
func (c *CMSketch) MergeCMSketch4IncrementalAnalyze(rc *CMSketch) error {
func (c *CMSketch) MergeCMSketch4IncrementalAnalyze(rc *CMSketch, numTopN uint32) error {
if c.depth != rc.depth || c.width != rc.width {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
return errors.New("CMSketch with Top-N does not support merge")
c.mergeTopN(c.topN, rc.topN, numTopN, true)
}
for i := range c.table {
c.count = 0
Expand Down Expand Up @@ -399,10 +414,10 @@ func CMSketchFromProto(protoSketch *tipb.CMSketch) *CMSketch {
c.count = c.count + uint64(counter)
}
}
c.defaultValue = protoSketch.DefaultValue
if len(protoSketch.TopN) == 0 {
return c
}
c.defaultValue = protoSketch.DefaultValue
c.topN = make(map[uint64][]*TopNMeta)
for _, e := range protoSketch.TopN {
h1, h2 := murmur3.Sum128(e.Data)
Expand Down Expand Up @@ -458,9 +473,15 @@ func LoadCMSketchWithTopN(exec sqlexec.RestrictedSQLExecutor, tableID, isIndex,
return decodeCMSketch(cms, topN)
}

// TotalCount returns the count, it is only used for test.
// TotalCount returns the total count in the sketch, it is only used for test.
func (c *CMSketch) TotalCount() uint64 {
return c.count
res := c.count
for _, metas := range c.topN {
for _, meta := range metas {
res += meta.Count
}
}
return res
}

// Equal tests if two CM Sketch equal, it is only used for test.
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ func (s *testStatsSuite) TestNeedAnalyzeTable(c *C) {
}

func (s *testStatsSuite) TestIndexQueryFeedback(c *C) {
c.Skip("support update the topn of index equal conditions")
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)

Expand Down
61 changes: 61 additions & 0 deletions statistics/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"sort"
"strings"
"time"

Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/spaolacci/murmur3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1069,3 +1071,62 @@ func matchPrefix(row chunk.Row, colIdx int, ad *types.Datum) bool {
}
return false
}

func getIndexPrefixLens(data []byte, numCols int) (prefixLens []int, err error) {
prefixLens = make([]int, 0, numCols)
var colData []byte
prefixLen := 0
for len(data) > 0 {
colData, data, err = codec.CutOne(data)
if err != nil {
return nil, err
}
prefixLen += len(colData)
prefixLens = append(prefixLens, prefixLen)
}
return prefixLens, nil
}

// ExtractTopN extracts topn from histogram.
func (hg *Histogram) ExtractTopN(cms *CMSketch, numCols int, numTopN uint32) error {
if hg.Len() == 0 || cms == nil || numTopN == 0 {
return nil
}
dataSet := make(map[string]struct{}, hg.Bounds.NumRows())
dataCnts := make([]dataCnt, 0, hg.Bounds.NumRows())
hg.PreCalculateScalar()
// Set a limit on the frequency of boundary values to avoid extract values with low frequency.
limit := hg.notNullCount() / float64(hg.Len())
// Since our histogram are equal depth, they must occurs on the boundaries of buckets.
for i := 0; i < hg.Bounds.NumRows(); i++ {
data := hg.Bounds.GetRow(i).GetBytes(0)
prefixLens, err := getIndexPrefixLens(data, numCols)
if err != nil {
return err
}
for _, prefixLen := range prefixLens {
prefixColData := data[:prefixLen]
_, ok := dataSet[string(prefixColData)]
if ok {
continue
}
dataSet[string(prefixColData)] = struct{}{}
res := hg.BetweenRowCount(types.NewBytesDatum(prefixColData), types.NewBytesDatum(kv.Key(prefixColData).PrefixNext()))
if res >= limit {
dataCnts = append(dataCnts, dataCnt{prefixColData, uint64(res)})
}
}
}
sort.SliceStable(dataCnts, func(i, j int) bool { return dataCnts[i].cnt >= dataCnts[j].cnt })
cms.topN = make(map[uint64][]*TopNMeta)
if len(dataCnts) > int(numTopN) {
dataCnts = dataCnts[:numTopN]
}
for _, dataCnt := range dataCnts {
h1, h2 := murmur3.Sum128(dataCnt.data)
realCnt := cms.queryHashValue(h1, h2)
cms.subValue(h1, h2, realCnt)
cms.topN[h1] = append(cms.topN[h1], &TopNMeta{h2, dataCnt.data, realCnt})
}
return nil
}
24 changes: 24 additions & 0 deletions statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
"github.com/spaolacci/murmur3"
)

// SampleItem is an item of sampled column value.
Expand Down Expand Up @@ -257,3 +258,26 @@ func RowToDatums(row chunk.Row, fields []*ast.ResultField) []types.Datum {
}
return datums
}

// ExtractTopN extracts the topn from the CM Sketch.
func (c *SampleCollector) ExtractTopN(numTop uint32) {
if numTop == 0 {
return
}
values := make([][]byte, 0, len(c.Samples))
for _, sample := range c.Samples {
values = append(values, sample.Value.GetBytes())
}
helper := newTopNHelper(values, numTop)
cms := c.CMSketch
cms.topN = make(map[uint64][]*TopNMeta)
// Process them decreasingly so we can handle most frequent values first and reduce the probability of hash collision
// by small values.
for i := uint32(0); i < helper.actualNumTop; i++ {
data := helper.sorted[i].data
h1, h2 := murmur3.Sum128(data)
realCnt := cms.queryHashValue(h1, h2)
cms.subValue(h1, h2, realCnt)
cms.topN[h1] = append(cms.topN[h1], &TopNMeta{h2, data, realCnt})
}
}