Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: correct behavior of non-lite InitStats and stats sync load of no stats column #57803

Merged
merged 20 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 56 additions & 14 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,20 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
Flag: row.GetInt64(10),
StatsVer: statsVer,
}
// primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
lastAnalyzePos.Copy(&col.LastAnalyzePos)
table.SetCol(hist.ID, col)
table.ColAndIdxExistenceMap.InsertCol(colInfo.ID, statsVer != statistics.Version0 || ndv > 0 || nullCount > 0)
if statsVer != statistics.Version0 {
// The LastAnalyzeVersion is added by ALTER table so its value might be 0.
table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, version)
// We will also set int primary key's loaded status to evicted.
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
} else if col.NDV > 0 || col.NullCount > 0 {
// If NDV > 0 or NullCount > 0, we also treat it as the one having its statistics. See the comments of StatsAvailable in column.go.
// So we align its status as evicted too.
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
}
// Otherwise the column's stats is not initialized.
}
}
if table != nil {
Expand All @@ -261,8 +266,19 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
}
}

// initStatsHistogramsSQLGen generates the SQL to load all stats_histograms records.
// We need to read all the records since we need to do initialization of table.ColAndIdxExistenceMap.
func initStatsHistogramsSQLGen(isPaging bool) string {
selectPrefix := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl) */ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
orderSuffix := " order by table_id"
if !isPaging {
return selectPrefix + orderSuffix
}
return selectPrefix + " where table_id >= %? and table_id < %?" + orderSuffix
}

func (h *Handle) initStatsHistogramsLite(ctx context.Context, cache statstypes.StatsCache) error {
sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms order by table_id"
sql := initStatsHistogramsSQLGen(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
Expand All @@ -285,7 +301,7 @@ func (h *Handle) initStatsHistogramsLite(ctx context.Context, cache statstypes.S
}

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
sql := "select /*+ ORDER_INDEX(mysql.stats_histograms,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms order by table_id"
sql := initStatsHistogramsSQLGen(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -319,10 +335,7 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta
}()

sctx := se.(sessionctx.Context)
// Why do we need to add `is_index=1` in the SQL?
// because it is aligned to the `initStatsTopN` function, which only loads the topn of the index too.
// the other will be loaded by sync load.
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %? and is_index=1"
sql := initStatsHistogramsSQLGen(true)
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -401,8 +414,20 @@ func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iter
}
}

// initStatsTopNSQLGen generates the SQL to load all stats_top_n records.
// We only need to load the indexes' since we only record the existence of columns in ColAndIdxExistenceMap.
// The stats of the column is not loaded during the bootstrap process.
func initStatsTopNSQLGen(isPaging bool) string {
selectPrefix := "select /*+ ORDER_INDEX(mysql.stats_top_n,tbl) */ HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
orderSuffix := " order by table_id"
if !isPaging {
return selectPrefix + orderSuffix
}
return selectPrefix + " and table_id >= %? and table_id < %?" + orderSuffix
}

func (h *Handle) initStatsTopN(cache statstypes.StatsCache, totalMemory uint64) error {
sql := "select /*+ ORDER_INDEX(mysql.stats_top_n,tbl)*/ HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 order by table_id"
sql := initStatsTopNSQLGen(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -435,7 +460,7 @@ func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initsta
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %? order by table_id"
sql := initStatsTopNSQLGen(true)
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -619,6 +644,18 @@ func (*Handle) initStatsBuckets4Chunk(cache statstypes.StatsCache, iter *chunk.I
}
}

// initStatsBucketsSQLGen generates the SQL to load all stats_top_n records.
// We only need to load the indexes' since we only record the existence of columns in ColAndIdxExistenceMap.
// The stats of the column is not loaded during the bootstrap process.
func initStatsBucketsSQLGen(isPaging bool) string {
selectPrefix := "select /*+ ORDER_INDEX(mysql.stats_buckets,tbl) */ HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where is_index=1"
orderSuffix := " order by table_id"
if !isPaging {
return selectPrefix + orderSuffix
}
return selectPrefix + " and table_id >= %? and table_id < %?" + orderSuffix
}

func (h *Handle) initStatsBuckets(cache statstypes.StatsCache, totalMemory uint64) error {
if IsFullCacheFunc(cache, totalMemory) {
return nil
Expand All @@ -629,7 +666,7 @@ func (h *Handle) initStatsBuckets(cache statstypes.StatsCache, totalMemory uint6
return errors.Trace(err)
}
} else {
sql := "select /*+ ORDER_INDEX(mysql.stats_buckets,tbl)*/ HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
sql := initStatsBucketsSQLGen(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -668,7 +705,7 @@ func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task init
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %? order by table_id, is_index, hist_id, bucket_id"
sql := initStatsBucketsSQLGen(true)
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -719,8 +756,10 @@ func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache, totalM

// InitStatsLite initiates the stats cache. The function is liter and faster than InitStats.
// 1. Basic stats meta data is loaded.(count, modify count, etc.)
// 2. Column/index stats are loaded. (only histogram)
// 2. Column/index stats are marked as existing or not by initializing the table.ColAndIdxExistenceMap, based on data from mysql.stats_histograms)
// 3. TopN, Bucket, FMSketch are not loaded.
// And to work with auto analyze's needs, we need to read all the tables' stats meta into memory.
// The sync/async load of the stats or other process haven't done a full initialization of the table.ColAndIdxExistenceMap. So we need to it here.
func (h *Handle) InitStatsLite(ctx context.Context) (err error) {
defer func() {
_, err1 := util.Exec(h.initStatsCtx, "commit")
Expand Down Expand Up @@ -750,7 +789,10 @@ func (h *Handle) InitStatsLite(ctx context.Context) (err error) {

// InitStats initiates the stats cache.
// 1. Basic stats meta data is loaded.(count, modify count, etc.)
// 2. Column/index stats are loaded. (histogram, topn, buckets, FMSketch)
// 2. Index stats are fully loaded. (histogram, topn, buckets)
// 2. Column stats are marked as existing or not by initializing the table.ColAndIdxExistenceMap, based on data from mysql.stats_histograms)
// To work with auto-analyze's needs, we need to read all stats meta info into memory.
// The sync/async load of the stats or other process haven't done a full initialization of the table.ColAndIdxExistenceMap. So we need to it here.
func (h *Handle) InitStats(ctx context.Context, is infoschema.InfoSchema) (err error) {
totalMemory, err := memory.MemTotal()
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func testConcurrentlyInitStats(t *testing.T) {
tk.MustQuery(fmt.Sprintf("explain select * from t%v where b = 1", i)).CheckNotContain("pseudo")
}
for i := 1; i < 10; i++ {
tk.MustQuery(fmt.Sprintf("explain select * from t%v where c = 1", i)).CheckNotContain("pseudo")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pk = 1 will result in PointGet. Will not trigger the stats load.

tk.MustQuery(fmt.Sprintf("explain select * from t%v where c >= 1", i)).CheckNotContain("pseudo")
}
for i := 1; i < 10; i++ {
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr(fmt.Sprintf("t%v", i)))
Expand Down
35 changes: 20 additions & 15 deletions pkg/statistics/handle/handletest/statstest/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ func TestInitStats(t *testing.T) {
require.NoError(t, h.Update(context.Background(), is))
// Index and pk are loaded.
needed := fmt.Sprintf(`Table:%v RealtimeCount:6
column:1 ndv:6 totColSize:0
column:2 ndv:6 totColSize:6
column:3 ndv:6 totColSize:6
index:1 ndv:6
num: 1 lower_bound: 1 upper_bound: 1 repeats: 1 ndv: 0
num: 1 lower_bound: 2 upper_bound: 2 repeats: 1 ndv: 0
Expand Down Expand Up @@ -363,7 +366,7 @@ func TestInitStatsVer2(t *testing.T) {
}()
config.GetGlobalConfig().Performance.LiteInitStats = false
config.GetGlobalConfig().Performance.ConcurrentlyInitStats = false
initStatsVer2(t, false)
initStatsVer2(t)
}

func TestInitStatsVer2Concurrency(t *testing.T) {
Expand All @@ -375,18 +378,21 @@ func TestInitStatsVer2Concurrency(t *testing.T) {
}()
config.GetGlobalConfig().Performance.LiteInitStats = false
config.GetGlobalConfig().Performance.ConcurrentlyInitStats = true
initStatsVer2(t, true)
initStatsVer2(t)
}

func initStatsVer2(t *testing.T, isConcurrency bool) {
func initStatsVer2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@session.tidb_analyze_version=2")
tk.MustExec("create table t(a int, b int, c int, index idx(a), index idxab(a, b))")
tk.MustExec("create table t(a int, b int, c int, d int, index idx(a), index idxab(a, b))")
dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh())
analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "c")
tk.MustExec("insert into t values(1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (4, 4, 4), (4, 4, 4)")
tk.MustExec("insert into t values(1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3), (4, 4, 4, 4), (4, 4, 4, 4), (4, 4, 4, 4)")
tk.MustExec("analyze table t with 2 topn, 3 buckets")
tk.MustExec("alter table t add column e int default 1")
dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh())
h := dom.StatsHandle()
is := dom.InfoSchema()
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
Expand All @@ -398,16 +404,15 @@ func initStatsVer2(t *testing.T, isConcurrency bool) {
h.Clear()
require.NoError(t, h.InitStats(context.Background(), is))
table0 := h.GetTableStats(tbl.Meta())
if isConcurrency {
require.Equal(t, uint8(0x3), table0.GetIdx(1).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x3), table0.GetIdx(2).LastAnalyzePos.GetBytes()[0])
} else {
require.Equal(t, uint8(0x33), table0.GetCol(1).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x33), table0.GetCol(2).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x33), table0.GetCol(3).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x3), table0.GetIdx(1).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x3), table0.GetIdx(2).LastAnalyzePos.GetBytes()[0])
}
Comment on lines -401 to -410
Copy link
Member Author

@winoros winoros Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, we should not distinguish between the single thread init and the concurrent init.
This pr makes them consistent.

require.Equal(t, 5, table0.ColNum())
require.True(t, table0.GetCol(1).IsAllEvicted())
require.True(t, table0.GetCol(2).IsAllEvicted())
require.True(t, table0.GetCol(3).IsAllEvicted())
require.True(t, !table0.GetCol(4).IsStatsInitialized())
require.True(t, table0.GetCol(5).IsStatsInitialized())
Comment on lines +408 to +412
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

column 1/2/3 is analyzed. col4 is not analyzed and has no record in storage.
col5 is created by add column default value so it has stats though it's not analyzed.

require.Equal(t, 2, table0.IdxNum())
require.Equal(t, uint8(0x3), table0.GetIdx(1).LastAnalyzePos.GetBytes()[0])
require.Equal(t, uint8(0x3), table0.GetIdx(2).LastAnalyzePos.GetBytes()[0])
h.Clear()
require.NoError(t, h.InitStats(context.Background(), is))
table1 := h.GetTableStats(tbl.Meta())
Expand Down
3 changes: 1 addition & 2 deletions pkg/statistics/handle/syncload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
"//pkg/statistics",
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/types",
"//pkg/table",
"//pkg/types",
"//pkg/util",
"//pkg/util/intest",
Expand All @@ -36,7 +35,7 @@ go_test(
srcs = ["stats_syncload_test.go"],
flaky = True,
race = "on",
shard_count = 7,
shard_count = 8,
deps = [
":syncload",
"//pkg/config",
Expand Down
25 changes: 13 additions & 12 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
Expand Down Expand Up @@ -314,30 +313,31 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
}

item := task.Item.TableItemID
tbl, ok := s.statsHandle.Get(item.TableID)
statsTbl, ok := s.statsHandle.Get(item.TableID)

if !ok {
return nil
}
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
tblInfo, ok := s.statsHandle.TableInfoByID(is, item.TableID)
tbl, ok := s.statsHandle.TableInfoByID(is, item.TableID)
if !ok {
return nil
}
isPkIsHandle := tblInfo.Meta().PKIsHandle
tblInfo := tbl.Meta()
isPkIsHandle := tblInfo.PKIsHandle
wrapper := &statsWrapper{}
if item.IsIndex {
index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID)
index, loadNeeded := statsTbl.IndexIsLoadNeeded(item.ID)
if !loadNeeded {
return nil
}
if index != nil {
wrapper.idxInfo = index.Info
} else {
wrapper.idxInfo = tblInfo.Meta().FindIndexByID(item.ID)
wrapper.idxInfo = tblInfo.FindIndexByID(item.ID)
}
} else {
col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
col, loadNeeded, analyzed := statsTbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad)
if !loadNeeded {
return nil
}
Expand All @@ -346,7 +346,7 @@ func (s *statsSyncLoad) handleOneItemTask(task *statstypes.NeededItemTask) (err
} else {
// Now, we cannot init the column info in the ColAndIdxExistenceMap when to disable lite-init-stats.
// so we have to get the column info from the domain.
wrapper.colInfo = tblInfo.Meta().GetColumnByID(item.ID)
wrapper.colInfo = tblInfo.GetColumnByID(item.ID)
}
if skipTypes != nil {
_, skip := skipTypes[types.TypeToStr(wrapper.colInfo.FieldType.GetType(), wrapper.colInfo.FieldType.GetCharset())]
Expand Down Expand Up @@ -410,7 +410,8 @@ func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.Ta
}
if hg == nil {
logutil.BgLogger().Warn("fail to get hist meta for this histogram, possibly a deleted one", zap.Int64("table_id", item.TableID),
zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex))
zap.Int64("hist_id", item.ID), zap.Bool("is_index", item.IsIndex),
)
return nil, errGetHistMeta
}
if item.IsIndex {
Expand Down Expand Up @@ -560,7 +561,7 @@ func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, r
}

// updateCachedItem updates the column/index hist to global statsCache.
func (s *statsSyncLoad) updateCachedItem(tblInfo table.Table, item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) {
func (s *statsSyncLoad) updateCachedItem(tblInfo *model.TableInfo, item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index, fullLoaded bool) (updated bool) {
s.StatsLoad.Lock()
defer s.StatsLoad.Unlock()
// Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions
Expand All @@ -572,13 +573,13 @@ func (s *statsSyncLoad) updateCachedItem(tblInfo table.Table, item model.TableIt
if !tbl.ColAndIdxExistenceMap.Checked() {
tbl = tbl.Copy()
for _, col := range tbl.HistColl.GetColSlice() {
if tblInfo.Meta().FindColumnByID(col.ID) == nil {
if tblInfo.FindColumnByID(col.ID) == nil {
tbl.HistColl.DelCol(col.ID)
tbl.ColAndIdxExistenceMap.DeleteColAnalyzed(col.ID)
}
}
for _, idx := range tbl.HistColl.GetIdxSlice() {
if tblInfo.Meta().FindIndexByID(idx.ID) == nil {
if tblInfo.FindIndexByID(idx.ID) == nil {
tbl.HistColl.DelIdx(idx.ID)
tbl.ColAndIdxExistenceMap.DeleteIdxAnalyzed(idx.ID)
}
Expand Down
Loading