From ddaf3e737fb1b9aed7404f37a834f1dc322f6770 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 17 Jan 2023 16:27:32 +0800 Subject: [PATCH 01/10] ttl: add table `mysql.ttl_job_history` to record histories --- session/bootstrap.go | 25 +++++++- ttl/cache/ttlstatus.go | 10 ++-- ttl/ttlworker/job.go | 57 ++++++++++++++++++- ttl/ttlworker/job_manager.go | 3 +- ttl/ttlworker/job_manager_integration_test.go | 26 +++++++-- ttl/ttlworker/job_manager_test.go | 4 ++ 6 files changed, 114 insertions(+), 11 deletions(-) diff --git a/session/bootstrap.go b/session/bootstrap.go index 74eecb28a68a4..9ac20fe55e491 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -516,6 +516,26 @@ const ( created_time timestamp NOT NULL, primary key(job_id, scan_id), key(created_time));` + + // CreateTTLJobHistory is a table that stores ttl job's history + CreateTTLJobHistory = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_job_history ( + job_id varchar(64) PRIMARY KEY, + table_id bigint(64) NOT NULL, + parent_table_id bigint(64) NOT NULL, + table_schema varchar(64) NOT NULL, + table_name varchar(64) NOT NULL, + partition_name varchar(64) DEFAULT NULL, + create_time timestamp NOT NULL, + finish_time timestamp NOT NULL, + ttl_expire timestamp NOT NULL, + summary_text text, + expired_rows bigint(64) DEFAULT NULL, + deleted_rows bigint(64) DEFAULT NULL, + error_delete_rows bigint(64) DEFAULT NULL, + status varchar(64) NOT NULL, + key(create_time), + key(finish_time) + );` ) // bootstrap initiates system DB for a store. @@ -757,7 +777,7 @@ const ( version109 = 109 // version110 sets tidb_enable_gc_aware_memory_track to off when a cluster upgrades from some version lower than v6.5.0. version110 = 110 - // version111 adds the table tidb_ttl_task + // version111 adds the table tidb_ttl_task and tidb_ttl_job_history version111 = 111 ) @@ -2239,6 +2259,7 @@ func upgradeToVer111(s Session, ver int64) { return } doReentrantDDL(s, CreateTTLTask) + doReentrantDDL(s, CreateTTLJobHistory) } func writeOOMAction(s Session) { @@ -2349,6 +2370,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateTTLTableStatus) // Create tidb_ttl_task table mustExecute(s, CreateTTLTask) + // Create tidb_ttl_job_history table + mustExecute(s, CreateTTLJobHistory) } // doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap. diff --git a/ttl/cache/ttlstatus.go b/ttl/cache/ttlstatus.go index d28bafa5a76c8..b21a50a161f79 100644 --- a/ttl/cache/ttlstatus.go +++ b/ttl/cache/ttlstatus.go @@ -30,13 +30,15 @@ const ( // JobStatusWaiting means the job hasn't started JobStatusWaiting JobStatus = "waiting" // JobStatusRunning means this job is running - JobStatusRunning = "running" + JobStatusRunning JobStatus = "running" // JobStatusCancelling means this job is being canceled, but not canceled yet - JobStatusCancelling = "cancelling" + JobStatusCancelling JobStatus = "cancelling" // JobStatusCancelled means this job has been canceled successfully - JobStatusCancelled = "cancelled" + JobStatusCancelled JobStatus = "cancelled" // JobStatusTimeout means this job has timeout - JobStatusTimeout = "timeout" + JobStatusTimeout JobStatus = "timeout" + // JobStatusFinished means job has been finished + JobStatusFinished JobStatus = "finished" ) const selectFromTTLTableStatus = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index 4b5ffb147bf1d..b15f04f8b03a5 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -46,6 +46,25 @@ const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status WHERE table_id = %? AND current_job_id = %?` const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = %? WHERE table_id = %? AND current_job_id = %? AND current_job_owner_id = %?" const removeTaskForJobTemplate = "DELETE FROM mysql.tidb_ttl_task WHERE job_id = %?" +const addJobHistoryTemplate = `INSERT INTO + mysql.tidb_ttl_job_history ( + job_id, + table_id, + parent_table_id, + table_schema, + table_name, + partition_name, + create_time, + finish_time, + ttl_expire, + summary_text, + expired_rows, + deleted_rows, + error_delete_rows, + status + ) +VALUES + (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)` func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) { return updateJobCurrentStatusTemplate, []interface{}{string(newStatus), tableID, string(oldStatus), jobID} @@ -63,6 +82,35 @@ func removeTaskForJob(jobID string) (string, []interface{}) { return removeTaskForJobTemplate, []interface{}{jobID} } +func addJobHistorySQL(job *ttlJob, finishTime time.Time, summaryText string) (string, []interface{}) { + status := cache.JobStatusFinished + if job.status == cache.JobStatusTimeout || job.status == cache.JobStatusCancelled { + status = job.status + } + + var partitionName interface{} + if job.tbl.Partition.L != "" { + partitionName = job.tbl.Partition.O + } + + return addJobHistoryTemplate, []interface{}{ + job.id, + job.tbl.ID, + job.tbl.TableInfo.ID, + job.tbl.Schema.O, + job.tbl.Name.O, + partitionName, + job.createTime.Format(timeFormat), + finishTime.Format(timeFormat), + job.ttlExpireTime.Format(timeFormat), + summaryText, + job.statistics.TotalRows.Load(), + job.statistics.SuccessRows.Load(), + job.statistics.ErrorRows.Load(), + string(status), + } +} + type ttlJob struct { id string ownerID string @@ -70,7 +118,8 @@ type ttlJob struct { ctx context.Context cancel func() - createTime time.Time + createTime time.Time + ttlExpireTime time.Time tbl *cache.PhysicalTable @@ -149,6 +198,12 @@ func (job *ttlJob) finish(se session.Session, now time.Time) { return errors.Wrapf(err, "execute sql: %s", sql) } + sql, args = addJobHistorySQL(job, now, summary) + _, err = se.ExecuteSQL(context.TODO(), sql, args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + return nil }) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 910038666c1b6..25139cd7d6c3a 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -782,7 +782,8 @@ func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *ca ctx: jobCtx, cancel: cancel, - createTime: now, + createTime: now, + ttlExpireTime: expireTime, // at least, the info schema cache and table status cache are consistent in table id, so it's safe to get table // information from schema cache directly tbl: table, diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index a1e284540a594..4495ce3668c16 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "strconv" + "strings" "sync" "testing" "time" @@ -112,7 +113,7 @@ func TestFinishJob(t *testing.T) { sessionFactory := sessionFactory(t, store) - testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}} + testTable := &cache.PhysicalTable{ID: 2, Schema: model.NewCIStr("db1"), TableInfo: &model.TableInfo{ID: 1, Name: model.NewCIStr("t1"), TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}} tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)") @@ -120,13 +121,30 @@ func TestFinishJob(t *testing.T) { m := ttlworker.NewJobManager("test-id", nil, store, nil) m.InfoSchemaCache().Tables[testTable.ID] = testTable se := sessionFactory() - job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false) + startTime := time.Now() + job, err := m.LockNewJob(context.Background(), se, testTable, startTime, false) require.NoError(t, err) job.SetScanErr(errors.New(`"'an error message contains both single and double quote'"`)) - job.Finish(se, time.Now()) + job.Statistics().TotalRows.Add(128) + job.Statistics().SuccessRows.Add(120) + job.Statistics().ErrorRows.Add(8) + time.Sleep(time.Second) + endTime := time.Now() + job.Finish(se, endTime) + + expireTime, err := testTable.EvalExpireTime(context.Background(), se, startTime) + require.NoError(t, err) - tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}")) + timeFormat := "2006-01-02 15:04:05" + lastJobSummary := "{\"total_rows\":128,\"success_rows\":120,\"error_rows\":8,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}" + tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 " + lastJobSummary)) tk.MustQuery("select * from mysql.tidb_ttl_task").Check(testkit.Rows()) + expectedRow := []string{ + job.ID(), "2", "1", "db1", "t1", "", + startTime.Format(timeFormat), endTime.Format(timeFormat), expireTime.Format(timeFormat), + lastJobSummary, "128", "120", "8", "finished", + } + tk.MustQuery("select * from mysql.tidb_ttl_job_history").Check(testkit.Rows(strings.Join(expectedRow, " "))) } func TestTTLAutoAnalyze(t *testing.T) { diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 97a84b3cb82a2..bbfa83fb1111d 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -164,6 +164,10 @@ func (j *ttlJob) Finish(se session.Session, now time.Time) { j.finish(se, now) } +func (j *ttlJob) Statistics() *ttlStatistics { + return j.statistics +} + func (j *ttlJob) ID() string { return j.id } From a606d5822c502db350c0ab9aae2b64c49c538d85 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 17 Jan 2023 17:24:49 +0800 Subject: [PATCH 02/10] fix ut --- ttl/ttlworker/job_manager_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index bbfa83fb1111d..df67e957ce0b4 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -16,6 +16,7 @@ package ttlworker import ( "context" + "strings" "testing" "time" @@ -582,7 +583,7 @@ func TestCheckFinishedJob(t *testing.T) { now := se.Now() jobID := m.runningJobs[0].id se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { - if len(args) > 1 { + if strings.Contains(sql, "tidb_ttl_table_status") { meetArg = true expectedSQL, expectedArgs := finishJobSQL(tbl.ID, now, "{\"total_rows\":1,\"success_rows\":1,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":1,\"finished_scan_task\":1}", jobID) assert.Equal(t, expectedSQL, sql) From da71974c1c4e5858324a6a9124f12e4e47e532cd Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 17 Jan 2023 18:19:54 +0800 Subject: [PATCH 03/10] update time --- executor/infoschema_cluster_table_test.go | 2 +- session/bootstrap.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index be2f04cb5c6ac..b1a6d4c57f4f8 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - result := 45 + result := 46 require.Len(t, rows, result) // More tests about the privileges. diff --git a/session/bootstrap.go b/session/bootstrap.go index 9ac20fe55e491..af856acb268ad 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -533,8 +533,8 @@ const ( deleted_rows bigint(64) DEFAULT NULL, error_delete_rows bigint(64) DEFAULT NULL, status varchar(64) NOT NULL, - key(create_time), - key(finish_time) + key(parent_table_id, create_time), + key(create_time) );` ) From b8ff918be284f4315a69e26b892e6048ad8e2a72 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 17 Jan 2023 18:28:27 +0800 Subject: [PATCH 04/10] update --- session/bootstrap.go | 1 + 1 file changed, 1 insertion(+) diff --git a/session/bootstrap.go b/session/bootstrap.go index af856acb268ad..ed65bb0720cf0 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -533,6 +533,7 @@ const ( deleted_rows bigint(64) DEFAULT NULL, error_delete_rows bigint(64) DEFAULT NULL, status varchar(64) NOT NULL, + key(table_schema, table_name, create_time), key(parent_table_id, create_time), key(create_time) );` From 30e88b2367f573de0f8eb878282abfd688b45fa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Wed, 18 Jan 2023 10:04:23 +0800 Subject: [PATCH 05/10] Update ttl/ttlworker/job.go Co-authored-by: bb7133 --- ttl/ttlworker/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index b15f04f8b03a5..d3d919cb89d80 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -89,7 +89,7 @@ func addJobHistorySQL(job *ttlJob, finishTime time.Time, summaryText string) (st } var partitionName interface{} - if job.tbl.Partition.L != "" { + if job.tbl.Partition.O != "" { partitionName = job.tbl.Partition.O } From 224815ddd1ac0264d39648574212248138b8410f Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Wed, 18 Jan 2023 20:45:39 +0800 Subject: [PATCH 06/10] ttl: add telemetry for ttl --- telemetry/data_feature_usage.go | 3 + telemetry/data_feature_usage_test.go | 133 +++++++++++++++++ telemetry/main_test.go | 2 + telemetry/ttl.go | 207 +++++++++++++++++++++++++++ 4 files changed, 345 insertions(+) create mode 100644 telemetry/ttl.go diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index 6dec8edcb023a..14195994bc3ab 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -59,6 +59,7 @@ type featureUsage struct { EnableGlobalMemoryControl bool `json:"enableGlobalMemoryControl"` AutoIDNoCache bool `json:"autoIDNoCache"` IndexMergeUsageCounter *m.IndexMergeUsageCounter `json:"indexMergeUsageCounter"` + TTLUsage *ttlUsageCounter `json:"ttlUsage"` } type placementPolicyUsage struct { @@ -111,6 +112,8 @@ func getFeatureUsage(ctx context.Context, sctx sessionctx.Context) (*featureUsag usage.IndexMergeUsageCounter = getIndexMergeUsageInfo() + usage.TTLUsage = getTTLUsageInfo(ctx, sctx) + return &usage, nil } diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index c303c53f3006b..8dec5356e41a9 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -15,12 +15,16 @@ package telemetry_test import ( + "encoding/json" "fmt" + "strings" "testing" + "time" _ "github.com/pingcap/tidb/autoid_service" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/testkit" @@ -580,3 +584,132 @@ func TestIndexMergeUsage(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), usage.IndexMergeUsageCounter.IndexMergeUsed) } + +func TestTTLTelemetry(t *testing.T) { + timeFormat := "2006-01-02 15:04:05" + dateFormat := "2006-01-02" + + now := time.Now() + curDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + if interval := curDate.Add(time.Hour * 24).Sub(now); interval > 0 && interval < 5*time.Minute { + // make sure testing is not running one the end of one day + time.Sleep(interval) + } + + store, do := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.tidb_ttl_job_enable=0") + + getTTLTable := func(name string) *model.TableInfo { + tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(name)) + require.NoError(t, err) + require.NotNil(t, tbl.Meta().TTLInfo) + return tbl.Meta() + } + + jobIDIdx := 1 + insertTTLHistory := func(tblName string, partitionName string, createTime, finishTime, ttlExpire time.Time, scanError string, totalRows, errorRows int64, status string) { + defer func() { + jobIDIdx++ + }() + + tbl := getTTLTable(tblName) + tblID := tbl.ID + partitionID := tbl.ID + if partitionName != "" { + for _, def := range tbl.Partition.Definitions { + if def.Name.L == strings.ToLower(partitionName) { + partitionID = def.ID + } + } + require.NotEqual(t, tblID, partitionID) + } + + summary := make(map[string]interface{}) + summary["total_rows"] = totalRows + summary["success_rows"] = totalRows - errorRows + summary["error_rows"] = errorRows + summary["total_scan_task"] = 1 + summary["scheduled_scan_task"] = 1 + summary["finished_scan_task"] = 1 + if scanError != "" { + summary["scan_task_err"] = scanError + } + + summaryText, err := json.Marshal(summary) + require.NoError(t, err) + + tk.MustExec("insert into "+ + "mysql.tidb_ttl_job_history ("+ + " job_id, table_id, parent_table_id, table_schema, table_name, partition_name, "+ + " create_time, finish_time, ttl_expire, summary_text, "+ + " expired_rows, deleted_rows, error_delete_rows, status) "+ + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + jobIDIdx, partitionID, tblID, "test", tblName, partitionName, + createTime.Format(timeFormat), finishTime.Format(timeFormat), ttlExpire.Format(timeFormat), summaryText, + totalRows, totalRows-errorRows, errorRows, status, + ) + } + + oneDayAgoDate := curDate.Add(-24 * time.Hour) + // start today, end today + times11 := []time.Time{curDate.Add(time.Hour), curDate.Add(2 * time.Hour), curDate} + // start yesterday, end today + times21 := []time.Time{curDate.Add(-2 * time.Hour), curDate, curDate.Add(-3 * time.Hour)} + // start yesterday, end yesterday + times31 := []time.Time{oneDayAgoDate, oneDayAgoDate.Add(time.Hour), oneDayAgoDate.Add(-time.Hour)} + times32 := []time.Time{oneDayAgoDate.Add(2 * time.Hour), oneDayAgoDate.Add(3 * time.Hour), oneDayAgoDate.Add(time.Hour)} + times33 := []time.Time{oneDayAgoDate.Add(4 * time.Hour), oneDayAgoDate.Add(5 * time.Hour), oneDayAgoDate.Add(3 * time.Hour)} + // start 2 day ago, end yesterday + times41 := []time.Time{oneDayAgoDate.Add(-2 * time.Hour), oneDayAgoDate.Add(-time.Hour), oneDayAgoDate.Add(-3 * time.Hour)} + + tk.MustExec("create table t1 (t timestamp) TTL=`t` + interval 1 hour") + insertTTLHistory("t1", "", times11[0], times11[1], times11[2], "", 100000000, 0, "finished") + insertTTLHistory("t1", "", times21[0], times21[1], times21[2], "", 100000000, 0, "finished") + insertTTLHistory("t1", "", times31[0], times31[1], times31[2], "err1", 100, 1, "finished") + insertTTLHistory("t1", "", times32[0], times32[1], times32[2], "", 100, 1, "timeout") + insertTTLHistory("t1", "", times33[0], times33[1], times33[2], "", 100, 1, "finished") + insertTTLHistory("t1", "", times41[0], times41[1], times41[2], "", 100, 1, "finished") + + usage, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + checkTableHistWithDeleteRows := func(vals ...int64) { + require.Equal(t, 5, len(vals)) + require.Equal(t, 5, len(usage.TTLUsage.TableHistWithDeleteRows)) + require.Equal(t, int64(10*1000), *usage.TTLUsage.TableHistWithDeleteRows[0].LessThan) + require.Equal(t, vals[0], usage.TTLUsage.TableHistWithDeleteRows[0].Count) + require.Equal(t, int64(100*1000), *usage.TTLUsage.TableHistWithDeleteRows[1].LessThan) + require.Equal(t, vals[1], usage.TTLUsage.TableHistWithDeleteRows[1].Count) + require.Equal(t, int64(1000*1000), *usage.TTLUsage.TableHistWithDeleteRows[2].LessThan) + require.Equal(t, vals[2], usage.TTLUsage.TableHistWithDeleteRows[2].Count) + require.Equal(t, int64(10*1000*1000), *usage.TTLUsage.TableHistWithDeleteRows[3].LessThan) + require.Equal(t, vals[3], usage.TTLUsage.TableHistWithDeleteRows[3].Count) + require.True(t, usage.TTLUsage.TableHistWithDeleteRows[4].LessThanMax) + require.Nil(t, usage.TTLUsage.TableHistWithDeleteRows[4].LessThan) + require.Equal(t, vals[4], usage.TTLUsage.TableHistWithDeleteRows[4].Count) + } + + checkTableHistWithDelay := func(vals ...int64) { + require.Equal(t, 5, len(vals)) + require.Equal(t, 5, len(usage.TTLUsage.TableHistWithDelayTime)) + require.Equal(t, int64(1), *usage.TTLUsage.TableHistWithDelayTime[0].LessThan) + require.Equal(t, vals[0], usage.TTLUsage.TableHistWithDelayTime[0].Count) + require.Equal(t, int64(6), *usage.TTLUsage.TableHistWithDelayTime[1].LessThan) + require.Equal(t, vals[1], usage.TTLUsage.TableHistWithDelayTime[1].Count) + require.Equal(t, int64(24), *usage.TTLUsage.TableHistWithDelayTime[2].LessThan) + require.Equal(t, vals[2], usage.TTLUsage.TableHistWithDelayTime[2].Count) + require.Equal(t, int64(72), *usage.TTLUsage.TableHistWithDelayTime[3].LessThan) + require.Equal(t, vals[3], usage.TTLUsage.TableHistWithDelayTime[3].Count) + require.True(t, usage.TTLUsage.TableHistWithDelayTime[4].LessThanMax) + require.Nil(t, usage.TTLUsage.TableHistWithDelayTime[4].LessThan) + require.Equal(t, vals[4], usage.TTLUsage.TableHistWithDelayTime[4].Count) + } + + require.False(t, usage.TTLUsage.TTLJobEnabled) + require.Equal(t, int64(1), usage.TTLUsage.TTLTables) + require.Equal(t, int64(1), usage.TTLUsage.TTLJobEnabledTables) + require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) + checkTableHistWithDeleteRows(1, 0, 0, 0, 0) + checkTableHistWithDelay(0, 0, 1, 0, 0) +} diff --git a/telemetry/main_test.go b/telemetry/main_test.go index 0e8d98b2a4f6c..8478a3ead4084 100644 --- a/telemetry/main_test.go +++ b/telemetry/main_test.go @@ -41,6 +41,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), } goleak.VerifyTestMain(m, opts...) diff --git a/telemetry/ttl.go b/telemetry/ttl.go new file mode 100644 index 0000000000000..1e0dd799a2f99 --- /dev/null +++ b/telemetry/ttl.go @@ -0,0 +1,207 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" +) + +const ( + selectDeletedRowsOneDaySQL = `SELECT parent_table_id, CAST(SUM(deleted_rows) AS SIGNED) + FROM + mysql.tidb_ttl_job_history + WHERE + create_time >= CURDATE() - INTERVAL 1 DAY + AND finish_time < CURDATE() + GROUP BY parent_table_id;` + selectDelaySQL = `SELECT + parent_table_id, TIMESTAMPDIFF(MINUTE, MIN(tm), CURDATE()) AS ttl_minutes + FROM + ( + SELECT + table_id, + parent_table_id, + MAX(ttl_expire) AS tm + FROM + mysql.tidb_ttl_job_history + WHERE + create_time < CURDATE() + AND create_time > CURDATE() - INTERVAL 7 DAY + AND finish_time < CURDATE() + AND status = 'finished' + AND JSON_VALID(summary_text) + AND summary_text ->> "$.scan_task_err" IS NULL + GROUP BY + table_id, parent_table_id + ) t + GROUP BY parent_table_id;` +) + +type ttlHistItem struct { + LessThan *int64 `json:"less_than,omitempty"` + LessThanMax bool `json:"less_than_max,omitempty"` + Count int64 `json:"count"` +} + +type ttlUsageCounter struct { + TTLJobEnabled bool `json:"ttl_job_enabled"` + TTLTables int64 `json:"ttl_table_count"` + TTLJobEnabledTables int64 `json:"ttl_job_enabled_tables"` + TTLHistDate string `json:"ttl_hist_date"` + TableHistWithDeleteRows []*ttlHistItem `json:"table_hist_with_delete_rows"` + TableHistWithDelayTime []*ttlHistItem `json:"table_hist_with_delay_time"` +} + +func int64Pointer(val int64) *int64 { + v := val + return &v +} + +func (c *ttlUsageCounter) UpdateTableHistWithDeleteRows(rows int64) { + for _, item := range c.TableHistWithDeleteRows { + if item.LessThanMax || rows < *item.LessThan { + item.Count++ + return + } + } +} + +func (c *ttlUsageCounter) UpdateTableHistWithDelayTime(tblCnt int, hours int64) { + for _, item := range c.TableHistWithDelayTime { + if item.LessThanMax || hours < *item.LessThan { + item.Count += int64(tblCnt) + return + } + } +} + +func getTTLUsageInfo(ctx context.Context, sctx sessionctx.Context) (counter *ttlUsageCounter) { + counter = &ttlUsageCounter{ + TTLJobEnabled: variable.EnableTTLJob.Load(), + TTLHistDate: time.Now().Add(-24 * time.Hour).Format("2006-01-02"), + TableHistWithDeleteRows: []*ttlHistItem{ + { + LessThan: int64Pointer(10 * 1000), + }, + { + LessThan: int64Pointer(100 * 1000), + }, + { + LessThan: int64Pointer(1000 * 1000), + }, + { + LessThan: int64Pointer(10000 * 1000), + }, + { + LessThanMax: true, + }, + }, + TableHistWithDelayTime: []*ttlHistItem{ + { + LessThan: int64Pointer(1), + }, + { + LessThan: int64Pointer(6), + }, + { + LessThan: int64Pointer(24), + }, + { + LessThan: int64Pointer(72), + }, + { + LessThanMax: true, + }, + }, + } + + is, ok := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + if !ok { + // it should never happen + logutil.BgLogger().Error(fmt.Sprintf("GetDomainInfoSchema returns a invalid type: %T", is)) + return + } + + ttlTables := make(map[int64]*model.TableInfo) + for _, db := range is.AllSchemas() { + for _, tbl := range is.SchemaTables(db.Name) { + tblInfo := tbl.Meta() + if tblInfo.State != model.StatePublic || tblInfo.TTLInfo == nil { + continue + } + + counter.TTLTables++ + if tblInfo.TTLInfo.Enable { + counter.TTLJobEnabledTables++ + } + ttlTables[tblInfo.ID] = tblInfo + } + } + + exec := sctx.(sqlexec.RestrictedSQLExecutor) + rows, _, err := exec.ExecRestrictedSQL(ctx, nil, selectDeletedRowsOneDaySQL) + if err != nil { + logutil.BgLogger().Error("exec sql error", zap.String("SQL", selectDeletedRowsOneDaySQL), zap.Error(err)) + } else { + for _, row := range rows { + counter.UpdateTableHistWithDeleteRows(row.GetInt64(1)) + } + } + + rows, _, err = exec.ExecRestrictedSQL(ctx, nil, selectDelaySQL) + if err != nil { + logutil.BgLogger().Error("exec sql error", zap.String("SQL", selectDelaySQL), zap.Error(err)) + } else { + for _, row := range rows { + tblID := row.GetInt64(0) + tbl, ok := ttlTables[tblID] + if !ok { + // table not exist, truncated for deleted + continue + } + delete(ttlTables, tblID) + + evalIntervalSQL := fmt.Sprintf( + "SELECT TIMESTAMPDIFF(HOUR, CURDATE() - INTERVAL %d MINUTE, CURDATE() - INTERVAL %s %s)", + row.GetInt64(1), tbl.TTLInfo.IntervalExprStr, ast.TimeUnitType(tbl.TTLInfo.IntervalTimeUnit).String(), + ) + + innerRows, _, err := exec.ExecRestrictedSQL(ctx, nil, evalIntervalSQL) + if err != nil || len(innerRows) == 0 { + logutil.BgLogger().Error("exec sql error or empty rows returned", zap.String("SQL", evalIntervalSQL), zap.Error(err)) + continue + } + + hours := innerRows[0].GetInt64(0) + counter.UpdateTableHistWithDelayTime(1, hours) + } + + // When no history found for a table, use max delay + counter.UpdateTableHistWithDelayTime(len(ttlTables), math.MaxInt64) + } + return +} From 623944b7f01a206666c1c5bd881051371037372f Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Sat, 28 Jan 2023 12:45:49 +0800 Subject: [PATCH 07/10] update --- telemetry/data_feature_usage_test.go | 39 +++++++++++++++++++++++----- telemetry/ttl.go | 15 ++++++----- 2 files changed, 41 insertions(+), 13 deletions(-) diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index 681fc512b8e24..5c632fb350f50 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -700,16 +700,19 @@ func TestTTLTelemetry(t *testing.T) { times31 := []time.Time{oneDayAgoDate, oneDayAgoDate.Add(time.Hour), oneDayAgoDate.Add(-time.Hour)} times32 := []time.Time{oneDayAgoDate.Add(2 * time.Hour), oneDayAgoDate.Add(3 * time.Hour), oneDayAgoDate.Add(time.Hour)} times33 := []time.Time{oneDayAgoDate.Add(4 * time.Hour), oneDayAgoDate.Add(5 * time.Hour), oneDayAgoDate.Add(3 * time.Hour)} - // start 2 day ago, end yesterday - times41 := []time.Time{oneDayAgoDate.Add(-2 * time.Hour), oneDayAgoDate.Add(-time.Hour), oneDayAgoDate.Add(-3 * time.Hour)} + // start 2 days ago, end yesterday + times41 := []time.Time{oneDayAgoDate.Add(-2 * time.Hour), oneDayAgoDate.Add(time.Hour), oneDayAgoDate.Add(-3 * time.Hour)} + // start two days ago, end two days ago + times51 := []time.Time{oneDayAgoDate.Add(-5 * time.Hour), oneDayAgoDate.Add(-4 * time.Hour), oneDayAgoDate.Add(-6 * time.Hour)} tk.MustExec("create table t1 (t timestamp) TTL=`t` + interval 1 hour") insertTTLHistory("t1", "", times11[0], times11[1], times11[2], "", 100000000, 0, "finished") insertTTLHistory("t1", "", times21[0], times21[1], times21[2], "", 100000000, 0, "finished") - insertTTLHistory("t1", "", times31[0], times31[1], times31[2], "err1", 100, 1, "finished") - insertTTLHistory("t1", "", times32[0], times32[1], times32[2], "", 100, 1, "timeout") - insertTTLHistory("t1", "", times33[0], times33[1], times33[2], "", 100, 1, "finished") - insertTTLHistory("t1", "", times41[0], times41[1], times41[2], "", 100, 1, "finished") + insertTTLHistory("t1", "", times31[0], times31[1], times31[2], "err1", 112600, 110000, "finished") + insertTTLHistory("t1", "", times32[0], times32[1], times32[2], "", 2600, 0, "timeout") + insertTTLHistory("t1", "", times33[0], times33[1], times33[2], "", 2600, 0, "finished") + insertTTLHistory("t1", "", times41[0], times41[1], times41[2], "", 2600, 0, "finished") + insertTTLHistory("t1", "", times51[0], times51[1], times51[2], "", 100000000, 1, "finished") usage, err := telemetry.GetFeatureUsage(tk.Session()) require.NoError(t, err) @@ -749,6 +752,28 @@ func TestTTLTelemetry(t *testing.T) { require.Equal(t, int64(1), usage.TTLUsage.TTLTables) require.Equal(t, int64(1), usage.TTLUsage.TTLJobEnabledTables) require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) - checkTableHistWithDeleteRows(1, 0, 0, 0, 0) + checkTableHistWithDeleteRows(0, 1, 0, 0, 0) checkTableHistWithDelay(0, 0, 1, 0, 0) + + tk.MustExec("create table t2 (t timestamp) TTL=`t` + interval 20 hour") + tk.MustExec("set @@global.tidb_ttl_job_enable=1") + insertTTLHistory("t2", "", times31[0], times31[1], times31[2], "", 9999, 0, "finished") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.True(t, usage.TTLUsage.TTLJobEnabled) + require.Equal(t, int64(2), usage.TTLUsage.TTLTables) + require.Equal(t, int64(2), usage.TTLUsage.TTLJobEnabledTables) + require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) + checkTableHistWithDeleteRows(1, 1, 0, 0, 0) + checkTableHistWithDelay(0, 1, 1, 0, 0) + + tk.MustExec("create table t3 (t timestamp) TTL=`t` + interval 1 hour TTL_ENABLE='OFF'") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.True(t, usage.TTLUsage.TTLJobEnabled) + require.Equal(t, int64(3), usage.TTLUsage.TTLTables) + require.Equal(t, int64(2), usage.TTLUsage.TTLJobEnabledTables) + require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) + checkTableHistWithDeleteRows(1, 1, 0, 0, 0) + checkTableHistWithDelay(0, 1, 1, 0, 1) } diff --git a/telemetry/ttl.go b/telemetry/ttl.go index 1e0dd799a2f99..cb02d51bcece5 100644 --- a/telemetry/ttl.go +++ b/telemetry/ttl.go @@ -31,13 +31,16 @@ import ( ) const ( + // selectDeletedRowsOneDaySQL selects the deleted rows for each table of last day selectDeletedRowsOneDaySQL = `SELECT parent_table_id, CAST(SUM(deleted_rows) AS SIGNED) FROM mysql.tidb_ttl_job_history WHERE - create_time >= CURDATE() - INTERVAL 1 DAY + create_time >= CURDATE() - INTERVAL 7 DAY + AND finish_time >= CURDATE() - INTERVAL 1 DAY AND finish_time < CURDATE() GROUP BY parent_table_id;` + // selectDelaySQL selects the deletion delay in minute for each table at the end of last day selectDelaySQL = `SELECT parent_table_id, TIMESTAMPDIFF(MINUTE, MIN(tm), CURDATE()) AS ttl_minutes FROM @@ -49,8 +52,7 @@ const ( FROM mysql.tidb_ttl_job_history WHERE - create_time < CURDATE() - AND create_time > CURDATE() - INTERVAL 7 DAY + create_time > CURDATE() - INTERVAL 7 DAY AND finish_time < CURDATE() AND status = 'finished' AND JSON_VALID(summary_text) @@ -176,14 +178,15 @@ func getTTLUsageInfo(ctx context.Context, sctx sessionctx.Context) (counter *ttl if err != nil { logutil.BgLogger().Error("exec sql error", zap.String("SQL", selectDelaySQL), zap.Error(err)) } else { + noHistoryTables := len(ttlTables) for _, row := range rows { tblID := row.GetInt64(0) tbl, ok := ttlTables[tblID] if !ok { - // table not exist, truncated for deleted + // table not exist, maybe truncated or deleted continue } - delete(ttlTables, tblID) + noHistoryTables-- evalIntervalSQL := fmt.Sprintf( "SELECT TIMESTAMPDIFF(HOUR, CURDATE() - INTERVAL %d MINUTE, CURDATE() - INTERVAL %s %s)", @@ -201,7 +204,7 @@ func getTTLUsageInfo(ctx context.Context, sctx sessionctx.Context) (counter *ttl } // When no history found for a table, use max delay - counter.UpdateTableHistWithDelayTime(len(ttlTables), math.MaxInt64) + counter.UpdateTableHistWithDelayTime(noHistoryTables, math.MaxInt64) } return } From fa9e45ad16d703e2f8a6b13f89d191e45704125c Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Sun, 29 Jan 2023 10:44:41 +0800 Subject: [PATCH 08/10] update --- telemetry/BUILD.bazel | 2 ++ 1 file changed, 2 insertions(+) diff --git a/telemetry/BUILD.bazel b/telemetry/BUILD.bazel index 1f032aa3f237a..a6c79f7de596f 100644 --- a/telemetry/BUILD.bazel +++ b/telemetry/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "id.go", "status.go", "telemetry.go", + "ttl.go", "util.go", ], importpath = "github.com/pingcap/tidb/telemetry", @@ -24,6 +25,7 @@ go_library( "//infoschema", "//kv", "//metrics", + "//parser/ast", "//parser/model", "//parser/mysql", "//sessionctx", From 6db1c6d73901993e0cb3f344c0b408cdcb3120d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Sun, 29 Jan 2023 17:19:19 +0800 Subject: [PATCH 09/10] Update telemetry/data_feature_usage_test.go Co-authored-by: bb7133 --- telemetry/data_feature_usage_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index 5c632fb350f50..a667219ba50a8 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -631,7 +631,7 @@ func TestTTLTelemetry(t *testing.T) { now := time.Now() curDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) if interval := curDate.Add(time.Hour * 24).Sub(now); interval > 0 && interval < 5*time.Minute { - // make sure testing is not running one the end of one day + // make sure testing is not running at the end of one day time.Sleep(interval) } From 086511b7e7fef3fe5320f1ba3fa4841acc3062fd Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Sun, 29 Jan 2023 18:20:51 +0800 Subject: [PATCH 10/10] comments --- telemetry/ttl.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/telemetry/ttl.go b/telemetry/ttl.go index cb02d51bcece5..b9c8c0210fb0c 100644 --- a/telemetry/ttl.go +++ b/telemetry/ttl.go @@ -64,9 +64,13 @@ const ( ) type ttlHistItem struct { - LessThan *int64 `json:"less_than,omitempty"` - LessThanMax bool `json:"less_than_max,omitempty"` - Count int64 `json:"count"` + // LessThan is not null means it collects the count of items with condition [prevLessThan, LessThan) + // Notice that it's type is an int64 pointer to forbid serializing it when it is not set. + LessThan *int64 `json:"less_than,omitempty"` + // LessThanMax is true means the condition is [prevLessThan, MAX) + LessThanMax bool `json:"less_than_max,omitempty"` + // Count is the count of items that fit the condition + Count int64 `json:"count"` } type ttlUsageCounter struct {