Skip to content

Commit

Permalink
Merge branch 'master' into more-robust
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Aug 25, 2022
2 parents d53eb52 + 7d9c684 commit 9f00a50
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 39 deletions.
9 changes: 5 additions & 4 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,18 +998,19 @@ func TestTiFlashBatchUnsupported(t *testing.T) {

func TestTiFlashGroupIndexWhenStartup(t *testing.T) {
s, teardown := createTiFlashContext(t)
tiflash := s.tiflash
defer teardown()
_ = testkit.NewTestKit(t, s.store)
timeout := time.Now().Add(10 * time.Second)
errMsg := "time out"
for time.Now().Before(timeout) {
time.Sleep(100 * time.Millisecond)
if s.tiflash.GroupIndex != 0 {
if tiflash.GetRuleGroupIndex() != 0 {
errMsg = "invalid group index"
break
}
}
require.Equal(t, placement.RuleIndexTiFlash, s.tiflash.GroupIndex, errMsg)
require.Greater(t, s.tiflash.GroupIndex, placement.RuleIndexTable)
require.Greater(t, s.tiflash.GroupIndex, placement.RuleIndexPartition)
require.Equal(t, placement.RuleIndexTiFlash, tiflash.GetRuleGroupIndex(), errMsg)
require.Greater(t, tiflash.GetRuleGroupIndex(), placement.RuleIndexTable)
require.Greater(t, tiflash.GetRuleGroupIndex(), placement.RuleIndexPartition)
}
20 changes: 17 additions & 3 deletions domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (m *mockTiFlashTableInfo) String() string {
// MockTiFlash mocks a TiFlash, with necessary Pd support.
type MockTiFlash struct {
sync.Mutex
GroupIndex int
groupIndex int
StatusAddr string
StatusServer *httptest.Server
SyncStatus map[int]mockTiFlashTableInfo
Expand Down Expand Up @@ -373,7 +373,7 @@ func NewMockTiFlash() *MockTiFlash {
func (tiflash *MockTiFlash) HandleSetPlacementRule(rule placement.TiFlashRule) error {
tiflash.Lock()
defer tiflash.Unlock()
tiflash.GroupIndex = placement.RuleIndexTiFlash
tiflash.groupIndex = placement.RuleIndexTiFlash
if !tiflash.PdEnabled {
logutil.BgLogger().Info("pd server is manually disabled, just quit")
return nil
Expand Down Expand Up @@ -492,6 +492,20 @@ func (tiflash *MockTiFlash) HandleGetStoresStat() *helper.StoresStat {
}
}

// SetRuleGroupIndex sets the group index of tiflash
func (tiflash *MockTiFlash) SetRuleGroupIndex(groupIndex int) {
tiflash.Lock()
defer tiflash.Unlock()
tiflash.groupIndex = groupIndex
}

// GetRuleGroupIndex gets the group index of tiflash
func (tiflash *MockTiFlash) GetRuleGroupIndex() int {
tiflash.Lock()
defer tiflash.Unlock()
return tiflash.groupIndex
}

// Compare supposed rule, and we actually get from TableInfo
func isRuleMatch(rule placement.TiFlashRule, startKey string, endKey string, count int, labels []string) bool {
// Compute startKey
Expand Down Expand Up @@ -598,7 +612,7 @@ func (m *mockTiFlashPlacementManager) SetTiFlashGroupConfig(_ context.Context) e
if m.tiflash == nil {
return nil
}
m.tiflash.GroupIndex = placement.RuleIndexTiFlash
m.tiflash.SetRuleGroupIndex(placement.RuleIndexTiFlash)
return nil
}

Expand Down
39 changes: 39 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
ExecRetryCount: a.retryCount,
IsExplicitTxn: sessVars.TxnCtx.IsExplicit,
IsWriteCacheTable: stmtCtx.WaitLockLeaseTime > 0,
StatsLoadStatus: convertStatusIntoString(a.Ctx, stmtCtx.StatsLoadStatus),
IsSyncStatsFailed: stmtCtx.IsSyncStatsFailed,
}
failpoint.Inject("assertSyncStatsFailed", func(val failpoint.Value) {
Expand Down Expand Up @@ -1667,3 +1668,41 @@ func (a *ExecStmt) getSQLPlanDigest() ([]byte, []byte) {
}
return sqlDigest, planDigest
}

func convertStatusIntoString(sctx sessionctx.Context, statsLoadStatus map[model.TableItemID]string) map[string]map[string]string {
if len(statsLoadStatus) < 1 {
return nil
}
is := domain.GetDomain(sctx).InfoSchema()
// tableName -> name -> status
r := make(map[string]map[string]string)
for item, status := range statsLoadStatus {
t, ok := is.TableByID(item.TableID)
if !ok {
t, _, _ = is.FindTableByPartitionID(item.TableID)
}
if t == nil {
logutil.BgLogger().Warn("record table item load status failed due to not finding table",
zap.Int64("tableID", item.TableID))
continue
}
tableName := t.Meta().Name.O
itemName := ""
if item.IsIndex {
itemName = t.Meta().FindIndexNameByID(item.ID)
} else {
itemName = t.Meta().FindColumnNameByID(item.ID)
}
if itemName == "" {
logutil.BgLogger().Warn("record table item load status failed due to not finding item",
zap.Int64("tableID", item.TableID),
zap.Int64("id", item.ID), zap.Bool("isIndex", item.IsIndex))
continue
}
if r[tableName] == nil {
r[tableName] = make(map[string]string)
}
r[tableName][itemName] = status
}
return r
}
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.EnableOptimizeTrace = false
sc.OptimizeTracer = nil
sc.OptimizerCETrace = nil
sc.StatsLoadStatus = make(map[model.TableItemID]string)
sc.IsSyncStatsFailed = false

sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow
Expand Down
1 change: 1 addition & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {

// Close implements the Executor Close interface.
func (e *InsertExec) Close() error {
defer e.memTracker.ReplaceBytesUsed(0)
e.ctx.GetSessionVars().CurrInsertValues = chunk.Row{}
e.ctx.GetSessionVars().CurrInsertBatchExtraCols = e.ctx.GetSessionVars().CurrInsertBatchExtraCols[0:0:0]
e.setMessage()
Expand Down
10 changes: 5 additions & 5 deletions executor/memtest/mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@ package memtest
import (
"testing"

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestGlobalMemoryTrackerOnCleanUp(t *testing.T) {
originConsume := executor.GlobalMemoryUsageTracker.BytesConsumed()
func TestInsertUpdateTrackerOnCleanUp(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

originConsume := tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
// assert insert
tk.MustExec("insert t (id) values (1)")
tk.MustExec("insert t (id) values (2)")
tk.MustExec("insert t (id) values (3)")
afterConsume := executor.GlobalMemoryUsageTracker.BytesConsumed()
afterConsume := tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
require.Equal(t, afterConsume, originConsume)

originConsume = tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
// assert update
tk.MustExec("update t set id = 4 where id = 1")
tk.MustExec("update t set id = 5 where id = 2")
tk.MustExec("update t set id = 6 where id = 3")
afterConsume = executor.GlobalMemoryUsageTracker.BytesConsumed()
afterConsume = tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
require.Equal(t, afterConsume, originConsume)
}
2 changes: 1 addition & 1 deletion executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,14 @@ func (e *UpdateExec) composeGeneratedColumns(rowIdx int, newRowData []types.Datu

// Close implements the Executor Close interface.
func (e *UpdateExec) Close() error {
defer e.memTracker.ReplaceBytesUsed(0)
e.setMessage()
if e.runtimeStats != nil && e.stats != nil {
txn, err := e.ctx.Txn(false)
if err == nil && txn.Valid() && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, nil)
}
}
defer e.memTracker.ReplaceBytesUsed(0)
return e.children[0].Close()
}

Expand Down
24 changes: 24 additions & 0 deletions planner/core/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -293,3 +294,26 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
t.Error("unexpected plan:", pp)
}
}

func TestPlanStatsStatusRecord(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.EnableStatsCacheMemQuota = true
})
store, _ := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`create table t (b int,key b(b))`)
tk.MustExec("insert into t (b) values (1)")
tk.MustExec("analyze table t")
tk.MustQuery("select * from t where b >= 1")
require.Len(t, tk.Session().GetSessionVars().StmtCtx.StatsLoadStatus, 0)
// drop stats in order to change status
domain.GetDomain(tk.Session()).StatsHandle().SetStatsCacheCapacity(1)
tk.MustQuery("select * from t where b >= 1")
require.Len(t, tk.Session().GetSessionVars().StmtCtx.StatsLoadStatus, 2)
for _, status := range tk.Session().GetSessionVars().StmtCtx.StatsLoadStatus {
require.Equal(t, status, "allEvicted")
}
}
2 changes: 2 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ type StatementContext struct {
// IsSQLAndPlanRegistered uses to indicate whether the SQL and plan has been registered for TopSQL.
IsSQLAndPlanRegistered atomic2.Bool

// StatsLoadStatus records StatsLoadedStatus for the index/column which is used in query
StatsLoadStatus map[model.TableItemID]string
// IsSyncStatsFailed indicates whether any failure happened during sync stats
IsSyncStatsFailed bool
}
Expand Down
23 changes: 22 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2570,6 +2570,8 @@ type SlowQueryLogItems struct {
ResultRows int64
IsExplicitTxn bool
IsWriteCacheTable bool
// table -> name -> status
StatsLoadStatus map[string]map[string]string
IsSyncStatsFailed bool
}

Expand Down Expand Up @@ -2665,6 +2667,9 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
buf.WriteString(k + ":" + vStr)
firstComma = true
}
if v != 0 && len(logItems.StatsLoadStatus[k]) > 0 {
writeStatsLoadStatusItems(&buf, logItems.StatsLoadStatus[k])
}
}
buf.WriteString("\n")
}
Expand Down Expand Up @@ -2748,7 +2753,6 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
if len(logItems.BinaryPlan) != 0 {
writeSlowLogItem(&buf, SlowLogBinaryPlan, logItems.BinaryPlan)
}

if logItems.PrevStmt != "" {
writeSlowLogItem(&buf, SlowLogPrevStmt, logItems.PrevStmt)
}
Expand All @@ -2762,9 +2766,26 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
if len(logItems.SQL) == 0 || logItems.SQL[len(logItems.SQL)-1] != ';' {
buf.WriteString(";")
}

return buf.String()
}

func writeStatsLoadStatusItems(buf *bytes.Buffer, loadStatus map[string]string) {
if len(loadStatus) > 0 {
buf.WriteString("[")
firstComma := false
for name, status := range loadStatus {
if firstComma {
buf.WriteString("," + name + ":" + status)
} else {
buf.WriteString(name + ":" + status)
firstComma = true
}
}
buf.WriteString("]")
}
}

// writeSlowLogItem writes a slow log item in the form of: "# ${key}:${value}"
func writeSlowLogItem(buf *bytes.Buffer, key, value string) {
buf.WriteString(SlowLogRowPrefixStr + key + SlowLogSpaceMarkStr + value + "\n")
Expand Down
10 changes: 8 additions & 2 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ func TestSlowLogFormat(t *testing.T) {
},
}
statsInfos := make(map[string]uint64)
statsInfos["t1"] = 0
statsInfos["t1"] = 123
loadStatus := make(map[string]map[string]string)
loadStatus["t1"] = map[string]string{
"col1": "unInitialized",
}

copTasks := &stmtctx.CopTasksDetails{
NumCopTasks: 10,
AvgProcessTime: time.Second,
Expand Down Expand Up @@ -212,7 +217,7 @@ func TestSlowLogFormat(t *testing.T) {
# Index_names: [t1:a,t2:b]
# Is_internal: true
# Digest: 01d00e6e93b28184beae487ac05841145d2a2f6a7b16de32a763bed27967e83d
# Stats: t1:pseudo
# Stats: t1:123[col1:unInitialized]
# Num_cop_tasks: 10
# Cop_proc_avg: 1 Cop_proc_p90: 2 Cop_proc_max: 3 Cop_proc_addr: 10.6.131.78
# Cop_wait_avg: 0.01 Cop_wait_p90: 0.02 Cop_wait_max: 0.03 Cop_wait_addr: 10.6.131.79
Expand Down Expand Up @@ -270,6 +275,7 @@ func TestSlowLogFormat(t *testing.T) {
ExecRetryTime: 5*time.Second + time.Millisecond*100,
IsExplicitTxn: true,
IsWriteCacheTable: true,
StatsLoadStatus: loadStatus,
}
logString := seVar.SlowLogFormat(logItems)
require.Equal(t, resultFields+"\n"+sql, logString)
Expand Down
5 changes: 2 additions & 3 deletions statistics/selectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,11 @@ func (coll *HistColl) Selectivity(ctx sessionctx.Context, exprs []expression.Exp
continue
}

if colHist := coll.Columns[c.UniqueID]; colHist == nil || colHist.IsInvalid(ctx, coll.Pseudo) {
colHist := coll.Columns[c.UniqueID]
if colHist == nil || colHist.IsInvalid(ctx, coll.Pseudo) {
ret *= 1.0 / pseudoEqualRate
continue
}

colHist := coll.Columns[c.UniqueID]
if colHist.Histogram.NDV > 0 {
ret *= 1 / float64(colHist.Histogram.NDV)
} else {
Expand Down
29 changes: 26 additions & 3 deletions statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,12 @@ func (t *Table) ColumnEqualRowCount(sctx sessionctx.Context, value types.Datum,
}

// GetRowCountByIntColumnRanges estimates the row count by a slice of IntColumnRange.
func (coll *HistColl) GetRowCountByIntColumnRanges(sctx sessionctx.Context, colID int64, intRanges []*ranger.Range) (float64, error) {
func (coll *HistColl) GetRowCountByIntColumnRanges(sctx sessionctx.Context, colID int64, intRanges []*ranger.Range) (result float64, err error) {
sc := sctx.GetSessionVars().StmtCtx
var result float64
c, ok := coll.Columns[colID]
if c != nil {
recordUsedItemStatsStatus(sctx, c.StatsLoadedStatus, coll.PhysicalID, colID, false)
}
if !ok || c.IsInvalid(sctx, coll.Pseudo) {
if len(intRanges) == 0 {
return 0, nil
Expand All @@ -549,7 +551,7 @@ func (coll *HistColl) GetRowCountByIntColumnRanges(sctx sessionctx.Context, colI
}
return result, nil
}
result, err := c.GetColumnRowCount(sctx, intRanges, coll.Count, true)
result, err = c.GetColumnRowCount(sctx, intRanges, coll.Count, true)
if sc.EnableOptimizerCETrace {
CETraceRange(sctx, coll.PhysicalID, []string{c.Info.Name.O}, intRanges, "Column Stats", uint64(result))
}
Expand All @@ -560,6 +562,9 @@ func (coll *HistColl) GetRowCountByIntColumnRanges(sctx sessionctx.Context, colI
func (coll *HistColl) GetRowCountByColumnRanges(sctx sessionctx.Context, colID int64, colRanges []*ranger.Range) (float64, error) {
sc := sctx.GetSessionVars().StmtCtx
c, ok := coll.Columns[colID]
if c != nil {
recordUsedItemStatsStatus(sctx, c.StatsLoadedStatus, coll.PhysicalID, colID, false)
}
if !ok || c.IsInvalid(sctx, coll.Pseudo) {
result, err := GetPseudoRowCountByColumnRanges(sc, float64(coll.Count), colRanges, 0)
if err == nil && sc.EnableOptimizerCETrace && ok {
Expand All @@ -584,6 +589,9 @@ func (coll *HistColl) GetRowCountByIndexRanges(sctx sessionctx.Context, idxID in
colNames = append(colNames, col.Name.O)
}
}
if idx != nil {
recordUsedItemStatsStatus(sctx, idx.StatsLoadedStatus, coll.PhysicalID, idxID, true)
}
if !ok || idx.IsInvalid(coll.Pseudo) {
colsLen := -1
if idx != nil && idx.Info.Unique {
Expand Down Expand Up @@ -1378,3 +1386,18 @@ func CheckAnalyzeVerOnTable(tbl *Table, version *int) bool {
// This table has no statistics yet. We can directly return true.
return true
}

// recordUsedItemStatsStatus only records un-FullLoad item load status during user query
func recordUsedItemStatsStatus(sctx sessionctx.Context, loadStatus StatsLoadedStatus,
tableID, id int64, isIndex bool) {
if loadStatus.IsFullLoad() {
return
}
stmtCtx := sctx.GetSessionVars().StmtCtx
item := model.TableItemID{TableID: tableID, ID: id, IsIndex: isIndex}
// For some testcases, it skips ResetContextOfStmt to init StatsLoadStatus
if stmtCtx.StatsLoadStatus == nil {
stmtCtx.StatsLoadStatus = make(map[model.TableItemID]string)
}
stmtCtx.StatsLoadStatus[item] = loadStatus.StatusToString()
}
Loading

0 comments on commit 9f00a50

Please sign in to comment.