Skip to content

Commit

Permalink
mysql.go: data may not consistent when no pk but has uk (#421)
Browse files Browse the repository at this point in the history
* mysql.go: data may not consistent when no pk but has uk

cause by drainer using uk as where to update (where a1 = * and a3 is
	NULL), may update the wrong row
when any column is null, it will not be index, so we can't use it unless
all column of index is not null
  • Loading branch information
july2993 authored Dec 24, 2018
1 parent dc577ef commit 20b3e2a
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 21 deletions.
50 changes: 42 additions & 8 deletions drainer/translator/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,30 @@ func (m *mysqlTranslator) GenDDLSQL(sql string, schema string, commitTS int64) (

func (m *mysqlTranslator) genWhere(table *model.TableInfo, columns []*model.ColumnInfo, data []interface{}) (string, []interface{}, error) {
var kvs bytes.Buffer

check := func(ucs []*model.ColumnInfo) bool {
ucsMap := make(map[int64]*model.ColumnInfo)
for _, col := range ucs {
ucsMap[col.ID] = col
}

for i, col := range columns {
_, ok := ucsMap[col.ID]
if !ok {
continue
}

// set to false, so we use all column as where condition
if data[i] == nil {
return false
}
}
return true
}

// if has unique key, use it to construct where condition
ucs, err := m.uniqueIndexColumns(table, false)
ucs, err := m.uniqueIndexColumns(table, check)

if err != nil {
return "", nil, errors.Trace(err)
}
Expand Down Expand Up @@ -384,7 +406,8 @@ func (m *mysqlTranslator) pkHandleColumn(table *model.TableInfo) *model.ColumnIn
return nil
}

func (m *mysqlTranslator) uniqueIndexColumns(table *model.TableInfo, findAll bool) ([]*model.ColumnInfo, error) {
// return primary key columns or any unique index columns which check return true
func (m *mysqlTranslator) uniqueIndexColumns(table *model.TableInfo, check func([]*model.ColumnInfo) bool) ([]*model.ColumnInfo, error) {
// pkHandleCol may in table.Indices, use map to keep olny one same key.
uniqueColsMap := make(map[string]interface{})
uniqueCols := make([]*model.ColumnInfo, 0, 2)
Expand All @@ -394,18 +417,29 @@ func (m *mysqlTranslator) uniqueIndexColumns(table *model.TableInfo, findAll boo
uniqueColsMap[pkHandleCol.Name.O] = pkHandleCol
uniqueCols = append(uniqueCols, pkHandleCol)

if !findAll {
return uniqueCols, nil
}
return uniqueCols, nil
}

columns := make(map[string]*model.ColumnInfo)
for _, col := range table.Columns {
columns[col.Name.O] = col
}

for _, idx := range table.Indices {
// put primary key at [0], so we get primary key first if table has primary key
indices := make([]*model.IndexInfo, len(table.Indices))
copy(indices, table.Indices)
for i := 0; i < len(indices); i++ {
if indices[i].Primary {
indices[i], indices[0] = indices[0], indices[i]
break
}
}

for _, idx := range indices {
if idx.Primary || idx.Unique {
uniqueCols = uniqueCols[:0]
// why need this? unique index should has no duplicate column
uniqueColsMap = make(map[string]interface{})
for _, col := range idx.Columns {
if column, ok := columns[col.Name.O]; ok {
if _, ok := uniqueColsMap[col.Name.O]; !ok {
Expand All @@ -419,13 +453,13 @@ func (m *mysqlTranslator) uniqueIndexColumns(table *model.TableInfo, findAll boo
return nil, errors.New("primay/unique index is empty, but should not be empty")
}

if !findAll {
if check == nil || check(uniqueCols) {
return uniqueCols, nil
}
}
}

return uniqueCols, nil
return uniqueCols[:0], nil
}

func (m *mysqlTranslator) getIndexColumns(table *model.TableInfo) (indexColumns [][]*model.ColumnInfo, err error) {
Expand Down
6 changes: 3 additions & 3 deletions drainer/translator/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ func (t *testTranslatorSuite) TestPkHandleColumn(c *C) {
func (t *testTranslatorSuite) TestPkIndexColumns(c *C) {
m := testGenMysqlTranslator(c)
table := testGenTable("hasPK")
cols, err := m.uniqueIndexColumns(table, true)
cols, err := m.uniqueIndexColumns(table, nil)
c.Assert(err, IsNil)
c.Assert(len(cols), Equals, 2)

table = testGenTable("hasID")
cols, err = m.uniqueIndexColumns(table, true)
cols, err = m.uniqueIndexColumns(table, nil)
c.Assert(err, IsNil)
c.Assert(len(cols), Equals, 1)

table = testGenTable("normal")
cols, err = m.uniqueIndexColumns(table, true)
cols, err = m.uniqueIndexColumns(table, nil)
c.Assert(err, IsNil)
c.Assert(len(cols), Equals, 0)
}
Expand Down
38 changes: 38 additions & 0 deletions tests/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ var case1Clean = []string{`
drop table binlog_case1`,
}

// https://internal.pingcap.net/jira/browse/TOOL-714
var case2 = []string{`
create table binlog_case2 (id int, a1 int, a3 int, unique key dex1(a1, a3));
`,
`
insert into binlog_case2(id, a1, a3) values(1, 1, NULL);
`,
`
insert into binlog_case2(id, a1, a3) values(2, 1, NULL);
`,
`
update binlog_case2 set id = 10 where id = 1;
`,
`
update binlog_case2 set id = 100 where id = 10;
`,
}

var case2Clean = []string{`
drop table binlog_case2`,
}

// RunCase run some simple test case
func RunCase(cfg *diff.Config, src *sql.DB, dst *sql.DB) {
RunTest(cfg, src, dst, func(src *sql.DB) {
Expand All @@ -82,6 +104,22 @@ func RunCase(cfg *diff.Config, src *sql.DB, dst *sql.DB) {
}
})

// run case2
RunTest(cfg, src, dst, func(src *sql.DB) {
err := execSQLs(src, case2)
if err != nil {
log.Fatal(err)
}
})

// clean table
RunTest(cfg, src, dst, func(src *sql.DB) {
err := execSQLs(src, case2Clean)
if err != nil {
log.Fatal(err)
}
})

// test big binlog msg
RunTest(cfg, src, dst, func(src *sql.DB) {
_, err := src.Query("create table binlog_big(id int primary key, data longtext);")
Expand Down
37 changes: 27 additions & 10 deletions tests/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,21 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {
sqlArgs = append(sqlArgs, args)
}

constructWhere := func() (sql string, usePK bool) {
constructWhere := func(args []interface{}) (sql string, usePK bool) {
var whereColumns []string
for _, col := range table.GetColumnInfo() {
var whereArgs []interface{}
for i, col := range table.GetColumnInfo() {
if col.GetIsPrimaryKey() {
whereColumns = append(whereColumns, col.GetName())
whereArgs = append(whereArgs, args[i])
usePK = true
}
}
// no primary key
if len(whereColumns) == 0 {
for _, col := range table.GetColumnInfo() {
for i, col := range table.GetColumnInfo() {
whereColumns = append(whereColumns, col.GetName())
whereArgs = append(whereArgs, args[i])
}
}

Expand All @@ -168,16 +171,18 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {
sql += " and "
}

sql += fmt.Sprintf("%s = ? ", col)
if whereArgs[i] == nil {
sql += fmt.Sprintf("%s IS NULL ", col)
} else {
sql += fmt.Sprintf("%s = ? ", col)
}
}

sql += " limit 1"

return
}

where, usePK := constructWhere()

for _, mutation := range table.Mutations {
switch mutation.GetType() {
case pb.MutationType_Insert:
Expand All @@ -193,8 +198,6 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {
sql += fmt.Sprintf("%s = ? ", col.Name)
}

sql += where

row := mutation.Row
changedRow := mutation.ChangeRow

Expand All @@ -204,8 +207,14 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {
args = append(args, columnToArg(col))
}

where, usePK := constructWhere(args)
sql += where

// for where
for i, col := range changedRow.GetColumns() {
if columnToArg(col) == nil {
continue
}
if !usePK || columnInfo[i].GetIsPrimaryKey() {
args = append(args, columnToArg(col))
}
Expand All @@ -216,13 +225,21 @@ func tableToSQL(table *pb.Table) (sqls []string, sqlArgs [][]interface{}) {

case pb.MutationType_Delete:
columnInfo := table.GetColumnInfo()
where, usePK := constructWhere()
row := mutation.Row

var values []interface{}
for _, col := range row.GetColumns() {
values = append(values, columnToArg(col))
}
where, usePK := constructWhere(values)

sql := fmt.Sprintf("delete from `%s`.`%s` %s", table.GetSchemaName(), table.GetTableName(), where)

row := mutation.Row
var args []interface{}
for i, col := range row.GetColumns() {
if columnToArg(col) == nil {
continue
}
if !usePK || columnInfo[i].GetIsPrimaryKey() {
args = append(args, columnToArg(col))
}
Expand Down

0 comments on commit 20b3e2a

Please sign in to comment.