Skip to content

Commit

Permalink
executor: fix 2nd index dup check after insert ignore on dup update p…
Browse files Browse the repository at this point in the history
…rimary (#23814)
  • Loading branch information
lysu authored Apr 2, 2021
1 parent fd7a331 commit 096281e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 17 deletions.
5 changes: 2 additions & 3 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
if handleChanged {
if sc.DupKeyAsWarning {
// For `UPDATE IGNORE`/`INSERT IGNORE ON DUPLICATE KEY UPDATE`
// If the new handle exists, this will avoid to remove the record.
err = tables.CheckHandleExists(ctx, sctx, t, newHandle, newData)
// If the new handle or unique index exists, this will avoid to remove the record.
err = tables.CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx, sctx, t, newHandle, newData)
if err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue {
return false, nil
Expand All @@ -210,7 +210,6 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
} else {
_, err = t.AddRecord(sctx, newData, table.IsUpdate, table.WithCtx(ctx))
}

if err != nil {
return false, err
}
Expand Down
21 changes: 21 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,27 @@ func (s *testSuite4) TestInsertIgnoreOnDup(c *C) {
testSQL = `select * from t;`
r = tk.MustQuery(testSQL)
r.Check(testkit.Rows("1 1", "2 2"))

tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2(`col_25` set('Alice','Bob','Charlie','David') NOT NULL,`col_26` date NOT NULL DEFAULT '2016-04-15', PRIMARY KEY (`col_26`) clustered, UNIQUE KEY `idx_9` (`col_25`,`col_26`),UNIQUE KEY `idx_10` (`col_25`))")
tk.MustExec("insert into t2(col_25, col_26) values('Bob', '1989-03-23'),('Alice', '2023-11-24'), ('Charlie', '2023-12-05')")
tk.MustExec("insert ignore into t2 (col_25,col_26) values ( 'Bob','1977-11-23' ) on duplicate key update col_25 = 'Alice', col_26 = '2036-12-13'")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry 'Alice' for key 'idx_10'"))
tk.MustQuery("select * from t2").Check(testkit.Rows("Bob 1989-03-23", "Alice 2023-11-24", "Charlie 2023-12-05"))

tk.MustExec("drop table if exists t4")
tk.MustExec("create table t4(id int primary key clustered, k int, v int, unique key uk1(k))")
tk.MustExec("insert into t4 values (1, 10, 100), (3, 30, 300)")
tk.MustExec("insert ignore into t4 (id, k, v) values(1, 0, 0) on duplicate key update id = 2, k = 30")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '30' for key 'uk1'"))
tk.MustQuery("select * from t4").Check(testkit.Rows("1 10 100", "3 30 300"))

tk.MustExec("drop table if exists t5")
tk.MustExec("create table t5(k1 varchar(100), k2 varchar(100), uk1 int, v int, primary key(k1, k2) clustered, unique key ukk1(uk1), unique key ukk2(v))")
tk.MustExec("insert into t5(k1, k2, uk1, v) values('1', '1', 1, '100'), ('1', '3', 2, '200')")
tk.MustExec("update ignore t5 set k2 = '2', uk1 = 2 where k1 = '1' and k2 = '1'")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '2' for key 'ukk1'"))
tk.MustQuery("select * from t5").Check(testkit.Rows("1 1 1 100", "1 3 2 200"))
}

func (s *testSuite4) TestInsertSetWithDefault(c *C) {
Expand Down
63 changes: 49 additions & 14 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
}

// genIndexKeyStr generates index content string representation.
func (t *TableCommon) genIndexKeyStr(colVals []types.Datum) (string, error) {
func genIndexKeyStr(colVals []types.Datum) (string, error) {
// Pass pre-composed error to txn.
strVals := make([]string, 0, len(colVals))
for _, cv := range colVals {
Expand Down Expand Up @@ -840,7 +840,7 @@ func (t *TableCommon) addIndices(sctx sessionctx.Context, recordID kv.Handle, r
}
var dupErr error
if !skipCheck && v.Meta().Unique {
entryKey, err := t.genIndexKeyStr(indexVals)
entryKey, err := genIndexKeyStr(indexVals)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func (t *TableCommon) buildIndexForRow(ctx sessionctx.Context, h kv.Handle, vals
if _, err := idx.Create(ctx, txn, vals, h, rsData, opts...); err != nil {
if kv.ErrKeyExists.Equal(err) {
// Make error message consistent with MySQL.
entryKey, err1 := t.genIndexKeyStr(vals)
entryKey, err1 := genIndexKeyStr(vals)
if err1 != nil {
// if genIndexKeyStr failed, return the original error.
return err
Expand Down Expand Up @@ -1448,9 +1448,9 @@ func FindIndexByColName(t table.Table, name string) table.Index {
return nil
}

// CheckHandleExists check whether recordID key exists. if not exists, return nil,
// CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore check whether recordID key or unique index key exists. if not exists, return nil,
// otherwise return kv.ErrKeyExists error.
func CheckHandleExists(ctx context.Context, sctx sessionctx.Context, t table.Table, recordID kv.Handle, data []types.Datum) error {
func CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx context.Context, sctx sessionctx.Context, t table.Table, recordID kv.Handle, data []types.Datum) error {
physicalTableID := t.Meta().ID
if pt, ok := t.(*partitionedTable); ok {
info := t.Meta().GetPartitionInfo()
Expand All @@ -1465,15 +1465,50 @@ func CheckHandleExists(ctx context.Context, sctx sessionctx.Context, t table.Tab
if err != nil {
return err
}
// Check key exists.
prefix := tablecodec.GenTableRecordPrefix(physicalTableID)
recordKey := tablecodec.EncodeRecordKey(prefix, recordID)
_, err = txn.Get(ctx, recordKey)
if err == nil {
handleStr := getDuplicateErrorHandleString(t, recordID, data)
return kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY")
} else if !kv.ErrNotExist.Equal(err) {
return err

// Check primary key exists.
{
prefix := tablecodec.GenTableRecordPrefix(physicalTableID)
recordKey := tablecodec.EncodeRecordKey(prefix, recordID)
_, err = txn.Get(ctx, recordKey)
if err == nil {
handleStr := getDuplicateErrorHandleString(t, recordID, data)
return kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY")
} else if !kv.ErrNotExist.Equal(err) {
return err
}
}

// Check unique key exists.
{
for _, idx := range t.Indices() {
if !IsIndexWritable(idx) {
continue
}
if !idx.Meta().Unique {
continue
}
vals, err := idx.FetchValues(data, nil)
if err != nil {
return err
}
key, _, err := idx.GenIndexKey(sctx.GetSessionVars().StmtCtx, vals, recordID, nil)
if err != nil {
return err
}
_, err = txn.Get(ctx, key)
if kv.IsErrNotFound(err) {
continue
}
if err != nil {
return err
}
entryKey, err := genIndexKeyStr(vals)
if err != nil {
return err
}
return kv.ErrKeyExists.FastGenByArgs(entryKey, idx.Meta().Name.String())
}
}
return nil
}
Expand Down

0 comments on commit 096281e

Please sign in to comment.