Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv,session: invalidate snapshot cache under pessimistic transaction #12147

Merged
merged 3 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
errStr := err.Error()
conflictCommitTS := extractConflictCommitTS(errStr)
if conflictCommitTS == 0 {
logutil.Logger(ctx).Warn("failed to extract conflictCommitTS when write conflicted")
logutil.Logger(ctx).Warn("failed to extract conflictCommitTS when write conflicted", zap.String("err", errStr))
}
forUpdateTS := txnCtx.GetForUpdateTS()
logutil.Logger(ctx).Info("pessimistic write conflict, retry statement",
Expand Down
17 changes: 17 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,21 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk2.MustExec("commit")
_, err := tk.Exec("commit")
c.Check(err, NotNil)

// Update snapshotTS after a conflict, invalidate snapshot cache.
tk.MustExec("truncate table conflict")
tk.MustExec("insert into conflict values (1, 2)")
tk.MustExec("begin pessimistic")
// This SQL use BatchGet and cache data in the txn snapshot.
// It can be changed to other SQLs that use BatchGet.
tk.MustExec("insert ignore into conflict values (1, 2)")

tk2.MustExec("update conflict set c = c - 1")

// Make the txn update its forUpdateTS.
tk.MustQuery("select * from conflict where id = 1 for update").Check(testkit.Rows("1 1"))
// Cover a bug that the txn snapshot doesn't invalidate cache after ts change.
tk.MustExec("insert into conflict values (1, 999) on duplicate key update c = c + 2")
tk.MustExec("commit")
tk.MustQuery("select * from conflict").Check(testkit.Rows("1 3"))
}
6 changes: 6 additions & 0 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version, replicaReadSeed uint32) *
}
}

func (s *tikvSnapshot) setSnapshotTS(ts uint64) {
// Invalidate cache if the snapshotTS change!
s.version.Ver = ts
s.cached = nil
}

func (s *tikvSnapshot) SetPriority(priority int) {
s.priority = pb.CommandPri(priority)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) {
case kv.KeyOnly:
txn.snapshot.keyOnly = val.(bool)
case kv.SnapshotTS:
txn.snapshot.version.Ver = val.(uint64)
txn.snapshot.setSnapshotTS(val.(uint64))
}
}

Expand Down