Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admin: impl admin check index for mv index #40270

Merged
merged 4 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,65 @@ func TestClusteredAdminCleanupIndex(t *testing.T) {
tk.MustExec("admin check table admin_test")
}

func TestAdminCheckTableWithMultiValuedIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(pk int primary key, a json, index idx((cast(a as signed array))))")
tk.MustExec("insert into t values (0, '[0,1,2]')")
tk.MustExec("insert into t values (1, '[1,2,3]')")
tk.MustExec("insert into t values (2, '[2,3,4]')")
tk.MustExec("insert into t values (3, '[3,4,5]')")
tk.MustExec("insert into t values (4, '[4,5,6]')")
tk.MustExec("admin check table t")

// Make some corrupted index. Build the index information.
ctx := mock.NewContext()
ctx.Store = store
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("t")
tbl, err := is.TableByName(dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
sc := ctx.GetSessionVars().StmtCtx
tk.Session().GetSessionVars().IndexLookupSize = 3
tk.Session().GetSessionVars().MaxChunkSize = 3

cpIdx := idxInfo.Clone()
cpIdx.MVIndex = false
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, cpIdx)
txn, err := store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(sc, txn, types.MakeDatums(0), kv.IntHandle(0))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table t")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))

txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(ctx, txn, types.MakeDatums(0), kv.IntHandle(0), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
tk.MustExec("admin check table t")

txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(ctx, txn, types.MakeDatums(9), kv.IntHandle(9), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table t")
require.Error(t, err)
}

func TestAdminCheckPartitionTableFailed(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo

tps := make([]*types.FieldType, 0, fullColLen)
for _, col := range is.Columns {
tps = append(tps, &(col.FieldType))
tps = append(tps, col.FieldType.ArrayType())
}

if !e.isCommonHandle() {
Expand Down
38 changes: 23 additions & 15 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,36 +1254,44 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
sctx := w.idxLookup.ctx.GetSessionVars().StmtCtx
for i := range vals {
col := w.idxTblCols[i]
tp := &col.FieldType
idxVal := idxRow.GetDatum(i, tp)
idxVal := idxRow.GetDatum(i, w.idxColTps[i])
tablecodec.TruncateIndexValue(&idxVal, w.idxLookup.index.Columns[i], col.ColumnInfo)
cmpRes, err := idxVal.Compare(sctx, &vals[i], collators[i])
if err != nil {
fts := make([]*types.FieldType, 0, len(w.idxTblCols))
for _, c := range w.idxTblCols {
fts = append(fts, &c.FieldType)
var cmpRes int
if col.FieldType.IsArray() {
// If it is multi-valued index, we should check the JSON contains the indexed value.
bj := vals[i].GetMysqlJSON()
count := bj.GetElemCount()
for elemIdx := 0; elemIdx < count; elemIdx++ {
jsonDatum := types.NewJSONDatum(bj.ArrayGetElem(elemIdx))
cmpRes, err = jsonDatum.Compare(sctx, &idxVal, collate.GetBinaryCollator())
if err != nil {
return errors.Trace(err)
}
if cmpRes == 0 {
break
}
}
} else {
cmpRes, err = idxVal.Compare(sctx, &vals[i], collators[i])
}
if err != nil {
return ir().ReportAdminCheckInconsistentWithColInfo(ctx,
handle,
col.Name.O,
idxRow.GetDatum(i, tp),
idxVal,
vals[i],
err,
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, fts)},
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)},
)
}
if cmpRes != 0 {
fts := make([]*types.FieldType, 0, len(w.idxTblCols))
for _, c := range w.idxTblCols {
fts = append(fts, &c.FieldType)
}
return ir().ReportAdminCheckInconsistentWithColInfo(ctx,
handle,
col.Name.O,
idxRow.GetDatum(i, tp),
idxRow.GetDatum(i, w.idxColTps[i]),
vals[i],
err,
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, fts)},
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)},
)
}
}
Expand Down
19 changes: 18 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,9 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {

idxNames := make([]string, 0, len(e.indexInfos))
for _, idx := range e.indexInfos {
if idx.MVIndex {
continue
}
idxNames = append(idxNames, idx.Name.O)
}
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames)
Expand All @@ -978,7 +981,13 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
// The number of table rows is equal to the number of index rows.
// TODO: Make the value of concurrency adjustable. And we can consider the number of records.
if len(e.srcs) == 1 {
return e.checkIndexHandle(ctx, e.srcs[0])
err = e.checkIndexHandle(ctx, e.srcs[0])
if err == nil && e.srcs[0].index.MVIndex {
err = e.checkTableRecord(ctx, 0)
}
if err != nil {
return err
}
}
taskCh := make(chan *IndexLookUpExecutor, len(e.srcs))
failure := atomicutil.NewBool(false)
Expand All @@ -997,6 +1006,14 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
select {
case src := <-taskCh:
err1 := e.checkIndexHandle(ctx, src)
if err1 == nil && src.index.MVIndex {
for offset, idx := range e.indexInfos {
if idx.ID == src.index.ID {
err1 = e.checkTableRecord(ctx, offset)
break
}
}
}
if err1 != nil {
failure.Store(true)
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1))
Expand Down
82 changes: 42 additions & 40 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,55 +438,57 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem
return indexKey, nil, TempIndexKeyTypeNone
}

func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil)
if err != nil {
return false, nil, err
}

var (
tempKey []byte
keyVer byte
)
// If index current is in creating status and using ingest mode, we need first
// check key exist status in temp index.
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer != TempIndexKeyTypeNone {
KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle)
if err1 != nil {
func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
indexedValues := c.getIndexedValue(indexedValue)
for _, val := range indexedValues {
key, distinct, err := c.GenIndexKey(sc, val, h, nil)
if err != nil {
return false, nil, err
}
switch KeyExistInfo {
case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted:
return false, nil, nil
case KeyInTempIndexConflict:
return true, h1, kv.ErrKeyExists
case KeyInTempIndexIsItself:
return true, h, nil
}
}

value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil, nil
}
if err != nil {
return false, nil, err
}
var (
tempKey []byte
keyVer byte
)
// If index current is in creating status and using ingest mode, we need first
// check key exist status in temp index.
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer != TempIndexKeyTypeNone {
KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle)
if err1 != nil {
return false, nil, err
}
switch KeyExistInfo {
case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted:
return false, nil, nil
case KeyInTempIndexConflict:
return true, h1, kv.ErrKeyExists
case KeyInTempIndexIsItself:
return true, h, nil
}
}

// For distinct index, the value of key is handle.
if distinct {
var handle kv.Handle
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle)
value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil, nil
}
if err != nil {
return false, nil, err
}
if !handle.Equal(h) {
return true, handle, kv.ErrKeyExists

// For distinct index, the value of key is handle.
if distinct {
var handle kv.Handle
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle)
if err != nil {
return false, nil, err
}
if !handle.Equal(h) {
return true, handle, kv.ErrKeyExists
}
return true, handle, nil
}
return true, handle, nil
}

return true, h, nil
}

Expand Down