Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix 2nd index dup check after insert ignore on dup update primary (#23814) #23826

Merged
merged 2 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
if handleChanged {
if sc.DupKeyAsWarning {
// For `UPDATE IGNORE`/`INSERT IGNORE ON DUPLICATE KEY UPDATE`
// If the new handle exists, this will avoid to remove the record.
err = tables.CheckHandleExists(ctx, sctx, t, newHandle, newData)
// If the new handle or unique index exists, this will avoid to remove the record.
err = tables.CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx, sctx, t, newHandle, newData)
if err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue {
return false, nil
Expand All @@ -210,7 +210,6 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
} else {
_, err = t.AddRecord(sctx, newData, table.IsUpdate, table.WithCtx(ctx))
}

if err != nil {
return false, err
}
Expand Down
21 changes: 21 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,27 @@ func (s *testSuite4) TestInsertIgnoreOnDup(c *C) {
testSQL = `select * from t;`
r = tk.MustQuery(testSQL)
r.Check(testkit.Rows("1 1", "2 2"))

tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2(`col_25` set('Alice','Bob','Charlie','David') NOT NULL,`col_26` date NOT NULL DEFAULT '2016-04-15', PRIMARY KEY (`col_26`) clustered, UNIQUE KEY `idx_9` (`col_25`,`col_26`),UNIQUE KEY `idx_10` (`col_25`))")
tk.MustExec("insert into t2(col_25, col_26) values('Bob', '1989-03-23'),('Alice', '2023-11-24'), ('Charlie', '2023-12-05')")
tk.MustExec("insert ignore into t2 (col_25,col_26) values ( 'Bob','1977-11-23' ) on duplicate key update col_25 = 'Alice', col_26 = '2036-12-13'")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry 'Alice' for key 'idx_10'"))
tk.MustQuery("select * from t2").Check(testkit.Rows("Bob 1989-03-23", "Alice 2023-11-24", "Charlie 2023-12-05"))

tk.MustExec("drop table if exists t4")
tk.MustExec("create table t4(id int primary key clustered, k int, v int, unique key uk1(k))")
tk.MustExec("insert into t4 values (1, 10, 100), (3, 30, 300)")
tk.MustExec("insert ignore into t4 (id, k, v) values(1, 0, 0) on duplicate key update id = 2, k = 30")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '30' for key 'uk1'"))
tk.MustQuery("select * from t4").Check(testkit.Rows("1 10 100", "3 30 300"))

tk.MustExec("drop table if exists t5")
tk.MustExec("create table t5(k1 varchar(100), k2 varchar(100), uk1 int, v int, primary key(k1, k2) clustered, unique key ukk1(uk1), unique key ukk2(v))")
tk.MustExec("insert into t5(k1, k2, uk1, v) values('1', '1', 1, '100'), ('1', '3', 2, '200')")
tk.MustExec("update ignore t5 set k2 = '2', uk1 = 2 where k1 = '1' and k2 = '1'")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '2' for key 'ukk1'"))
tk.MustQuery("select * from t5").Check(testkit.Rows("1 1 1 100", "1 3 2 200"))
}

func (s *testSuite4) TestInsertSetWithDefault(c *C) {
Expand Down
63 changes: 49 additions & 14 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
}

// genIndexKeyStr generates index content string representation.
func (t *TableCommon) genIndexKeyStr(colVals []types.Datum) (string, error) {
func genIndexKeyStr(colVals []types.Datum) (string, error) {
// Pass pre-composed error to txn.
strVals := make([]string, 0, len(colVals))
for _, cv := range colVals {
Expand Down Expand Up @@ -839,7 +839,7 @@ func (t *TableCommon) addIndices(sctx sessionctx.Context, recordID kv.Handle, r
}
var dupErr error
if !skipCheck && v.Meta().Unique {
entryKey, err := t.genIndexKeyStr(indexVals)
entryKey, err := genIndexKeyStr(indexVals)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1159,7 +1159,7 @@ func (t *TableCommon) buildIndexForRow(ctx sessionctx.Context, h kv.Handle, vals
if _, err := idx.Create(ctx, txn, vals, h, rsData, opts...); err != nil {
if kv.ErrKeyExists.Equal(err) {
// Make error message consistent with MySQL.
entryKey, err1 := t.genIndexKeyStr(vals)
entryKey, err1 := genIndexKeyStr(vals)
if err1 != nil {
// if genIndexKeyStr failed, return the original error.
return err
Expand Down Expand Up @@ -1447,9 +1447,9 @@ func FindIndexByColName(t table.Table, name string) table.Index {
return nil
}

// CheckHandleExists check whether recordID key exists. if not exists, return nil,
// CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore check whether recordID key or unique index key exists. if not exists, return nil,
// otherwise return kv.ErrKeyExists error.
func CheckHandleExists(ctx context.Context, sctx sessionctx.Context, t table.Table, recordID kv.Handle, data []types.Datum) error {
func CheckHandleOrUniqueKeyExistForUpdateIgnoreOrInsertOnDupIgnore(ctx context.Context, sctx sessionctx.Context, t table.Table, recordID kv.Handle, data []types.Datum) error {
physicalTableID := t.Meta().ID
if pt, ok := t.(*partitionedTable); ok {
info := t.Meta().GetPartitionInfo()
Expand All @@ -1464,15 +1464,50 @@ func CheckHandleExists(ctx context.Context, sctx sessionctx.Context, t table.Tab
if err != nil {
return err
}
// Check key exists.
prefix := tablecodec.GenTableRecordPrefix(physicalTableID)
recordKey := tablecodec.EncodeRecordKey(prefix, recordID)
_, err = txn.Get(ctx, recordKey)
if err == nil {
handleStr := getDuplicateErrorHandleString(t, recordID, data)
return kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY")
} else if !kv.ErrNotExist.Equal(err) {
return err

// Check primary key exists.
{
prefix := tablecodec.GenTableRecordPrefix(physicalTableID)
recordKey := tablecodec.EncodeRecordKey(prefix, recordID)
_, err = txn.Get(ctx, recordKey)
if err == nil {
handleStr := getDuplicateErrorHandleString(t, recordID, data)
return kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY")
} else if !kv.ErrNotExist.Equal(err) {
return err
}
}

// Check unique key exists.
{
for _, idx := range t.Indices() {
if !IsIndexWritable(idx) {
continue
}
if !idx.Meta().Unique {
continue
}
vals, err := idx.FetchValues(data, nil)
if err != nil {
return err
}
key, _, err := idx.GenIndexKey(sctx.GetSessionVars().StmtCtx, vals, recordID, nil)
if err != nil {
return err
}
_, err = txn.Get(ctx, key)
if kv.IsErrNotFound(err) {
continue
}
if err != nil {
return err
}
entryKey, err := genIndexKeyStr(vals)
if err != nil {
return err
}
return kv.ErrKeyExists.FastGenByArgs(entryKey, idx.Meta().Name.String())
}
}
return nil
}
Expand Down