Skip to content

Commit

Permalink
ttl: use a pessimistic transaction to finish the job (#56516) (#56967)
Browse files Browse the repository at this point in the history
close #56422
  • Loading branch information
ti-chi-bot authored Nov 4, 2024
1 parent 2f2bf22 commit 7917e4a
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"//pkg/util/timeutil",
Expand Down
23 changes: 15 additions & 8 deletions pkg/ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
"github.com/pingcap/tidb/pkg/util/intest"
)

const updateJobCurrentStatusTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_status = %? WHERE table_id = %? AND current_job_status = %? AND current_job_id = %?"
Expand Down Expand Up @@ -126,7 +126,8 @@ type ttlJob struct {
}

// finish turns current job into last job, and update the error message and statistics summary
func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) {
func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) error {
intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
err := se.RunInTxn(context.TODO(), func() error {
Expand All @@ -148,10 +149,16 @@ func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
}, session.TxnModeOptimistic)
failpoint.Inject("ttl-finish", func(val failpoint.Value) {
if val.(bool) {
TTLJobFinishFailpointHook(&err)
}
})
return err
}, session.TxnModePessimistic)

if err != nil {
logutil.BgLogger().Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
}
return err
}

// TTLJobFinishFailpointHook is a failpoint hook for testing purpose
var TTLJobFinishFailpointHook func(err *error)
31 changes: 23 additions & 8 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,12 @@ j:
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
}
err = job.finish(se, se.Now(), summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, se.Now(), summary)
}
cancel()
}
Expand All @@ -565,10 +569,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {

summary, err := summarizeErr(errors.New("ttl job is disabled"))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, now, summary)
}
}
return
Expand All @@ -585,10 +593,14 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
logutil.Logger(m.ctx).Info("cancel job because the table has been dropped or it's no longer TTL table", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
summary, err := summarizeErr(errors.New("TTL table has been removed or the TTL on this table has been stopped"))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, now, summary)
}

jobTables := m.readyForLockHBTimeoutJobTables(now)
Expand Down Expand Up @@ -852,11 +864,14 @@ func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, no
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
summary, err := summarizeErr(errors.New("job is timeout"))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
job.finish(se, now, summary)
continue
}

sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
Expand Down
151 changes: 151 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
dbsession "github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/testkit"
timerapi "github.com/pingcap/tidb/pkg/timer/api"
Expand Down Expand Up @@ -1242,3 +1243,153 @@ func TestManagerJobAdapterGetJob(t *testing.T) {
tk.MustExec("delete from mysql.tidb_ttl_job_history")
}
}

func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) {
// Finishing a `TTLJob` will remove all the `TTLTask` of the job, and at the same time
// the `task_manager` may update the owner of the task, which may cause a write conflict.
// This test is to simulate this scenario.
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

testTimes := 1000
now := se.Now()
for i := 0; i < testTimes; i++ {
now = now.Add(time.Hour * 48)

m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

se := sessionFactory()
job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
se := sessionFactory()

m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
}()

go func() {
defer wg.Done()
se := sessionFactory()

job.Finish(se, se.Now(), &ttlworker.TTLSummary{})
}()

wg.Wait()
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
}

func TestFinishError(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

errCount := 5
ttlworker.TTLJobFinishFailpointHook = func(err *error) {
errCount -= 1
if errCount > 0 {
*err = errors.New("mock error")
}
}
failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish", "return(true)")
defer failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/ttl-finish")

now := se.Now()

m := ttlworker.NewJobManager("test-ttl-job-manager", nil, store, nil, nil)
require.NoError(t, m.InfoSchemaCache().Update(se))

initializeTest := func() {
errCount = 5
now = now.Add(time.Hour * 48)
job, err := m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
task, err := m.TaskManager().LockScanTask(se, &cache.TTLTask{
ScanID: 0,
JobID: job.ID(),
TableID: testTable.Meta().ID,
}, now)
require.NoError(t, err)
task.SetResult(nil)
err = m.TaskManager().ReportTaskFinished(se, now, task)
require.NoError(t, err)
tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("finished"))
}

// Test the `CheckFinishedJob` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.CheckFinishedJob(se)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))

// Test the `rescheduleJobs` can tolerate the `job.finish` error
// cancel job branch
initializeTest()
variable.EnableTTLJob.Store(false)
t.Cleanup(func() {
variable.EnableTTLJob.Store(true)
})
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
variable.EnableTTLJob.Store(true)
// remove table branch
initializeTest()
tk.MustExec("drop table t")
require.NoError(t, m.InfoSchemaCache().Update(se))
for i := 0; i < 4; i++ {
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.RescheduleJobs(se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
require.NoError(t, m.InfoSchemaCache().Update(se))
testTable, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

// Teset the `updateHeartBeat` can tolerate the `job.finish` error
initializeTest()
for i := 0; i < 4; i++ {
// timeout is 6h
now = now.Add(time.Hour * 8)
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))
}
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
9 changes: 7 additions & 2 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,13 @@ func (m *JobManager) ReportMetrics(se session.Session) {
m.reportMetrics(se)
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) {
j.finish(se, now, summary)
// CheckFinishedJob is an exported version of checkFinishedJob
func (m *JobManager) CheckFinishedJob(se session.Session) {
m.checkFinishedJob(se)
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) error {
return j.finish(se, now, summary)
}

func (j *ttlJob) ID() string {
Expand Down
5 changes: 5 additions & 0 deletions pkg/ttl/ttlworker/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (m *taskManager) MeetTTLRunningTasks(count int, taskStatus cache.TaskStatus
}

// ReportTaskFinished is an exported version of reportTaskFinished
func (m *taskManager) ReportTaskFinished(se session.Session, now time.Time, task *runningScanTask) error {
return m.reportTaskFinished(se, now, task)
}

// SetResult sets the result of the task
func (t *runningScanTask) SetResult(err error) {
t.result = t.ttlScanTask.result(err)
}
Expand Down

0 comments on commit 7917e4a

Please sign in to comment.