Skip to content

Commit

Permalink
admin: impl admin check index for mv index (#40270)
Browse files Browse the repository at this point in the history
close #40272
  • Loading branch information
xiongjiwei authored Jan 3, 2023
1 parent be8caa6 commit 494672c
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 78 deletions.
59 changes: 59 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,65 @@ func TestClusteredAdminCleanupIndex(t *testing.T) {
tk.MustExec("admin check table admin_test")
}

func TestAdminCheckTableWithMultiValuedIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(pk int primary key, a json, index idx((cast(a as signed array))))")
tk.MustExec("insert into t values (0, '[0,1,2]')")
tk.MustExec("insert into t values (1, '[1,2,3]')")
tk.MustExec("insert into t values (2, '[2,3,4]')")
tk.MustExec("insert into t values (3, '[3,4,5]')")
tk.MustExec("insert into t values (4, '[4,5,6]')")
tk.MustExec("admin check table t")

// Make some corrupted index. Build the index information.
ctx := mock.NewContext()
ctx.Store = store
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("t")
tbl, err := is.TableByName(dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
sc := ctx.GetSessionVars().StmtCtx
tk.Session().GetSessionVars().IndexLookupSize = 3
tk.Session().GetSessionVars().MaxChunkSize = 3

cpIdx := idxInfo.Clone()
cpIdx.MVIndex = false
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, cpIdx)
txn, err := store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(sc, txn, types.MakeDatums(0), kv.IntHandle(0))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table t")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))

txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(ctx, txn, types.MakeDatums(0), kv.IntHandle(0), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
tk.MustExec("admin check table t")

txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(ctx, txn, types.MakeDatums(9), kv.IntHandle(9), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table t")
require.Error(t, err)
}

func TestAdminCheckPartitionTableFailed(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

Expand Down
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo

tps := make([]*types.FieldType, 0, fullColLen)
for _, col := range is.Columns {
tps = append(tps, &(col.FieldType))
// tps is used to decode the index, we should use the element type of the array if any.
tps = append(tps, col.FieldType.ArrayType())
}

if !e.isCommonHandle() {
Expand Down
22 changes: 7 additions & 15 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -1254,36 +1255,27 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
sctx := w.idxLookup.ctx.GetSessionVars().StmtCtx
for i := range vals {
col := w.idxTblCols[i]
tp := &col.FieldType
idxVal := idxRow.GetDatum(i, tp)
idxVal := idxRow.GetDatum(i, w.idxColTps[i])
tablecodec.TruncateIndexValue(&idxVal, w.idxLookup.index.Columns[i], col.ColumnInfo)
cmpRes, err := idxVal.Compare(sctx, &vals[i], collators[i])
cmpRes, err := tables.CompareIndexAndVal(sctx, vals[i], idxVal, collators[i], col.FieldType.IsArray() && vals[i].Kind() == types.KindMysqlJSON)
if err != nil {
fts := make([]*types.FieldType, 0, len(w.idxTblCols))
for _, c := range w.idxTblCols {
fts = append(fts, &c.FieldType)
}
return ir().ReportAdminCheckInconsistentWithColInfo(ctx,
handle,
col.Name.O,
idxRow.GetDatum(i, tp),
idxVal,
vals[i],
err,
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, fts)},
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)},
)
}
if cmpRes != 0 {
fts := make([]*types.FieldType, 0, len(w.idxTblCols))
for _, c := range w.idxTblCols {
fts = append(fts, &c.FieldType)
}
return ir().ReportAdminCheckInconsistentWithColInfo(ctx,
handle,
col.Name.O,
idxRow.GetDatum(i, tp),
idxRow.GetDatum(i, w.idxColTps[i]),
vals[i],
err,
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, fts)},
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)},
)
}
}
Expand Down
19 changes: 18 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,9 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {

idxNames := make([]string, 0, len(e.indexInfos))
for _, idx := range e.indexInfos {
if idx.MVIndex {
continue
}
idxNames = append(idxNames, idx.Name.O)
}
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames)
Expand All @@ -978,7 +981,13 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
// The number of table rows is equal to the number of index rows.
// TODO: Make the value of concurrency adjustable. And we can consider the number of records.
if len(e.srcs) == 1 {
return e.checkIndexHandle(ctx, e.srcs[0])
err = e.checkIndexHandle(ctx, e.srcs[0])
if err == nil && e.srcs[0].index.MVIndex {
err = e.checkTableRecord(ctx, 0)
}
if err != nil {
return err
}
}
taskCh := make(chan *IndexLookUpExecutor, len(e.srcs))
failure := atomicutil.NewBool(false)
Expand All @@ -997,6 +1006,14 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
select {
case src := <-taskCh:
err1 := e.checkIndexHandle(ctx, src)
if err1 == nil && src.index.MVIndex {
for offset, idx := range e.indexInfos {
if idx.ID == src.index.ID {
err1 = e.checkTableRecord(ctx, offset)
break
}
}
}
if err1 != nil {
failure.Store(true)
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1))
Expand Down
3 changes: 3 additions & 0 deletions parser/types/field_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ func (ft *FieldType) IsArray() bool {

// ArrayType return the type of the array.
func (ft *FieldType) ArrayType() *FieldType {
if !ft.array {
return ft
}
clone := ft.Clone()
clone.SetArray(false)
return clone
Expand Down
81 changes: 41 additions & 40 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,55 +438,56 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem
return indexKey, nil, TempIndexKeyTypeNone
}

func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil)
if err != nil {
return false, nil, err
}

var (
tempKey []byte
keyVer byte
)
// If index current is in creating status and using ingest mode, we need first
// check key exist status in temp index.
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer != TempIndexKeyTypeNone {
KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle)
if err1 != nil {
func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
indexedValues := c.getIndexedValue(indexedValue)
for _, val := range indexedValues {
key, distinct, err := c.GenIndexKey(sc, val, h, nil)
if err != nil {
return false, nil, err
}
switch KeyExistInfo {
case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted:
return false, nil, nil
case KeyInTempIndexConflict:
return true, h1, kv.ErrKeyExists
case KeyInTempIndexIsItself:
return true, h, nil
}
}

value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil, nil
}
if err != nil {
return false, nil, err
}
var (
tempKey []byte
keyVer byte
)
// If index current is in creating status and using ingest mode, we need first
// check key exist status in temp index.
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer != TempIndexKeyTypeNone {
KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle)
if err1 != nil {
return false, nil, err
}
switch KeyExistInfo {
case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted:
return false, nil, nil
case KeyInTempIndexConflict:
return true, h1, kv.ErrKeyExists
case KeyInTempIndexIsItself:
continue
}
}

// For distinct index, the value of key is handle.
if distinct {
var handle kv.Handle
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle)
value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil, nil
}
if err != nil {
return false, nil, err
}
if !handle.Equal(h) {
return true, handle, kv.ErrKeyExists

// For distinct index, the value of key is handle.
if distinct {
var handle kv.Handle
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle)
if err != nil {
return false, nil, err
}
if !handle.Equal(h) {
return true, handle, kv.ErrKeyExists
}
}
return true, handle, nil
}

return true, h, nil
}

Expand Down
50 changes: 29 additions & 21 deletions table/tables/mutation_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,27 +347,11 @@ func compareIndexData(
cols[indexInfo.Columns[i].Offset].ColumnInfo,
)

var comparison int
var err error
// If it is multi-valued index, we should check the JSON contains the indexed value.
if cols[indexInfo.Columns[i].Offset].ColumnInfo.FieldType.IsArray() && expectedDatum.Kind() == types.KindMysqlJSON {
bj := expectedDatum.GetMysqlJSON()
count := bj.GetElemCount()
for elemIdx := 0; elemIdx < count; elemIdx++ {
jsonDatum := types.NewJSONDatum(bj.ArrayGetElem(elemIdx))
comparison, err = jsonDatum.Compare(sc, &decodedMutationDatum, collate.GetBinaryCollator())
if err != nil {
return errors.Trace(err)
}
if comparison == 0 {
break
}
}
} else {
comparison, err = decodedMutationDatum.Compare(sc, &expectedDatum, collate.GetCollator(decodedMutationDatum.Collation()))
if err != nil {
return errors.Trace(err)
}
comparison, err := CompareIndexAndVal(sc, expectedDatum, decodedMutationDatum,
collate.GetCollator(decodedMutationDatum.Collation()),
cols[indexInfo.Columns[i].Offset].ColumnInfo.FieldType.IsArray() && expectedDatum.Kind() == types.KindMysqlJSON)
if err != nil {
return errors.Trace(err)
}

if comparison != 0 {
Expand All @@ -382,6 +366,30 @@ func compareIndexData(
return nil
}

// CompareIndexAndVal compare index valued and row value.
func CompareIndexAndVal(sctx *stmtctx.StatementContext, rowVal types.Datum, idxVal types.Datum, collator collate.Collator, cmpMVIndex bool) (int, error) {
var cmpRes int
var err error
if cmpMVIndex {
// If it is multi-valued index, we should check the JSON contains the indexed value.
bj := rowVal.GetMysqlJSON()
count := bj.GetElemCount()
for elemIdx := 0; elemIdx < count; elemIdx++ {
jsonDatum := types.NewJSONDatum(bj.ArrayGetElem(elemIdx))
cmpRes, err = jsonDatum.Compare(sctx, &idxVal, collate.GetBinaryCollator())
if err != nil {
return 0, errors.Trace(err)
}
if cmpRes == 0 {
break
}
}
} else {
cmpRes, err = idxVal.Compare(sctx, &rowVal, collator)
}
return cmpRes, err
}

// getColumnMaps tries to get the columnMaps from transaction options. If there isn't one, it builds one and stores it.
// It saves redundant computations of the map.
func getColumnMaps(txn kv.Transaction, t *TableCommon) columnMaps {
Expand Down

0 comments on commit 494672c

Please sign in to comment.