Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: add a priority queue API #57385

Merged
merged 21 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/server/handler/optimizor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ go_test(
"statistics_handler_test.go",
],
flaky = True,
shard_count = 6,
shard_count = 7,
deps = [
":optimizor",
"//pkg/config",
Expand All @@ -56,6 +56,7 @@ go_test(
"//pkg/server/internal/testutil",
"//pkg/server/internal/util",
"//pkg/session",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/store/mockstore/unistore",
"//pkg/testkit",
Expand Down
23 changes: 23 additions & 0 deletions pkg/server/handler/optimizor/statistics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,26 @@ func getSnapshotTableInfo(dom *domain.Domain, snapshot uint64, dbName, tblName s
}
return is.TableByName(context.Background(), model.NewCIStr(dbName), model.NewCIStr(tblName))
}

// StatsPriorityQueueHandler is the handler for dumping the stats priority queue snapshot.
type StatsPriorityQueueHandler struct {
do *domain.Domain
}

// NewStatsPriorityQueueHandler creates a new StatsPriorityQueueHandler.
func NewStatsPriorityQueueHandler(do *domain.Domain) *StatsPriorityQueueHandler {
return &StatsPriorityQueueHandler{do: do}
}

// ServeHTTP dumps the stats priority queue snapshot to json.
func (sh StatsPriorityQueueHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")

h := sh.do.StatsHandle()
tables, err := h.GetPriorityQueueSnapshot()
if err != nil {
handler.WriteError(w, err)
} else {
handler.WriteData(w, tables)
}
}
56 changes: 56 additions & 0 deletions pkg/server/handler/optimizor/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/server/internal/testutil"
"github.com/pingcap/tidb/pkg/server/internal/util"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
util2 "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -289,3 +290,58 @@ func checkData(t *testing.T, path string, client *testserverclient.TestServerCli
require.Equal(t, int64(4), count)
require.NoError(t, rows.Close())
}

func TestStatsPriorityQueueAPI(t *testing.T) {
store := testkit.CreateMockStore(t)
driver := server2.NewTiDBDriver(store)
client := testserverclient.NewTestServerClient()
cfg := util.NewTestConfig()
cfg.Port = client.Port
cfg.Status.StatusPort = client.StatusPort
cfg.Status.ReportStatus = true
cfg.Socket = fmt.Sprintf("/tmp/tidb-mock-%d.sock", time.Now().UnixNano())

server, err := server2.NewServer(cfg, driver)
require.NoError(t, err)
defer server.Close()

dom, err := session.GetDomain(store)
require.NoError(t, err)
server.SetDomain(dom)
go func() {
err := server.Run(nil)
require.NoError(t, err)
}()
<-server2.RunInGoTestChan
client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
client.WaitUntilServerOnline()

router := mux.NewRouter()
handler := optimizor.NewStatsPriorityQueueHandler(dom)
router.Handle("/stats/priority-queue", handler)

resp, err := client.FetchStatus("/stats/priority-queue")
require.NoError(t, err)
defer resp.Body.Close()

js, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "priority queue not initialized", string(js))

// Init the queue.
handle := dom.StatsHandle()
require.False(t, handle.HandleAutoAnalyze())

resp, err = client.FetchStatus("/stats/priority-queue")
require.NoError(t, err)
defer resp.Body.Close()

js, err = io.ReadAll(resp.Body)
require.NoError(t, err)
var snapshot types.PriorityQueueSnapshot
err = json.Unmarshal(js, &snapshot)
require.NoError(t, err)
require.Empty(t, snapshot.CurrentJobs)
require.Empty(t, snapshot.MustRetryTables)
}
16 changes: 16 additions & 0 deletions pkg/server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func (s *Server) startHTTPServer() {
Name("StatsDump")
router.Handle("/stats/dump/{db}/{table}/{snapshot}", s.newStatsHistoryHandler()).
Name("StatsHistoryDump")
router.Handle("/stats/priority-queue", s.newStatsPriorityQueueHandler()).
Name("StatsPriorityQueue")

router.Handle("/plan_replayer/dump/{filename}", s.newPlanReplayerHandler()).Name("PlanReplayerDump")
router.Handle("/extract_task/dump", s.newExtractServeHandler()).Name("ExtractTaskDump")
Expand Down Expand Up @@ -621,3 +623,17 @@ func (s *Server) newStatsHistoryHandler() *optimizor.StatsHistoryHandler {
}
return optimizor.NewStatsHistoryHandler(do)
}

func (s *Server) newStatsPriorityQueueHandler() *optimizor.StatsPriorityQueueHandler {
store, ok := s.driver.(*TiDBDriver)
if !ok {
panic("Illegal driver")
}

do, err := session.GetDomain(store.store)
if err != nil {
panic("Failed to get domain")
}

return optimizor.NewStatsPriorityQueueHandler(do)
}
5 changes: 5 additions & 0 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID
return statistics.CheckAnalyzeVerOnTable(tbl, version)
}

// GetPriorityQueueSnapshot returns the stats priority queue snapshot.
func (sa *statsAnalyze) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) {
return sa.refresher.GetPriorityQueueSnapshot()
}

func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool {
defer func() {
if r := recover(); r != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,7 @@ func (j *TestJob) SetIndicators(indicators priorityqueue.Indicators) {
func (j *TestJob) HasNewlyAddedIndex() bool {
return false
}

func (j *TestJob) AsJSON() types.AnalysisJobJSON {
panic("unimplemented")
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,20 @@ func getPartitionNames(partitionIndexes map[string][]string) []string {
}
return names
}

// AsJSON converts the job to a JSON object.
func (j *DynamicPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
partitionIDs := make([]int64, 0, len(j.PartitionIDs))
for partition := range j.PartitionIDs {
partitionIDs = append(partitionIDs, partition)
}
return statstypes.AnalysisJobJSON{
Type: string(j.getAnalyzeType()),
TableID: j.GlobalTableID,
PartitionIDs: partitionIDs,
PartitionIndexIDs: j.PartitionIndexIDs,
Weight: j.Weight,
Indicators: asJSONIndicators(j.Indicators),
HasNewlyAddedIndex: j.HasNewlyAddedIndex(),
}
}
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (t testHeapObject) RegisterSuccessHook(hook SuccessJobHook) {
func (t testHeapObject) RegisterFailureHook(hook FailureJobHook) {
panic("implement me")
}
func (t testHeapObject) AsJSON() statstypes.AnalysisJobJSON {
panic("implement me")
}
func (t testHeapObject) String() string {
panic("implement me")
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type AnalysisJob interface {
// RegisterFailureHook registers a failureHook function that will be called after the job is marked as failed.
RegisterFailureHook(hook FailureJobHook)

// AsJSON converts the job to a JSON object.
AsJSON() statstypes.AnalysisJobJSON

fmt.Stringer
}

Expand Down Expand Up @@ -186,3 +189,12 @@ func IsDynamicPartitionedTableAnalysisJob(job AnalysisJob) bool {
_, ok := job.(*DynamicPartitionedTableAnalysisJob)
return ok
}

// asJSONIndicators converts the indicators to a JSON object.
func asJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON {
return statstypes.IndicatorsJSON{
ChangePercentage: fmt.Sprintf("%.2f%%", indicators.ChangePercentage*100),
TableSize: fmt.Sprintf("%.2f", indicators.TableSize),
LastAnalysisDuration: fmt.Sprintf("%v", indicators.LastAnalysisDuration),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,19 @@ func (j *NonPartitionedTableAnalysisJob) GenSQLForAnalyzeIndex(index string) (st

return sql, params
}

// AsJSON converts the job to a JSON object.
func (j *NonPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
indexes := make([]int64, 0, len(j.IndexIDs))
for index := range j.IndexIDs {
indexes = append(indexes, index)
}
return statstypes.AnalysisJobJSON{
Type: string(j.getAnalyzeType()),
TableID: j.TableID,
IndexIDs: indexes,
Weight: j.Weight,
Indicators: asJSONIndicators(j.Indicators),
HasNewlyAddedIndex: j.HasNewlyAddedIndex(),
}
}
20 changes: 20 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,26 @@ func (pq *AnalysisPriorityQueue) Len() (int, error) {
return pq.syncFields.inner.len(), nil
}

// Snapshot returns a snapshot of all the jobs in the priority queue.
func (pq *AnalysisPriorityQueue) Snapshot() (
currentJobs []AnalysisJob,
mustRetryTables []int64,
err error) {
pq.syncFields.mu.RLock()
defer pq.syncFields.mu.RUnlock()
if !pq.syncFields.initialized {
return nil, nil, errors.New(notInitializedErrMsg)
}

currentJobs = pq.syncFields.inner.list()
mustRetryTables = make([]int64, 0, len(pq.syncFields.mustRetryJobs))
for tableID := range pq.syncFields.mustRetryJobs {
mustRetryTables = append(mustRetryTables, tableID)
}

return currentJobs, mustRetryTables, nil
}

// Close closes the priority queue.
// Note: This function is thread-safe.
func (pq *AnalysisPriorityQueue) Close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,19 @@ func (j *StaticPartitionedTableAnalysisJob) GenSQLForAnalyzeStaticPartitionIndex

return sql, params
}

// AsJSON converts the job to a JSON object.
func (j *StaticPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
indexes := make([]int64, 0, len(j.IndexIDs))
for index := range j.IndexIDs {
indexes = append(indexes, index)
}
return statstypes.AnalysisJobJSON{
Type: string(j.getAnalyzeType()),
TableID: j.StaticPartitionID,
IndexIDs: indexes,
Weight: j.Weight,
Indicators: asJSONIndicators(j.Indicators),
HasNewlyAddedIndex: j.HasNewlyAddedIndex(),
}
}
22 changes: 22 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package refresher

import (
stderrors "errors"
"sort"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -188,6 +189,27 @@ func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool {
return false
}

// GetPriorityQueueSnapshot returns the stats priority queue.
func (r *Refresher) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) {
currentJobs, mustTables, err := r.jobs.Snapshot()
if err != nil {
return statstypes.PriorityQueueSnapshot{}, err
}
jsonJobs := make([]statstypes.AnalysisJobJSON, len(currentJobs))
for i, job := range currentJobs {
jsonJobs[i] = job.AsJSON()
}
// Sort by the weight in descending order.
sort.Slice(jsonJobs, func(i, j int) bool {
return jsonJobs[i].Weight > jsonJobs[j].Weight
})

return statstypes.PriorityQueueSnapshot{
CurrentJobs: jsonJobs,
MustRetryTables: mustTables,
}, nil
}

func (r *Refresher) setAutoAnalysisTimeWindow(
parameters map[string]string,
) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (m *mockAnalysisJob) GetIndicators() priorityqueue.Indicators {
func (m *mockAnalysisJob) SetIndicators(indicators priorityqueue.Indicators) {
panic("not implemented")
}
func (m *mockAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
panic("not implemented")
}

func TestWorker(t *testing.T) {
_, dom := testkit.CreateMockStoreAndDomain(t)
Expand Down
30 changes: 30 additions & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,33 @@ type StatsHistory interface {
RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error)
}

// PriorityQueueSnapshot is the snapshot of the stats priority queue.
type PriorityQueueSnapshot struct {
CurrentJobs []AnalysisJobJSON `json:"current_jobs"`
MustRetryTables []int64 `json:"must_retry_tables"`
}

// AnalysisJobJSON represents the JSON format of an AnalysisJob.
//
//nolint:fieldalignment
type AnalysisJobJSON struct {
Type string `json:"type"`
TableID int64 `json:"table_id"`
Weight float64 `json:"weight"`
PartitionIDs []int64 `json:"partition_ids"`
IndexIDs []int64 `json:"index_ids"`
PartitionIndexIDs map[int64][]int64 `json:"partition_index_ids"`
Indicators IndicatorsJSON `json:"indicators"`
HasNewlyAddedIndex bool `json:"has_newly_added_index"`
}

// IndicatorsJSON represents the JSON format of Indicators.
type IndicatorsJSON struct {
ChangePercentage string `json:"change_percentage"`
TableSize string `json:"table_size"`
LastAnalysisDuration string `json:"last_analysis_duration"`
}

// StatsAnalyze is used to handle auto-analyze and manage analyze jobs.
type StatsAnalyze interface {
owner.Listener
Expand Down Expand Up @@ -161,6 +188,9 @@ type StatsAnalyze interface {
// CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same.
CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool

// GetPriorityQueueSnapshot returns the stats priority queue.
GetPriorityQueueSnapshot() (PriorityQueueSnapshot, error)

// Close closes the analyze worker.
Close()
}
Expand Down