diff --git a/drainer/schema.go b/drainer/schema.go index b40e8921a..09a985ea5 100644 --- a/drainer/schema.go +++ b/drainer/schema.go @@ -134,6 +134,7 @@ func (s *Schema) CreateSchema(db *model.DBInfo) error { s.schemas[db.ID] = db s.schemaNameToID[db.Name.O] = db.ID + log.Debugf("create schema %s, schema id %d", db.Name.O, db.ID) return nil } @@ -150,6 +151,8 @@ func (s *Schema) DropTable(id int64) (string, error) { delete(s.tables, id) delete(s.tableIDToName, id) + + log.Debugf("drop table %s, table id %d", table.Name.O, id) return table.Name.O, nil } @@ -168,6 +171,7 @@ func (s *Schema) CreateTable(schema *model.DBInfo, table *model.TableInfo) error s.tables[table.ID] = table s.tableIDToName[table.ID] = TableName{Schema: schema.Name.O, Table: table.Name.O} + log.Debugf("create table %s.%s, table id %d", schema.Name.O, table.Name.O, table.ID) return nil } @@ -213,11 +217,13 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error { var i int for i = 0; i < len(s.jobs); i++ { if skipJob(s.jobs[i]) { + log.Debugf("skip ddl job %v", s.jobs[i]) continue } if s.jobs[i].BinlogInfo.SchemaVersion <= version { if s.jobs[i].BinlogInfo.SchemaVersion <= s.currentVersion { + log.Warnf("ddl job %v schema version is less than current version %d, skip this ddl job", s.jobs[i], s.currentVersion) continue } @@ -230,7 +236,7 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error { _, _, _, err = s.handleDDL(s.jobs[i]) if err != nil { - return errors.Trace(err) + return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s) } } else { break @@ -252,7 +258,6 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { return "", "", "", nil } - // log.Infof("ddl query %s", job.Query) sql := job.Query if sql == "" { return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job) @@ -370,7 +375,6 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { return schema.Name.O, table.Name.O, sql, nil default: - log.Infof("get unknown ddl type %v", job.Type) binlogInfo := job.BinlogInfo if binlogInfo == nil { return "", "", "", errors.NotFoundf("table %d", job.TableID) diff --git a/drainer/util.go b/drainer/util.go index d946b154d..b60d0a3ca 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -7,6 +7,7 @@ import ( "net/url" "os" "path" + "sort" "strings" "time" @@ -101,6 +102,11 @@ func loadHistoryDDLJobs(tiStore kv.Storage) ([]*model.Job, error) { if err != nil { return nil, errors.Trace(err) } + + // jobs from GetAllHistoryDDLJobs are sorted by job id, need sorted by schema version + sorter := &jobsSorter{jobs: jobs} + sort.Sort(sorter) + return jobs, nil } @@ -179,3 +185,20 @@ func filterIgnoreSchema(schema *model.DBInfo, ignoreSchemaNames map[string]struc _, ok := ignoreSchemaNames[schema.Name.L] return ok } + +// jobsSorter implements the sort.Interface interface. +type jobsSorter struct { + jobs []*model.Job +} + +func (s *jobsSorter) Swap(i, j int) { + s.jobs[i], s.jobs[j] = s.jobs[j], s.jobs[i] +} + +func (s *jobsSorter) Len() int { + return len(s.jobs) +} + +func (s *jobsSorter) Less(i, j int) bool { + return s.jobs[i].BinlogInfo.SchemaVersion < s.jobs[j].BinlogInfo.SchemaVersion +} diff --git a/tests/_utils/check_status b/tests/_utils/check_status index 4af30eccf..2f59e3cbb 100755 --- a/tests/_utils/check_status +++ b/tests/_utils/check_status @@ -15,9 +15,9 @@ do count=`grep -c "$2" $STATUS_LOG` || true if [ $i -eq 1 ]; then - max_commit_ts_old=`cat $STATUS_LOG | sed 's/.*MaxCommitTS:\([0-9]*\) .*/\1/g'` + max_commit_ts_old=`cat $STATUS_LOG | sed 's/.*MaxCommitTS: \([0-9]*\), .*/\1/g'` else - max_commit_ts_new=`cat $STATUS_LOG | sed 's/.*MaxCommitTS:\([0-9]*\) .*/\1/g'` + max_commit_ts_new=`cat $STATUS_LOG | sed 's/.*MaxCommitTS: \([0-9]*\), .*/\1/g'` fi # if status is online, will check the max commit ts, the new max commit ts should greater than the old one. diff --git a/tests/status/run.sh b/tests/status/run.sh index 49fb7c511..6f1dac85b 100755 --- a/tests/status/run.sh +++ b/tests/status/run.sh @@ -12,7 +12,7 @@ run_drainer & echo "check drainer's status, should be online" check_status drainers online -drainerNodeID=`cat $STATUS_LOG | sed 's/.*NodeID:\([a-zA-Z0-9\-]*:[0-9]*\) .*/\1/g'` +drainerNodeID=`cat $STATUS_LOG | sed 's/.*NodeID: \([a-zA-Z0-9\-]*:[0-9]*\),.*/\1/g'` # pump's state should be online echo "check pump's status, should be online"