Skip to content

Commit

Permalink
remove txnScope
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Feb 26, 2025
1 parent e6f14ff commit cf6688e
Show file tree
Hide file tree
Showing 28 changed files with 152 additions and 279 deletions.
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type Config struct {
Path string
EnableForwarding bool
// FIXME: rename
TxnScope string
EnableAsyncCommit bool
Enable1PC bool
TxnScope string
EnableAsyncCommit bool
Enable1PC bool
// RegionsRefreshInterval indicates the interval of loading regions info, the unit is second, if RegionsRefreshInterval == 0, it will be disabled.
RegionsRefreshInterval uint64
// EnablePreload indicates whether to preload region info when initializing the client.
Expand Down
29 changes: 3 additions & 26 deletions integration_tests/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *testOnePCSuite) Test1PC() {
// Check all keys
keys := [][]byte{k1, k2, k3, k4, k5}
values := [][]byte{v1, v2, v3, v4, v5New}
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp()
s.Nil(err)
snap := s.store.GetSnapshot(ver)
for i, k := range keys {
Expand All @@ -161,7 +161,7 @@ func (s *testOnePCSuite) Test1PCIsolation() {
// Make `txn`'s commitTs more likely to be less than `txn2`'s startTs if there's bug in commitTs
// calculation.
for i := 0; i < 10; i++ {
_, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
_, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
}

Expand Down Expand Up @@ -215,7 +215,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() {
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0))
s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS())

ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp()
s.Nil(err)
snap := s.store.GetSnapshot(ver)
for i, k := range keys {
Expand Down Expand Up @@ -244,29 +244,6 @@ func (s *testOnePCSuite) Test1PCLinearizability() {
s.Less(commitTS2, commitTS1)
}

func (s *testOnePCSuite) Test1PCWithMultiDC() {
// It requires setting placement rules to run with TiKV
if *withTiKV {
return
}

localTxn := s.begin1PC()
err := localTxn.Set([]byte("a"), []byte("a1"))
localTxn.SetScope("bj")
s.Nil(err)
err = localTxn.Commit(context.Background())
s.Nil(err)
s.False(localTxn.GetCommitter().IsOnePC())

globalTxn := s.begin1PC()
err = globalTxn.Set([]byte("b"), []byte("b1"))
globalTxn.SetScope(oracle.GlobalTxnScope)
s.Nil(err)
err = globalTxn.Commit(context.Background())
s.Nil(err)
s.True(globalTxn.GetCommitter().IsOnePC())
}

func (s *testOnePCSuite) TestTxnCommitCounter() {
initial := metrics.GetTxnCommitCounter()

Expand Down
14 changes: 7 additions & 7 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (s *testCommitterSuite) TestPrewriteRollback() {
err = committer.PrewriteAllMutations(ctx)
s.Nil(err)
}
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
committer.SetCommitTS(commitTS)
err = committer.CommitMutations(ctx)
Expand Down Expand Up @@ -372,7 +372,7 @@ func (s *testCommitterSuite) mustGetRegionID(key []byte) uint64 {
}

func (s *testCommitterSuite) isKeyOptimisticLocked(key []byte) bool {
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp()
s.Nil(err)
bo := tikv.NewBackofferWithVars(context.Background(), 500, nil)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Expand Down Expand Up @@ -733,7 +733,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() {
err = txn.LockKeys(context.Background(), lockCtx, key2)
s.Nil(err)
lockInfo := s.getLockInfo(key)
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{})
s.GreaterOrEqual(msBeforeLockExpired, int64(100))

lr := s.store.NewLockResolver()
Expand All @@ -746,7 +746,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() {
check := func() bool {
lockInfoNew := s.getLockInfo(key)
if lockInfoNew.LockTtl > lockInfo.LockTtl {
currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{})
s.Nil(err)
// Check that the TTL is update to a reasonable range.
expire := oracle.ExtractPhysical(txn.StartTS()) + int64(lockInfoNew.LockTtl)
Expand Down Expand Up @@ -1272,7 +1272,7 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() {
s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
// Update forUpdateTS to simulate a pessimistic retry.
newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
newForUpdateTS, err := s.store.CurrentTimestamp()
s.Nil(err)
s.GreaterOrEqual(newForUpdateTS, txn2.CommitTS())
lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()}
Expand Down Expand Up @@ -1583,7 +1583,7 @@ func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() {
s.True(txn.GetCommitter().IsTTLRunning())

// Get a new ts as the new forUpdateTS.
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
Expand Down Expand Up @@ -1662,7 +1662,7 @@ func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfter
return
}

forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
key := []byte("k1")
Expand Down
5 changes: 2 additions & 3 deletions integration_tests/assertion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stretchr/testify/suite"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
)
Expand Down Expand Up @@ -187,7 +186,7 @@ func (s *testAssertionSuite) TestPrewriteAssertion() {
// When the test cases runs with TiKV, the TiKV cluster can be reused, thus there may be deleted versions caused by
// previous tests. This test case may meet different behavior if there are deleted versions. To avoid it, compose a
// key prefix with a timestamp to ensure the keys to be unique.
ts, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
ts, err := s.store.CurrentTimestamp()
s.Nil(err)
prefix := fmt.Sprintf("test-prewrite-assertion-%d-", ts)
s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Strict)
Expand All @@ -199,7 +198,7 @@ func (s *testAssertionSuite) TestFastAssertion() {
// When the test cases runs with TiKV, the TiKV cluster can be reused, thus there may be deleted versions caused by
// previous tests. This test case may meet different behavior if there are deleted versions. To avoid it, compose a
// key prefix with a timestamp to ensure the keys to be unique.
ts, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
ts, err := s.store.CurrentTimestamp()
s.Nil(err)
prefix := fmt.Sprintf("test-fast-assertion-%d-", ts)
s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Fast)
Expand Down
41 changes: 8 additions & 33 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, ex
}

func (s *testAsyncCommitCommon) mustGetLock(key []byte) *txnkv.Lock {
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
ver, err := s.store.CurrentTimestamp()
s.Nil(err)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: key,
Expand Down Expand Up @@ -230,7 +230,7 @@ func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, pr
s.Nil(err)

if commitPrimary {
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
tpc.SetCommitTS(commitTS)
err = tpc.CommitMutations(ctx)
Expand All @@ -257,23 +257,23 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() {
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, []byte("z"), []byte("z"), false)
lock := s.mustGetLock([]byte("z"))
lock.UseAsyncCommit = true
ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
var lockutil txnlock.LockProbe
status := lockutil.NewLockStatus(nil, true, ts)

resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
err = resolver.ResolveAsyncCommitLock(s.bo, lock, status)
s.Nil(err)
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
status, err = resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false, nil)
s.Nil(err)
s.True(status.IsCommitted())
s.Equal(status.CommitTS(), ts)

// One key is committed (i), one key is locked (a). Should get committed.
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
commitTs := ts + 10

Expand Down Expand Up @@ -350,7 +350,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() {
s.Equal(gotResolve, int64(1))

// One key has been rolled back (b), one is locked (a). Should be rolled back.
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
commitTs = ts + 10

Expand Down Expand Up @@ -400,7 +400,7 @@ func (s *testAsyncCommitSuite) TestRepeatableRead() {
txn1.Set([]byte("k1"), []byte("v2"))

for i := 0; i < 20; i++ {
_, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
_, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
}

Expand Down Expand Up @@ -445,31 +445,6 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() {
s.Less(commitTS2, commitTS1)
}

// TestAsyncCommitWithMultiDC tests that async commit can only be enabled in global transactions
func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC() {
// It requires setting placement rules to run with TiKV
if *withTiKV {
return
}

localTxn := s.beginAsyncCommit()
err := localTxn.Set([]byte("a"), []byte("a1"))
localTxn.SetScope("bj")
s.Nil(err)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
err = localTxn.Commit(ctx)
s.Nil(err)
s.False(localTxn.IsAsyncCommit())

globalTxn := s.beginAsyncCommit()
err = globalTxn.Set([]byte("b"), []byte("b1"))
globalTxn.SetScope(oracle.GlobalTxnScope)
s.Nil(err)
err = globalTxn.Commit(ctx)
s.Nil(err)
s.True(globalTxn.IsAsyncCommit())
}

func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() {
keys := [][]byte{[]byte("k0"), []byte("k1")}
values := [][]byte{[]byte("v00"), []byte("v10")}
Expand Down Expand Up @@ -620,7 +595,7 @@ func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() {
lock := s.mustGetLock([]byte("a"))
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
for {
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil)
s.Nil(err)
Expand Down
Loading

0 comments on commit cf6688e

Please sign in to comment.