Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: update table delta map when really lock keys #21996

Merged
12 changes: 12 additions & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
existKeys = make([]kv.Key, 0, len(values))
}
e.values = make([][]byte, 0, len(values))
hasKeys := false
for i, key := range keys {
val := values[string(key)]
if len(val) == 0 {
Expand All @@ -304,6 +305,9 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
e.values = append(e.values, val)
handles = append(handles, e.handles[i])
if e.lock && rc {
if !hasKeys {
hasKeys = true
}
existKeys = append(existKeys, key)
}
}
Expand All @@ -313,6 +317,14 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if err != nil {
return err
}
if hasKeys {
// Update partition table IDs
for _, pid := range e.physIDs {
e.updateDeltaForTableID(pid)
}
// Update table ID
e.updateDeltaForTableID(e.tblInfo.ID)
}
}
e.handles = handles
return nil
Expand Down
26 changes: 25 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func (e *baseExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *baseExecutor) updateDeltaForTableID(id int64) {
txnCtx := e.ctx.GetSessionVars().TxnCtx
txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{})
}

func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) baseExecutor {
e := baseExecutor{
children: children,
Expand Down Expand Up @@ -900,6 +905,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

hasKeys := false
if req.NumRows() > 0 {
iter := chunk.NewIterator4Chunk(req)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
Expand All @@ -915,6 +921,9 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

for _, col := range cols {
if !hasKeys {
hasKeys = true
}
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, row.GetInt64(col.Index)))
}
}
Expand All @@ -926,7 +935,22 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
lockWaitTime = kv.LockNoWait
}

return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...)
err = doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...)
if err == nil && hasKeys {
// Just update table delta when there really has keys locked.
if len(e.tblID2Handle) > 0 {
for id := range e.tblID2Handle {
e.updateDeltaForTableID(id)
}
}
if len(e.partitionedTable) > 0 {
for _, p := range e.partitionedTable {
pid := p.Meta().ID
e.updateDeltaForTableID(pid)
}
}
}
return err
}

func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx {
Expand Down
149 changes: 149 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6508,3 +6508,152 @@ func (s *testSuite) TestTxnRetry(c *C) {
tk.MustExec("commit")
tk.MustQuery("select * from t").Check(testkit.Rows("10"))
}

func issue20975Prepare(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) {
tk1 := testkit.NewTestKit(c, store)
tk2 := testkit.NewTestKit(c, store)
tk1.MustExec("use test")
tk1.MustExec("drop table if exists t1, t2")
tk2.MustExec("use test")
tk1.MustExec("create table t1(id int primary key, c int)")
tk1.MustExec("insert into t1 values(1, 10), (2, 20)")
return tk1, tk2
}

func (s *testSuite) TestIssue20975UpdateNoChange(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin pessimistic")
tk1.MustExec("update t1 set c=c")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdate(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdatePointGet(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGet(c *C) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add some cases for pointGetExec and batchPointGetExec

  • Isolation is default RR(SI).
  • Using non-handle unique keys.
  • The correspond handle keys do not exist.
    These unique keys should be locked and thus these lock only transactions will get schema check during 2pc.

tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func issue20975PreparePartitionTable(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) {
tk1 := testkit.NewTestKit(c, store)
tk2 := testkit.NewTestKit(c, store)
tk1.MustExec("use test")
tk1.MustExec("drop table if exists t1, t2")
tk2.MustExec("use test")
tk1.MustExec(`create table t1(id int primary key, c int) partition by range (id) (
partition p1 values less than (10),
partition p2 values less than (20)
)`)
tk1.MustExec("insert into t1 values(1, 10), (2, 20), (11, 30), (12, 40)")
return tk1, tk2
}

func (s *testSuite) TestIssue20975UpdateNoChangeWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin pessimistic")
tk1.MustExec("update t1 set c=c")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdateWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdatePointGetWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id=12 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id=12 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGetWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (11, 12) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (1, 11) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (11, 12) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (1, 11) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}
9 changes: 9 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,15 @@ func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) erro
if err != nil {
return err
}
// Key need lock get table ID
var tblID int64
if e.partInfo != nil {
tblID = e.partInfo.ID
} else {
tblID = e.tblInfo.ID
}
e.updateDeltaForTableID(tblID)

lockCtx.ValuesLock.Lock()
defer lockCtx.ValuesLock.Unlock()
for key, val := range lockCtx.Values {
Expand Down
7 changes: 5 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,11 @@ func (s *session) doCommit(ctx context.Context) error {
for id := range relatedPhysicalTables {
physicalTableIDs = append(physicalTableIDs, id)
}
// Set this option for 2 phase commit to validate schema lease.
s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs))
// If table delta is empty means no changes for any table. So do not need SchemaChecker.
if len(physicalTableIDs) > 0 {
// Set this option for 2 phase commit to validate schema lease.
s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs))
}
s.txn.SetOption(kv.InfoSchema, s.sessionVars.TxnCtx.InfoSchema)
s.txn.SetOption(kv.CommitHook, func(info kv.TxnInfo, _ error) { s.sessionVars.LastTxnInfo = info })
if s.GetSessionVars().EnableAmendPessimisticTxn {
Expand Down