Skip to content

Commit

Permalink
statistics: update RecordHistoricalStatsMeta to handle multiple table…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Jan 22, 2025
1 parent ba79f50 commit a1beeb1
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 88 deletions.
36 changes: 36 additions & 0 deletions pkg/executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,39 @@ PARTITION p0 VALUES LESS THAN (6)
require.NotNil(t, jt)
require.False(t, jt.IsHistoricalStats)
}

func TestDumpHistoricalStatsMetaForMultiTables(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(a int, b varchar(10), index idx(a, b))")
tk.MustExec("create table t2(a int, b varchar(10), index idx(a, b))")
// Insert some data.
tk.MustExec("insert into t1 values (1, 'a'), (2, 'b'), (3, 'c')")
tk.MustExec("insert into t2 values (1, 'a'), (2, 'b'), (3, 'c')")
// Analyze the tables.
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
h := dom.StatsHandle()
// Update the stats cache.
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))

// Insert more data.
tk.MustExec("insert into t1 values (4, 'd'), (5, 'e'), (6, 'f')")
tk.MustExec("insert into t2 values (4, 'd'), (5, 'e'), (6, 'f')")
// Dump stats delta to kv.
require.NoError(t, h.DumpStatsDeltaToKV(true))

// Check historical stats meta.
tbl1, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t1"))
require.NoError(t, err)
tbl2, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t2"))
require.NoError(t, err)
rows := tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl1.Meta().ID).Rows()
version1 := rows[0][0].(string)
rows = tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl2.Meta().ID).Rows()
version2 := rows[0][0].(string)
require.Equal(t, version1, version2)
}
2 changes: 1 addition & 1 deletion pkg/statistics/handle/ddl/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ func (h subscriber) recordHistoricalStatsMeta(
return history.RecordHistoricalStatsMeta(
ctx,
sctx,
id,
startTS,
util.StatsMetaHistorySourceSchemaChange,
id,
)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/history/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ go_library(
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/statistics/handle/cache",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/statistics/util",
"//pkg/util/logutil",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],
Expand Down
155 changes: 110 additions & 45 deletions pkg/statistics/handle/history/history_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ package history

import (
"context"
"fmt"
"slices"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
statsutil "github.com/pingcap/tidb/pkg/statistics/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/intest"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -56,7 +61,7 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI
return 0, errors.Trace(err)
}
if js == nil {
logutil.BgLogger().Warn("no stats data to record", zap.String("dbName", dbName), zap.String("tableName", tableInfo.Name.O))
statslogutil.StatsLogger().Warn("no stats data to record", zap.String("dbName", dbName), zap.String("tableName", tableInfo.Name.O))
return 0, nil
}
var version uint64
Expand All @@ -67,31 +72,41 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI
return version, err
}

// RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history table.
func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) {
// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with a single transaction.
func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64) {
if version == 0 {
return
}
if !enforce {
tbl, ok := sh.statsHandle.Get(tableID)
if !ok {
return
}
if !tbl.IsInitialized() {
return

var targetedTableIDs []int64
if enforce {
targetedTableIDs = tableIDs
} else {
targetedTableIDs = make([]int64, 0, len(tableIDs))
for _, tableID := range tableIDs {
tbl, ok := sh.statsHandle.Get(tableID)
if tableID == 0 || !ok || !tbl.IsInitialized() {
continue
}
targetedTableIDs = append(targetedTableIDs, tableID)
}
}
// Sort the tableIDs to avoid deadlocks.
slices.Sort(targetedTableIDs)

err := handleutil.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error {
if !sctx.GetSessionVars().EnableHistoricalStats {
return nil
}
return RecordHistoricalStatsMeta(handleutil.StatsCtx, sctx, tableID, version, source)
return RecordHistoricalStatsMeta(handleutil.StatsCtx, sctx, version, source, targetedTableIDs...)
}, handleutil.FlagWrapTxn)
if err != nil { // just log the error, hide the error from the outside caller.
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),

if err != nil {
statslogutil.StatsLogger().Error("record historical stats meta failed",
zap.Uint64("version", version),
zap.String("source", source),
zap.Int64s("tableIDs", tableIDs),
zap.Int64s("targetedTableIDs", targetedTableIDs),
zap.Error(err))
}
}
Expand All @@ -105,46 +120,96 @@ func (sh *statsHistoryImpl) CheckHistoricalStatsEnable() (enable bool, err error
return
}

// RecordHistoricalStatsMeta records the historical stats meta.
// RecordHistoricalStatsMeta records the historical stats meta for multiple tables.
func RecordHistoricalStatsMeta(
ctx context.Context,
sctx sessionctx.Context,
tableID int64,
version uint64,
source string,
tableIDs ...int64,
) error {
if tableID == 0 || version == 0 {
return errors.Errorf("tableID %d, version %d are invalid", tableID, version)
}
rows, _, err := handleutil.ExecRowsWithCtx(
ctx,
sctx,
"select modify_count, count from mysql.stats_meta where table_id = %? and version = %?",
tableID,
version,
)
if err != nil {
intest.Assert(version != 0, "version should not be zero")
intest.AssertFunc(func() bool {
for _, id := range tableIDs {
if id == 0 {
return false
}
}
return true
}, "tableIDs should not contain 0")
intest.AssertFunc(func() bool {
return slices.IsSorted(tableIDs)
}, "tableIDs should be sorted")
if len(tableIDs) == 0 {
return nil
}

// Enable prepared statement cache to avoid repeated compilation of the same statement.
if _, err := handleutil.ExecWithCtx(ctx, sctx, "SET tidb_enable_prepared_plan_cache = ON"); err != nil {
return errors.Trace(err)
}
defer func() {
_, err := handleutil.ExecWithCtx(ctx, sctx, "SET tidb_enable_prepared_plan_cache = OFF")
if err != nil {
statslogutil.StatsLogger().Error("failed to disable prepared statement cache", zap.Error(errors.Trace(err)))
}
}()
prepareSelectForUpdate := `
PREPARE select_stmt FROM 'SELECT * FROM mysql.stats_meta WHERE table_id = ? AND version = ? FOR UPDATE'
`
if _, err := handleutil.ExecWithCtx(ctx, sctx, prepareSelectForUpdate); err != nil {
return errors.Trace(err)
}
if len(rows) == 0 {
return errors.New("no historical meta stats can be recorded")
}
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)

const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())"
if _, err := handleutil.ExecWithCtx(
ctx,
sctx,
sql,
tableID,
modifyCount,
count,
version,
source,
); err != nil {
defer func() {
_, err := handleutil.ExecWithCtx(ctx, sctx, "DEALLOCATE PREPARE select_stmt")
if err != nil {
statslogutil.StatsLogger().Error("failed to deallocate prepared statement", zap.Error(errors.Trace(err)))
}
}()

// Lock the rows one by one to avoid deadlocks.
for _, tableID := range tableIDs {
_, err := handleutil.ExecWithCtx(ctx, sctx, fmt.Sprintf("SET @table_id = %d, @version = %d", tableID, version))
if err != nil {
return errors.Trace(err)
}
rows, _, err := handleutil.ExecRowsWithCtx(ctx, sctx, "EXECUTE select_stmt USING @table_id, @version")
if err != nil {
return errors.Trace(err)
}
intest.Assert(len(rows) != 0, "no historical meta stats can be recorded")
if len(rows) == 0 {
statslogutil.StatsLogger().Warn("no historical meta stats can be recorded",
zap.Int64("tableID", tableID),
zap.Uint64("version", version),
)
}
}

// Convert tableIDs to string for SQL IN clause
tableIDStrs := make([]string, 0, len(tableIDs))
for _, id := range tableIDs {
tableIDStrs = append(tableIDStrs, strconv.FormatInt(id, 10))
}
tableIDsStr := strings.Join(tableIDStrs, ",")

// Single query that combines SELECT and INSERT
sql := fmt.Sprintf(`REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time)
SELECT table_id, modify_count, count, %d, '%s', NOW()
FROM mysql.stats_meta
WHERE table_id IN (%s) AND version = %d`,
version, source, tableIDsStr, version)

_, err := handleutil.ExecWithCtx(ctx, sctx, sql)
if err != nil {
return errors.Trace(err)
}
cache.TableRowStatsCache.Invalidate(tableID)

// Invalidate cache for all tables
for _, tableID := range tableIDs {
cache.TableRowStatsCache.Invalidate(tableID)
}

return nil
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/statistics/handle/storage/stats_read_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *statsReadWriter) InsertColStats2KV(physicalID int64, colInfos []*model.
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID)
}
}()

Expand All @@ -77,7 +77,7 @@ func (s *statsReadWriter) InsertTableStats2KV(info *model.TableInfo, physicalID
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID)
}
}()

Expand All @@ -104,7 +104,7 @@ func (s *statsReadWriter) UpdateStatsMetaVersionForGC(physicalID int64) (err err
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID)
}
}()

Expand Down Expand Up @@ -135,7 +135,7 @@ func (s *statsReadWriter) SaveTableStatsToStorage(results *statistics.AnalyzeRes
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
tableID := results.TableID.GetStatisticsID()
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, true)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, true, tableID)
}
return err
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (s *statsReadWriter) SaveStatsToStorage(
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, false, tableID)
}
return
}
Expand All @@ -198,7 +198,7 @@ func (s *statsReadWriter) SaveMetaToStorage(tableID, count, modifyCount int64, s
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, false, tableID)
}
return
}
Expand All @@ -211,7 +211,7 @@ func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64,
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID)
}
return
}
Expand All @@ -224,7 +224,7 @@ func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID)
}
return
}
Expand All @@ -237,7 +237,7 @@ func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *st
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID)
}
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ type IndexUsage interface {

// StatsHistory is used to manage historical stats.
type StatsHistory interface {
// RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history.
RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool)
// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with a single transaction.
RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64)

// CheckHistoricalStatsEnable check whether historical stats is enabled.
CheckHistoricalStatsEnable() (enable bool, err error)
Expand Down
Loading

0 comments on commit a1beeb1

Please sign in to comment.