diff --git a/drainer/schema.go b/drainer/schema.go index 05ced1bad..593e9348b 100644 --- a/drainer/schema.go +++ b/drainer/schema.go @@ -21,6 +21,8 @@ type Schema struct { schemas map[int64]*model.DBInfo tables map[int64]*model.TableInfo + truncateTableID map[int64]struct{} + schemaMetaVersion int64 hasImplicitCol bool @@ -41,6 +43,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) { s := &Schema{ hasImplicitCol: hasImplicitCol, version2SchemaTable: make(map[int64]TableName), + truncateTableID: make(map[int64]struct{}), jobs: jobs, } @@ -253,12 +256,12 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error { // the second value[string]: the table name // the third value[string]: the sql that is corresponding to the job // the fourth value[error]: the handleDDL execution's err -func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { +func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { if skipJob(job) { return "", "", "", nil } - sql := job.Query + sql = job.Query if sql == "" { return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job) } @@ -275,17 +278,16 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, ""} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, "", sql, nil + schemaName = schema.Name.O case model.ActionDropSchema: - schemaName, err := s.DropSchema(job.SchemaID) + schemaName, err = s.DropSchema(job.SchemaID) if err != nil { return "", "", "", errors.Trace(err) } s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schemaName, ""} s.currentVersion = job.BinlogInfo.SchemaVersion - return schemaName, "", sql, nil case model.ActionRenameTable: // ignore schema doesn't support reanme ddl @@ -312,7 +314,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, table.Name.O, sql, nil + schemaName = schema.Name.O + tableName = table.Name.O case model.ActionCreateTable: table := job.BinlogInfo.TableInfo @@ -332,7 +335,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, table.Name.O, sql, nil + schemaName = schema.Name.O + tableName = table.Name.O case model.ActionDropTable: schema, ok := s.SchemaByID(job.SchemaID) @@ -340,14 +344,14 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { return "", "", "", errors.NotFoundf("schema %d", job.SchemaID) } - tableName, err := s.DropTable(job.TableID) + tableName, err = s.DropTable(job.TableID) if err != nil { return "", "", "", errors.Trace(err) } s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tableName} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, tableName, sql, nil + schemaName = schema.Name.O case model.ActionTruncateTable: schema, ok := s.SchemaByID(job.SchemaID) @@ -355,6 +359,7 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { return "", "", "", errors.NotFoundf("schema %d", job.SchemaID) } + // job.TableID is the old table id, different from table.ID _, err := s.DropTable(job.TableID) if err != nil { return "", "", "", errors.Trace(err) @@ -372,7 +377,9 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, table.Name.O, sql, nil + schemaName = schema.Name.O + tableName = table.Name.O + s.truncateTableID[job.TableID] = struct{}{} default: binlogInfo := job.BinlogInfo @@ -396,8 +403,17 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tbInfo.Name.O} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, tbInfo.Name.O, sql, nil + schemaName = schema.Name.O + tableName = tbInfo.Name.O } + + return +} + +// IsTruncateTableID returns true if the table id have been truncated by truncate table DDL +func (s *Schema) IsTruncateTableID(id int64) bool { + _, ok := s.truncateTableID[id] + return ok } func (s *Schema) getSchemaTableAndDelete(version int64) (string, string, error) { @@ -425,11 +441,10 @@ func addImplicitColumn(table *model.TableInfo) { table.Indices = []*model.IndexInfo{newIndex} } -// there's only two status will be in HistoryDDLJob(we fetch at start time): -// JobStateSynced and JobStateRollbackDone -// If it fail to commit(to tikv) in 2pc phrase (when changing JobStateDone -> JobStateSynced and add to HistoryDDLJob), -// then is would't not be add to HistoryDDLJob, and we may get (prewrite + rollback binlog), -// this binlog event would reach drainer, finally we will get a (p + commit binlog) when tidb retry and successfully commit +// TiDB write DDL Binlog for every DDL Job, we must ignore jobs that are cancelled or rollback +// For older version TiDB, it write DDL Binlog in the txn that the state of job is changed to *synced* +// Now, it write DDL Binlog in the txn that the state of job is changed to *done* (before change to *synced*) +// At state *done*, it will be always and only changed to *synced*. func skipJob(job *model.Job) bool { - return !job.IsSynced() + return !job.IsSynced() && !job.IsDone() } diff --git a/drainer/syncer.go b/drainer/syncer.go index 36c6fa65c..8fb598bec 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -48,9 +48,6 @@ type Syncer struct { positions map[string]int64 initCommitTS int64 - // because TiDB is case-insensitive, only lower-case here. - ignoreSchemaNames map[string]struct{} - ctx context.Context cancel context.CancelFunc @@ -418,6 +415,9 @@ func (s *Syncer) run(jobs []*model.Job) error { } } s.schema, err = NewSchema(jobs, false) + if err != nil { + return errors.Trace(err) + } s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount) if err != nil { @@ -436,6 +436,7 @@ func (s *Syncer) run(jobs []*model.Job) error { go s.sync(s.executors[i], s.jobCh[i], i) } + var lastDDLSchemaVersion int64 var b *binlogItem for { select { @@ -463,7 +464,16 @@ func (s *Syncer) run(jobs []*model.Job) error { return errors.Errorf("prewrite %s unmarshal error %v", preWriteValue, err) } + err = s.rewriteForOldVersion(preWrite) + if err != nil { + return errors.Annotate(err, "rewrite for old version fail") + } + log.Debug("DML SchemaVersion: ", preWrite.SchemaVersion) + if preWrite.SchemaVersion < lastDDLSchemaVersion { + log.Debug("encounter older schema dml") + } + err = s.schema.handlePreviousDDLJobIfNeed(preWrite.SchemaVersion) if err != nil { return errors.Trace(err) @@ -481,6 +491,8 @@ func (s *Syncer) run(jobs []*model.Job) error { s.schema.addJob(b.job) log.Debug("DDL SchemaVersion: ", b.job.BinlogInfo.SchemaVersion) + lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion + err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion) if err != nil { return errors.Trace(err) @@ -645,3 +657,23 @@ func (s *Syncer) GetLastSyncTime() time.Time { func (s *Syncer) GetLatestCommitTS() int64 { return s.cp.TS() } + +// see https://github.com/pingcap/tidb/issues/9304 +// currently, we only drop the data which table id is truncated. +// because of online DDL, different TiDB instance may see the different schema, +// it can't be treated simply as one timeline consider both DML and DDL, +// we must carefully handle every DDL type now and need to find a better design. +func (s *Syncer) rewriteForOldVersion(pv *pb.PrewriteValue) (err error) { + var mutations = make([]pb.TableMutation, 0, len(pv.GetMutations())) + for _, mutation := range pv.GetMutations() { + if s.schema.IsTruncateTableID(mutation.TableId) { + log.Infof("skip old version truncate dml, table id: %d", mutation.TableId) + continue + } + + mutations = append(mutations, mutation) + } + pv.Mutations = mutations + + return nil +} diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go index 8230b7f56..55d2643e3 100644 --- a/drainer/syncer_test.go +++ b/drainer/syncer_test.go @@ -24,7 +24,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) { c.Assert(sql, Equals, "") // check job.Query is empty - job = &model.Job{ID: 1, State: model.JobStateSynced} + job = &model.Job{ID: 1, State: model.JobStateDone} _, _, sql, err = s.schema.handleDDL(job) c.Assert(sql, Equals, "") c.Assert(err, NotNil, Commentf("should return not found job.Query")) @@ -82,7 +82,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) { job = &model.Job{ ID: testCase.jobID, - State: model.JobStateSynced, + State: model.JobStateDone, SchemaID: testCase.schemaID, TableID: testCase.tableID, Type: testCase.jobType, diff --git a/drainer/translator/flash.go b/drainer/translator/flash.go index 91e4668ca..9ba48926c 100644 --- a/drainer/translator/flash.go +++ b/drainer/translator/flash.go @@ -16,7 +16,6 @@ import ( "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" ) // flashTranslator translates TiDB binlog to flash sqls @@ -42,7 +41,6 @@ func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r version := makeInternalVersionValue(uint64(commitTS)) delFlag := makeInternalDelmarkValue(false) - colsTypeMap := toFlashColumnTypeMap(columns) columnList := genColumnList(columns) // addition 2 holder is for del flag and version columnPlaceholders := dml.GenColumnPlaceholders(len(columns) + 2) @@ -50,37 +48,19 @@ func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r for _, row := range rows { //decode the pk value - remain, pk, err := codec.DecodeOne(row) - hashKey := pk.GetInt64() + pk, columnValues, err := insertRowToDatums(table, row) if err != nil { return nil, nil, nil, errors.Trace(err) } - columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, gotime.Local) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - - if columnValues == nil { - columnValues = make(map[int64]types.Datum) - } + hashKey := pk.GetInt64() var vals []interface{} vals = append(vals, hashKey) for _, col := range columns { - if IsPKHandleColumn(table, col) { - columnValues[col.ID] = pk - pkVal, err := formatFlashData(&pk, &col.FieldType) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - vals = append(vals, pkVal) - continue - } - val, ok := columnValues[col.ID] if !ok { - vals = append(vals, col.DefaultValue) + vals = append(vals, col.GetDefaultValue()) } else { value, err := formatFlashData(&val, &col.FieldType) if err != nil { diff --git a/drainer/translator/flash_util.go b/drainer/translator/flash_util.go index a9c6fb852..b4f97224f 100644 --- a/drainer/translator/flash_util.go +++ b/drainer/translator/flash_util.go @@ -87,15 +87,6 @@ func isHandleTypeColumn(colDef *ast.ColumnDef) bool { tp == mysql.TypeLonglong } -func toFlashColumnTypeMap(columns []*model.ColumnInfo) map[int64]*types.FieldType { - colTypeMap := make(map[int64]*types.FieldType) - for _, col := range columns { - colTypeMap[col.ID] = &col.FieldType - } - - return colTypeMap -} - func makeRow(pk int64, values []interface{}, version uint64, delFlag uint8) []interface{} { var row []interface{} row = append(row, pk) diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index e1e8a1e41..cd8d6da2a 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -14,7 +14,6 @@ import ( "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" ) // kafkaTranslator translates TiDB binlog to self-description protobuf @@ -129,54 +128,19 @@ func (p *kafkaTranslator) GenDDLSQL(sql string, schema string, commitTS int64) ( } func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, err error) { + _, columnValues, err := insertRowToDatums(tableInfo, raw) columns := tableInfo.Columns - remain, pk, err := codec.DecodeOne(raw) - if err != nil { - log.Error(err) - err = errors.Trace(err) - return - } - - log.Debugf("decode pk: %+v", pk) - - colsTypeMap := util.ToColumnTypeMap(tableInfo.Columns) - columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, time.Local) - if err != nil { - log.Error(err) - err = errors.Trace(err) - return - } - - // log.Debugf("decodeRow: %+v\n", columnValues) - // maybe only the pk column value - if columnValues == nil { - columnValues = make(map[int64]types.Datum) - } - row = new(obinlog.Row) for _, col := range columns { - if IsPKHandleColumn(tableInfo, col) { - columnValues[col.ID] = pk - } - - var column *obinlog.Column val, ok := columnValues[col.ID] - if ok { - column = DatumToColumn(col, val) - } else { - if col.DefaultValue == nil { - column = nullColumn() - } else { - log.Fatal("can't find value col: ", col, "default value: ", col.DefaultValue) - } + if !ok { + val = getDefaultOrZeroValue(col) } - row.Columns = append(row.Columns, column) - } - if len(columnValues) == 0 { - panic(errors.New("columnValues is nil")) + column := DatumToColumn(col, val) + row.Columns = append(row.Columns, column) } return @@ -198,17 +162,12 @@ func deleteRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e row = new(obinlog.Row) for _, col := range columns { - var column *obinlog.Column val, ok := columnValues[col.ID] - if ok { - column = DatumToColumn(col, val) - } else { - if col.DefaultValue == nil { - column = nullColumn() - } else { - log.Fatal("can't find value col: ", col, "default value: ", col.DefaultValue) - } + if !ok { + val = getDefaultOrZeroValue(col) } + + column := DatumToColumn(col, val) row.Columns = append(row.Columns, column) } @@ -225,28 +184,20 @@ func updateRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, c row = new(obinlog.Row) changedRow = new(obinlog.Row) for _, col := range tableInfo.Columns { - if val, ok := newDatums[col.ID]; ok { - column := DatumToColumn(col, val) - row.Columns = append(row.Columns, column) - } else { - if col.DefaultValue == nil { - column := nullColumn() - row.Columns = append(row.Columns, column) - } else { - log.Fatal("can't find value col: ", col, "default value: ", col.DefaultValue) - } + var val types.Datum + var ok bool + + if val, ok = newDatums[col.ID]; !ok { + getDefaultOrZeroValue(col) } - if val, ok := oldDatums[col.ID]; ok { - column := DatumToColumn(col, val) - changedRow.Columns = append(changedRow.Columns, column) - } else { - if col.DefaultValue == nil { - column := nullColumn() - row.Columns = append(row.Columns, column) - } else { - log.Fatal("can't find value col: ", col, "default value: ", col.DefaultValue) - } + column := DatumToColumn(col, val) + row.Columns = append(row.Columns, column) + + if val, ok = oldDatums[col.ID]; !ok { + getDefaultOrZeroValue(col) } + column = DatumToColumn(col, val) + changedRow.Columns = append(changedRow.Columns, column) } return @@ -320,10 +271,3 @@ func DatumToColumn(colInfo *model.ColumnInfo, datum types.Datum) (col *obinlog.C return } - -func nullColumn() (col *obinlog.Column) { - col = new(obinlog.Column) - col.IsNull = proto.Bool(true) - - return -} diff --git a/drainer/translator/mysql.go b/drainer/translator/mysql.go index 9eb9b2515..408fa3da8 100644 --- a/drainer/translator/mysql.go +++ b/drainer/translator/mysql.go @@ -48,7 +48,6 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r keys := make([][]string, 0, len(rows)) values := make([][]interface{}, 0, len(rows)) - colsTypeMap := util.ToColumnTypeMap(columns) columnList := m.genColumnList(columns) columnPlaceholders := dml.GenColumnPlaceholders((len(columns))) @@ -59,44 +58,25 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r sql := fmt.Sprintf("%s into `%s`.`%s` (%s) values (%s);", insertStr, schema, table.Name, columnList, columnPlaceholders) for _, row := range rows { - //decode the pk value - remain, pk, err := codec.DecodeOne(row) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - - columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, time.Local) + _, columnValues, err := insertRowToDatums(table, row) if err != nil { return nil, nil, nil, errors.Trace(err) } - if columnValues == nil { - columnValues = make(map[int64]types.Datum) - } - var vals []interface{} for _, col := range columns { - if IsPKHandleColumn(table, col) { - columnValues[col.ID] = pk - vals = append(vals, pk.GetValue()) - continue - } - val, ok := columnValues[col.ID] if !ok { - vals = append(vals, col.DefaultValue) - } else { - value, err := formatData(val, col.FieldType) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } + val = getDefaultOrZeroValue(col) + } - vals = append(vals, value.GetValue()) + value, err := formatData(val, col.FieldType) + if err != nil { + return nil, nil, nil, errors.Trace(err) } - } - if len(columnValues) == 0 { - panic(errors.New("columnValues is nil")) + vals = append(vals, value.GetValue()) + } sqls = append(sqls, sql) @@ -570,8 +550,8 @@ func extractFingerprint(cols []*model.ColumnInfo, columnValues map[int64]types.D columnsValues = append(columnsValues, fmt.Sprintf("(%s: %v)", col.Name, value.GetValue())) } } else { - if col.DefaultValue != nil { - columnsValues = append(columnsValues, fmt.Sprintf("(%s: %v)", col.Name, col.DefaultValue)) + if col.GetDefaultValue() != nil { + columnsValues = append(columnsValues, fmt.Sprintf("(%s: %v)", col.Name, col.GetDefaultValue())) } } } diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go index b2f1a0d2e..528149306 100644 --- a/drainer/translator/pb.go +++ b/drainer/translator/pb.go @@ -11,7 +11,6 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb-binlog/pkg/util" pb "github.com/pingcap/tidb-binlog/proto/binlog" - "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -35,24 +34,13 @@ func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows sqls := make([]string, 0, len(rows)) keys := make([][]string, 0, len(rows)) values := make([][]interface{}, 0, len(rows)) - colsTypeMap := util.ToColumnTypeMap(columns) for _, row := range rows { - //decode the pk value - remain, pk, err := codec.DecodeOne(row) + _, columnValues, err := insertRowToDatums(table, row) if err != nil { return nil, nil, nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name) } - columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, time.Local) - if err != nil { - return nil, nil, nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name) - } - - if columnValues == nil { - columnValues = make(map[int64]types.Datum) - } - var ( vals = make([]types.Datum, 0, len(columns)) cols = make([]string, 0, len(columns)) @@ -60,31 +48,19 @@ func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows mysqlTypes = make([]string, 0, len(columns)) ) for _, col := range columns { - if IsPKHandleColumn(table, col) { - columnValues[col.ID] = pk - } - cols = append(cols, col.Name.O) tps = append(tps, col.Tp) mysqlTypes = append(mysqlTypes, types.TypeToStr(col.Tp, col.Charset)) val, ok := columnValues[col.ID] - if ok { - value, err := formatData(val, col.FieldType) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - vals = append(vals, value) - } else if col.DefaultValue == nil { - val, err := getColDefaultValueFromNil(col) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - vals = append(vals, val) + if !ok { + val = getDefaultOrZeroValue(col) } - } - if len(columnValues) == 0 { - panic(errors.New("columnValues is nil")) + value, err := formatData(val, col.FieldType) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + vals = append(vals, value) } rowData, err := encodeRow(vals, cols, tps, mysqlTypes) @@ -282,20 +258,3 @@ func packEvent(schemaName, tableName string, tp pb.EventType, rowData [][]byte) return []interface{}{event} } - -func getColDefaultValueFromNil(col *model.ColumnInfo) (types.Datum, error) { - if !mysql.HasNotNullFlag(col.Flag) { - return types.Datum{}, nil - } - if col.Tp == mysql.TypeEnum { - // For enum type, if no default value and not null is set, - // the default value is the first element of the enum list - return types.NewDatum(col.FieldType.Elems[0]), nil - } - if mysql.HasAutoIncrementFlag(col.Flag) { - // Auto increment column doesn't has default value and we should not return error. - return types.Datum{}, nil - } - - return types.Datum{}, errors.Errorf("Field '%s' doesn't have a default value", col.Name) -} diff --git a/drainer/translator/translator.go b/drainer/translator/translator.go index 4e289f886..f4fc95825 100644 --- a/drainer/translator/translator.go +++ b/drainer/translator/translator.go @@ -1,9 +1,17 @@ package translator import ( + "time" + "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb-binlog/pkg/util" + "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" ) // OpType represents type of the operation @@ -69,3 +77,53 @@ func New(providerName string) (SQLTranslator, error) { return translator, nil } + +func insertRowToDatums(table *model.TableInfo, row []byte) (pk types.Datum, datums map[int64]types.Datum, err error) { + colsTypeMap := util.ToColumnTypeMap(table.Columns) + + // decode the pk value + var remain []byte + remain, pk, err = codec.DecodeOne(row) + if err != nil { + return types.Datum{}, nil, errors.Trace(err) + } + + datums, err = tablecodec.DecodeRow(remain, colsTypeMap, time.Local) + if err != nil { + return types.Datum{}, nil, errors.Trace(err) + } + + // if only one column and IsPKHandleColumn then datums contains no any columns. + if datums == nil { + datums = make(map[int64]types.Datum) + } + + for _, col := range table.Columns { + if IsPKHandleColumn(table, col) { + datums[col.ID] = pk + } + } + + return +} + +func getDefaultOrZeroValue(col *model.ColumnInfo) types.Datum { + // see https://github.com/pingcap/tidb/issues/9304 + // must use null if TiDB not write the column value when default value is null + // and the value is null + if !mysql.HasNotNullFlag(col.Flag) { + return types.NewDatum(nil) + } + + if col.GetDefaultValue() != nil { + return types.NewDatum(col.GetDefaultValue()) + } + + if col.Tp == mysql.TypeEnum { + // For enum type, if no default value and not null is set, + // the default value is the first element of the enum list + return types.NewDatum(col.FieldType.Elems[0]) + } + + return table.GetZeroValue(col) +} diff --git a/pkg/loader/load.go b/pkg/loader/load.go index 29278414c..10e9a4d17 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -13,6 +13,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" + "github.com/pingcap/parser" + "github.com/pingcap/parser/ast" pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" ) @@ -206,6 +208,17 @@ func (s *Loader) getTableInfo(schema string, table string) (info *tableInfo, err return s.refreshTableInfo(schema, table) } +func isCreateDatabaseDDL(sql string) bool { + stmt, err := parser.New().ParseOneStmt(sql, "", "") + if err != nil { + log.Errorf("parse [%s] err: %v", sql, err) + return false + } + + _, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt) + return isCreateDatabase +} + func (s *Loader) execDDL(ddl *DDL) error { log.Debug("exec ddl: ", ddl) var err error @@ -221,7 +234,7 @@ func (s *Loader) execDDL(ddl *DDL) error { continue } - if len(ddl.Database) > 0 { + if len(ddl.Database) > 0 && !isCreateDatabaseDDL(ddl.SQL) { _, err = tx.Exec(fmt.Sprintf("use %s;", quoteName(ddl.Database))) if err != nil { log.Error(err) diff --git a/tests/binlog/binlog.go b/tests/binlog/binlog.go index 0eeb3078f..4e913a861 100644 --- a/tests/binlog/binlog.go +++ b/tests/binlog/binlog.go @@ -35,5 +35,12 @@ func main() { } defer util.CloseDB(targetDB) + sourceDBs, err := util.CreateSourceDBs() + if err != nil { + log.Fatal(err) + } + defer util.CloseDBs(sourceDBs) + + dailytest.RunMultiSource(sourceDBs, targetDB, &cfg.DiffConfig) dailytest.Run(sourceDB, targetDB, &cfg.DiffConfig, cfg.WorkerCount, cfg.JobCount, cfg.Batch) } diff --git a/tests/dailytest/dailytest.go b/tests/dailytest/dailytest.go index 9b43b1527..16b4fab59 100644 --- a/tests/dailytest/dailytest.go +++ b/tests/dailytest/dailytest.go @@ -7,6 +7,11 @@ import ( "github.com/pingcap/tidb-binlog/diff" ) +// RunMultiSource runs the test that need multi instance TiDB, one instance for one *sql.DB* in srcs +func RunMultiSource(srcs []*sql.DB, targetDB *sql.DB, diffCfg *diff.Config) { + runDDLTest(srcs, targetDB, diffCfg) +} + // Run runs the daily test func Run(sourceDB *sql.DB, targetDB *sql.DB, diffCfg *diff.Config, workerCount int, jobCount int, batch int) { diff --git a/tests/dailytest/ddl.go b/tests/dailytest/ddl.go new file mode 100644 index 000000000..b288a1b67 --- /dev/null +++ b/tests/dailytest/ddl.go @@ -0,0 +1,243 @@ +package dailytest + +import ( + "context" + "database/sql" + "fmt" + "reflect" + "runtime" + "sync" + "time" + + "github.com/ngaut/log" + "github.com/pingcap/tidb-binlog/diff" +) + +func mustCreateTable(db *sql.DB) { + conn, err := db.Conn(context.Background()) + if err != nil { + log.Fatal(err) + } + mustCreateTableWithConn(conn) +} + +func mustCreateTableWithConn(conn *sql.Conn) { + var err error + _, err = conn.ExecContext(context.Background(), "create database if not exists test") + if err != nil { + log.Fatal(err) + } + _, err = conn.ExecContext(context.Background(), "create table if not exists test.test1(id int primary key, v1 int default null)") + if err != nil { + log.Fatal(err) + } +} + +func createDropSchemaDDL(ctx context.Context, db *sql.DB) { + /* + mysql> use test; + Database changed + mysql> create table test1(id int); + Query OK, 0 rows affected (0.05 sec) + + mysql> drop database test; + Query OK, 3 rows affected (0.02 sec) + + mysql> create database test; + Query OK, 1 row affected (0.02 sec) + + mysql> create table test1(id int); + ERROR 1046 (3D000): No database selected + */ + // drop the database used will make the session become No database selected + // this make later code use *sql.DB* fail as expected + // so we setback the used db before close the conn + conn, err := db.Conn(ctx) + if err != nil { + log.Fatal(err) + } + defer func() { + _, err := conn.ExecContext(context.Background(), "use test") + if err != nil { + log.Fatal(err) + } + conn.Close() + }() + + for { + mustCreateTableWithConn(conn) + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(time.Millisecond) + + _, err = conn.ExecContext(context.Background(), "drop database test") + if err != nil { + log.Fatal(err) + } + + } +} + +func truncateDDL(ctx context.Context, db *sql.DB) { + var err error + mustCreateTable(db) + + for { + select { + case <-ctx.Done(): + return + default: + } + + _, err = db.Exec("truncate table test.test1") + if err != nil { + log.Fatal(err) + } + + time.Sleep(time.Millisecond) + } +} + +func dml(ctx context.Context, db *sql.DB, id int) { + var err error + var i int + var success int + + for i = 0; ; i++ { + _, err = db.Exec("insert into test.test1(id) values(?)", i+id*100000000) + if err == nil { + success++ + if success%100 == 0 { + log.Info(id, " success: ", success) + } + } + + select { + case <-ctx.Done(): + return + default: + } + } +} + +func addDropColumnDDL(ctx context.Context, db *sql.DB) { + var err error + mustCreateTable(db) + + for value := 1; ; value++ { + select { + case <-ctx.Done(): + return + default: + } + + _, err = db.Exec("alter table test.test1 drop column v1") + if err != nil { + log.Fatal(err) + } + time.Sleep(time.Millisecond) + + var notNULL string + var defaultValue interface{} + + if value%5 == 0 { + // use default not null + notNULL = "not null" + defaultValue = value + } else if value%5 == 1 { + // use default null + defaultValue = nil + } else { + // use default + defaultValue = value + } + + _, err = db.Exec(fmt.Sprintf("alter table test.test1 add column v1 int default ? %s", notNULL), defaultValue) + if err != nil { + log.Fatal(err) + } + time.Sleep(time.Millisecond) + + } +} + +func modifyColumnDDL(ctx context.Context, db *sql.DB) { + var err error + + mustCreateTable(db) + + for value := 1; ; value++ { + select { + case <-ctx.Done(): + return + default: + } + + var defaultValue interface{} + // use default null per five modify + if value%5 == 0 { + defaultValue = nil + } else { + defaultValue = value + } + + _, err = db.Exec("alter table test.test1 modify column v1 int default ?", defaultValue) + if err != nil { + log.Fatal(err) + } + time.Sleep(time.Millisecond) + } +} + +func getFunctionName(i interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() +} + +// for every DDL, run the DDL continuously, and one goroutine for one TiDB instance to do some DML op +func runDDLTest(srcs []*sql.DB, targetDB *sql.DB, diffCfg *diff.Config) { + runTime := time.Second * 3 + start := time.Now() + defer func() { + log.Infof("runDDLTest take %v", time.Since(start)) + }() + + for _, ddlFunc := range []func(context.Context, *sql.DB){createDropSchemaDDL, truncateDDL, addDropColumnDDL, modifyColumnDDL} { + RunTest(diffCfg, srcs[0], targetDB, func(_ *sql.DB) { + log.Info("running ddl test: ", getFunctionName(ddlFunc)) + + var wg sync.WaitGroup + ctx, _ := context.WithTimeout(context.Background(), runTime) + + for idx, src := range srcs { + wg.Add(1) + go func() { + dml(ctx, src, idx) + wg.Done() + }() + } + + time.Sleep(time.Millisecond) + + wg.Add(1) + go func() { + ddlFunc(ctx, srcs[0]) + wg.Done() + }() + + wg.Wait() + }) + + // just cleanup + RunTest(diffCfg, srcs[0], targetDB, func(db *sql.DB) { + _, err := db.Exec("drop table if exists test1") + if err != nil { + log.Fatal(err) + } + }) + } + +} diff --git a/tests/kafka/kafka.go b/tests/kafka/kafka.go index e6a077470..35b9fa7ac 100644 --- a/tests/kafka/kafka.go +++ b/tests/kafka/kafka.go @@ -41,7 +41,7 @@ func main() { panic(err) } - sourceDB, err := util.CreateSourceDB() + sourceDBs, err := util.CreateSourceDBs() if err != nil { panic(err) } @@ -51,6 +51,11 @@ func main() { panic(err) } + sinkDBForDiff, err := util.CreateSinkDB() + if err != nil { + panic(err) + } + // start sync to mysql from kafka ld, err := loader.NewLoader(sinkDB, loader.WorkerCount(16), loader.BatchSize(128)) if err != nil { @@ -92,5 +97,6 @@ func main() { EqualRowCount: true, EqualData: true, } - dailytest.Run(sourceDB, sinkDB, diffCfg, 10, 1000, 10) + dailytest.RunMultiSource(sourceDBs, sinkDBForDiff, diffCfg) + dailytest.Run(sourceDBs[0], sinkDBForDiff, diffCfg, 10, 1000, 10) } diff --git a/tests/run.sh b/tests/run.sh index e6c21fd21..145ee9366 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -15,11 +15,11 @@ export PATH=$PATH:$(dirname $pwd)/bin clean_data() { - rm -rf $OUT_DIR/pd || true - rm -rf $OUT_DIR/tidb || true - rm -rf $OUT_DIR/tikv || true - rm -rf $OUT_DIR/pump || true - rm -rf $OUT_DIR/data.drainer || true + rm -rf $OUT_DIR/pd || true + rm -rf $OUT_DIR/tidb || true + rm -rf $OUT_DIR/tikv || true + rm -rf $OUT_DIR/pump || true + rm -rf $OUT_DIR/data.drainer || true } stop_services() { @@ -31,9 +31,32 @@ stop_services() { killall -9 drainer || true } +start_upstream_tidb() { + port=${1-4000} + echo "Starting TiDB at port: $port..." + tidb-server \ + -P $port \ + --store tikv \ + --path 127.0.0.1:2379 \ + --enable-binlog=true \ + --log-file "$OUT_DIR/tidb.log" & + + echo "Verifying TiDB is started..." + i=0 + while ! mysql -uroot -h127.0.0.1 -P$port --default-character-set utf8 -e 'select * from mysql.tidb;'; do + i=$((i+1)) + if [ "$i" -gt 40 ]; then + echo 'Failed to start TiDB' + exit 1 + fi + sleep 3 + done + +} + start_services() { stop_services - clean_data + clean_data echo "Starting PD..." pd-server \ @@ -64,35 +87,18 @@ EOF echo "Starting Pump..." - run_pump & - - sleep 5 + run_pump & + sleep 5 - echo "Starting TiDB..." - tidb-server \ - -P 4000 \ - --store tikv \ - --path 127.0.0.1:2379 \ - --enable-binlog=true \ - --log-file "$OUT_DIR/tidb.log" & - - echo "Verifying TiDB is started..." - i=0 - while ! mysql -uroot -h127.0.0.1 -P4000 --default-character-set utf8 -e 'select * from mysql.tidb;'; do - i=$((i+1)) - if [ "$i" -gt 40 ]; then - echo 'Failed to start TiDB' - exit 1 - fi - sleep 3 - done + start_upstream_tidb 4000 + start_upstream_tidb 4001 echo "Starting Downstream TiDB..." tidb-server \ -P 3306 \ - --path=$OUT_DIR/tidb \ - --status=20080 \ + --path=$OUT_DIR/tidb \ + --status=20080 \ --log-file "$OUT_DIR/down_tidb.log" & echo "Verifying Downstream TiDB is started..." @@ -106,8 +112,8 @@ EOF sleep 3 done - echo "Starting Drainer..." - run_drainer & + echo "Starting Drainer..." + run_drainer -L debug & } trap stop_services EXIT @@ -122,16 +128,17 @@ fi do_case="" for script in ./*/run.sh; do - test_name="$(basename "$(dirname "$script")")" - if [[ $do_case != "" && $test_name != $do_case ]]; then - continue - fi + test_name="$(basename "$(dirname "$script")")" + if [[ $do_case != "" && $test_name != $do_case ]]; then + continue + fi echo "Running test $script..." PATH="$pwd/../bin:$pwd/_utils:$PATH" \ - OUT_DIR=$OUT_DIR \ + OUT_DIR=$OUT_DIR \ TEST_NAME=$test_name \ sh "$script" done -echo "<<< Run all test success >>>" +# with color +echo "\033[0;36m<<< Run all test success >>>\033[0m" diff --git a/tests/util/db.go b/tests/util/db.go index e393e7efe..8e7a81130 100644 --- a/tests/util/db.go +++ b/tests/util/db.go @@ -52,17 +52,55 @@ func CloseDB(db *sql.DB) error { return errors.Trace(db.Close()) } +// CloseDBs close the mysql fd +func CloseDBs(dbs []*sql.DB) error { + for _, db := range dbs { + err := db.Close() + if err != nil { + return errors.Trace(err) + } + } + return nil +} + // CheckSyncState check if srouceDB and targetDB has the same table and data func CheckSyncState(cfg *diff.Config, sourceDB, targetDB *sql.DB) bool { d := diff.New(cfg, sourceDB, targetDB) ok, err := d.Equal() if err != nil { - log.Fatal(err) + log.Print(err) + return false } return ok } +// CreateSourceDBs return source sql.DB for test +// we create two TiDB instance now in tests/run.sh, change it if needed +func CreateSourceDBs() (dbs []*sql.DB, err error) { + cfg := DBConfig{ + Host: "127.0.0.1", + User: "root", + Password: "", + Name: "test", + Port: 4000, + } + + src1, err := CreateDB(cfg) + if err != nil { + return nil, errors.Trace(err) + } + + cfg.Port = 4001 + src2, err := CreateDB(cfg) + if err != nil { + return nil, errors.Trace(err) + } + + dbs = append(dbs, src1, src2) + return +} + // CreateSourceDB return source sql.DB for test func CreateSourceDB() (db *sql.DB, err error) { cfg := DBConfig{