Skip to content

Commit

Permalink
ttl: fix the issue that TTL job may hang some time when shrink the de…
Browse files Browse the repository at this point in the history
…lete worker count (#55572) (#55614)

close #55561
  • Loading branch information
ti-chi-bot authored Aug 27, 2024
1 parent a7df4f9 commit b22d1ea
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 74 deletions.
20 changes: 20 additions & 0 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
tracer.EnterPhase(metrics.PhaseOther)

leftRows := t.rows
defer func() {
if len(leftRows) > 0 {
t.statistics.IncErrorRows(len(leftRows))
}
}()

se := newTableSession(rawSe, t.tbl, t.expire)
for len(leftRows) > 0 {
maxBatch := variable.TTLDeleteBatchSize.Load()
Expand Down Expand Up @@ -207,6 +213,18 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim
return b.retryInterval
}

// Drain drains a retry buffer.
func (b *ttlDelRetryBuffer) Drain() {
for ele := b.list.Front(); ele != nil; ele = ele.Next() {
if item, ok := ele.Value.(*ttlDelRetryItem); ok {
item.task.statistics.IncErrorRows(len(item.task.rows))
} else {
logutil.BgLogger().Error(fmt.Sprintf("invalid retry buffer item type: %T", ele))
}
}
b.list = list.New()
}

func (b *ttlDelRetryBuffer) recordRetryItem(task *ttlDeleteTask, retryRows [][]types.Datum, retryCnt int) bool {
if len(retryRows) == 0 {
return false
Expand Down Expand Up @@ -276,6 +294,8 @@ func (w *ttlDeleteWorker) loop() error {
timer := time.NewTimer(w.retryBuffer.retryInterval)
defer timer.Stop()

// drain retry buffer to make sure the statistics are correct
defer w.retryBuffer.Drain()
for w.Status() == workerStatusRunning {
tracer.EnterPhase(metrics.PhaseIdle)
select {
Expand Down
233 changes: 169 additions & 64 deletions pkg/ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ package ttlworker
import (
"context"
"errors"
"fmt"
"math"
"slices"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -163,48 +167,56 @@ func TestTTLDelRetryBuffer(t *testing.T) {

// test task should be immutable
require.Equal(t, 10, len(task5.rows))

// test drain
require.Equal(t, 0, buffer.Len())
task6, rows6, statics6 := createTask("t6")
buffer.RecordTaskResult(task6, rows6[:7])
require.Equal(t, 1, buffer.Len())
require.Equal(t, uint64(0), statics6.SuccessRows.Load())
require.Equal(t, uint64(0), statics6.ErrorRows.Load())
buffer.Drain()
require.Equal(t, 0, buffer.Len())
require.Equal(t, uint64(0), statics6.SuccessRows.Load())
require.Equal(t, uint64(7), statics6.ErrorRows.Load())
}

func TestTTLDeleteTaskDoDelete(t *testing.T) {
origBatchSize := variable.TTLDeleteBatchSize.Load()
variable.TTLDeleteBatchSize.Store(3)
delBatch := 3
variable.TTLDeleteBatchSize.Store(int64(delBatch))
defer variable.TTLDeleteBatchSize.Store(origBatchSize)

t1 := newMockTTLTbl(t, "t1")
t2 := newMockTTLTbl(t, "t2")
t3 := newMockTTLTbl(t, "t3")
t4 := newMockTTLTbl(t, "t4")
s := newMockSession(t)
invokes := 0
var sqls []string
var retryErrBatches []int
var nonRetryBatches []int
var afterExecuteSQL func()
s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) {
invokes++
s.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo)
if strings.Contains(sql, "`t1`") {
return nil, nil
s.sessionInfoSchema = newMockInfoSchema(t1.TableInfo)
sqls = append(sqls, sql)

if !strings.Contains(sql, "`t1`") {
require.FailNow(t, "")
}

if strings.Contains(sql, "`t2`") {
defer func() {
if afterExecuteSQL != nil {
afterExecuteSQL()
}
}()

if slices.Contains(retryErrBatches, len(sqls)-1) {
return nil, errors.New("mockErr")
}

if strings.Contains(sql, "`t3`") {
if slices.Contains(nonRetryBatches, len(sqls)-1) {
// set an infoschema that contains no table to make an error that cannot retry
s.sessionInfoSchema = newMockInfoSchema()
return nil, nil
}

if strings.Contains(sql, "`t4`") {
switch invokes {
case 1:
return nil, nil
case 2, 4:
return nil, errors.New("mockErr")
case 3:
s.sessionInfoSchema = newMockInfoSchema()
return nil, nil
}
}

require.FailNow(t, "")
return nil, nil
}

Expand All @@ -218,63 +230,117 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
return rows
}

delTask := func(t *cache.PhysicalTable) *ttlDeleteTask {
delTask := func(batchCnt int) *ttlDeleteTask {
task := &ttlDeleteTask{
tbl: t,
tbl: t1,
expire: time.UnixMilli(0),
rows: nRows(10),
rows: nRows(batchCnt * delBatch),
statistics: &ttlStatistics{},
}
task.statistics.TotalRows.Add(10)
task.statistics.TotalRows.Add(uint64(batchCnt * delBatch))
return task
}

cases := []struct {
task *ttlDeleteTask
retryRows []int
successRows int
errorRows int
batchCnt int
retryErrBatches []int
noRetryErrBatches []int
cancelCtx bool
cancelCtxBatch int
}{
{
task: delTask(t1),
retryRows: nil,
successRows: 10,
errorRows: 0,
// all success
batchCnt: 10,
},
{
task: delTask(t2),
retryRows: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
successRows: 0,
errorRows: 0,
// all retries
batchCnt: 10,
retryErrBatches: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
},
{
task: delTask(t3),
retryRows: nil,
successRows: 0,
errorRows: 10,
// all errors without retry
batchCnt: 10,
noRetryErrBatches: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
},
{
task: delTask(t4),
retryRows: []int{3, 4, 5, 9},
successRows: 3,
errorRows: 3,
// some retries and some not
batchCnt: 10,
noRetryErrBatches: []int{3, 8, 9},
retryErrBatches: []int{1, 2, 4},
},
{
// some retries and some not
batchCnt: 10,
noRetryErrBatches: []int{3, 8, 9},
retryErrBatches: []int{1, 2, 4},
cancelCtx: true,
cancelCtxBatch: 6,
},
}

for _, c := range cases {
invokes = 0
retryRows := c.task.doDelete(context.Background(), s)
require.Equal(t, 4, invokes)
if c.retryRows == nil {
require.Nil(t, retryRows)
ctx, cancel := context.WithCancel(context.Background())
if c.cancelCtx && c.cancelCtxBatch == 0 {
cancel()
}
require.Equal(t, len(c.retryRows), len(retryRows))
for i, row := range retryRows {
require.Equal(t, int64(c.retryRows[i]), row[0].GetInt64())

afterExecuteSQL = func() {
if c.cancelCtx {
if len(sqls) == c.cancelCtxBatch {
cancel()
}
}
}

task := delTask(c.batchCnt)
require.Equal(t, len(task.rows), c.batchCnt*delBatch)
sqls = make([]string, 0, c.batchCnt)
retryErrBatches = c.retryErrBatches
nonRetryBatches = c.noRetryErrBatches
retryRows := task.doDelete(ctx, s)
realBatchCnt := c.batchCnt
if c.cancelCtx {
realBatchCnt = c.cancelCtxBatch
}
require.Equal(t, uint64(10), c.task.statistics.TotalRows.Load())
require.Equal(t, uint64(c.successRows), c.task.statistics.SuccessRows.Load())
require.Equal(t, uint64(c.errorRows), c.task.statistics.ErrorRows.Load())
require.LessOrEqual(t, realBatchCnt, c.batchCnt)

// check SQLs
require.Equal(t, realBatchCnt, len(sqls))
expectedSQLs := make([]string, 0, len(sqls))
for i := 0; i < realBatchCnt; i++ {
batch := task.rows[i*delBatch : (i+1)*delBatch]
idList := make([]string, 0, delBatch)
for _, row := range batch {
idList = append(idList, strconv.FormatInt(row[0].GetInt64(), 10))
}
sql := fmt.Sprintf("DELETE LOW_PRIORITY FROM `test`.`t1` "+
"WHERE `_tidb_rowid` IN (%s) AND `time` < FROM_UNIXTIME(0) LIMIT %d",
strings.Join(idList, ", "),
delBatch,
)
expectedSQLs = append(expectedSQLs, sql)
}
require.Equal(t, strings.Join(expectedSQLs, "\n"), strings.Join(sqls, "\n"))

// check retry rows
var expectedRetryRows [][]types.Datum
for i := 0; i < realBatchCnt; i++ {
if slices.Contains(c.retryErrBatches, i) {
expectedRetryRows = append(expectedRetryRows, task.rows[i*delBatch:(i+1)*delBatch]...)
}
}
require.Equal(t, expectedRetryRows, retryRows)

// check statistics
var expectedErrRows uint64
for i := 0; i < c.batchCnt; i++ {
if i >= realBatchCnt || slices.Contains(c.noRetryErrBatches, i) {
expectedErrRows += uint64(delBatch)
}
}
expectedSuccessRows := uint64(len(task.rows)) - expectedErrRows - uint64(len(expectedRetryRows))
require.Equal(t, expectedSuccessRows, task.statistics.SuccessRows.Load())
require.Equal(t, expectedErrRows, task.statistics.ErrorRows.Load())
}
}

Expand Down Expand Up @@ -317,37 +383,57 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
t1 := newMockTTLTbl(t, "t1")
t2 := newMockTTLTbl(t, "t2")
t3 := newMockTTLTbl(t, "t3")
t4 := newMockTTLTbl(t, "t4")
s := newMockSession(t)
pool := newMockSessionPool(t)
pool.se = s

sqlMap := make(map[string]struct{})
sqlMap := make(map[string]int)
t3Retried := make(chan struct{})
t4Retried := make(chan struct{})
s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) {
pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo)
pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo)
if strings.Contains(sql, "`t1`") {
// success
return nil, nil
}

if strings.Contains(sql, "`t2`") {
// first error, retry success
if _, ok := sqlMap[sql]; ok {
close(t3Retried)
return nil, nil
}
sqlMap[sql] = struct{}{}
sqlMap[sql] = 1
return nil, errors.New("mockErr")
}

if strings.Contains(sql, "`t3`") {
// error no retry
pool.lastSession.sessionInfoSchema = newMockInfoSchema()
return nil, nil
}

if strings.Contains(sql, "`t4`") {
// error and retry still error
// this is to test the retry buffer should be drained after the delete worker stopped
i := sqlMap[sql]
if i >= 2 {
// i >= 2 means t4 has retried once and records in retry buffer
close(t4Retried)
}
sqlMap[sql] = i + 1
return nil, errors.New("mockErr")
}

require.FailNow(t, "")
return nil, nil
}

delCh := make(chan *ttlDeleteTask)
w := newDeleteWorker(delCh, pool)
w.retryBuffer.retryInterval = time.Millisecond
w.retryBuffer.maxRetry = math.MaxInt
require.Equal(t, workerStatusCreated, w.Status())
w.Start()
require.Equal(t, workerStatusRunning, w.Status())
Expand All @@ -357,7 +443,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
}()

tasks := make([]*ttlDeleteTask, 0)
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3} {
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4} {
task := &ttlDeleteTask{
tbl: tbl,
expire: time.UnixMilli(0),
Expand All @@ -377,7 +463,23 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
}
}

time.Sleep(time.Millisecond * 100)
select {
case <-t3Retried:
case <-time.After(time.Second):
require.FailNow(t, "")
}

select {
case <-t4Retried:
case <-time.After(time.Second):
require.FailNow(t, "")
}

// before stop, t4 should always retry without any error rows
require.Equal(t, uint64(0), tasks[3].statistics.ErrorRows.Load())
w.Stop()
require.NoError(t, w.WaitStopped(context.Background(), 10*time.Second))

require.Equal(t, uint64(3), tasks[0].statistics.SuccessRows.Load())
require.Equal(t, uint64(0), tasks[0].statistics.ErrorRows.Load())

Expand All @@ -386,4 +488,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {

require.Equal(t, uint64(0), tasks[2].statistics.SuccessRows.Load())
require.Equal(t, uint64(3), tasks[2].statistics.ErrorRows.Load())

require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load())
require.Equal(t, uint64(3), tasks[3].statistics.ErrorRows.Load())
}
Loading

0 comments on commit b22d1ea

Please sign in to comment.