Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default value when generate the update rows #1006

Merged
merged 11 commits into from
Nov 19, 2020
80 changes: 67 additions & 13 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ type Schema struct {
schemaNameToID map[string]int64

schemas map[int64]*model.DBInfo
tables map[int64]*model.TableInfo
tables map[int64][]schemaVersionTableInfo

truncateTableID map[int64]struct{}
tblsDroppingCol map[int64]bool

tableSchemaVersion map[int64]int64

schemaMetaVersion int64

hasImplicitCol bool
Expand All @@ -51,20 +53,26 @@ type Schema struct {
// TableName stores the table and schema name
type TableName = filter.TableName

type schemaVersionTableInfo struct {
SchemaVersion int64
TableInfo *model.TableInfo
}

// NewSchema returns the Schema object
func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) {
s := &Schema{
hasImplicitCol: hasImplicitCol,
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
tblsDroppingCol: make(map[int64]bool),
tableSchemaVersion: make(map[int64]int64),
jobs: jobs,
}

s.tableIDToName = make(map[int64]TableName)
s.schemas = make(map[int64]*model.DBInfo)
s.schemaNameToID = make(map[string]int64)
s.tables = make(map[int64]*model.TableInfo)
s.tables = make(map[int64][]schemaVersionTableInfo)

return s, nil
}
Expand Down Expand Up @@ -120,8 +128,11 @@ func (s *Schema) SchemaByTableID(tableID int64) (*model.DBInfo, bool) {

// TableByID returns the TableInfo by table id
func (s *Schema) TableByID(id int64) (val *model.TableInfo, ok bool) {
val, ok = s.tables[id]
return
tbls := s.tables[id]
if len(tbls) == 0 {
return nil, false
}
return tbls[len(tbls)-1].TableInfo, true
}

// DropSchema deletes the given DBInfo
Expand Down Expand Up @@ -157,10 +168,11 @@ func (s *Schema) CreateSchema(db *model.DBInfo) error {

// DropTable deletes the given TableInfo
func (s *Schema) DropTable(id int64) (string, error) {
table, ok := s.tables[id]
tables, ok := s.tables[id]
if !ok {
return "", errors.NotFoundf("table %d", id)
}
table := tables[len(tables)-1].TableInfo
err := s.removeTable(id)
if err != nil {
return "", errors.Trace(err)
Expand All @@ -173,8 +185,33 @@ func (s *Schema) DropTable(id int64) (string, error) {
return table.Name.O, nil
}

func (s *Schema) appendTableInfo(schemaVersion int64, table *model.TableInfo) {
tbls := s.tables[table.ID]
tbls = append(tbls, schemaVersionTableInfo{SchemaVersion: schemaVersion, TableInfo: table})
if len(tbls) > 2 {
tbls = tbls[len(tbls)-2:]
}
s.tables[table.ID] = tbls
}

// TableBySchemaVersion get the table info according the schemaVersion and table id.
func (s *Schema) TableBySchemaVersion(schemaVersion int64, id int64) (table *model.TableInfo, ok bool) {
tbls, ok := s.tables[id]
if !ok {
return nil, false
}

for _, t := range tbls {
if t.SchemaVersion >= schemaVersion {
return t.TableInfo, true
}
}

return nil, false
}

// CreateTable creates new TableInfo
func (s *Schema) CreateTable(schema *model.DBInfo, table *model.TableInfo) error {
func (s *Schema) CreateTable(schemaVersion int64, schema *model.DBInfo, table *model.TableInfo) error {
_, ok := s.tables[table.ID]
if ok {
return errors.AlreadyExistsf("table %s.%s", schema.Name, table.Name)
Expand All @@ -185,15 +222,15 @@ func (s *Schema) CreateTable(schema *model.DBInfo, table *model.TableInfo) error
}

schema.Tables = append(schema.Tables, table)
s.tables[table.ID] = table
s.appendTableInfo(schemaVersion, table)
s.tableIDToName[table.ID] = TableName{Schema: schema.Name.O, Table: table.Name.O}

log.Debug("create table success", zap.String("name", schema.Name.O+"."+table.Name.O), zap.Int64("id", table.ID))
return nil
}

// ReplaceTable replace the table by new tableInfo
func (s *Schema) ReplaceTable(table *model.TableInfo) error {
func (s *Schema) ReplaceTable(schemaVersion int64, table *model.TableInfo) error {
_, ok := s.tables[table.ID]
if !ok {
return errors.NotFoundf("table %s(%d)", table.Name, table.ID)
Expand All @@ -203,7 +240,7 @@ func (s *Schema) ReplaceTable(table *model.TableInfo) error {
addImplicitColumn(table)
}

s.tables[table.ID] = table
s.appendTableInfo(schemaVersion, table)

return nil
}
Expand Down Expand Up @@ -260,6 +297,8 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
if err != nil {
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}

s.tableSchemaVersion[job.TableID] = job.BinlogInfo.SchemaVersion
}

s.jobs = s.jobs[i:]
Expand Down Expand Up @@ -349,7 +388,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

err = s.CreateTable(schema, table)
err = s.CreateTable(job.BinlogInfo.SchemaVersion, schema, table)
if err != nil {
return "", "", "", errors.Trace(err)
}
Expand All @@ -370,7 +409,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

err := s.CreateTable(schema, table)
err := s.CreateTable(job.BinlogInfo.SchemaVersion, schema, table)
if err != nil {
return "", "", "", errors.Trace(err)
}
Expand Down Expand Up @@ -412,7 +451,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return "", "", "", errors.NotFoundf("table %d", job.TableID)
}

err = s.CreateTable(schema, table)
err = s.CreateTable(job.BinlogInfo.SchemaVersion, schema, table)
if err != nil {
return "", "", "", errors.Trace(err)
}
Expand All @@ -437,7 +476,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

err := s.ReplaceTable(tbInfo)
err := s.ReplaceTable(job.BinlogInfo.SchemaVersion, tbInfo)
if err != nil {
return "", "", "", errors.Trace(err)
}
Expand All @@ -456,6 +495,21 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
return
}

// CanAppendDefaultValue means we can safely add the default value to the column if missing the value.
func (s *Schema) CanAppendDefaultValue(id int64, schemaVersion int64) bool {
if s.IsDroppingColumn(id) {
return true
}

if v, ok := s.tableSchemaVersion[id]; ok {
if schemaVersion < v {
return true
}
}

return false
}

// IsDroppingColumn returns true if the table is in the middle of dropping a column
func (s *Schema) IsDroppingColumn(id int64) bool {
return s.tblsDroppingCol[id]
Expand Down
34 changes: 18 additions & 16 deletions drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func TiBinlogToSecondaryBinlog(
if !ok {
return nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId())
}
isTblDroppingCol := infoGetter.IsDroppingColumn(mut.GetTableId())
canAppendDefaultValue := infoGetter.CanAppendDefaultValue(mut.GetTableId(), pv.SchemaVersion)

pinfo, _ := infoGetter.TableBySchemaVersion(pv.SchemaVersion, mut.GetTableId())

schema, _, ok = infoGetter.SchemaAndTableName(mut.GetTableId())
if !ok {
Expand All @@ -76,7 +78,7 @@ func TiBinlogToSecondaryBinlog(
secondaryBinlog.DmlData.Tables = append(secondaryBinlog.DmlData.Tables, table)

for {
tableMutation, err := nextRow(schema, info, isTblDroppingCol, iter)
tableMutation, err := nextRow(schema, pinfo, info, canAppendDefaultValue, iter)
if err != nil {
if errors.Cause(err) == io.EOF {
break
Expand Down Expand Up @@ -143,7 +145,7 @@ func genTable(schema string, tableInfo *model.TableInfo) (table *obinlog.Table)
return
}

func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, err error) {
func insertRowToRow(ptableInfo, tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, err error) {
_, columnValues, err := insertRowToDatums(tableInfo, raw)
columns := tableInfo.Columns

Expand All @@ -152,7 +154,7 @@ func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e
for _, col := range columns {
val, ok := columnValues[col.ID]
if !ok {
val = getDefaultOrZeroValue(col)
val = getDefaultOrZeroValue(ptableInfo, col)
}

column := DatumToColumn(col, val)
Expand All @@ -162,7 +164,7 @@ func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e
return
}

func deleteRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, err error) {
func deleteRowToRow(ptableinfo, tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, err error) {
columns := tableInfo.Columns

colsTypeMap := util.ToColumnTypeMap(tableInfo.Columns)
Expand All @@ -178,7 +180,7 @@ func deleteRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e
for _, col := range columns {
val, ok := columnValues[col.ID]
if !ok {
val = getDefaultOrZeroValue(col)
val = getDefaultOrZeroValue(ptableinfo, col)
}

column := DatumToColumn(col, val)
Expand All @@ -188,8 +190,8 @@ func deleteRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e
return
}

func updateRowToRow(tableInfo *model.TableInfo, raw []byte, isTblDroppingCol bool) (row *obinlog.Row, changedRow *obinlog.Row, err error) {
updtDecoder := newUpdateDecoder(tableInfo, isTblDroppingCol)
func updateRowToRow(ptableinfo, tableInfo *model.TableInfo, raw []byte, canAppendDefaultValue bool) (row *obinlog.Row, changedRow *obinlog.Row, err error) {
updtDecoder := newUpdateDecoder(ptableinfo, tableInfo, canAppendDefaultValue)
oldDatums, newDatums, err := updtDecoder.decode(raw, time.Local)
if err != nil {
return
Expand All @@ -202,13 +204,13 @@ func updateRowToRow(tableInfo *model.TableInfo, raw []byte, isTblDroppingCol boo
var ok bool

if val, ok = newDatums[col.ID]; !ok {
getDefaultOrZeroValue(col)
getDefaultOrZeroValue(ptableinfo, col)
}
column := DatumToColumn(col, val)
row.Columns = append(row.Columns, column)

if val, ok = oldDatums[col.ID]; !ok {
getDefaultOrZeroValue(col)
getDefaultOrZeroValue(ptableinfo, col)
}
column = DatumToColumn(col, val)
changedRow.Columns = append(changedRow.Columns, column)
Expand Down Expand Up @@ -287,25 +289,25 @@ func DatumToColumn(colInfo *model.ColumnInfo, datum types.Datum) (col *obinlog.C
return
}

func createTableMutation(tp pb.MutationType, info *model.TableInfo, isTblDroppingCol bool, row []byte) (*obinlog.TableMutation, error) {
func createTableMutation(tp pb.MutationType, pinfo, info *model.TableInfo, canAppendDefaultValue bool, row []byte) (*obinlog.TableMutation, error) {
var err error
mut := new(obinlog.TableMutation)
switch tp {
case pb.MutationType_Insert:
mut.Type = obinlog.MutationType_Insert.Enum()
mut.Row, err = insertRowToRow(info, row)
mut.Row, err = insertRowToRow(pinfo, info, row)
if err != nil {
return nil, err
}
case pb.MutationType_Update:
mut.Type = obinlog.MutationType_Update.Enum()
mut.Row, mut.ChangeRow, err = updateRowToRow(info, row, isTblDroppingCol)
mut.Row, mut.ChangeRow, err = updateRowToRow(pinfo, info, row, canAppendDefaultValue)
if err != nil {
return nil, err
}
case pb.MutationType_DeleteRow:
mut.Type = obinlog.MutationType_Delete.Enum()
mut.Row, err = deleteRowToRow(info, row)
mut.Row, err = deleteRowToRow(pinfo, info, row)
if err != nil {
return nil, err
}
Expand All @@ -315,13 +317,13 @@ func createTableMutation(tp pb.MutationType, info *model.TableInfo, isTblDroppin
return mut, nil
}

func nextRow(schema string, info *model.TableInfo, isTblDroppingCol bool, iter *sequenceIterator) (*obinlog.TableMutation, error) {
func nextRow(schema string, pinfo, info *model.TableInfo, canAppendDefaultValue bool, iter *sequenceIterator) (*obinlog.TableMutation, error) {
mutType, row, err := iter.next()
if err != nil {
return nil, errors.Trace(err)
}

tableMutation, err := createTableMutation(mutType, info, isTblDroppingCol, row)
tableMutation, err := createTableMutation(mutType, pinfo, info, canAppendDefaultValue, row)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
16 changes: 9 additions & 7 deletions drainer/translator/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

const implicitColID = -1

func genMysqlInsert(schema string, table *model.TableInfo, row []byte) (names []string, args []interface{}, err error) {
func genMysqlInsert(schema string, ptable, table *model.TableInfo, row []byte) (names []string, args []interface{}, err error) {
columns := writableColumns(table)

_, columnValues, err := insertRowToDatums(table, row)
Expand All @@ -41,7 +41,7 @@ func genMysqlInsert(schema string, table *model.TableInfo, row []byte) (names []
for _, col := range columns {
val, ok := columnValues[col.ID]
if !ok {
val = getDefaultOrZeroValue(col)
val = getDefaultOrZeroValue(ptable, col)
}

value, err := formatData(val, col.FieldType)
Expand All @@ -56,9 +56,9 @@ func genMysqlInsert(schema string, table *model.TableInfo, row []byte) (names []
return names, args, nil
}

func genMysqlUpdate(schema string, table *model.TableInfo, row []byte, isTblDroppingCol bool) (names []string, values []interface{}, oldValues []interface{}, err error) {
func genMysqlUpdate(schema string, ptable, table *model.TableInfo, row []byte, canAppendDefaultValue bool) (names []string, values []interface{}, oldValues []interface{}, err error) {
columns := writableColumns(table)
updtDecoder := newUpdateDecoder(table, isTblDroppingCol)
updtDecoder := newUpdateDecoder(ptable, table, canAppendDefaultValue)

var updateColumns []*model.ColumnInfo

Expand Down Expand Up @@ -121,7 +121,9 @@ func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBi
return nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId())
}

isTblDroppingCol := infoGetter.IsDroppingColumn(mut.GetTableId())
pinfo, _ := infoGetter.TableBySchemaVersion(pv.SchemaVersion, mut.GetTableId())

canAppendDefaultValue := infoGetter.CanAppendDefaultValue(mut.GetTableId(), pv.SchemaVersion)

schema, table, ok = infoGetter.SchemaAndTableName(mut.GetTableId())
if !ok {
Expand All @@ -140,7 +142,7 @@ func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBi

switch mutType {
case tipb.MutationType_Insert:
names, args, err := genMysqlInsert(schema, info, row)
names, args, err := genMysqlInsert(schema, pinfo, info, row)
if err != nil {
return nil, errors.Annotate(err, "gen insert fail")
}
Expand All @@ -156,7 +158,7 @@ func TiBinlogToTxn(infoGetter TableInfoGetter, schema string, table string, tiBi
dml.Values[name] = args[i]
}
case tipb.MutationType_Update:
names, args, oldArgs, err := genMysqlUpdate(schema, info, row, isTblDroppingCol)
names, args, oldArgs, err := genMysqlUpdate(schema, pinfo, info, row, canAppendDefaultValue)
if err != nil {
return nil, errors.Annotate(err, "gen update fail")
}
Expand Down
Loading