From 3587a508ac753e015988234187746de1d3196c86 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Thu, 14 Nov 2024 16:01:22 +0800 Subject: [PATCH 01/20] statistics: add a priority queue API Signed-off-by: Rustin170506 --- .../handler/optimizor/statistics_handler.go | 20 +++++++++++++++++++ pkg/server/http_status.go | 16 +++++++++++++++ .../handle/autoanalyze/autoanalyze.go | 5 +++++ .../calculator_analysis_test.go | 4 ++++ .../dynamic_partitioned_table_analysis_job.go | 14 +++++++++++++ .../autoanalyze/priorityqueue/heap_test.go | 3 +++ .../handle/autoanalyze/priorityqueue/job.go | 3 +++ .../non_partitioned_table_analysis_job.go | 14 +++++++++++++ .../handle/autoanalyze/priorityqueue/queue.go | 11 ++++++++++ .../static_partitioned_table_analysis_job.go | 14 +++++++++++++ .../handle/autoanalyze/refresher/refresher.go | 17 ++++++++++++++++ pkg/statistics/handle/types/interfaces.go | 19 ++++++++++++++++++ 12 files changed, 140 insertions(+) diff --git a/pkg/server/handler/optimizor/statistics_handler.go b/pkg/server/handler/optimizor/statistics_handler.go index 897b91309ea46..d9083a8fd46d2 100644 --- a/pkg/server/handler/optimizor/statistics_handler.go +++ b/pkg/server/handler/optimizor/statistics_handler.go @@ -144,3 +144,23 @@ func getSnapshotTableInfo(dom *domain.Domain, snapshot uint64, dbName, tblName s } return is.TableByName(context.Background(), model.NewCIStr(dbName), model.NewCIStr(tblName)) } + +type StatsPriorityQueueHandler struct { + do *domain.Domain +} + +func NewStatsPriorityQueueHandler(do *domain.Domain) *StatsPriorityQueueHandler { + return &StatsPriorityQueueHandler{do: do} +} + +func (sh StatsPriorityQueueHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/json") + + h := sh.do.StatsHandle() + tables, err := h.GetStatsPriorityQueue() + if err != nil { + handler.WriteError(w, err) + } else { + handler.WriteData(w, tables) + } +} diff --git a/pkg/server/http_status.go b/pkg/server/http_status.go index cfaa51d7cbf5f..ef861efd86498 100644 --- a/pkg/server/http_status.go +++ b/pkg/server/http_status.go @@ -217,6 +217,8 @@ func (s *Server) startHTTPServer() { Name("StatsDump") router.Handle("/stats/dump/{db}/{table}/{snapshot}", s.newStatsHistoryHandler()). Name("StatsHistoryDump") + router.Handle("/stats/priority-queue", s.newStatsPriorityQueueHandler()). + Name("StatsPriorityQueue") router.Handle("/plan_replayer/dump/{filename}", s.newPlanReplayerHandler()).Name("PlanReplayerDump") router.Handle("/extract_task/dump", s.newExtractServeHandler()).Name("ExtractTaskDump") @@ -621,3 +623,17 @@ func (s *Server) newStatsHistoryHandler() *optimizor.StatsHistoryHandler { } return optimizor.NewStatsHistoryHandler(do) } + +func (s *Server) newStatsPriorityQueueHandler() *optimizor.StatsPriorityQueueHandler { + store, ok := s.driver.(*TiDBDriver) + if !ok { + panic("Illegal driver") + } + + do, err := session.GetDomain(store.store) + if err != nil { + panic("Failed to get domain") + } + + return optimizor.NewStatsPriorityQueueHandler(do) +} diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index b25ac73977643..22925c25b5dd1 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -317,6 +317,11 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID return statistics.CheckAnalyzeVerOnTable(tbl, version) } +// GetStatsPriorityQueue returns the stats priority queue. +func (sa *statsAnalyze) GetStatsPriorityQueue() ([]statstypes.AnalysisJobJSON, error) { + return sa.refresher.GetStatsPriorityQueue() +} + func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool { defer func() { if r := recover(); r != nil { diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go index bef3ee8929ef9..7ce301cdb8fca 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go @@ -289,3 +289,7 @@ func (j *TestJob) SetIndicators(indicators priorityqueue.Indicators) { func (j *TestJob) HasNewlyAddedIndex() bool { return false } + +func (j *TestJob) ToJSON() types.AnalysisJobJSON { + panic("unimplemented") +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go index 0fa00e5626060..543f80fb38a02 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go @@ -364,3 +364,17 @@ func getPartitionNames(partitionIndexes map[string][]string) []string { } return names } + +func (j *DynamicPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { + return statstypes.AnalysisJobJSON{ + Type: string(j.getAnalyzeType()), + TableID: j.GlobalTableID, + Weight: j.Weight, + Indicators: statstypes.IndicatorsJSON{ + ChangePercentage: j.ChangePercentage, + TableSize: j.TableSize, + LastAnalysisDuration: j.LastAnalysisDuration, + }, + HasNewlyAddedIndex: j.HasNewlyAddedIndex(), + } +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go index 53ecd941403e0..db95b91b2a04f 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go @@ -67,6 +67,9 @@ func (t testHeapObject) RegisterSuccessHook(hook SuccessJobHook) { func (t testHeapObject) RegisterFailureHook(hook FailureJobHook) { panic("implement me") } +func (t testHeapObject) ToJSON() statstypes.AnalysisJobJSON { + panic("implement me") +} func (t testHeapObject) String() string { panic("implement me") } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index a75c8ce5fd50d..e917a2d664ec3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -91,6 +91,9 @@ type AnalysisJob interface { // RegisterFailureHook registers a failureHook function that will be called after the job is marked as failed. RegisterFailureHook(hook FailureJobHook) + // ToJSON converts the job to a JSON format. + ToJSON() statstypes.AnalysisJobJSON + fmt.Stringer } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go index a8dff7f0921ad..91e873dd4ff7f 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go @@ -270,3 +270,17 @@ func (j *NonPartitionedTableAnalysisJob) GenSQLForAnalyzeIndex(index string) (st return sql, params } + +func (j *NonPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { + return statstypes.AnalysisJobJSON{ + Type: string(j.getAnalyzeType()), + TableID: j.TableID, + Weight: j.Weight, + Indicators: statstypes.IndicatorsJSON{ + ChangePercentage: j.ChangePercentage, + TableSize: j.TableSize, + LastAnalysisDuration: j.LastAnalysisDuration, + }, + HasNewlyAddedIndex: j.HasNewlyAddedIndex(), + } +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index 8d25714f283c5..994bf4d39a0c7 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -800,6 +800,17 @@ func (pq *AnalysisPriorityQueue) Len() (int, error) { return pq.syncFields.inner.len(), nil } +// List returns all the jobs in the priority queue. +func (pq *AnalysisPriorityQueue) List() ([]AnalysisJob, error) { + pq.syncFields.mu.RLock() + defer pq.syncFields.mu.RUnlock() + if !pq.syncFields.initialized { + return nil, errors.New(notInitializedErrMsg) + } + + return pq.syncFields.inner.list(), nil +} + // Close closes the priority queue. // Note: This function is thread-safe. func (pq *AnalysisPriorityQueue) Close() { diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go index 7de6df698cedc..a0529c4041b5e 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go @@ -306,3 +306,17 @@ func (j *StaticPartitionedTableAnalysisJob) GenSQLForAnalyzeStaticPartitionIndex return sql, params } + +func (j *StaticPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { + return statstypes.AnalysisJobJSON{ + Type: string(j.getAnalyzeType()), + TableID: j.StaticPartitionID, + Weight: j.Weight, + Indicators: statstypes.IndicatorsJSON{ + ChangePercentage: j.ChangePercentage, + TableSize: j.TableSize, + LastAnalysisDuration: j.LastAnalysisDuration, + }, + HasNewlyAddedIndex: j.HasNewlyAddedIndex(), + } +} diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index 6d58c7c01be86..99c64c6bf09df 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -16,6 +16,7 @@ package refresher import ( stderrors "errors" + "sort" "time" "github.com/pingcap/errors" @@ -188,6 +189,22 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool { return false } +func (r *Refresher) GetStatsPriorityQueue() ([]statstypes.AnalysisJobJSON, error) { + jobs, err := r.jobs.List() + if err != nil { + return nil, err + } + jsonJobs := make([]statstypes.AnalysisJobJSON, len(jobs)) + for i, job := range jobs { + jsonJobs[i] = job.ToJSON() + } + // Sort by the weight in descending order. + sort.Slice(jsonJobs, func(i, j int) bool { + return jsonJobs[i].Weight > jsonJobs[j].Weight + }) + return jsonJobs, nil +} + func (r *Refresher) setAutoAnalysisTimeWindow( parameters map[string]string, ) error { diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index c6112bb08bc14..543ca3e746345 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -118,6 +118,22 @@ type StatsHistory interface { RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) } +// AnalysisJobJSON represents the JSON format of an AnalysisJob. +type AnalysisJobJSON struct { + Type string `json:"type"` + TableID int64 `json:"table_id"` + Weight float64 `json:"weight"` + Indicators IndicatorsJSON `json:"indicators"` + HasNewlyAddedIndex bool `json:"has_newly_added_index"` +} + +// IndicatorsJSON represents the JSON format of Indicators. +type IndicatorsJSON struct { + ChangePercentage float64 `json:"change_percentage"` + TableSize float64 `json:"table_size"` + LastAnalysisDuration time.Duration `json:"last_analysis_duration"` +} + // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. type StatsAnalyze interface { owner.Listener @@ -161,6 +177,9 @@ type StatsAnalyze interface { // CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool + // GetStatsPriorityQueue returns the stats priority queue. + GetStatsPriorityQueue() ([]AnalysisJobJSON, error) + // Close closes the analyze worker. Close() } From abe51e151875245cd6e19dfdb8b43db5497c5c4a Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Thu, 14 Nov 2024 16:27:04 +0800 Subject: [PATCH 02/20] statistics: include must retry tables Signed-off-by: Rustin170506 --- .../handler/optimizor/statistics_handler.go | 2 +- .../handle/autoanalyze/autoanalyze.go | 6 +-- .../handle/autoanalyze/priorityqueue/queue.go | 37 ++++++++++++------- .../handle/autoanalyze/refresher/refresher.go | 17 ++++++--- .../autoanalyze/refresher/worker_test.go | 3 ++ pkg/statistics/handle/types/interfaces.go | 9 ++++- 6 files changed, 49 insertions(+), 25 deletions(-) diff --git a/pkg/server/handler/optimizor/statistics_handler.go b/pkg/server/handler/optimizor/statistics_handler.go index d9083a8fd46d2..d0de2f928a68c 100644 --- a/pkg/server/handler/optimizor/statistics_handler.go +++ b/pkg/server/handler/optimizor/statistics_handler.go @@ -157,7 +157,7 @@ func (sh StatsPriorityQueueHandler) ServeHTTP(w http.ResponseWriter, req *http.R w.Header().Set("Content-Type", "application/json") h := sh.do.StatsHandle() - tables, err := h.GetStatsPriorityQueue() + tables, err := h.GetPriorityQueueSnapshot() if err != nil { handler.WriteError(w, err) } else { diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index 22925c25b5dd1..0d28a30ee6836 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -317,9 +317,9 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID return statistics.CheckAnalyzeVerOnTable(tbl, version) } -// GetStatsPriorityQueue returns the stats priority queue. -func (sa *statsAnalyze) GetStatsPriorityQueue() ([]statstypes.AnalysisJobJSON, error) { - return sa.refresher.GetStatsPriorityQueue() +// GetPriorityQueueSnapshot returns the stats priority queue. +func (sa *statsAnalyze) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) { + return sa.refresher.GetPriorityQueueSnapshot() } func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool { diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index 994bf4d39a0c7..c53fb28ffde96 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -187,7 +187,9 @@ func (pq *AnalysisPriorityQueue) rebuildWithoutLock() error { // This will guarantee that we will not miss any DML changes. But it may cause some DML changes to be processed twice. // It is acceptable since the DML changes operation is idempotent. nextCheckVersionWithOffset := pq.statsHandle.GetNextCheckVersionWithOffset() - err := pq.fetchAllTablesAndBuildAnalysisJobs() + err := fetchAllTablesAndBuildAnalysisJobs(pq.statsHandle, func(job AnalysisJob) error { + return pq.pushWithoutLock(job) + }) if err != nil { return errors.Trace(err) } @@ -199,8 +201,8 @@ func (pq *AnalysisPriorityQueue) rebuildWithoutLock() error { // fetchAllTablesAndBuildAnalysisJobs builds analysis jobs for all eligible tables and partitions. // Note: Please hold the lock before calling this function. -func (pq *AnalysisPriorityQueue) fetchAllTablesAndBuildAnalysisJobs() error { - return statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error { +func fetchAllTablesAndBuildAnalysisJobs(statsHandle statstypes.StatsHandle, push func(job AnalysisJob) error) error { + return statsutil.CallWithSCtx(statsHandle.SPool(), func(sctx sessionctx.Context) error { parameters := exec.GetAutoAnalyzeParameters(sctx) autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) @@ -246,9 +248,9 @@ func (pq *AnalysisPriorityQueue) fetchAllTablesAndBuildAnalysisJobs() error { if pi == nil { job := jobFactory.CreateNonPartitionedTableAnalysisJob( tblInfo, - pq.statsHandle.GetTableStatsForAutoAnalyze(tblInfo), + statsHandle.GetTableStatsForAutoAnalyze(tblInfo), ) - err := pq.pushWithoutLock(job) + err := push(job) if err != nil { return err } @@ -262,7 +264,7 @@ func (pq *AnalysisPriorityQueue) fetchAllTablesAndBuildAnalysisJobs() error { partitionDefs = append(partitionDefs, def) } } - partitionStats := GetPartitionStats(pq.statsHandle, tblInfo, partitionDefs) + partitionStats := GetPartitionStats(statsHandle, tblInfo, partitionDefs) // If the prune mode is static, we need to analyze every partition as a separate table. if pruneMode == variable.Static { for pIDAndName, stats := range partitionStats { @@ -271,7 +273,7 @@ func (pq *AnalysisPriorityQueue) fetchAllTablesAndBuildAnalysisJobs() error { pIDAndName.ID, stats, ) - err := pq.pushWithoutLock(job) + err := push(job) if err != nil { return err } @@ -279,10 +281,10 @@ func (pq *AnalysisPriorityQueue) fetchAllTablesAndBuildAnalysisJobs() error { } else { job := jobFactory.CreateDynamicPartitionedTableAnalysisJob( tblInfo, - pq.statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID), + statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID), partitionStats, ) - err := pq.pushWithoutLock(job) + err := push(job) if err != nil { return err } @@ -800,15 +802,24 @@ func (pq *AnalysisPriorityQueue) Len() (int, error) { return pq.syncFields.inner.len(), nil } -// List returns all the jobs in the priority queue. -func (pq *AnalysisPriorityQueue) List() ([]AnalysisJob, error) { +// Snapshot returns a snapshot of all the jobs in the priority queue. +func (pq *AnalysisPriorityQueue) Snapshot() ( + currentJobs []AnalysisJob, + mustTables []int64, + err error) { pq.syncFields.mu.RLock() defer pq.syncFields.mu.RUnlock() if !pq.syncFields.initialized { - return nil, errors.New(notInitializedErrMsg) + return nil, nil, errors.New(notInitializedErrMsg) + } + + currentJobs = pq.syncFields.inner.list() + mustTables = make([]int64, 0, len(pq.syncFields.mustRetryJobs)) + for tableID := range pq.syncFields.mustRetryJobs { + mustTables = append(mustTables, tableID) } - return pq.syncFields.inner.list(), nil + return currentJobs, mustTables, nil } // Close closes the priority queue. diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index 99c64c6bf09df..dd6b54884aaf8 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -189,20 +189,25 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool { return false } -func (r *Refresher) GetStatsPriorityQueue() ([]statstypes.AnalysisJobJSON, error) { - jobs, err := r.jobs.List() +// GetPriorityQueueSnapshot returns the stats priority queue. +func (r *Refresher) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) { + currentJobs, mustTables, err := r.jobs.Snapshot() if err != nil { - return nil, err + return statstypes.PriorityQueueSnapshot{}, err } - jsonJobs := make([]statstypes.AnalysisJobJSON, len(jobs)) - for i, job := range jobs { + jsonJobs := make([]statstypes.AnalysisJobJSON, len(currentJobs)) + for i, job := range currentJobs { jsonJobs[i] = job.ToJSON() } // Sort by the weight in descending order. sort.Slice(jsonJobs, func(i, j int) bool { return jsonJobs[i].Weight > jsonJobs[j].Weight }) - return jsonJobs, nil + + return statstypes.PriorityQueueSnapshot{ + CurrentJobs: jsonJobs, + MustRetryTables: mustTables, + }, nil } func (r *Refresher) setAutoAnalysisTimeWindow( diff --git a/pkg/statistics/handle/autoanalyze/refresher/worker_test.go b/pkg/statistics/handle/autoanalyze/refresher/worker_test.go index 179b1f7f37789..1a1fc4a84f1f3 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/worker_test.go +++ b/pkg/statistics/handle/autoanalyze/refresher/worker_test.go @@ -67,6 +67,9 @@ func (m *mockAnalysisJob) GetIndicators() priorityqueue.Indicators { func (m *mockAnalysisJob) SetIndicators(indicators priorityqueue.Indicators) { panic("not implemented") } +func (m *mockAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { + panic("not implemented") +} func TestWorker(t *testing.T) { _, dom := testkit.CreateMockStoreAndDomain(t) diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 543ca3e746345..25af62f07cf03 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -118,6 +118,11 @@ type StatsHistory interface { RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) } +type PriorityQueueSnapshot struct { + CurrentJobs []AnalysisJobJSON `json:"current_jobs"` + MustRetryTables []int64 `json:"must_retry_tables"` +} + // AnalysisJobJSON represents the JSON format of an AnalysisJob. type AnalysisJobJSON struct { Type string `json:"type"` @@ -177,8 +182,8 @@ type StatsAnalyze interface { // CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool - // GetStatsPriorityQueue returns the stats priority queue. - GetStatsPriorityQueue() ([]AnalysisJobJSON, error) + // GetPriorityQueueSnapshot returns the stats priority queue. + GetPriorityQueueSnapshot() (PriorityQueueSnapshot, error) // Close closes the analyze worker. Close() From 2df91fa47c022f69ca252b1b24fe1c008d204a19 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Thu, 14 Nov 2024 16:31:29 +0800 Subject: [PATCH 03/20] fix: revert change Signed-off-by: Rustin170506 --- .../handler/optimizor/statistics_handler.go | 3 +++ .../handle/autoanalyze/autoanalyze.go | 2 +- .../handle/autoanalyze/priorityqueue/queue.go | 20 +++++++++---------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/server/handler/optimizor/statistics_handler.go b/pkg/server/handler/optimizor/statistics_handler.go index d0de2f928a68c..35eb896d5b4c2 100644 --- a/pkg/server/handler/optimizor/statistics_handler.go +++ b/pkg/server/handler/optimizor/statistics_handler.go @@ -145,14 +145,17 @@ func getSnapshotTableInfo(dom *domain.Domain, snapshot uint64, dbName, tblName s return is.TableByName(context.Background(), model.NewCIStr(dbName), model.NewCIStr(tblName)) } +// StatsPriorityQueueHandler is the handler for dumping the stats priority queue snapshot. type StatsPriorityQueueHandler struct { do *domain.Domain } +// NewStatsPriorityQueueHandler creates a new StatsPriorityQueueHandler. func NewStatsPriorityQueueHandler(do *domain.Domain) *StatsPriorityQueueHandler { return &StatsPriorityQueueHandler{do: do} } +// ServeHTTP dumps the stats priority queue snapshot to json. func (sh StatsPriorityQueueHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/json") diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index 0d28a30ee6836..d752ffc143094 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -317,7 +317,7 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID return statistics.CheckAnalyzeVerOnTable(tbl, version) } -// GetPriorityQueueSnapshot returns the stats priority queue. +// GetPriorityQueueSnapshot returns the stats priority queue snapshot. func (sa *statsAnalyze) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) { return sa.refresher.GetPriorityQueueSnapshot() } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index c53fb28ffde96..f4c3113bf966e 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -187,9 +187,7 @@ func (pq *AnalysisPriorityQueue) rebuildWithoutLock() error { // This will guarantee that we will not miss any DML changes. But it may cause some DML changes to be processed twice. // It is acceptable since the DML changes operation is idempotent. nextCheckVersionWithOffset := pq.statsHandle.GetNextCheckVersionWithOffset() - err := fetchAllTablesAndBuildAnalysisJobs(pq.statsHandle, func(job AnalysisJob) error { - return pq.pushWithoutLock(job) - }) + err := pq.fetchAllTablesAndBuildAnalysisJobs() if err != nil { return errors.Trace(err) } @@ -201,8 +199,8 @@ func (pq *AnalysisPriorityQueue) rebuildWithoutLock() error { // fetchAllTablesAndBuildAnalysisJobs builds analysis jobs for all eligible tables and partitions. // Note: Please hold the lock before calling this function. -func fetchAllTablesAndBuildAnalysisJobs(statsHandle statstypes.StatsHandle, push func(job AnalysisJob) error) error { - return statsutil.CallWithSCtx(statsHandle.SPool(), func(sctx sessionctx.Context) error { +func (pq *AnalysisPriorityQueue) fetchAllTablesAndBuildAnalysisJobs() error { + return statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error { parameters := exec.GetAutoAnalyzeParameters(sctx) autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) @@ -248,9 +246,9 @@ func fetchAllTablesAndBuildAnalysisJobs(statsHandle statstypes.StatsHandle, push if pi == nil { job := jobFactory.CreateNonPartitionedTableAnalysisJob( tblInfo, - statsHandle.GetTableStatsForAutoAnalyze(tblInfo), + pq.statsHandle.GetTableStatsForAutoAnalyze(tblInfo), ) - err := push(job) + err := pq.pushWithoutLock(job) if err != nil { return err } @@ -264,7 +262,7 @@ func fetchAllTablesAndBuildAnalysisJobs(statsHandle statstypes.StatsHandle, push partitionDefs = append(partitionDefs, def) } } - partitionStats := GetPartitionStats(statsHandle, tblInfo, partitionDefs) + partitionStats := GetPartitionStats(pq.statsHandle, tblInfo, partitionDefs) // If the prune mode is static, we need to analyze every partition as a separate table. if pruneMode == variable.Static { for pIDAndName, stats := range partitionStats { @@ -273,7 +271,7 @@ func fetchAllTablesAndBuildAnalysisJobs(statsHandle statstypes.StatsHandle, push pIDAndName.ID, stats, ) - err := push(job) + err := pq.pushWithoutLock(job) if err != nil { return err } @@ -281,10 +279,10 @@ func fetchAllTablesAndBuildAnalysisJobs(statsHandle statstypes.StatsHandle, push } else { job := jobFactory.CreateDynamicPartitionedTableAnalysisJob( tblInfo, - statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID), + pq.statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID), partitionStats, ) - err := push(job) + err := pq.pushWithoutLock(job) if err != nil { return err } From 9591a213853010b26230726b852618d218f48cb3 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Thu, 14 Nov 2024 16:35:43 +0800 Subject: [PATCH 04/20] fix: rename Signed-off-by: Rustin170506 --- pkg/statistics/handle/autoanalyze/priorityqueue/queue.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index f4c3113bf966e..dae8879412af2 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -803,7 +803,7 @@ func (pq *AnalysisPriorityQueue) Len() (int, error) { // Snapshot returns a snapshot of all the jobs in the priority queue. func (pq *AnalysisPriorityQueue) Snapshot() ( currentJobs []AnalysisJob, - mustTables []int64, + mustRetryTables []int64, err error) { pq.syncFields.mu.RLock() defer pq.syncFields.mu.RUnlock() @@ -812,12 +812,12 @@ func (pq *AnalysisPriorityQueue) Snapshot() ( } currentJobs = pq.syncFields.inner.list() - mustTables = make([]int64, 0, len(pq.syncFields.mustRetryJobs)) + mustRetryTables = make([]int64, 0, len(pq.syncFields.mustRetryJobs)) for tableID := range pq.syncFields.mustRetryJobs { - mustTables = append(mustTables, tableID) + mustRetryTables = append(mustRetryTables, tableID) } - return currentJobs, mustTables, nil + return currentJobs, mustRetryTables, nil } // Close closes the priority queue. From 71383cb3b7131d55445127b39011fe733aa13562 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Thu, 14 Nov 2024 16:44:22 +0800 Subject: [PATCH 05/20] fix: make lint happy Signed-off-by: Rustin170506 --- pkg/statistics/handle/types/interfaces.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 25af62f07cf03..704bd4343a18c 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -118,6 +118,7 @@ type StatsHistory interface { RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) } +// PriorityQueueSnapshot is the snapshot of the stats priority queue. type PriorityQueueSnapshot struct { CurrentJobs []AnalysisJobJSON `json:"current_jobs"` MustRetryTables []int64 `json:"must_retry_tables"` From 6867bee93adfc482f3dd5b59bf64e15723047f52 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Thu, 14 Nov 2024 16:55:56 +0800 Subject: [PATCH 06/20] fix: update comments Signed-off-by: Rustin170506 --- .../priorityqueue/dynamic_partitioned_table_analysis_job.go | 1 + .../priorityqueue/non_partitioned_table_analysis_job.go | 1 + .../priorityqueue/static_partitioned_table_analysis_job.go | 1 + 3 files changed, 3 insertions(+) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go index 543f80fb38a02..c10ae8c0e3eb5 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go @@ -365,6 +365,7 @@ func getPartitionNames(partitionIndexes map[string][]string) []string { return names } +// ToJSON converts the job to a JSON object. func (j *DynamicPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { return statstypes.AnalysisJobJSON{ Type: string(j.getAnalyzeType()), diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go index 91e873dd4ff7f..7306966b37c1e 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go @@ -271,6 +271,7 @@ func (j *NonPartitionedTableAnalysisJob) GenSQLForAnalyzeIndex(index string) (st return sql, params } +// ToJSON converts the job to a JSON object. func (j *NonPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { return statstypes.AnalysisJobJSON{ Type: string(j.getAnalyzeType()), diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go index a0529c4041b5e..8ea736256cd05 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go @@ -307,6 +307,7 @@ func (j *StaticPartitionedTableAnalysisJob) GenSQLForAnalyzeStaticPartitionIndex return sql, params } +// ToJSON converts the job to a JSON object. func (j *StaticPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { return statstypes.AnalysisJobJSON{ Type: string(j.getAnalyzeType()), From 963ab7dfdcf64013ccf2e5244ce9fba367c0ea17 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Thu, 14 Nov 2024 17:06:04 +0800 Subject: [PATCH 07/20] fix: better format Signed-off-by: Rustin170506 --- .../priorityqueue/dynamic_partitioned_table_analysis_job.go | 2 +- .../priorityqueue/non_partitioned_table_analysis_job.go | 2 +- .../priorityqueue/static_partitioned_table_analysis_job.go | 2 +- pkg/statistics/handle/types/interfaces.go | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go index c10ae8c0e3eb5..b76a49e69704a 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go @@ -374,7 +374,7 @@ func (j *DynamicPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON Indicators: statstypes.IndicatorsJSON{ ChangePercentage: j.ChangePercentage, TableSize: j.TableSize, - LastAnalysisDuration: j.LastAnalysisDuration, + LastAnalysisDuration: fmt.Sprintf("%.2fs", j.LastAnalysisDuration.Seconds()), }, HasNewlyAddedIndex: j.HasNewlyAddedIndex(), } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go index 7306966b37c1e..14ddac87596ef 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go @@ -280,7 +280,7 @@ func (j *NonPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { Indicators: statstypes.IndicatorsJSON{ ChangePercentage: j.ChangePercentage, TableSize: j.TableSize, - LastAnalysisDuration: j.LastAnalysisDuration, + LastAnalysisDuration: fmt.Sprintf("%.2fs", j.LastAnalysisDuration.Seconds()), }, HasNewlyAddedIndex: j.HasNewlyAddedIndex(), } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go index 8ea736256cd05..e3aaa657e26bd 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go @@ -316,7 +316,7 @@ func (j *StaticPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON Indicators: statstypes.IndicatorsJSON{ ChangePercentage: j.ChangePercentage, TableSize: j.TableSize, - LastAnalysisDuration: j.LastAnalysisDuration, + LastAnalysisDuration: fmt.Sprintf("%.2fs", j.LastAnalysisDuration.Seconds()), }, HasNewlyAddedIndex: j.HasNewlyAddedIndex(), } diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 704bd4343a18c..8190e2a4d8d1e 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -135,9 +135,9 @@ type AnalysisJobJSON struct { // IndicatorsJSON represents the JSON format of Indicators. type IndicatorsJSON struct { - ChangePercentage float64 `json:"change_percentage"` - TableSize float64 `json:"table_size"` - LastAnalysisDuration time.Duration `json:"last_analysis_duration"` + ChangePercentage float64 `json:"change_percentage"` + TableSize float64 `json:"table_size"` + LastAnalysisDuration string `json:"last_analysis_duration"` } // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. From 4f0f9fa2b78dbbb975acb1b9287b5baa0898ef0e Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Thu, 14 Nov 2024 17:15:16 +0800 Subject: [PATCH 08/20] fix: better format Signed-off-by: Rustin170506 --- .../dynamic_partitioned_table_analysis_job.go | 12 ++++-------- .../handle/autoanalyze/priorityqueue/job.go | 9 +++++++++ .../non_partitioned_table_analysis_job.go | 12 ++++-------- .../static_partitioned_table_analysis_job.go | 12 ++++-------- pkg/statistics/handle/types/interfaces.go | 6 +++--- 5 files changed, 24 insertions(+), 27 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go index b76a49e69704a..01ce3bdcc8c42 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go @@ -368,14 +368,10 @@ func getPartitionNames(partitionIndexes map[string][]string) []string { // ToJSON converts the job to a JSON object. func (j *DynamicPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { return statstypes.AnalysisJobJSON{ - Type: string(j.getAnalyzeType()), - TableID: j.GlobalTableID, - Weight: j.Weight, - Indicators: statstypes.IndicatorsJSON{ - ChangePercentage: j.ChangePercentage, - TableSize: j.TableSize, - LastAnalysisDuration: fmt.Sprintf("%.2fs", j.LastAnalysisDuration.Seconds()), - }, + Type: string(j.getAnalyzeType()), + TableID: j.GlobalTableID, + Weight: j.Weight, + Indicators: toJSONIndicators(j.Indicators), HasNewlyAddedIndex: j.HasNewlyAddedIndex(), } } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index e917a2d664ec3..277be4897c3b6 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -189,3 +189,12 @@ func IsDynamicPartitionedTableAnalysisJob(job AnalysisJob) bool { _, ok := job.(*DynamicPartitionedTableAnalysisJob) return ok } + +// toJSONIndicators converts the indicators to a JSON format. +func toJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON { + return statstypes.IndicatorsJSON{ + ChangePercentage: fmt.Sprintf("%.2f%%", indicators.ChangePercentage*100), + TableSize: fmt.Sprintf("%.2frows", indicators.TableSize), + LastAnalysisDuration: fmt.Sprintf("%.2fs", indicators.LastAnalysisDuration.Seconds()), + } +} diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go index 14ddac87596ef..df2728dcd73f3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go @@ -274,14 +274,10 @@ func (j *NonPartitionedTableAnalysisJob) GenSQLForAnalyzeIndex(index string) (st // ToJSON converts the job to a JSON object. func (j *NonPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { return statstypes.AnalysisJobJSON{ - Type: string(j.getAnalyzeType()), - TableID: j.TableID, - Weight: j.Weight, - Indicators: statstypes.IndicatorsJSON{ - ChangePercentage: j.ChangePercentage, - TableSize: j.TableSize, - LastAnalysisDuration: fmt.Sprintf("%.2fs", j.LastAnalysisDuration.Seconds()), - }, + Type: string(j.getAnalyzeType()), + TableID: j.TableID, + Weight: j.Weight, + Indicators: toJSONIndicators(j.Indicators), HasNewlyAddedIndex: j.HasNewlyAddedIndex(), } } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go index e3aaa657e26bd..d74bdc044b66c 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go @@ -310,14 +310,10 @@ func (j *StaticPartitionedTableAnalysisJob) GenSQLForAnalyzeStaticPartitionIndex // ToJSON converts the job to a JSON object. func (j *StaticPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { return statstypes.AnalysisJobJSON{ - Type: string(j.getAnalyzeType()), - TableID: j.StaticPartitionID, - Weight: j.Weight, - Indicators: statstypes.IndicatorsJSON{ - ChangePercentage: j.ChangePercentage, - TableSize: j.TableSize, - LastAnalysisDuration: fmt.Sprintf("%.2fs", j.LastAnalysisDuration.Seconds()), - }, + Type: string(j.getAnalyzeType()), + TableID: j.StaticPartitionID, + Weight: j.Weight, + Indicators: toJSONIndicators(j.Indicators), HasNewlyAddedIndex: j.HasNewlyAddedIndex(), } } diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 8190e2a4d8d1e..d6e6a20da35b9 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -135,9 +135,9 @@ type AnalysisJobJSON struct { // IndicatorsJSON represents the JSON format of Indicators. type IndicatorsJSON struct { - ChangePercentage float64 `json:"change_percentage"` - TableSize float64 `json:"table_size"` - LastAnalysisDuration string `json:"last_analysis_duration"` + ChangePercentage string `json:"change_percentage"` + TableSize string `json:"table_size"` + LastAnalysisDuration string `json:"last_analysis_duration"` } // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. From 2bf024ede7d1e4101ad56b7133e6ffe093d49e51 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Fri, 15 Nov 2024 14:16:22 +0800 Subject: [PATCH 09/20] fix: include more fields Signed-off-by: Rustin170506 --- .../dynamic_partitioned_table_analysis_job.go | 6 ++++++ .../non_partitioned_table_analysis_job.go | 5 +++++ .../static_partitioned_table_analysis_job.go | 6 ++++++ pkg/statistics/handle/types/interfaces.go | 13 ++++++++----- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go index 01ce3bdcc8c42..289a0acede44d 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go @@ -367,9 +367,15 @@ func getPartitionNames(partitionIndexes map[string][]string) []string { // ToJSON converts the job to a JSON object. func (j *DynamicPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { + partitionIDs := make([]int64, 0, len(j.PartitionIDs)) + for partition := range j.PartitionIDs { + partitionIDs = append(partitionIDs, partition) + } return statstypes.AnalysisJobJSON{ Type: string(j.getAnalyzeType()), TableID: j.GlobalTableID, + PartitionIDs: partitionIDs, + PartitionIndexIDs: j.PartitionIndexIDs, Weight: j.Weight, Indicators: toJSONIndicators(j.Indicators), HasNewlyAddedIndex: j.HasNewlyAddedIndex(), diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go index df2728dcd73f3..cce5b1b44def4 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go @@ -273,9 +273,14 @@ func (j *NonPartitionedTableAnalysisJob) GenSQLForAnalyzeIndex(index string) (st // ToJSON converts the job to a JSON object. func (j *NonPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { + indexes := make([]int64, 0, len(j.IndexIDs)) + for index := range j.IndexIDs { + indexes = append(indexes, index) + } return statstypes.AnalysisJobJSON{ Type: string(j.getAnalyzeType()), TableID: j.TableID, + IndexIDs: indexes, Weight: j.Weight, Indicators: toJSONIndicators(j.Indicators), HasNewlyAddedIndex: j.HasNewlyAddedIndex(), diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go index d74bdc044b66c..66c5238a593f5 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go @@ -309,9 +309,15 @@ func (j *StaticPartitionedTableAnalysisJob) GenSQLForAnalyzeStaticPartitionIndex // ToJSON converts the job to a JSON object. func (j *StaticPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { + indexes := make([]int64, 0, len(j.IndexIDs)) + for index := range j.IndexIDs { + indexes = append(indexes, index) + } return statstypes.AnalysisJobJSON{ Type: string(j.getAnalyzeType()), TableID: j.StaticPartitionID, + PartitionIDs: []int64{j.StaticPartitionID}, + IndexIDs: indexes, Weight: j.Weight, Indicators: toJSONIndicators(j.Indicators), HasNewlyAddedIndex: j.HasNewlyAddedIndex(), diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index d6e6a20da35b9..57dffc049cab1 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -126,11 +126,14 @@ type PriorityQueueSnapshot struct { // AnalysisJobJSON represents the JSON format of an AnalysisJob. type AnalysisJobJSON struct { - Type string `json:"type"` - TableID int64 `json:"table_id"` - Weight float64 `json:"weight"` - Indicators IndicatorsJSON `json:"indicators"` - HasNewlyAddedIndex bool `json:"has_newly_added_index"` + Type string `json:"type"` + TableID int64 `json:"table_id"` + PartitionIDs []int64 `json:"partition_ids"` + IndexIDs []int64 `json:"index_ids"` + PartitionIndexIDs map[int64][]int64 `json:"partition_index_ids"` + Weight float64 `json:"weight"` + Indicators IndicatorsJSON `json:"indicators"` + HasNewlyAddedIndex bool `json:"has_newly_added_index"` } // IndicatorsJSON represents the JSON format of Indicators. From 61e83e3ef8322c1bde78bdd57146e389d2efa25b Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Fri, 15 Nov 2024 14:21:02 +0800 Subject: [PATCH 10/20] fix: make lint happy Signed-off-by: Rustin170506 --- pkg/statistics/handle/types/interfaces.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 57dffc049cab1..cfc7d63e8f4b1 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -126,13 +126,13 @@ type PriorityQueueSnapshot struct { // AnalysisJobJSON represents the JSON format of an AnalysisJob. type AnalysisJobJSON struct { + PartitionIndexIDs map[int64][]int64 `json:"partition_index_ids"` + Indicators IndicatorsJSON `json:"indicators"` Type string `json:"type"` - TableID int64 `json:"table_id"` PartitionIDs []int64 `json:"partition_ids"` IndexIDs []int64 `json:"index_ids"` - PartitionIndexIDs map[int64][]int64 `json:"partition_index_ids"` + TableID int64 `json:"table_id"` Weight float64 `json:"weight"` - Indicators IndicatorsJSON `json:"indicators"` HasNewlyAddedIndex bool `json:"has_newly_added_index"` } From e5c19ebc55ede089d71fce397be7f6a647ca24bf Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Fri, 15 Nov 2024 14:28:05 +0800 Subject: [PATCH 11/20] fix: rename Signed-off-by: Rustin170506 --- .../calculatoranalysis/calculator_analysis_test.go | 2 +- .../dynamic_partitioned_table_analysis_job.go | 6 +++--- .../handle/autoanalyze/priorityqueue/heap_test.go | 2 +- pkg/statistics/handle/autoanalyze/priorityqueue/job.go | 8 ++++---- .../priorityqueue/non_partitioned_table_analysis_job.go | 6 +++--- .../static_partitioned_table_analysis_job.go | 6 +++--- pkg/statistics/handle/autoanalyze/refresher/refresher.go | 2 +- .../handle/autoanalyze/refresher/worker_test.go | 2 +- 8 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go index 7ce301cdb8fca..870e13bee326e 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go @@ -290,6 +290,6 @@ func (j *TestJob) HasNewlyAddedIndex() bool { return false } -func (j *TestJob) ToJSON() types.AnalysisJobJSON { +func (j *TestJob) AsJSON() types.AnalysisJobJSON { panic("unimplemented") } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go index 289a0acede44d..3a550309119c6 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go @@ -365,8 +365,8 @@ func getPartitionNames(partitionIndexes map[string][]string) []string { return names } -// ToJSON converts the job to a JSON object. -func (j *DynamicPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { +// AsJSON converts the job to a JSON object. +func (j *DynamicPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON { partitionIDs := make([]int64, 0, len(j.PartitionIDs)) for partition := range j.PartitionIDs { partitionIDs = append(partitionIDs, partition) @@ -377,7 +377,7 @@ func (j *DynamicPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON PartitionIDs: partitionIDs, PartitionIndexIDs: j.PartitionIndexIDs, Weight: j.Weight, - Indicators: toJSONIndicators(j.Indicators), + Indicators: asJSONIndicators(j.Indicators), HasNewlyAddedIndex: j.HasNewlyAddedIndex(), } } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go index db95b91b2a04f..13590237bd6a3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go @@ -67,7 +67,7 @@ func (t testHeapObject) RegisterSuccessHook(hook SuccessJobHook) { func (t testHeapObject) RegisterFailureHook(hook FailureJobHook) { panic("implement me") } -func (t testHeapObject) ToJSON() statstypes.AnalysisJobJSON { +func (t testHeapObject) AsJSON() statstypes.AnalysisJobJSON { panic("implement me") } func (t testHeapObject) String() string { diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index 277be4897c3b6..d62334fd048d5 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -91,8 +91,8 @@ type AnalysisJob interface { // RegisterFailureHook registers a failureHook function that will be called after the job is marked as failed. RegisterFailureHook(hook FailureJobHook) - // ToJSON converts the job to a JSON format. - ToJSON() statstypes.AnalysisJobJSON + // AsJSON converts the job to a JSON format. + AsJSON() statstypes.AnalysisJobJSON fmt.Stringer } @@ -190,8 +190,8 @@ func IsDynamicPartitionedTableAnalysisJob(job AnalysisJob) bool { return ok } -// toJSONIndicators converts the indicators to a JSON format. -func toJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON { +// asJSONIndicators converts the indicators to a JSON format. +func asJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON { return statstypes.IndicatorsJSON{ ChangePercentage: fmt.Sprintf("%.2f%%", indicators.ChangePercentage*100), TableSize: fmt.Sprintf("%.2frows", indicators.TableSize), diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go index cce5b1b44def4..ad4b27e5f80d3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go @@ -271,8 +271,8 @@ func (j *NonPartitionedTableAnalysisJob) GenSQLForAnalyzeIndex(index string) (st return sql, params } -// ToJSON converts the job to a JSON object. -func (j *NonPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { +// AsJSON converts the job to a JSON object. +func (j *NonPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON { indexes := make([]int64, 0, len(j.IndexIDs)) for index := range j.IndexIDs { indexes = append(indexes, index) @@ -282,7 +282,7 @@ func (j *NonPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { TableID: j.TableID, IndexIDs: indexes, Weight: j.Weight, - Indicators: toJSONIndicators(j.Indicators), + Indicators: asJSONIndicators(j.Indicators), HasNewlyAddedIndex: j.HasNewlyAddedIndex(), } } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go index 66c5238a593f5..1aedc5cb4751d 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go @@ -307,8 +307,8 @@ func (j *StaticPartitionedTableAnalysisJob) GenSQLForAnalyzeStaticPartitionIndex return sql, params } -// ToJSON converts the job to a JSON object. -func (j *StaticPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { +// AsJSON converts the job to a JSON object. +func (j *StaticPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON { indexes := make([]int64, 0, len(j.IndexIDs)) for index := range j.IndexIDs { indexes = append(indexes, index) @@ -319,7 +319,7 @@ func (j *StaticPartitionedTableAnalysisJob) ToJSON() statstypes.AnalysisJobJSON PartitionIDs: []int64{j.StaticPartitionID}, IndexIDs: indexes, Weight: j.Weight, - Indicators: toJSONIndicators(j.Indicators), + Indicators: asJSONIndicators(j.Indicators), HasNewlyAddedIndex: j.HasNewlyAddedIndex(), } } diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index dd6b54884aaf8..9e94d1002d045 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -197,7 +197,7 @@ func (r *Refresher) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot } jsonJobs := make([]statstypes.AnalysisJobJSON, len(currentJobs)) for i, job := range currentJobs { - jsonJobs[i] = job.ToJSON() + jsonJobs[i] = job.AsJSON() } // Sort by the weight in descending order. sort.Slice(jsonJobs, func(i, j int) bool { diff --git a/pkg/statistics/handle/autoanalyze/refresher/worker_test.go b/pkg/statistics/handle/autoanalyze/refresher/worker_test.go index 1a1fc4a84f1f3..40c4c87e28c21 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/worker_test.go +++ b/pkg/statistics/handle/autoanalyze/refresher/worker_test.go @@ -67,7 +67,7 @@ func (m *mockAnalysisJob) GetIndicators() priorityqueue.Indicators { func (m *mockAnalysisJob) SetIndicators(indicators priorityqueue.Indicators) { panic("not implemented") } -func (m *mockAnalysisJob) ToJSON() statstypes.AnalysisJobJSON { +func (m *mockAnalysisJob) AsJSON() statstypes.AnalysisJobJSON { panic("not implemented") } From 90fd39793393d57b5dedb242f54be8351dc87ee2 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Fri, 15 Nov 2024 14:43:02 +0800 Subject: [PATCH 12/20] fix: better comment Signed-off-by: Rustin170506 --- pkg/statistics/handle/autoanalyze/priorityqueue/job.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index d62334fd048d5..53997846a9195 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -91,7 +91,7 @@ type AnalysisJob interface { // RegisterFailureHook registers a failureHook function that will be called after the job is marked as failed. RegisterFailureHook(hook FailureJobHook) - // AsJSON converts the job to a JSON format. + // AsJSON converts the job to a JSON object. AsJSON() statstypes.AnalysisJobJSON fmt.Stringer @@ -190,7 +190,7 @@ func IsDynamicPartitionedTableAnalysisJob(job AnalysisJob) bool { return ok } -// asJSONIndicators converts the indicators to a JSON format. +// asJSONIndicators converts the indicators to a JSON object. func asJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON { return statstypes.IndicatorsJSON{ ChangePercentage: fmt.Sprintf("%.2f%%", indicators.ChangePercentage*100), From 4a83ec419b2a4ba075656177294caa92c4442ff4 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Fri, 15 Nov 2024 14:45:08 +0800 Subject: [PATCH 13/20] fix: remove useless partition ids Signed-off-by: Rustin170506 --- .../priorityqueue/static_partitioned_table_analysis_job.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go index 1aedc5cb4751d..7e1ec3983709d 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/static_partitioned_table_analysis_job.go @@ -316,7 +316,6 @@ func (j *StaticPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON return statstypes.AnalysisJobJSON{ Type: string(j.getAnalyzeType()), TableID: j.StaticPartitionID, - PartitionIDs: []int64{j.StaticPartitionID}, IndexIDs: indexes, Weight: j.Weight, Indicators: asJSONIndicators(j.Indicators), From b63cd5e773a18b47f318f058f2b7fcc8e563c479 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Fri, 15 Nov 2024 15:14:46 +0800 Subject: [PATCH 14/20] fix: reorder felids Signed-off-by: Rustin170506 --- pkg/statistics/handle/types/interfaces.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index cfc7d63e8f4b1..3fd8a4711fa08 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -125,14 +125,16 @@ type PriorityQueueSnapshot struct { } // AnalysisJobJSON represents the JSON format of an AnalysisJob. +// +//nolint:fieldalignment type AnalysisJobJSON struct { - PartitionIndexIDs map[int64][]int64 `json:"partition_index_ids"` - Indicators IndicatorsJSON `json:"indicators"` Type string `json:"type"` - PartitionIDs []int64 `json:"partition_ids"` - IndexIDs []int64 `json:"index_ids"` TableID int64 `json:"table_id"` Weight float64 `json:"weight"` + PartitionIDs []int64 `json:"partition_ids"` + IndexIDs []int64 `json:"index_ids"` + PartitionIndexIDs map[int64][]int64 `json:"partition_index_ids"` + Indicators IndicatorsJSON `json:"indicators"` HasNewlyAddedIndex bool `json:"has_newly_added_index"` } From c18d00be6499397924e56038bcf7c03fa9030818 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Fri, 15 Nov 2024 17:15:19 +0800 Subject: [PATCH 15/20] fix: make lint happy Signed-off-by: Rustin170506 --- pkg/server/handler/optimizor/statistics_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/handler/optimizor/statistics_handler.go b/pkg/server/handler/optimizor/statistics_handler.go index 35eb896d5b4c2..8a0fe4264f4ef 100644 --- a/pkg/server/handler/optimizor/statistics_handler.go +++ b/pkg/server/handler/optimizor/statistics_handler.go @@ -156,7 +156,7 @@ func NewStatsPriorityQueueHandler(do *domain.Domain) *StatsPriorityQueueHandler } // ServeHTTP dumps the stats priority queue snapshot to json. -func (sh StatsPriorityQueueHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { +func (sh StatsPriorityQueueHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") h := sh.do.StatsHandle() From 7c56507431df188b6eec21f95825e21b57b6a142 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Mon, 18 Nov 2024 14:40:23 +0800 Subject: [PATCH 16/20] test: add an unit test Signed-off-by: Rustin170506 --- pkg/server/handler/optimizor/BUILD.bazel | 1 + .../optimizor/statistics_handler_test.go | 56 +++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/pkg/server/handler/optimizor/BUILD.bazel b/pkg/server/handler/optimizor/BUILD.bazel index cccf9b5498c63..d93154f4abfaf 100644 --- a/pkg/server/handler/optimizor/BUILD.bazel +++ b/pkg/server/handler/optimizor/BUILD.bazel @@ -56,6 +56,7 @@ go_test( "//pkg/server/internal/testutil", "//pkg/server/internal/util", "//pkg/session", + "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", "//pkg/store/mockstore/unistore", "//pkg/testkit", diff --git a/pkg/server/handler/optimizor/statistics_handler_test.go b/pkg/server/handler/optimizor/statistics_handler_test.go index d91fa92121cdb..2c54cb2a83146 100644 --- a/pkg/server/handler/optimizor/statistics_handler_test.go +++ b/pkg/server/handler/optimizor/statistics_handler_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/server/internal/testutil" "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/statistics/handle/types" util2 "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" @@ -289,3 +290,58 @@ func checkData(t *testing.T, path string, client *testserverclient.TestServerCli require.Equal(t, int64(4), count) require.NoError(t, rows.Close()) } + +func TestStatsPriorityQueueAPI(t *testing.T) { + store := testkit.CreateMockStore(t) + driver := server2.NewTiDBDriver(store) + client := testserverclient.NewTestServerClient() + cfg := util.NewTestConfig() + cfg.Port = client.Port + cfg.Status.StatusPort = client.StatusPort + cfg.Status.ReportStatus = true + cfg.Socket = fmt.Sprintf("/tmp/tidb-mock-%d.sock", time.Now().UnixNano()) + + server, err := server2.NewServer(cfg, driver) + require.NoError(t, err) + defer server.Close() + + dom, err := session.GetDomain(store) + require.NoError(t, err) + server.SetDomain(dom) + go func() { + err := server.Run(nil) + require.NoError(t, err) + }() + <-server2.RunInGoTestChan + client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr()) + client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr()) + client.WaitUntilServerOnline() + + router := mux.NewRouter() + handler := optimizor.NewStatsPriorityQueueHandler(dom) + router.Handle("/stats/priority-queue", handler) + + resp, err := client.FetchStatus("/stats/priority-queue") + require.NoError(t, err) + defer resp.Body.Close() + + js, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, "priority queue not initialized", string(js)) + + // Init the queue. + handle := dom.StatsHandle() + require.False(t, handle.HandleAutoAnalyze()) + + resp, err = client.FetchStatus("/stats/priority-queue") + require.NoError(t, err) + defer resp.Body.Close() + + js, err = io.ReadAll(resp.Body) + require.NoError(t, err) + var snapshot types.PriorityQueueSnapshot + err = json.Unmarshal(js, &snapshot) + require.NoError(t, err) + require.Empty(t, snapshot.CurrentJobs) + require.Empty(t, snapshot.MustRetryTables) +} From c493692120acb6ff1a782967d507e14a6399ce33 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Mon, 18 Nov 2024 14:42:17 +0800 Subject: [PATCH 17/20] fix: use the right unit Signed-off-by: Rustin170506 --- pkg/statistics/handle/autoanalyze/priorityqueue/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index 53997846a9195..13b121ab53a45 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -194,7 +194,7 @@ func IsDynamicPartitionedTableAnalysisJob(job AnalysisJob) bool { func asJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON { return statstypes.IndicatorsJSON{ ChangePercentage: fmt.Sprintf("%.2f%%", indicators.ChangePercentage*100), - TableSize: fmt.Sprintf("%.2frows", indicators.TableSize), + TableSize: fmt.Sprintf("%.2f", indicators.TableSize), LastAnalysisDuration: fmt.Sprintf("%.2fs", indicators.LastAnalysisDuration.Seconds()), } } From d2d606e0f61bdf6f404ef87179ae693407b6fc6c Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Mon, 18 Nov 2024 14:44:22 +0800 Subject: [PATCH 18/20] fix: better duration format Signed-off-by: Rustin170506 --- pkg/statistics/handle/autoanalyze/priorityqueue/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index 13b121ab53a45..8a2cc918cb5a4 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -195,6 +195,6 @@ func asJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON { return statstypes.IndicatorsJSON{ ChangePercentage: fmt.Sprintf("%.2f%%", indicators.ChangePercentage*100), TableSize: fmt.Sprintf("%.2f", indicators.TableSize), - LastAnalysisDuration: fmt.Sprintf("%.2fs", indicators.LastAnalysisDuration.Seconds()), + LastAnalysisDuration: fmt.Sprintf("%v", indicators.LastAnalysisDuration), } } From 96f42964a36e9a8ea0afe5003e7b3262a71ed25c Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Mon, 18 Nov 2024 14:56:18 +0800 Subject: [PATCH 19/20] fix: make bazel happy Signed-off-by: Rustin170506 --- pkg/server/handler/optimizor/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/handler/optimizor/BUILD.bazel b/pkg/server/handler/optimizor/BUILD.bazel index d93154f4abfaf..97b7c232c5b49 100644 --- a/pkg/server/handler/optimizor/BUILD.bazel +++ b/pkg/server/handler/optimizor/BUILD.bazel @@ -43,7 +43,7 @@ go_test( "statistics_handler_test.go", ], flaky = True, - shard_count = 6, + shard_count = 7, deps = [ ":optimizor", "//pkg/config", From f0dd712c57a19bb511edae76ab077f261cbd8337 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Tue, 19 Nov 2024 14:08:16 +0800 Subject: [PATCH 20/20] refactor: convert within the queue Signed-off-by: Rustin170506 --- .../handle/autoanalyze/priorityqueue/queue.go | 27 ++++++++++++++----- .../handle/autoanalyze/refresher/refresher.go | 19 +------------ 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index dae8879412af2..a64ef48933669 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -16,6 +16,7 @@ package priorityqueue import ( "context" + "sort" "sync" "time" @@ -802,22 +803,34 @@ func (pq *AnalysisPriorityQueue) Len() (int, error) { // Snapshot returns a snapshot of all the jobs in the priority queue. func (pq *AnalysisPriorityQueue) Snapshot() ( - currentJobs []AnalysisJob, - mustRetryTables []int64, - err error) { + snapshot statstypes.PriorityQueueSnapshot, + err error, +) { pq.syncFields.mu.RLock() defer pq.syncFields.mu.RUnlock() if !pq.syncFields.initialized { - return nil, nil, errors.New(notInitializedErrMsg) + return statstypes.PriorityQueueSnapshot{}, errors.New(notInitializedErrMsg) } - currentJobs = pq.syncFields.inner.list() - mustRetryTables = make([]int64, 0, len(pq.syncFields.mustRetryJobs)) + currentJobs := pq.syncFields.inner.list() + mustRetryTables := make([]int64, 0, len(pq.syncFields.mustRetryJobs)) for tableID := range pq.syncFields.mustRetryJobs { mustRetryTables = append(mustRetryTables, tableID) } - return currentJobs, mustRetryTables, nil + jsonJobs := make([]statstypes.AnalysisJobJSON, len(currentJobs)) + for i, job := range currentJobs { + jsonJobs[i] = job.AsJSON() + } + // Sort by the weight in descending order. + sort.Slice(jsonJobs, func(i, j int) bool { + return jsonJobs[i].Weight > jsonJobs[j].Weight + }) + + return statstypes.PriorityQueueSnapshot{ + CurrentJobs: jsonJobs, + MustRetryTables: mustRetryTables, + }, nil } // Close closes the priority queue. diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index 9e94d1002d045..4bbb0d975a665 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -16,7 +16,6 @@ package refresher import ( stderrors "errors" - "sort" "time" "github.com/pingcap/errors" @@ -191,23 +190,7 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool { // GetPriorityQueueSnapshot returns the stats priority queue. func (r *Refresher) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) { - currentJobs, mustTables, err := r.jobs.Snapshot() - if err != nil { - return statstypes.PriorityQueueSnapshot{}, err - } - jsonJobs := make([]statstypes.AnalysisJobJSON, len(currentJobs)) - for i, job := range currentJobs { - jsonJobs[i] = job.AsJSON() - } - // Sort by the weight in descending order. - sort.Slice(jsonJobs, func(i, j int) bool { - return jsonJobs[i].Weight > jsonJobs[j].Weight - }) - - return statstypes.PriorityQueueSnapshot{ - CurrentJobs: jsonJobs, - MustRetryTables: mustTables, - }, nil + return r.jobs.Snapshot() } func (r *Refresher) setAutoAnalysisTimeWindow(