From bbb7c38b3590eeb73b3a09bf5ee6d4d6ca3823a8 Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Thu, 24 Nov 2022 17:44:42 +0800 Subject: [PATCH 01/15] fix tempidx key check exist logic. --- ddl/db_change_test.go | 2 - ddl/index_change_test.go | 27 ++++++++--- executor/batch_checker.go | 3 ++ executor/insert.go | 6 +++ table/tables/index.go | 96 ++++++++++++++++++++++++++++++++++++--- 5 files changed, 120 insertions(+), 14 deletions(-) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index ee0634215c465..d865c970e7f42 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1719,8 +1719,6 @@ func TestCreateUniqueExpressionIndex(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table t(a int default 0, b int default 0)") tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)") diff --git a/ddl/index_change_test.go b/ddl/index_change_test.go index 85352cb6d08d1..f9dcc99154dc5 100644 --- a/ddl/index_change_test.go +++ b/ddl/index_change_test.go @@ -37,8 +37,6 @@ func TestIndexChange(t *testing.T) { ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - // TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table t (c1 int primary key, c2 int)") tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);") @@ -221,6 +219,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT } func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table.Table) error { + var err1 error // WriteOnlyTable: insert t values (6, 6) err := sessiontxn.NewTxn(context.Background(), ctx) if err != nil { @@ -231,7 +230,11 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table return errors.Trace(err) } err = checkIndexExists(ctx, publicTbl, 6, 6, true) - if err != nil { + if ddl.IsEnableFastReorg() { + // Need check temp index also. + err1 = checkIndexExists(ctx, writeTbl, 6, 6, true) + } + if err != nil && err1 != nil { return errors.Trace(err) } // PublicTable: insert t values (7, 7) @@ -250,10 +253,18 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table return errors.Trace(err) } err = checkIndexExists(ctx, publicTbl, 5, 7, true) - if err != nil { + if ddl.IsEnableFastReorg() { + // Need check temp index also. + err1 = checkIndexExists(ctx, writeTbl, 5, 7, true) + } + if err != nil && err1 != nil { return errors.Trace(err) } - err = checkIndexExists(ctx, publicTbl, 7, 7, false) + if ddl.IsEnableFastReorg() { + err = checkIndexExists(ctx, writeTbl, 7, 7, false) + } else { + err = checkIndexExists(ctx, publicTbl, 7, 7, false) + } if err != nil { return errors.Trace(err) } @@ -283,7 +294,11 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table idxVal := row[1].GetInt64() handle := row[0].GetInt64() err = checkIndexExists(ctx, publicTbl, idxVal, handle, true) - if err != nil { + if ddl.IsEnableFastReorg() { + // Need check temp index also. + err1 = checkIndexExists(ctx, writeTbl, idxVal, handle, true) + } + if err != nil && err1 != nil { return errors.Trace(err) } } diff --git a/executor/batch_checker.go b/executor/batch_checker.go index d3820ecb0d08c..4a93fd028bf24 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -180,6 +180,9 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D if !distinct { continue } + if v.Meta().BackfillState == model.BackfillStateRunning { + _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) + } colValStr, err1 := formatDataForDupError(colVals) if err1 != nil { return nil, err1 diff --git a/executor/insert.go b/executor/insert.go index 9b286297351b9..a77cf6ed3cfda 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -15,6 +15,7 @@ package executor import ( + "bytes" "context" "encoding/hex" "fmt" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -264,6 +266,10 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } return err } + rowVal := val[:len(val)-1] + if bytes.Equal(rowVal, tables.DeleteMarkerUnique) { + continue + } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { return err diff --git a/table/tables/index.go b/table/tables/index.go index 9fc1042a110fd..6bc1063fe9588 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -15,6 +15,7 @@ package tables import ( + "bytes" "context" "sync" @@ -127,7 +128,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue keyIsTempIdxKey bool ) if !opt.FromBackFill { - key, tempKey, keyVer = genTempIdxKeyByState(c.idxInfo, key) + key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) if keyVer == TempIndexKeyTypeBackfill { key, tempKey = tempKey, nil keyIsTempIdxKey = true @@ -229,8 +230,20 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if keyIsTempIdxKey { idxVal = append(idxVal, keyVer) } + var needPresumeKey int if lazyCheck { - flags := []kv.FlagsOp{kv.SetPresumeKeyNotExists} + var ( + flags []kv.FlagsOp + ) + if !opt.FromBackFill { + needPresumeKey, _, err = KeyExistInTempIndex(txn, key, distinct, h, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } + if needPresumeKey != KeyInTempIndexIsDeleted { + flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} + } if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && !vars.InRestrictedSQL && vars.ConnectionID > 0 { flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) @@ -244,7 +257,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if len(tempKey) > 0 { idxVal = append(idxVal, keyVer) - if lazyCheck { + if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted { err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) } else { err = txn.GetMemBuffer().Set(tempKey, idxVal) @@ -285,7 +298,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed return err } - key, tempKey, tempKeyVer := genTempIdxKeyByState(c.idxInfo, key) + key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) if distinct { if len(key) > 0 { @@ -336,9 +349,9 @@ const ( TempIndexKeyTypeMerge byte = 'm' ) -// genTempIdxKeyByState is used to get the key version and the temporary key. +// GenTempIdxKeyByState is used to get the key version and the temporary key. // The tempKeyVer means the temp index key/value version. -func genTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) { +func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) { if indexInfo.State != model.StatePublic { switch indexInfo.BackfillState { case model.BackfillStateInapplicable: @@ -364,6 +377,28 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV return false, nil, err } + var ( + tempKey []byte + keyVer byte + ) + key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) + if keyVer != TempIndexKeyTypeNone { + if len(tempKey) > 0 { + KeyExistInfo, h1, err1 := KeyExistInTempIndex(txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) + if err1 != nil { + return false, nil, err + } + switch KeyExistInfo { + case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: + return false, nil, nil + case KeyInTempIndexConflict: + return true, h1, kv.ErrKeyExists + case KeyInTempIndexIsItself: + return true, h, nil + } + } + } + value, err := txn.Get(context.TODO(), key) if kv.IsErrNotFound(err) { return false, nil, nil @@ -463,3 +498,52 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * } return colInfo } + +const ( + // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. + KeyInTempIndexUnknown = 0 + // KeyInTempIndexNotExist the key is not exist in temp index. + KeyInTempIndexNotExist = 1 + // KeyInTempIndexIsDeleted the key is marked deleted in temp index. + KeyInTempIndexIsDeleted = 2 + // KeyInTempIndexIsItself the key is correlated to itself in temp index. + KeyInTempIndexIsItself = 3 + // KeyInTempIndexConflict the key is conflict in temp index. + KeyInTempIndexConflict = 4 +) + +// KeyExistInTempIndex is used to check if there is unique key is marked delete in temp index. +func KeyExistInTempIndex(txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (int, kv.Handle, error) { + value, err := txn.Get(context.TODO(), key) + if kv.IsErrNotFound(err) { + return KeyInTempIndexNotExist, nil, nil + } + if err != nil { + return KeyInTempIndexUnknown, nil, err + } + + length := len(value) + value = value[:length-1] + if distinct { + if bytes.Equal(value, DeleteMarkerUnique) { + return KeyInTempIndexIsDeleted, nil, nil + } + } else { + if bytes.Equal(value, DeleteMarker) { + return KeyInTempIndexIsDeleted, nil, nil + } + } + + // Check if handle equal? + var handle kv.Handle + if distinct { + handle, err = tablecodec.DecodeHandleInUniqueIndexValue(value, IsCommonHandle) + if err != nil { + return KeyInTempIndexUnknown, nil, err + } + if !handle.Equal(h) { + return KeyInTempIndexConflict, handle, kv.ErrKeyExists + } + } + return KeyInTempIndexIsItself, handle, nil +} From ab562022ec44f67c67ffc2f21f0827572ec3cfc4 Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Thu, 24 Nov 2022 18:38:02 +0800 Subject: [PATCH 02/15] only check temp idnex key. --- table/tables/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table/tables/index.go b/table/tables/index.go index 6bc1063fe9588..2b77c578088f1 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -235,7 +235,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue var ( flags []kv.FlagsOp ) - if !opt.FromBackFill { + if !opt.FromBackFill && keyIsTempIdxKey { needPresumeKey, _, err = KeyExistInTempIndex(txn, key, distinct, h, c.tblInfo.IsCommonHandle) if err != nil { return nil, err From 9d2b14242427625798b43dc588a88d499214cc63 Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Fri, 25 Nov 2022 13:41:11 +0800 Subject: [PATCH 03/15] refactor code and add comments. --- executor/batch_checker.go | 3 ++- executor/insert.go | 2 ++ table/tables/index.go | 21 +++++++++++++-------- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index 4a93fd028bf24..79a6748b2d5c3 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -180,7 +180,8 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D if !distinct { continue } - if v.Meta().BackfillState == model.BackfillStateRunning { + // If index is used ingest ways, then we should check key from temp index. + if v.Meta().BackfillState != model.BackfillStateInapplicable { _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) } colValStr, err1 := formatDataForDupError(colVals) diff --git a/executor/insert.go b/executor/insert.go index a77cf6ed3cfda..b15220bf201d2 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -266,6 +266,8 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } return err } + // Since the temp index stores deleted key with marked 'deleteu' for unique key at the end + // of value, So if return a key we check and skip deleted key. rowVal := val[:len(val)-1] if bytes.Equal(rowVal, tables.DeleteMarkerUnique) { continue diff --git a/table/tables/index.go b/table/tables/index.go index 2b77c578088f1..818aa82f1fad3 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -227,20 +227,23 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if err != nil || len(value) == 0 { lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil + var needPresumeKey int if keyIsTempIdxKey { idxVal = append(idxVal, keyVer) - } - var needPresumeKey int - if lazyCheck { - var ( - flags []kv.FlagsOp - ) - if !opt.FromBackFill && keyIsTempIdxKey { - needPresumeKey, _, err = KeyExistInTempIndex(txn, key, distinct, h, c.tblInfo.IsCommonHandle) + needPresumeKey, _, err = KeyExistInTempIndex(txn, key, distinct, h, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } else { + if len(tempKey) > 0 { + needPresumeKey, _, err = KeyExistInTempIndex(txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) if err != nil { return nil, err } } + } + if lazyCheck { + var flags []kv.FlagsOp if needPresumeKey != KeyInTempIndexIsDeleted { flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} } @@ -381,6 +384,8 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV tempKey []byte keyVer byte ) + // If index current is in creating status and using ingest mode, we need first + // check key exist status in temp index. key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) if keyVer != TempIndexKeyTypeNone { if len(tempKey) > 0 { From 410417614baa8fcd2341f08237e80a93d9bd798e Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Fri, 25 Nov 2022 16:12:05 +0800 Subject: [PATCH 04/15] add test case. --- ddl/index_merge_tmp_test.go | 87 +++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 8dae95e590438..33961ca5c8cd1 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -259,3 +259,90 @@ func TestPessimisticAmendIncompatibleWithFastReorg(t *testing.T) { tk.MustGetErrMsg("set @@tidb_enable_amend_pessimistic_txn = 1;", "amend pessimistic transactions is not compatible with tidb_ddl_enable_fast_reorg") } + +// TestCreateUniqueIndexKeyExist this case will test below things: +// Create one unique index idx((a*b+1)); +// insert (0, 6) and delete it; +// insert (0, 9), it should be successful; +func TestCreateUniqueIndexKeyExist(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + stateDeleteOnlySQLs := []string{"insert into t values (5, 5)", "begin pessimistic;", "insert into t select * from t", "rollback", "insert into t set b = 6", "update t set b = 7 where a = 1", "delete from t where b = 4"} + + // If waitReorg timeout, the worker may enter writeReorg more than 2 times. + reorgTime := 0 + var checkErr error + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &ddl.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if checkErr != nil { + return + } + err := originalCallback.OnChanged(nil) + require.NoError(t, err) + switch job.SchemaState { + case model.StateDeleteOnly: + for _, sql := range stateDeleteOnlySQLs { + _, checkErr = tk1.Exec(sql) + if checkErr != nil { + return + } + } + // (1, 7), (2, 2), (3, 3), (5, 5), (0, 6) + case model.StateWriteOnly: + _, checkErr = tk1.Exec("insert into t values (8, 8)") + if checkErr != nil { + return + } + + _, checkErr = tk1.Exec("update t set b = 7 where a = 2") + if checkErr != nil { + return + } + _, checkErr = tk1.Exec("delete from t where b = 3") + if checkErr != nil { + return + } + // (1, 7), (2, 7), (5, 5), (0, 6), (8, 8) + case model.StateWriteReorganization: + if reorgTime < 1 { + reorgTime++ + } else { + return + } + _, checkErr = tk1.Exec("insert into t values (10, 10)") + if checkErr != nil { + return + } + _, checkErr = tk1.Exec("delete from t where b = 6") + if checkErr != nil { + return + } + _, checkErr = tk1.Exec("insert into t set b = 9") + if checkErr != nil { + return + } + _, checkErr = tk1.Exec("update t set b = 7 where a = 5") + if checkErr != nil { + return + } + // (1, 7), (2, 7), (5, 7), (8, 8), (10, 10), (0, 9) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + tk.MustExec("alter table t add unique index idx((a*b+1))") + require.NoError(t, checkErr) + tk.MustExec("admin check table t") + tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "1 7", "2 7", "5 7", "8 8", "10 10")) +} From 43aae4c140326405b22e66afac8df9d16608976f Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Fri, 25 Nov 2022 16:33:24 +0800 Subject: [PATCH 05/15] add error information. --- ddl/index_merge_tmp_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 33961ca5c8cd1..def3f9e1aef50 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -264,6 +264,12 @@ func TestPessimisticAmendIncompatibleWithFastReorg(t *testing.T) { // Create one unique index idx((a*b+1)); // insert (0, 6) and delete it; // insert (0, 9), it should be successful; +// Should check temp key exist and skip deleted mark +// The error returned below: +// Error Trace: /tidb/ddl/index_merge_tmp_test.go:345 +// Error: Received unexpected error: +// +// [kv:1062]Duplicate entry '1' for key 't.idx' func TestCreateUniqueIndexKeyExist(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) From 5c73ab31da578bf0278a8b1323957ac5f8888382 Mon Sep 17 00:00:00 2001 From: Benjamin2037 <benjamin2037@sina.com> Date: Fri, 25 Nov 2022 17:27:00 +0800 Subject: [PATCH 06/15] Update ddl/index_merge_tmp_test.go Co-authored-by: tangenta <tangenta@126.com> --- ddl/index_merge_tmp_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index def3f9e1aef50..55496199f1736 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -266,7 +266,6 @@ func TestPessimisticAmendIncompatibleWithFastReorg(t *testing.T) { // insert (0, 9), it should be successful; // Should check temp key exist and skip deleted mark // The error returned below: -// Error Trace: /tidb/ddl/index_merge_tmp_test.go:345 // Error: Received unexpected error: // // [kv:1062]Duplicate entry '1' for key 't.idx' From 87e1d0912378f45a265c20720cb7035d8848954b Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Fri, 25 Nov 2022 17:45:46 +0800 Subject: [PATCH 07/15] remove reduent err check --- ddl/index_merge_tmp_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 55496199f1736..bfd717c45292d 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -290,9 +290,6 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) { defer d.SetHook(originalCallback) callback := &ddl.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { - if checkErr != nil { - return - } err := originalCallback.OnChanged(nil) require.NoError(t, err) switch job.SchemaState { From 456b2bf1f7348a0a7eb320bcbe2174176ff715fc Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Fri, 25 Nov 2022 18:31:20 +0800 Subject: [PATCH 08/15] refactor code --- ddl/index_merge_tmp_test.go | 43 +++++++------------------------------ table/tables/index.go | 14 ++++++------ 2 files changed, 16 insertions(+), 41 deletions(-) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index bfd717c45292d..4524e5d6b4054 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -284,7 +284,6 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) { // If waitReorg timeout, the worker may enter writeReorg more than 2 times. reorgTime := 0 - var checkErr error d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) @@ -295,26 +294,13 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) { switch job.SchemaState { case model.StateDeleteOnly: for _, sql := range stateDeleteOnlySQLs { - _, checkErr = tk1.Exec(sql) - if checkErr != nil { - return - } + tk1.MustExec(sql) } // (1, 7), (2, 2), (3, 3), (5, 5), (0, 6) case model.StateWriteOnly: - _, checkErr = tk1.Exec("insert into t values (8, 8)") - if checkErr != nil { - return - } - - _, checkErr = tk1.Exec("update t set b = 7 where a = 2") - if checkErr != nil { - return - } - _, checkErr = tk1.Exec("delete from t where b = 3") - if checkErr != nil { - return - } + tk1.MustExec("insert into t values (8, 8)") + tk1.MustExec("update t set b = 7 where a = 2") + tk1.MustExec("delete from t where b = 3") // (1, 7), (2, 7), (5, 5), (0, 6), (8, 8) case model.StateWriteReorganization: if reorgTime < 1 { @@ -322,29 +308,16 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) { } else { return } - _, checkErr = tk1.Exec("insert into t values (10, 10)") - if checkErr != nil { - return - } - _, checkErr = tk1.Exec("delete from t where b = 6") - if checkErr != nil { - return - } - _, checkErr = tk1.Exec("insert into t set b = 9") - if checkErr != nil { - return - } - _, checkErr = tk1.Exec("update t set b = 7 where a = 5") - if checkErr != nil { - return - } + tk1.MustExec("insert into t values (10, 10)") + tk1.MustExec("delete from t where b = 6") + tk1.MustExec("insert into t set b = 9") + tk1.MustExec("update t set b = 7 where a = 5") // (1, 7), (2, 7), (5, 7), (8, 8), (10, 10), (0, 9) } } callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) d.SetHook(callback) tk.MustExec("alter table t add unique index idx((a*b+1))") - require.NoError(t, checkErr) tk.MustExec("admin check table t") tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "1 7", "2 7", "5 7", "8 8", "10 10")) } diff --git a/table/tables/index.go b/table/tables/index.go index 818aa82f1fad3..e5be31b6b6ae8 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -230,13 +230,13 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue var needPresumeKey int if keyIsTempIdxKey { idxVal = append(idxVal, keyVer) - needPresumeKey, _, err = KeyExistInTempIndex(txn, key, distinct, h, c.tblInfo.IsCommonHandle) + needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) if err != nil { return nil, err } } else { if len(tempKey) > 0 { - needPresumeKey, _, err = KeyExistInTempIndex(txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) + needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) if err != nil { return nil, err } @@ -389,7 +389,7 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) if keyVer != TempIndexKeyTypeNone { if len(tempKey) > 0 { - KeyExistInfo, h1, err1 := KeyExistInTempIndex(txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) + KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) if err1 != nil { return false, nil, err } @@ -518,8 +518,8 @@ const ( ) // KeyExistInTempIndex is used to check if there is unique key is marked delete in temp index. -func KeyExistInTempIndex(txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (int, kv.Handle, error) { - value, err := txn.Get(context.TODO(), key) +func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (int, kv.Handle, error) { + value, err := txn.Get(ctx, key) if kv.IsErrNotFound(err) { return KeyInTempIndexNotExist, nil, nil } @@ -528,6 +528,8 @@ func KeyExistInTempIndex(txn kv.Transaction, key kv.Key, distinct bool, h kv.Han } length := len(value) + // Firstly, we will remove the last byte of key version. + // It should be TempIndexKeyTypeBackfill or TempIndexKeyTypeMerge. value = value[:length-1] if distinct { if bytes.Equal(value, DeleteMarkerUnique) { @@ -539,7 +541,7 @@ func KeyExistInTempIndex(txn kv.Transaction, key kv.Key, distinct bool, h kv.Han } } - // Check if handle equal? + // Check if handle equal. var handle kv.Handle if distinct { handle, err = tablecodec.DecodeHandleInUniqueIndexValue(value, IsCommonHandle) From 1f810bfea9ba8556f4a26b83cd18e31a2b88b107 Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Fri, 25 Nov 2022 21:46:54 +0800 Subject: [PATCH 09/15] refactor the code --- table/tables/index.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/table/tables/index.go b/table/tables/index.go index e5be31b6b6ae8..dd97187555648 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -227,7 +227,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if err != nil || len(value) == 0 { lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - var needPresumeKey int + var needPresumeKey KeyInTempIndex if keyIsTempIdxKey { idxVal = append(idxVal, keyVer) needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) @@ -388,19 +388,17 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV // check key exist status in temp index. key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) if keyVer != TempIndexKeyTypeNone { - if len(tempKey) > 0 { - KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err1 != nil { - return false, nil, err - } - switch KeyExistInfo { - case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: - return false, nil, nil - case KeyInTempIndexConflict: - return true, h1, kv.ErrKeyExists - case KeyInTempIndexIsItself: - return true, h, nil - } + KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) + if err1 != nil { + return false, nil, err + } + switch KeyExistInfo { + case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: + return false, nil, nil + case KeyInTempIndexConflict: + return true, h1, kv.ErrKeyExists + case KeyInTempIndexIsItself: + return true, h, nil } } @@ -504,21 +502,23 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * return colInfo } +type KeyInTempIndex byte + const ( // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. - KeyInTempIndexUnknown = 0 + KeyInTempIndexUnknown KeyInTempIndex = iota // KeyInTempIndexNotExist the key is not exist in temp index. - KeyInTempIndexNotExist = 1 + KeyInTempIndexNotExist // KeyInTempIndexIsDeleted the key is marked deleted in temp index. - KeyInTempIndexIsDeleted = 2 + KeyInTempIndexIsDeleted // KeyInTempIndexIsItself the key is correlated to itself in temp index. - KeyInTempIndexIsItself = 3 + KeyInTempIndexIsItself // KeyInTempIndexConflict the key is conflict in temp index. - KeyInTempIndexConflict = 4 + KeyInTempIndexConflict ) -// KeyExistInTempIndex is used to check if there is unique key is marked delete in temp index. -func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (int, kv.Handle, error) { +// KeyExistInTempIndex is used to check the unique key exist status in temp index. +func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (KeyInTempIndex, kv.Handle, error) { value, err := txn.Get(ctx, key) if kv.IsErrNotFound(err) { return KeyInTempIndexNotExist, nil, nil From 10ecd3ce145cc81ffd53ef3172fb68902ac68008 Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Fri, 25 Nov 2022 21:56:45 +0800 Subject: [PATCH 10/15] unexported KeyInTempIndeix to keyInTempIndex --- table/tables/index.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/table/tables/index.go b/table/tables/index.go index dd97187555648..7a4ba5d0b1c70 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -227,7 +227,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if err != nil || len(value) == 0 { lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - var needPresumeKey KeyInTempIndex + var needPresumeKey keyInTempIndex if keyIsTempIdxKey { idxVal = append(idxVal, keyVer) needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) @@ -502,11 +502,11 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * return colInfo } -type KeyInTempIndex byte +type keyInTempIndex byte const ( // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. - KeyInTempIndexUnknown KeyInTempIndex = iota + KeyInTempIndexUnknown keyInTempIndex = iota // KeyInTempIndexNotExist the key is not exist in temp index. KeyInTempIndexNotExist // KeyInTempIndexIsDeleted the key is marked deleted in temp index. @@ -518,7 +518,7 @@ const ( ) // KeyExistInTempIndex is used to check the unique key exist status in temp index. -func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (KeyInTempIndex, kv.Handle, error) { +func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (keyInTempIndex, kv.Handle, error) { value, err := txn.Get(ctx, key) if kv.IsErrNotFound(err) { return KeyInTempIndexNotExist, nil, nil From 928a07b0b18d5e33c26ad8ef84d934771a76df8b Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Fri, 25 Nov 2022 22:01:59 +0800 Subject: [PATCH 11/15] name a enum tempIndexKeyState. --- table/tables/index.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/table/tables/index.go b/table/tables/index.go index 7a4ba5d0b1c70..aedd85bc184e0 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -227,7 +227,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if err != nil || len(value) == 0 { lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - var needPresumeKey keyInTempIndex + var needPresumeKey tempIndexKeyState if keyIsTempIdxKey { idxVal = append(idxVal, keyVer) needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) @@ -502,11 +502,11 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * return colInfo } -type keyInTempIndex byte +type tempIndexKeyState byte const ( // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. - KeyInTempIndexUnknown keyInTempIndex = iota + KeyInTempIndexUnknown tempIndexKeyState = iota // KeyInTempIndexNotExist the key is not exist in temp index. KeyInTempIndexNotExist // KeyInTempIndexIsDeleted the key is marked deleted in temp index. @@ -518,7 +518,7 @@ const ( ) // KeyExistInTempIndex is used to check the unique key exist status in temp index. -func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (keyInTempIndex, kv.Handle, error) { +func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (tempIndexKeyState, kv.Handle, error) { value, err := txn.Get(ctx, key) if kv.IsErrNotFound(err) { return KeyInTempIndexNotExist, nil, nil From 3778505780d628c6c4fd1b10d72c51ba1fd9113a Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Mon, 28 Nov 2022 15:00:31 +0800 Subject: [PATCH 12/15] Edit hook test case code. --- ddl/db_change_test.go | 3 ++- ddl/index_merge_tmp_test.go | 26 +++++++++++++++++--------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index d865c970e7f42..ead939c009236 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/testkit/external" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/sqlexec" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1739,7 +1740,7 @@ func TestCreateUniqueExpressionIndex(t *testing.T) { return } err := originalCallback.OnChanged(nil) - require.NoError(t, err) + assert.NoError(t, err) switch job.SchemaState { case model.StateDeleteOnly: for _, sql := range stateDeleteOnlySQLs { diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 4524e5d6b4054..5f94c51d3b182 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -290,17 +290,21 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) { callback := &ddl.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { err := originalCallback.OnChanged(nil) - require.NoError(t, err) + assert.NoError(t, err) switch job.SchemaState { case model.StateDeleteOnly: for _, sql := range stateDeleteOnlySQLs { - tk1.MustExec(sql) + _, err = tk1.Exec(sql) + assert.NoError(t, err) } // (1, 7), (2, 2), (3, 3), (5, 5), (0, 6) case model.StateWriteOnly: - tk1.MustExec("insert into t values (8, 8)") - tk1.MustExec("update t set b = 7 where a = 2") - tk1.MustExec("delete from t where b = 3") + _, err = tk1.Exec("insert into t values (8, 8)") + assert.NoError(t, err) + _, err = tk1.Exec("update t set b = 7 where a = 2") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where b = 3") + assert.NoError(t, err) // (1, 7), (2, 7), (5, 5), (0, 6), (8, 8) case model.StateWriteReorganization: if reorgTime < 1 { @@ -308,10 +312,14 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) { } else { return } - tk1.MustExec("insert into t values (10, 10)") - tk1.MustExec("delete from t where b = 6") - tk1.MustExec("insert into t set b = 9") - tk1.MustExec("update t set b = 7 where a = 5") + _, err = tk1.Exec("insert into t values (10, 10)") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where b = 6") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t set b = 9") + assert.NoError(t, err) + _, err = tk1.Exec("update t set b = 7 where a = 5") + assert.NoError(t, err) // (1, 7), (2, 7), (5, 7), (8, 8), (10, 10), (0, 9) } } From e37a7552bedbe237f2c198dc95ecbbc3f4e96a11 Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Mon, 28 Nov 2022 16:45:58 +0800 Subject: [PATCH 13/15] Add check logic to make sure the uk.newkey is from temp key, then apply check deleteu logic --- ddl/db_change_test.go | 3 --- ddl/index_merge_tmp_test.go | 3 +-- executor/insert.go | 8 +++++--- tablecodec/tablecodec.go | 14 ++++++++++++++ 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index ead939c009236..cf11036a9935e 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -41,7 +41,6 @@ import ( "github.com/pingcap/tidb/testkit/external" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/sqlexec" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1739,8 +1738,6 @@ func TestCreateUniqueExpressionIndex(t *testing.T) { if checkErr != nil { return } - err := originalCallback.OnChanged(nil) - assert.NoError(t, err) switch job.SchemaState { case model.StateDeleteOnly: for _, sql := range stateDeleteOnlySQLs { diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 5f94c51d3b182..4714709b6d4e1 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -289,8 +289,7 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) { defer d.SetHook(originalCallback) callback := &ddl.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { - err := originalCallback.OnChanged(nil) - assert.NoError(t, err) + var err error switch job.SchemaState { case model.StateDeleteOnly: for _, sql := range stateDeleteOnlySQLs { diff --git a/executor/insert.go b/executor/insert.go index b15220bf201d2..bcd8341668767 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -268,9 +268,11 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } // Since the temp index stores deleted key with marked 'deleteu' for unique key at the end // of value, So if return a key we check and skip deleted key. - rowVal := val[:len(val)-1] - if bytes.Equal(rowVal, tables.DeleteMarkerUnique) { - continue + if tablecodec.IsTempIndexKey(uk.newKey) { + rowVal := val[:len(val)-1] + if bytes.Equal(rowVal, tables.DeleteMarkerUnique) { + continue + } } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index b9400b0271d41..e45576b9d0674 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1143,6 +1143,20 @@ func TempIndexKey2IndexKey(originIdxID int64, tempIdxKey []byte) { binary.BigEndian.PutUint64(tempIdxKey[prefixLen:], eid) } +// IsTempIndexKey check whether the input key is for a temp index. +func IsTempIndexKey(indexKey []byte) bool { + var ( + indexIDKey []byte + indexID int64 + tempIndexID int64 + ) + // Get encoded indexID from key, Add uint64 8 byte length. + indexIDKey = indexKey[prefixLen : prefixLen+8] + indexID = codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey)) + tempIndexID = int64(TempIndexPrefix) | indexID + return tempIndexID == indexID +} + // GenIndexValuePortal is the portal for generating index value. // Value layout: // From 72886e7c813ec2818962e85ab49d9c756e3bb67d Mon Sep 17 00:00:00 2001 From: Benjamin2037 <benjamin2037@sina.com> Date: Mon, 28 Nov 2022 17:01:42 +0800 Subject: [PATCH 14/15] Update ddl/index_merge_tmp_test.go Co-authored-by: tangenta <tangenta@126.com> --- ddl/index_merge_tmp_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 4714709b6d4e1..389339ac15ad4 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -289,6 +289,9 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) { defer d.SetHook(originalCallback) callback := &ddl.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } var err error switch job.SchemaState { case model.StateDeleteOnly: From 42f0e3e762582c2431370889798aac710ca4f1bb Mon Sep 17 00:00:00 2001 From: Benjamin2037 <bear.c@pingcap.com> Date: Mon, 28 Nov 2022 17:09:17 +0800 Subject: [PATCH 15/15] Add check logic for input key and value. --- table/tables/index.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/table/tables/index.go b/table/tables/index.go index aedd85bc184e0..b3a481efba29f 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -17,6 +17,7 @@ package tables import ( "bytes" "context" + "errors" "sync" "github.com/opentracing/opentracing-go" @@ -519,6 +520,10 @@ const ( // KeyExistInTempIndex is used to check the unique key exist status in temp index. func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (tempIndexKeyState, kv.Handle, error) { + // Only check temp index key. + if !tablecodec.IsTempIndexKey(key) { + return KeyInTempIndexUnknown, nil, nil + } value, err := txn.Get(ctx, key) if kv.IsErrNotFound(err) { return KeyInTempIndexNotExist, nil, nil @@ -527,6 +532,10 @@ func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, di return KeyInTempIndexUnknown, nil, err } + // Since KeyExistInTempIndex only accept temp index key, so the value length should great than 1 for key version. + if len(value) < 1 { + return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1") + } length := len(value) // Firstly, we will remove the last byte of key version. // It should be TempIndexKeyTypeBackfill or TempIndexKeyTypeMerge.