From 7917e4a6eedaef2723394b5f0037b7bd7c609f16 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 4 Nov 2024 15:18:48 +0800 Subject: [PATCH] ttl: use a pessimistic transaction to finish the job (#56516) (#56967) close pingcap/tidb#56422 --- pkg/ttl/ttlworker/BUILD.bazel | 1 + pkg/ttl/ttlworker/job.go | 23 ++- pkg/ttl/ttlworker/job_manager.go | 31 +++- .../ttlworker/job_manager_integration_test.go | 151 ++++++++++++++++++ pkg/ttl/ttlworker/job_manager_test.go | 9 +- pkg/ttl/ttlworker/task_manager_test.go | 5 + 6 files changed, 202 insertions(+), 18 deletions(-) diff --git a/pkg/ttl/ttlworker/BUILD.bazel b/pkg/ttl/ttlworker/BUILD.bazel index 65b914a9ed8d9..51cb7cd2554f3 100644 --- a/pkg/ttl/ttlworker/BUILD.bazel +++ b/pkg/ttl/ttlworker/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//pkg/types", "//pkg/util", "//pkg/util/chunk", + "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/sqlexec", "//pkg/util/timeutil", diff --git a/pkg/ttl/ttlworker/job.go b/pkg/ttl/ttlworker/job.go index c937fa9037802..21386e8dd9633 100644 --- a/pkg/ttl/ttlworker/job.go +++ b/pkg/ttl/ttlworker/job.go @@ -20,10 +20,10 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ttl/cache" "github.com/pingcap/tidb/pkg/ttl/session" - "github.com/pingcap/tidb/pkg/util/logutil" - "go.uber.org/zap" + "github.com/pingcap/tidb/pkg/util/intest" ) const updateJobCurrentStatusTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_status = %? WHERE table_id = %? AND current_job_status = %? AND current_job_id = %?" @@ -126,7 +126,8 @@ type ttlJob struct { } // finish turns current job into last job, and update the error message and statistics summary -func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) { +func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) error { + intest.Assert(se.GetSessionVars().Location().String() == now.Location().String()) // at this time, the job.ctx may have been canceled (to cancel this job) // even when it's canceled, we'll need to update the states, so use another context err := se.RunInTxn(context.TODO(), func() error { @@ -148,10 +149,16 @@ func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary return errors.Wrapf(err, "execute sql: %s", sql) } - return nil - }, session.TxnModeOptimistic) + failpoint.Inject("ttl-finish", func(val failpoint.Value) { + if val.(bool) { + TTLJobFinishFailpointHook(&err) + } + }) + return err + }, session.TxnModePessimistic) - if err != nil { - logutil.BgLogger().Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id)) - } + return err } + +// TTLJobFinishFailpointHook is a failpoint hook for testing purpose +var TTLJobFinishFailpointHook func(err *error) diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index eb9a34e4ad568..9dd7518567d2e 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -550,8 +550,12 @@ j: if err != nil { logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) } + err = job.finish(se, se.Now(), summary) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) + continue + } m.removeJob(job) - job.finish(se, se.Now(), summary) } cancel() } @@ -565,10 +569,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { summary, err := summarizeErr(errors.New("ttl job is disabled")) if err != nil { - logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) + logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err)) + } + err = job.finish(se, now, summary) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) + continue } m.removeJob(job) - job.finish(se, now, summary) } } return @@ -585,10 +593,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { logutil.Logger(m.ctx).Info("cancel job because the table has been dropped or it's no longer TTL table", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID)) summary, err := summarizeErr(errors.New("TTL table has been removed or the TTL on this table has been stopped")) if err != nil { - logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) + logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err)) + } + err = job.finish(se, now, summary) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) + continue } m.removeJob(job) - job.finish(se, now, summary) } jobTables := m.readyForLockHBTimeoutJobTables(now) @@ -852,11 +864,14 @@ func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, no logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id)) summary, err := summarizeErr(errors.New("job is timeout")) if err != nil { - logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) + logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err)) + } + err = job.finish(se, now, summary) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err)) + continue } m.removeJob(job) - job.finish(se, now, summary) - continue } sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id) diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index a8b9a81ba10ff..8202cb01620ab 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" dbsession "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/testkit" timerapi "github.com/pingcap/tidb/pkg/timer/api" @@ -1242,3 +1243,153 @@ func TestManagerJobAdapterGetJob(t *testing.T) { tk.MustExec("delete from mysql.tidb_ttl_job_history") } } + +func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) { + // Finishing a `TTLJob` will remove all the `TTLTask` of the job, and at the same time + // the `task_manager` may update the owner of the task, which may cause a write conflict. + // This test is to simulate this scenario. + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, store) + se := sessionFactory() + + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + testTimes := 1000 + now := se.Now() + for i := 0; i < testTimes; i++ { + now = now.Add(time.Hour * 48) + + m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil) + require.NoError(t, m.InfoSchemaCache().Update(se)) + + se := sessionFactory() + job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + se := sessionFactory() + + m.TaskManager().LockScanTask(se, &cache.TTLTask{ + ScanID: 0, + JobID: job.ID(), + TableID: testTable.Meta().ID, + }, now) + }() + + go func() { + defer wg.Done() + se := sessionFactory() + + job.Finish(se, se.Now(), &ttlworker.TTLSummary{}) + }() + + wg.Wait() + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + } +} + +func TestFinishError(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, store) + se := sessionFactory() + + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + errCount := 5 + ttlworker.TTLJobFinishFailpointHook = func(err *error) { + errCount -= 1 + if errCount > 0 { + *err = errors.New("mock error") + } + } + failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish") + + now := se.Now() + + m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil) + require.NoError(t, m.InfoSchemaCache().Update(se)) + + initializeTest := func() { + errCount = 5 + now = now.Add(time.Hour * 48) + job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + task, err := m.TaskManager().LockScanTask(se, &cache.TTLTask{ + ScanID: 0, + JobID: job.ID(), + TableID: testTable.Meta().ID, + }, now) + require.NoError(t, err) + task.SetResult(nil) + err = m.TaskManager().ReportTaskFinished(se, now, task) + require.NoError(t, err) + tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("finished")) + } + + // Test the `CheckFinishedJob` can tolerate the `job.finish` error + initializeTest() + for i := 0; i < 4; i++ { + m.CheckFinishedJob(se) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.CheckFinishedJob(se) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + + // Test the `rescheduleJobs` can tolerate the `job.finish` error + // cancel job branch + initializeTest() + variable.EnableTTLJob.Store(false) + t.Cleanup(func() { + variable.EnableTTLJob.Store(true) + }) + for i := 0; i < 4; i++ { + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + variable.EnableTTLJob.Store(true) + // remove table branch + initializeTest() + tk.MustExec("drop table t") + require.NoError(t, m.InfoSchemaCache().Update(se)) + for i := 0; i < 4; i++ { + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.RescheduleJobs(se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) + tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR") + require.NoError(t, m.InfoSchemaCache().Update(se)) + testTable, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + // Teset the `updateHeartBeat` can tolerate the `job.finish` error + initializeTest() + for i := 0; i < 4; i++ { + // timeout is 6h + now = now.Add(time.Hour * 8) + m.UpdateHeartBeat(context.Background(), se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + } + m.UpdateHeartBeat(context.Background(), se, now) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) +} diff --git a/pkg/ttl/ttlworker/job_manager_test.go b/pkg/ttl/ttlworker/job_manager_test.go index 1fe7b48e476a3..ff88047a09c4e 100644 --- a/pkg/ttl/ttlworker/job_manager_test.go +++ b/pkg/ttl/ttlworker/job_manager_test.go @@ -195,8 +195,13 @@ func (m *JobManager) ReportMetrics(se session.Session) { m.reportMetrics(se) } -func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) { - j.finish(se, now, summary) +// CheckFinishedJob is an exported version of checkFinishedJob +func (m *JobManager) CheckFinishedJob(se session.Session) { + m.checkFinishedJob(se) +} + +func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) error { + return j.finish(se, now, summary) } func (j *ttlJob) ID() string { diff --git a/pkg/ttl/ttlworker/task_manager_test.go b/pkg/ttl/ttlworker/task_manager_test.go index cb6765f5dcea9..064c68f86ed40 100644 --- a/pkg/ttl/ttlworker/task_manager_test.go +++ b/pkg/ttl/ttlworker/task_manager_test.go @@ -77,6 +77,11 @@ func (m *taskManager) MeetTTLRunningTasks(count int, taskStatus cache.TaskStatus } // ReportTaskFinished is an exported version of reportTaskFinished +func (m *taskManager) ReportTaskFinished(se session.Session, now time.Time, task *runningScanTask) error { + return m.reportTaskFinished(se, now, task) +} + +// SetResult sets the result of the task func (t *runningScanTask) SetResult(err error) { t.result = t.ttlScanTask.result(err) }