Skip to content

Commit

Permalink
ddl: fix unstable test TestCreateDropCreateTable (#50076)
Browse files Browse the repository at this point in the history
close #50061
  • Loading branch information
tangenta authored Mar 7, 2024
1 parent 54cca35 commit bc84197
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 36 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
d.runningJobs = newRunningJobs()
})

return nil
Expand Down
92 changes: 56 additions & 36 deletions pkg/ddl/ddl_running_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,57 +28,96 @@ import (

type runningJobs struct {
sync.RWMutex
ids map[int64]struct{}
runningSchema map[string]map[string]struct{} // database -> table -> struct{}
runningJobIDs string
// processingIDs records the IDs of the jobs that are being processed by a worker.
processingIDs map[int64]struct{}
processingIDsStr string

// unfinishedIDs records the IDs of the jobs that are not finished yet.
// It is not necessarily being processed by a worker.
unfinishedIDs map[int64]struct{}
unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{}
}

func newRunningJobs() *runningJobs {
return &runningJobs{
ids: make(map[int64]struct{}),
runningSchema: make(map[string]map[string]struct{}),
processingIDs: make(map[int64]struct{}),
unfinishedSchema: make(map[string]map[string]struct{}),
unfinishedIDs: make(map[int64]struct{}),
}
}

func (j *runningJobs) add(job *model.Job) {
j.Lock()
defer j.Unlock()
j.ids[job.ID] = struct{}{}
j.processingIDs[job.ID] = struct{}{}
j.updateInternalRunningJobIDs()

if _, ok := j.unfinishedIDs[job.ID]; ok {
// Already exists, no need to add it again.
return
}
j.unfinishedIDs[job.ID] = struct{}{}
for _, info := range job.GetInvolvingSchemaInfo() {
if _, ok := j.runningSchema[info.Database]; !ok {
j.runningSchema[info.Database] = make(map[string]struct{})
if _, ok := j.unfinishedSchema[info.Database]; !ok {
j.unfinishedSchema[info.Database] = make(map[string]struct{})
}
j.runningSchema[info.Database][info.Table] = struct{}{}
j.unfinishedSchema[info.Database][info.Table] = struct{}{}
}
}

func (j *runningJobs) remove(job *model.Job) {
j.Lock()
defer j.Unlock()
delete(j.ids, job.ID)
delete(j.processingIDs, job.ID)
j.updateInternalRunningJobIDs()
for _, info := range job.GetInvolvingSchemaInfo() {
if db, ok := j.runningSchema[info.Database]; ok {
delete(db, info.Table)

if job.IsFinished() || job.IsSynced() {
delete(j.unfinishedIDs, job.ID)
for _, info := range job.GetInvolvingSchemaInfo() {
if db, ok := j.unfinishedSchema[info.Database]; ok {
delete(db, info.Table)
}
if len(j.unfinishedSchema[info.Database]) == 0 {
delete(j.unfinishedSchema, info.Database)
}
}
if len(j.runningSchema[info.Database]) == 0 {
delete(j.runningSchema, info.Database)
}
}

func (j *runningJobs) allIDs() string {
j.RLock()
defer j.RUnlock()
return j.processingIDsStr
}

func (j *runningJobs) updateInternalRunningJobIDs() {
var sb strings.Builder
i := 0
for id := range j.processingIDs {
sb.WriteString(strconv.Itoa(int(id)))
if i != len(j.processingIDs)-1 {
sb.WriteString(",")
}
i++
}
j.processingIDsStr = sb.String()
}

func (j *runningJobs) checkRunnable(job *model.Job) bool {
j.RLock()
defer j.RUnlock()
if _, ok := j.processingIDs[job.ID]; ok {
// Already processing by a worker. Skip running it again.
return false
}
for _, info := range job.GetInvolvingSchemaInfo() {
if _, ok := j.runningSchema[model.InvolvingAll]; ok {
if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok {
return false
}
if info.Database == model.InvolvingNone {
continue
}
if tbls, ok := j.runningSchema[info.Database]; ok {
if tbls, ok := j.unfinishedSchema[info.Database]; ok {
if _, ok := tbls[model.InvolvingAll]; ok {
return false
}
Expand All @@ -92,22 +131,3 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
}
return true
}

func (j *runningJobs) allIDs() string {
j.RLock()
defer j.RUnlock()
return j.runningJobIDs
}

func (j *runningJobs) updateInternalRunningJobIDs() {
var sb strings.Builder
i := 0
for id := range j.ids {
sb.WriteString(strconv.Itoa(int(id)))
if i != len(j.ids)-1 {
sb.WriteString(",")
}
i++
}
j.runningJobIDs = sb.String()
}
3 changes: 3 additions & 0 deletions pkg/ddl/ddl_running_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,19 @@ func TestRunningJobs(t *testing.T) {
runnable = j.checkRunnable(mkJob(0, "db100.t100"))
require.False(t, runnable)

job5.State = model.JobStateDone
j.remove(job5)
require.Equal(t, "1,2,3,4", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db100.t100"))
require.True(t, runnable)

job3.State = model.JobStateDone
j.remove(job3)
require.Equal(t, "1,2,4", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db1.t100"))
require.True(t, runnable)

job1.State = model.JobStateDone
j.remove(job1)
require.Equal(t, "2,4", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db1.t1"))
Expand Down

0 comments on commit bc84197

Please sign in to comment.