Skip to content

Commit

Permalink
impl admin check index for mv index
Browse files Browse the repository at this point in the history
Signed-off-by: xiongjiwei <[email protected]>
  • Loading branch information
xiongjiwei committed Jan 3, 2023
1 parent 702a559 commit 16304c1
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 57 deletions.
59 changes: 59 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,65 @@ func TestClusteredAdminCleanupIndex(t *testing.T) {
tk.MustExec("admin check table admin_test")
}

func TestAdminCheckTableWithMultiValuedIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(pk int primary key, a json, index idx((cast(a as signed array))))")
tk.MustExec("insert into t values (0, '[0,1,2]')")
tk.MustExec("insert into t values (1, '[1,2,3]')")
tk.MustExec("insert into t values (2, '[2,3,4]')")
tk.MustExec("insert into t values (3, '[3,4,5]')")
tk.MustExec("insert into t values (4, '[4,5,6]')")
tk.MustExec("admin check table t")

// Make some corrupted index. Build the index information.
ctx := mock.NewContext()
ctx.Store = store
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("t")
tbl, err := is.TableByName(dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
sc := ctx.GetSessionVars().StmtCtx
tk.Session().GetSessionVars().IndexLookupSize = 3
tk.Session().GetSessionVars().MaxChunkSize = 3

cpIdx := idxInfo.Clone()
cpIdx.MVIndex = false
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, cpIdx)
txn, err := store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(sc, txn, types.MakeDatums(0), kv.IntHandle(0))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table t")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))

txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(ctx, txn, types.MakeDatums(0), kv.IntHandle(0), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
tk.MustExec("admin check table t")

txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(ctx, txn, types.MakeDatums(9), kv.IntHandle(9), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table t")
require.Error(t, err)
}

func TestAdminCheckPartitionTableFailed(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo

tps := make([]*types.FieldType, 0, fullColLen)
for _, col := range is.Columns {
tps = append(tps, &(col.FieldType))
tps = append(tps, col.FieldType.ArrayType())
}

if !e.isCommonHandle() {
Expand Down
38 changes: 23 additions & 15 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,36 +1254,44 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
sctx := w.idxLookup.ctx.GetSessionVars().StmtCtx
for i := range vals {
col := w.idxTblCols[i]
tp := &col.FieldType
idxVal := idxRow.GetDatum(i, tp)
idxVal := idxRow.GetDatum(i, w.idxColTps[i])
tablecodec.TruncateIndexValue(&idxVal, w.idxLookup.index.Columns[i], col.ColumnInfo)
cmpRes, err := idxVal.Compare(sctx, &vals[i], collators[i])
if err != nil {
fts := make([]*types.FieldType, 0, len(w.idxTblCols))
for _, c := range w.idxTblCols {
fts = append(fts, &c.FieldType)
var cmpRes int
if col.FieldType.IsArray() {
// If it is multi-valued index, we should check the JSON contains the indexed value.
bj := vals[i].GetMysqlJSON()
count := bj.GetElemCount()
for elemIdx := 0; elemIdx < count; elemIdx++ {
jsonDatum := types.NewJSONDatum(bj.ArrayGetElem(elemIdx))
cmpRes, err = jsonDatum.Compare(sctx, &idxVal, collate.GetBinaryCollator())
if err != nil {
return errors.Trace(err)
}
if cmpRes == 0 {
break
}
}
} else {
cmpRes, err = idxVal.Compare(sctx, &vals[i], collators[i])
}
if err != nil {
return ir().ReportAdminCheckInconsistentWithColInfo(ctx,
handle,
col.Name.O,
idxRow.GetDatum(i, tp),
idxVal,
vals[i],
err,
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, fts)},
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)},
)
}
if cmpRes != 0 {
fts := make([]*types.FieldType, 0, len(w.idxTblCols))
for _, c := range w.idxTblCols {
fts = append(fts, &c.FieldType)
}
return ir().ReportAdminCheckInconsistentWithColInfo(ctx,
handle,
col.Name.O,
idxRow.GetDatum(i, tp),
idxRow.GetDatum(i, w.idxColTps[i]),
vals[i],
err,
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, fts)},
&consistency.RecordData{Handle: handle, Values: getDatumRow(&idxRow, w.idxColTps)},
)
}
}
Expand Down
19 changes: 18 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,9 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {

idxNames := make([]string, 0, len(e.indexInfos))
for _, idx := range e.indexInfos {
if idx.MVIndex {
continue
}
idxNames = append(idxNames, idx.Name.O)
}
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames)
Expand All @@ -978,7 +981,13 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
// The number of table rows is equal to the number of index rows.
// TODO: Make the value of concurrency adjustable. And we can consider the number of records.
if len(e.srcs) == 1 {
return e.checkIndexHandle(ctx, e.srcs[0])
err = e.checkIndexHandle(ctx, e.srcs[0])
if err == nil && e.srcs[0].index.MVIndex {
err = e.checkTableRecord(ctx, 0)
}
if err != nil {
return err
}
}
taskCh := make(chan *IndexLookUpExecutor, len(e.srcs))
failure := atomicutil.NewBool(false)
Expand All @@ -997,6 +1006,14 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
select {
case src := <-taskCh:
err1 := e.checkIndexHandle(ctx, src)
if err1 == nil && src.index.MVIndex {
for offset, idx := range e.indexInfos {
if idx.ID == src.index.ID {
err1 = e.checkTableRecord(ctx, offset)
break
}
}
}
if err1 != nil {
failure.Store(true)
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1))
Expand Down
82 changes: 42 additions & 40 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,55 +438,57 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem
return indexKey, nil, TempIndexKeyTypeNone
}

func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil)
if err != nil {
return false, nil, err
}

var (
tempKey []byte
keyVer byte
)
// If index current is in creating status and using ingest mode, we need first
// check key exist status in temp index.
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer != TempIndexKeyTypeNone {
KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle)
if err1 != nil {
func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
indexedValues := c.getIndexedValue(indexedValue)
for _, val := range indexedValues {
key, distinct, err := c.GenIndexKey(sc, val, h, nil)
if err != nil {
return false, nil, err
}
switch KeyExistInfo {
case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted:
return false, nil, nil
case KeyInTempIndexConflict:
return true, h1, kv.ErrKeyExists
case KeyInTempIndexIsItself:
return true, h, nil
}
}

value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil, nil
}
if err != nil {
return false, nil, err
}
var (
tempKey []byte
keyVer byte
)
// If index current is in creating status and using ingest mode, we need first
// check key exist status in temp index.
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer != TempIndexKeyTypeNone {
KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle)
if err1 != nil {
return false, nil, err
}
switch KeyExistInfo {
case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted:
return false, nil, nil
case KeyInTempIndexConflict:
return true, h1, kv.ErrKeyExists
case KeyInTempIndexIsItself:
return true, h, nil
}
}

// For distinct index, the value of key is handle.
if distinct {
var handle kv.Handle
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle)
value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil, nil
}
if err != nil {
return false, nil, err
}
if !handle.Equal(h) {
return true, handle, kv.ErrKeyExists

// For distinct index, the value of key is handle.
if distinct {
var handle kv.Handle
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle)
if err != nil {
return false, nil, err
}
if !handle.Equal(h) {
return true, handle, kv.ErrKeyExists
}
return true, handle, nil
}
return true, handle, nil
}

return true, h, nil
}

Expand Down

0 comments on commit 16304c1

Please sign in to comment.