From 16304c13ce163e33bde88a178459b6b6bc629b0d Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Tue, 3 Jan 2023 12:59:34 +0800 Subject: [PATCH] impl admin check index for mv index Signed-off-by: xiongjiwei --- executor/admin_test.go | 59 ++++++++++++++++++++++++++++++ executor/builder.go | 2 +- executor/distsql.go | 38 ++++++++++++-------- executor/executor.go | 19 +++++++++- table/tables/index.go | 82 +++++++++++++++++++++--------------------- 5 files changed, 143 insertions(+), 57 deletions(-) diff --git a/executor/admin_test.go b/executor/admin_test.go index 0b2530e76d5a3..cd5c0664d031a 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -843,6 +843,65 @@ func TestClusteredAdminCleanupIndex(t *testing.T) { tk.MustExec("admin check table admin_test") } +func TestAdminCheckTableWithMultiValuedIndex(t *testing.T) { + store, domain := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(pk int primary key, a json, index idx((cast(a as signed array))))") + tk.MustExec("insert into t values (0, '[0,1,2]')") + tk.MustExec("insert into t values (1, '[1,2,3]')") + tk.MustExec("insert into t values (2, '[2,3,4]')") + tk.MustExec("insert into t values (3, '[3,4,5]')") + tk.MustExec("insert into t values (4, '[4,5,6]')") + tk.MustExec("admin check table t") + + // Make some corrupted index. Build the index information. + ctx := mock.NewContext() + ctx.Store = store + is := domain.InfoSchema() + dbName := model.NewCIStr("test") + tblName := model.NewCIStr("t") + tbl, err := is.TableByName(dbName, tblName) + require.NoError(t, err) + tblInfo := tbl.Meta() + idxInfo := tblInfo.Indices[0] + sc := ctx.GetSessionVars().StmtCtx + tk.Session().GetSessionVars().IndexLookupSize = 3 + tk.Session().GetSessionVars().MaxChunkSize = 3 + + cpIdx := idxInfo.Clone() + cpIdx.MVIndex = false + indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, cpIdx) + txn, err := store.Begin() + require.NoError(t, err) + err = indexOpr.Delete(sc, txn, types.MakeDatums(0), kv.IntHandle(0)) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + err = tk.ExecToErr("admin check table t") + require.Error(t, err) + require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err)) + + txn, err = store.Begin() + require.NoError(t, err) + _, err = indexOpr.Create(ctx, txn, types.MakeDatums(0), kv.IntHandle(0), nil) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + tk.MustExec("admin check table t") + + txn, err = store.Begin() + require.NoError(t, err) + _, err = indexOpr.Create(ctx, txn, types.MakeDatums(9), kv.IntHandle(9), nil) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + err = tk.ExecToErr("admin check table t") + require.Error(t, err) +} + func TestAdminCheckPartitionTableFailed(t *testing.T) { store, domain := testkit.CreateMockStoreAndDomain(t) diff --git a/executor/builder.go b/executor/builder.go index f771c706bfa9b..dd40276a9b397 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -433,7 +433,7 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo tps := make([]*types.FieldType, 0, fullColLen) for _, col := range is.Columns { - tps = append(tps, &(col.FieldType)) + tps = append(tps, col.FieldType.ArrayType()) } if !e.isCommonHandle() { diff --git a/executor/distsql.go b/executor/distsql.go index aab5067a81b6a..95ec8eb37093e 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -1254,36 +1254,44 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta sctx := w.idxLookup.ctx.GetSessionVars().StmtCtx for i := range vals { col := w.idxTblCols[i] - tp := &col.FieldType - idxVal := idxRow.GetDatum(i, tp) + idxVal := idxRow.GetDatum(i, w.idxColTps[i]) tablecodec.TruncateIndexValue(&idxVal, w.idxLookup.index.Columns[i], col.ColumnInfo) - cmpRes, err := idxVal.Compare(sctx, &vals[i], collators[i]) - if err != nil { - fts := make([]*types.FieldType, 0, len(w.idxTblCols)) - for _, c := range w.idxTblCols { - fts = append(fts, &c.FieldType) + var cmpRes int + if col.FieldType.IsArray() { + // If it is multi-valued index, we should check the JSON contains the indexed value. + bj := vals[i].GetMysqlJSON() + count := bj.GetElemCount() + for elemIdx := 0; elemIdx < count; elemIdx++ { + jsonDatum := types.NewJSONDatum(bj.ArrayGetElem(elemIdx)) + cmpRes, err = jsonDatum.Compare(sctx, &idxVal, collate.GetBinaryCollator()) + if err != nil { + return errors.Trace(err) + } + if cmpRes == 0 { + break + } } + } else { + cmpRes, err = idxVal.Compare(sctx, &vals[i], collators[i]) + } + if err != nil { return ir().ReportAdminCheckInconsistentWithColInfo(ctx, handle, col.Name.O, - idxRow.GetDatum(i, tp), + idxVal, vals[i], err, - &consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, fts)}, + &consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)}, ) } if cmpRes != 0 { - fts := make([]*types.FieldType, 0, len(w.idxTblCols)) - for _, c := range w.idxTblCols { - fts = append(fts, &c.FieldType) - } return ir().ReportAdminCheckInconsistentWithColInfo(ctx, handle, col.Name.O, - idxRow.GetDatum(i, tp), + idxRow.GetDatum(i, w.idxColTps[i]), vals[i], err, - &consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, fts)}, + &consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)}, ) } } diff --git a/executor/executor.go b/executor/executor.go index 603996ad7764f..1679ed9e57e12 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -959,6 +959,9 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error { idxNames := make([]string, 0, len(e.indexInfos)) for _, idx := range e.indexInfos { + if idx.MVIndex { + continue + } idxNames = append(idxNames, idx.Name.O) } greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames) @@ -978,7 +981,13 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error { // The number of table rows is equal to the number of index rows. // TODO: Make the value of concurrency adjustable. And we can consider the number of records. if len(e.srcs) == 1 { - return e.checkIndexHandle(ctx, e.srcs[0]) + err = e.checkIndexHandle(ctx, e.srcs[0]) + if err == nil && e.srcs[0].index.MVIndex { + err = e.checkTableRecord(ctx, 0) + } + if err != nil { + return err + } } taskCh := make(chan *IndexLookUpExecutor, len(e.srcs)) failure := atomicutil.NewBool(false) @@ -997,6 +1006,14 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error { select { case src := <-taskCh: err1 := e.checkIndexHandle(ctx, src) + if err1 == nil && src.index.MVIndex { + for offset, idx := range e.indexInfos { + if idx.ID == src.index.ID { + err1 = e.checkTableRecord(ctx, offset) + break + } + } + } if err1 != nil { failure.Store(true) logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1)) diff --git a/table/tables/index.go b/table/tables/index.go index 607afb9640aad..527a9a1863dfc 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -438,55 +438,57 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem return indexKey, nil, TempIndexKeyTypeNone } -func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) { - key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil) - if err != nil { - return false, nil, err - } - - var ( - tempKey []byte - keyVer byte - ) - // If index current is in creating status and using ingest mode, we need first - // check key exist status in temp index. - key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) - if keyVer != TempIndexKeyTypeNone { - KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err1 != nil { +func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) (bool, kv.Handle, error) { + indexedValues := c.getIndexedValue(indexedValue) + for _, val := range indexedValues { + key, distinct, err := c.GenIndexKey(sc, val, h, nil) + if err != nil { return false, nil, err } - switch KeyExistInfo { - case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: - return false, nil, nil - case KeyInTempIndexConflict: - return true, h1, kv.ErrKeyExists - case KeyInTempIndexIsItself: - return true, h, nil - } - } - value, err := txn.Get(context.TODO(), key) - if kv.IsErrNotFound(err) { - return false, nil, nil - } - if err != nil { - return false, nil, err - } + var ( + tempKey []byte + keyVer byte + ) + // If index current is in creating status and using ingest mode, we need first + // check key exist status in temp index. + key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) + if keyVer != TempIndexKeyTypeNone { + KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) + if err1 != nil { + return false, nil, err + } + switch KeyExistInfo { + case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: + return false, nil, nil + case KeyInTempIndexConflict: + return true, h1, kv.ErrKeyExists + case KeyInTempIndexIsItself: + return true, h, nil + } + } - // For distinct index, the value of key is handle. - if distinct { - var handle kv.Handle - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) + value, err := txn.Get(context.TODO(), key) + if kv.IsErrNotFound(err) { + return false, nil, nil + } if err != nil { return false, nil, err } - if !handle.Equal(h) { - return true, handle, kv.ErrKeyExists + + // For distinct index, the value of key is handle. + if distinct { + var handle kv.Handle + handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) + if err != nil { + return false, nil, err + } + if !handle.Equal(h) { + return true, handle, kv.ErrKeyExists + } + return true, handle, nil } - return true, handle, nil } - return true, h, nil }