Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: consider paused job when check runnable #54419

Merged
merged 4 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ type Callback interface {
// OnWatched is called after watching owner is completed.
OnWatched(ctx context.Context)
// OnGetJobBefore is called before getting job.
OnGetJobBefore(jobType string)
OnGetJobBefore()
// OnGetJobAfter is called after getting job.
OnGetJobAfter(jobType string, job *model.Job)
OnGetJobAfter(job *model.Job)
}

// BaseCallback implements Callback.OnChanged interface.
Expand Down Expand Up @@ -100,12 +100,12 @@ func (*BaseCallback) OnWatched(_ context.Context) {
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (*BaseCallback) OnGetJobBefore(_ string) {
func (*BaseCallback) OnGetJobBefore() {
// Nothing to do.
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (*BaseCallback) OnGetJobAfter(_ string, _ *model.Job) {
func (*BaseCallback) OnGetJobAfter(_ *model.Job) {
// Nothing to do.
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
failpoint.InjectCall("afterModifyColumnStateDeleteOnly", job.ID)
case model.StateWriteOnly:
// write only -> reorganization
updateChangingObjState(changingCol, changingIdxs, model.StateWriteReorganization)
Expand Down
16 changes: 16 additions & 0 deletions pkg/ddl/ddl_running_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,23 @@ func (j *runningJobs) addRunning(jobID int64, involves []model.InvolvingSchemaIn
}
}

func (j *runningJobs) finishOrPendJob(jobID int64, involves []model.InvolvingSchemaInfo, moveToPending bool) {
j.mu.Lock()
defer j.mu.Unlock()
j.removeRunningWithoutLock(jobID, involves)
if moveToPending {
j.addPendingWithoutLock(involves)
}
}

// removeRunning can be concurrently called with add and checkRunnable.
func (j *runningJobs) removeRunning(jobID int64, involves []model.InvolvingSchemaInfo) {
j.mu.Lock()
defer j.mu.Unlock()
j.removeRunningWithoutLock(jobID, involves)
}

func (j *runningJobs) removeRunningWithoutLock(jobID int64, involves []model.InvolvingSchemaInfo) {
if intest.InTest {
if _, ok := j.ids[jobID]; !ok {
panic(fmt.Sprintf("job %d is not running", jobID))
Expand Down Expand Up @@ -296,6 +308,10 @@ func (j *runningJobs) addPending(involves []model.InvolvingSchemaInfo) {
j.mu.Lock()
defer j.mu.Unlock()

j.addPendingWithoutLock(involves)
}

func (j *runningJobs) addPendingWithoutLock(involves []model.InvolvingSchemaInfo) {
for _, info := range involves {
if info.Database != model.InvolvingNone {
if _, ok := j.pending.schemas[info.Database]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestParallelDDL(t *testing.T) {
}

once1 := sync.Once{}
tc.OnGetJobBeforeExported = func(string) {
tc.OnGetJobBeforeExported = func() {
once1.Do(func() {
for {
tk := testkit.NewTestKit(t, store)
Expand Down
75 changes: 40 additions & 35 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,51 +185,47 @@ func (s *jobScheduler) close() {
}

// getJob reads tidb_ddl_job and returns the first runnable DDL job.
func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error) {
func (s *jobScheduler) getJob(se *sess.Session) (*model.Job, bool, error) {
defer s.runningJobs.resetAllPending()

not := "not"
label := "get_job_general"
if tp == jobTypeReorg {
not = ""
label = "get_job_reorg"
}
const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in
const getJobSQL = `select job_meta, processing, reorg from mysql.tidb_ddl_job where job_id in
(select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing)
and %s reorg %s order by processing desc, job_id`
%s order by processing desc, job_id`
var excludedJobIDs string
if ids := s.runningJobs.allIDs(); len(ids) > 0 {
excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids)
}
sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs)
rows, err := se.Execute(context.Background(), sql, label)
sql := fmt.Sprintf(getJobSQL, excludedJobIDs)
rows, err := se.Execute(context.Background(), sql, "get_job")
if err != nil {
return nil, errors.Trace(err)
return nil, false, errors.Trace(err)
}
for _, row := range rows {
jobBinary := row.GetBytes(0)
isJobProcessing := row.GetInt64(1) == 1
isReorg := row.GetInt64(2) != 0

job := model.Job{}
err = job.Decode(jobBinary)
if err != nil {
return nil, errors.Trace(err)
return nil, isReorg, errors.Trace(err)
}

involving := job.GetInvolvingSchemaInfo()
isRunnable, err := s.processJobDuringUpgrade(se, &job)
if err != nil {
return nil, errors.Trace(err)
return nil, isReorg, errors.Trace(err)
}
if !isRunnable {
s.runningJobs.addPending(involving)
continue
}

// The job has already been picked up, just return to continue it.
if isJobProcessing {
return &job, nil
return &job, isReorg, nil
}

involving := job.GetInvolvingSchemaInfo()
if !s.runningJobs.checkRunnable(job.ID, involving) {
s.runningJobs.addPending(involving)
continue
Expand All @@ -241,11 +237,11 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error)
zap.Error(err),
zap.Stringer("job", &job))
s.runningJobs.addPending(involving)
return nil, errors.Trace(err)
return nil, isReorg, errors.Trace(err)
}
return &job, nil
return &job, isReorg, nil
}
return nil, nil
return nil, false, nil
}

func hasSysDB(job *model.Job) bool {
Expand Down Expand Up @@ -394,8 +390,7 @@ func (s *jobScheduler) startDispatch() error {
continue
}
failpoint.InjectCall("beforeAllLoadDDLJobAndRun")
s.loadDDLJobAndRun(se, s.generalDDLWorkerPool, jobTypeGeneral)
s.loadDDLJobAndRun(se, s.reorgWorkerPool, jobTypeReorg)
s.loadDDLJobAndRun(se)
}
}

Expand Down Expand Up @@ -436,30 +431,32 @@ func (s *jobScheduler) checkAndUpdateClusterState(needUpdate bool) error {
return nil
}

func (s *jobScheduler) loadDDLJobAndRun(se *sess.Session, pool *workerPool, tp jobType) {
wk, err := pool.get()
if err != nil || wk == nil {
logutil.DDLLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err))
return
}

func (s *jobScheduler) loadDDLJobAndRun(se *sess.Session) {
s.mu.RLock()
s.mu.hook.OnGetJobBefore(pool.tp().String())
s.mu.hook.OnGetJobBefore()
s.mu.RUnlock()

startTime := time.Now()
job, err := s.getJob(se, tp)
job, isReorg, err := s.getJob(se)
if job == nil || err != nil {
if err != nil {
wk.jobLogger(job).Warn("get job met error", zap.Duration("take time", time.Since(startTime)), zap.Error(err))
logutil.DDLLogger().Warn("get job met error", zap.Duration("take time", time.Since(startTime)), zap.Error(err))
}
pool.put(wk)
return
}
s.mu.RLock()
s.mu.hook.OnGetJobAfter(pool.tp().String(), job)
s.mu.hook.OnGetJobAfter(job)
s.mu.RUnlock()

pool := s.generalDDLWorkerPool
if isReorg {
pool = s.reorgWorkerPool
}
wk, err := pool.get()
if err != nil || wk == nil {
logutil.DDLLogger().Debug(fmt.Sprintf("[ddl] no %v worker available now", pool.tp()), zap.Error(err))
return
}
s.delivery2Worker(wk, pool, job)
}

Expand Down Expand Up @@ -526,10 +523,18 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.
jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo()
s.runningJobs.addRunning(jobID, involvedSchemaInfos)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc()
s.wg.RunWithLog(func() {
s.wg.Run(func() {
defer func() {
r := recover()
if r != nil {
logutil.DDLLogger().Error("panic in delivery2Worker", zap.Any("recover", r), zap.Stack("stack"))
}
failpoint.InjectCall("afterDelivery2Worker", job)
s.runningJobs.removeRunning(jobID, involvedSchemaInfos)
// Because there is a gap between `allIDs()` and `checkRunnable()`,
// we append unfinished job to pending atomically to prevent `getJob()`
// chosing another runnable job that involves the same schema object.
moveRunningJobsToPending := r != nil || (job != nil && !job.IsFinished())
s.runningJobs.finishOrPendJob(jobID, involvedSchemaInfos, moveRunningJobsToPending)
asyncNotify(s.ddlJobNotifyCh)
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
pool.put(wk)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDDLScheduling(t *testing.T) {
var wg util.WaitGroupWrapper
wg.Add(1)
var once sync.Once
hook.OnGetJobBeforeExported = func(jobType string) {
hook.OnGetJobBeforeExported = func() {
once.Do(func() {
for i, job := range ddlJobs {
wg.Run(func() {
Expand All @@ -91,7 +91,7 @@ func TestDDLScheduling(t *testing.T) {
}

record := make([]int64, 0, 16)
hook.OnGetJobAfterExported = func(jobType string, job *model.Job) {
hook.OnGetJobAfterExported = func(job *model.Job) {
// record the job schedule order
record = append(record, job.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/tests/adminpause/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ go_test(
],
embed = [":adminpause"],
flaky = True,
shard_count = 14,
shard_count = 15,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
61 changes: 60 additions & 1 deletion pkg/ddl/tests/adminpause/pause_resume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
testddlutil "github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
Expand Down Expand Up @@ -123,7 +125,7 @@ func pauseResumeAndCancel(t *testing.T, stmtKit *testkit.TestKit, adminCommandKi
var isCancelled = false
var cancelResult []sqlexec.RecordSet
var cancelErr error
var cancelFunc = func(jobType string) {
var cancelFunc = func() {
adminCommandMutex.Lock()
defer adminCommandMutex.Unlock()
if isPaused && isResumed && !isCancelled {
Expand Down Expand Up @@ -354,3 +356,60 @@ func TestPauseResumeCancelAndRerunPartitionTableStmt(t *testing.T) {

Logger.Info("TestPauseResumeCancelAndRerunPartitionTableStmt: all cases finished.")
}

func TestPauseJobDependency(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1);")

afterPause := make(chan struct{})
afterAddCol := make(chan struct{})
startAddCol := make(chan struct{})
var (
modifyJobID int64
errModCol error
errAddCol error
)
once := sync.Once{}
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterModifyColumnStateDeleteOnly", func(jobID int64) {
once.Do(func() {
modifyJobID = jobID
tk2.MustExec(fmt.Sprintf("admin pause ddl jobs %d", jobID))
afterPause <- struct{}{}
})
})
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
// Will stuck because the job is paused.
errModCol = tk.ExecToErr("alter table t modify column b tinyint;")
}()
go func() {
defer wg.Done()
<-afterPause
// This should be blocked because they handle the same table.
startAddCol <- struct{}{}
errAddCol = tk2.ExecToErr("alter table t add column c int;")
afterAddCol <- struct{}{}
}()
<-startAddCol
select {
case <-afterAddCol:
t.Logf("add column DDL on same table should be blocked")
t.FailNow()
case <-time.After(3 * time.Second):
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use test")
tk3.MustExec(fmt.Sprintf("admin resume ddl jobs %d", modifyJobID))
<-afterAddCol
}
wg.Wait()
require.NoError(t, errModCol)
require.NoError(t, errAddCol)
}
16 changes: 8 additions & 8 deletions pkg/ddl/util/callback/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type TestDDLCallback struct {
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
OnGetJobBeforeExported func()
OnGetJobAfterExported func(*model.Job)
OnJobSchemaStateChanged func(int64)

OnUpdateReorgInfoExported func(job *model.Job, pid int64)
Expand Down Expand Up @@ -146,21 +146,21 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
func (tc *TestDDLCallback) OnGetJobBefore() {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
tc.OnGetJobBeforeExported()
return
}
tc.BaseCallback.OnGetJobBefore(jobType)
tc.BaseCallback.OnGetJobBefore()
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
func (tc *TestDDLCallback) OnGetJobAfter(job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
tc.OnGetJobAfterExported(job)
return
}
tc.BaseCallback.OnGetJobAfter(jobType, job)
tc.BaseCallback.OnGetJobAfter(job)
}

// Clone copies the callback and take its reference
Expand Down