Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer(dm): track skipped ddls when using dmctl binlog to skip some ddls (#4178) #4225

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1726,7 +1726,28 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
// revert currentLocation to startLocation
currentLocation = startLocation
} else if op == pb.ErrorOp_Skip {
ec := eventContext{
tctx: tctx,
header: e.Header,
startLocation: &startLocation,
currentLocation: &currentLocation,
lastLocation: &lastLocation,
}
var sourceTbls map[string]map[string]struct{}
sourceTbls, err = s.trackOriginDDL(ev, ec)
if err != nil {
tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", ev.Query))
}

s.saveGlobalPoint(currentLocation)
for sourceSchema, tableMap := range sourceTbls {
if sourceSchema == "" {
continue
}
for sourceTable := range tableMap {
s.saveTablePoint(&filter.Table{Schema: sourceSchema, Name: sourceTable}, currentLocation)
}
}
err = s.flushJobs()
if err != nil {
tctx.L().Warn("failed to flush jobs when handle-error skip", zap.Error(err))
Expand Down Expand Up @@ -2774,6 +2795,70 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex
return nil
}

func (s *Syncer) trackOriginDDL(ev *replication.QueryEvent, ec eventContext) (map[string]map[string]struct{}, error) {
originSQL := strings.TrimSpace(string(ev.Query))
if originSQL == "BEGIN" || originSQL == "" || utils.IsBuildInSkipDDL(originSQL) {
return nil, nil
}
var err error
qec := &queryEventContext{
eventContext: &ec,
ddlSchema: string(ev.Schema),
originSQL: utils.TrimCtrlChars(originSQL),
splitDDLs: make([]string, 0),
appliedDDLs: make([]string, 0),
sourceTbls: make(map[string]map[string]struct{}),
}
qec.p, err = event.GetParserForStatusVars(ev.StatusVars)
if err != nil {
log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err))
}
stmt, err := parseOneStmt(qec)
if err != nil {
// originSQL can't be parsed => can't be tracked by schema tracker
// we can use operate-schema to set a compatible schema after this
return nil, err
}

if _, ok := stmt.(ast.DDLNode); !ok {
return nil, nil
}

// TiDB can't handle multi schema change DDL, so we split it here.
qec.splitDDLs, err = parserpkg.SplitDDL(stmt, qec.ddlSchema)
if err != nil {
return nil, err
}

affectedTbls := make(map[string]map[string]struct{})
for _, sql := range qec.splitDDLs {
ddlInfo, err := s.genDDLInfo(qec.p, qec.ddlSchema, sql)
if err != nil {
return nil, err
}
sourceTable := ddlInfo.sourceTables[0]
switch ddlInfo.originStmt.(type) {
case *ast.DropDatabaseStmt:
delete(affectedTbls, sourceTable.Schema)
case *ast.DropTableStmt:
if affectedTable, ok := affectedTbls[sourceTable.Schema]; ok {
delete(affectedTable, sourceTable.Name)
}
default:
if _, ok := affectedTbls[sourceTable.Schema]; !ok {
affectedTbls[sourceTable.Schema] = make(map[string]struct{})
}
affectedTbls[sourceTable.Schema][sourceTable.Name] = struct{}{}
}
err = s.trackDDL(qec.ddlSchema, ddlInfo, qec.eventContext)
if err != nil {
return nil, err
}
}

return affectedTbls, nil
}

func (s *Syncer) genRouter() error {
s.tableRouter, _ = router.NewTableRouter(s.cfg.CaseSensitive, []*router.TableRule{})
for _, rule := range s.cfg.RouteRules {
Expand Down
12 changes: 4 additions & 8 deletions dm/tests/shardddl1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,11 @@ function DM_RENAME_COLUMN_OPTIMISTIC_CASE() {
# dmls fail
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Paused" 2
#"Error 1054: Unknown column 'a' in 'field list'" 2 // may more than 2 dml error
"Paused" 1 \
"Unknown column 'a' in 'field list'" 1

# third, set schema to be same with upstream
# TODO: support set schema automatically base on upstream schema
echo 'CREATE TABLE `tb1` ( `c` int NOT NULL, `b` varchar(10) DEFAULT NULL, PRIMARY KEY (`c`)) ENGINE=InnoDB DEFAULT CHARSET=latin1' >${WORK_DIR}/schema1.sql
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog-schema update -s mysql-replica-01 test ${shardddl1} ${tb1} ${WORK_DIR}/schema1.sql --flush --sync" \
"\"result\": true" 2
echo 'CREATE TABLE `tb1` ( `c` int NOT NULL, `b` varchar(10) DEFAULT NULL, PRIMARY KEY (`c`)) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' >${WORK_DIR}/schema1.sql
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog-schema update -s mysql-replica-02 test ${shardddl1} ${tb1} ${WORK_DIR}/schema1.sql --flush --sync" \
"\"result\": true" 2
Expand All @@ -169,7 +165,7 @@ function DM_RENAME_COLUMN_OPTIMISTIC_CASE() {
# source2.table2's dml fails
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Error 1054: Unknown column 'a' in 'field list'" 1
"Unknown column 'a' in 'field list'" 1

# WARN: set schema of source2.table2
# Actually it should be tb2(a,b), dml is {a: 9, b: 'iii'}
Expand Down
6 changes: 0 additions & 6 deletions dm/tests/shardddl2/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -484,16 +484,10 @@ function DM_DropAddColumn_CASE() {
"\"result\": true" 2 \
"\"source 'mysql-replica-02' has no error\"" 1

# after we skip ADD COLUMN, we should fix the table structure
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task test" \
"\"result\": true" 3

echo 'CREATE TABLE `tb1` ( `a` int(11) NOT NULL, `b` int(11) DEFAULT NULL, `c` int(11) DEFAULT NULL, PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' >${WORK_DIR}/schema.sql
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog-schema update test ${shardddl1} ${tb1} ${WORK_DIR}/schema.sql -s mysql-replica-01" \
"\"result\": true" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test" \
"\"result\": true" 3
Expand Down