Skip to content

Commit

Permalink
executor: fix data race of "admin check table" (pingcap#11568)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Aug 8, 2019
1 parent 295b495 commit 96b1137
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 29 deletions.
42 changes: 21 additions & 21 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func (s *testSuite) TestAdminRecoverIndex(c *C) {
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err, NotNil)
c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue)
_, err = tk.Exec("admin check index admin_test c2")
err = tk.ExecToErr("admin check index admin_test c2")
c.Assert(err, NotNil)

r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)")
Expand All @@ -125,7 +125,7 @@ func (s *testSuite) TestAdminRecoverIndex(c *C) {
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

_, err = tk.Exec("admin check index admin_test c2")
err = tk.ExecToErr("admin check index admin_test c2")
c.Assert(err, NotNil)
r = tk.MustQuery("admin recover index admin_test c2")
r.Check(testkit.Rows("1 5"))
Expand All @@ -147,9 +147,9 @@ func (s *testSuite) TestAdminRecoverIndex(c *C) {
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err, NotNil)
_, err = tk.Exec("admin check index admin_test c2")
err = tk.ExecToErr("admin check index admin_test c2")
c.Assert(err, NotNil)

r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)")
Expand Down Expand Up @@ -271,9 +271,9 @@ func (s *testSuite) TestAdminCleanupIndex(c *C) {
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err, NotNil)
_, err = tk.Exec("admin check index admin_test c2")
err = tk.ExecToErr("admin check index admin_test c2")
c.Assert(err, NotNil)
r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)")
r.Check(testkit.Rows("11"))
Expand All @@ -283,9 +283,9 @@ func (s *testSuite) TestAdminCleanupIndex(c *C) {
r.Check(testkit.Rows("6"))
tk.MustExec("admin check index admin_test c2")

_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err, NotNil)
_, err = tk.Exec("admin check index admin_test c3")
err = tk.ExecToErr("admin check index admin_test c3")
c.Assert(err, NotNil)
r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c3)")
r.Check(testkit.Rows("9"))
Expand Down Expand Up @@ -332,9 +332,9 @@ func (s *testSuite) TestAdminCleanupIndexPKNotHandle(c *C) {
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err, NotNil)
_, err = tk.Exec("admin check index admin_test `primary`")
err = tk.ExecToErr("admin check index admin_test `primary`")
c.Assert(err, NotNil)
r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(`primary`)")
r.Check(testkit.Rows("6"))
Expand Down Expand Up @@ -384,11 +384,11 @@ func (s *testSuite) TestAdminCleanupIndexMore(c *C) {
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err, NotNil)
_, err = tk.Exec("admin check index admin_test c1")
err = tk.ExecToErr("admin check index admin_test c1")
c.Assert(err, NotNil)
_, err = tk.Exec("admin check index admin_test c2")
err = tk.ExecToErr("admin check index admin_test c2")
c.Assert(err, NotNil)
r := tk.MustQuery("SELECT COUNT(*) FROM admin_test")
r.Check(testkit.Rows("3"))
Expand Down Expand Up @@ -440,7 +440,7 @@ func (s *testSuite) TestAdminCheckTableFailed(c *C) {
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err.Error(), Equals,
"[executor:8003]admin_test err:[admin:1]index:<nil> != record:&admin.RecordData{Handle:-1, Values:[]types.Datum{types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:-10, b:[]uint8(nil), x:interface {}(nil)}}}")
c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue)
Expand All @@ -457,7 +457,7 @@ func (s *testSuite) TestAdminCheckTableFailed(c *C) {
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err.Error(), Equals, "handle 0, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:0, b:[]uint8(nil), x:interface {}(nil)} != record:<nil>")

// Add one row of index.
Expand All @@ -474,7 +474,7 @@ func (s *testSuite) TestAdminCheckTableFailed(c *C) {
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err.Error(), Equals, "col c2, handle 2, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:13, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:12, b:[]uint8(nil), x:interface {}(nil)}")

// Table count = index count.
Expand All @@ -487,7 +487,7 @@ func (s *testSuite) TestAdminCheckTableFailed(c *C) {
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}")

// Table count = index count.
Expand All @@ -500,7 +500,7 @@ func (s *testSuite) TestAdminCheckTableFailed(c *C) {
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
_, err = tk.Exec("admin check table admin_test")
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}")

// Recover records.
Expand Down Expand Up @@ -587,7 +587,7 @@ func (s *testSuite) TestAdminCheckTable(c *C) {

tk.MustExec("use mysql")
tk.MustExec(`admin check table test.t;`)
_, err := tk.Exec("admin check table t")
err := tk.ExecToErr("admin check table t")
c.Assert(err, NotNil)

// test add index on time type column which have default value
Expand Down Expand Up @@ -627,7 +627,7 @@ func (s *testSuite) TestAdminCheckTable(c *C) {
tk.MustExec(`drop table if exists t1`)
tk.MustExec(`create table t1 (a decimal(2,1), index(a))`)
tk.MustExec(`insert into t1 set a='1.9'`)
_, err = tk.Exec(`alter table t1 modify column a decimal(3,2);`)
err = tk.ExecToErr(`alter table t1 modify column a decimal(3,2);`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:203]unsupported modify decimal column precision")
tk.MustExec(`delete from t1;`)
Expand Down
8 changes: 0 additions & 8 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package executor

import (
"fmt"
"math"
"runtime"
"sort"
Expand Down Expand Up @@ -830,10 +829,6 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
if err != nil {
return errors.Trace(err)
}
x := fmt.Sprintf("xxx ------------------------------------------- idx %v, row len %v, cols len %v, col %v, rows %v, handles %v, expr %v, val %#v, tp %v",
w.idxLookup.index.Name, row.Len(), len(w.idxLookup.columns), col, chk.NumRows(), len(task.handles), expr, val,
col.FieldType.String())
logutil.Logger(context.Background()).Warn("111 ----", zap.String("xxx", x))
vals = append(vals, val)
} else {
vals = append(vals, row.GetDatum(i, &col.FieldType))
Expand All @@ -843,9 +838,6 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
for i, val := range vals {
col := w.idxTblCols[i]
tp := &col.FieldType
d := idxRow.GetDatum(i, tp)
x := fmt.Sprintf("col tp %v, val %v, is null %v, len %v, no.%d", tp, (&d).Kind(), (&d).IsNull(), idxRow.Len(), i)
logutil.Logger(context.Background()).Warn("-------", zap.String("xx", x))
ret := chunk.Compare(idxRow, i, &val)
if ret != 0 {
return errors.Errorf("col %s, handle %#v, index:%#v != record:%#v", col.Name, handle, idxRow.GetDatum(i, tp), val)
Expand Down
11 changes: 11 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,17 @@ func (e *CheckTableExec) Open(ctx context.Context) error {
return nil
}

// Close implements the Executor Close interface.
func (e *CheckTableExec) Close() error {
var firstErr error
for _, src := range e.srcs {
if err := src.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}

func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *IndexLookUpExecutor) error {
cols := src.schema.Columns
retFieldTypes := make([]*types.FieldType, len(cols))
Expand Down
9 changes: 9 additions & 0 deletions util/testkit/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ func (tk *TestKit) QueryToErr(sql string, args ...interface{}) error {
return resErr
}

// ExecToErr executes a sql statement and discard results.
func (tk *TestKit) ExecToErr(sql string, args ...interface{}) error {
res, err := tk.Exec(sql, args...)
if res != nil {
tk.c.Assert(res.Close(), check.IsNil)
}
return err
}

// ResultSetToResult converts sqlexec.RecordSet to testkit.Result.
// It is used to check results of execute statement in binary mode.
func (tk *TestKit) ResultSetToResult(rs sqlexec.RecordSet, comment check.CommentInterface) *Result {
Expand Down

0 comments on commit 96b1137

Please sign in to comment.