Skip to content

Commit

Permalink
remove txnScope
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Feb 19, 2025
1 parent e6f14ff commit 72d2742
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 226 deletions.
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type Config struct {
Path string
EnableForwarding bool
// FIXME: rename
TxnScope string
EnableAsyncCommit bool
Enable1PC bool
TxnScope string
EnableAsyncCommit bool
Enable1PC bool
// RegionsRefreshInterval indicates the interval of loading regions info, the unit is second, if RegionsRefreshInterval == 0, it will be disabled.
RegionsRefreshInterval uint64
// EnablePreload indicates whether to preload region info when initializing the client.
Expand Down
25 changes: 1 addition & 24 deletions integration_tests/1pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *testOnePCSuite) Test1PCIsolation() {
// Make `txn`'s commitTs more likely to be less than `txn2`'s startTs if there's bug in commitTs
// calculation.
for i := 0; i < 10; i++ {
_, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
_, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
}

Expand Down Expand Up @@ -244,29 +244,6 @@ func (s *testOnePCSuite) Test1PCLinearizability() {
s.Less(commitTS2, commitTS1)
}

func (s *testOnePCSuite) Test1PCWithMultiDC() {
// It requires setting placement rules to run with TiKV
if *withTiKV {
return
}

localTxn := s.begin1PC()
err := localTxn.Set([]byte("a"), []byte("a1"))
localTxn.SetScope("bj")
s.Nil(err)
err = localTxn.Commit(context.Background())
s.Nil(err)
s.False(localTxn.GetCommitter().IsOnePC())

globalTxn := s.begin1PC()
err = globalTxn.Set([]byte("b"), []byte("b1"))
globalTxn.SetScope(oracle.GlobalTxnScope)
s.Nil(err)
err = globalTxn.Commit(context.Background())
s.Nil(err)
s.True(globalTxn.GetCommitter().IsOnePC())
}

func (s *testOnePCSuite) TestTxnCommitCounter() {
initial := metrics.GetTxnCommitCounter()

Expand Down
10 changes: 5 additions & 5 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (s *testCommitterSuite) TestPrewriteRollback() {
err = committer.PrewriteAllMutations(ctx)
s.Nil(err)
}
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
committer.SetCommitTS(commitTS)
err = committer.CommitMutations(ctx)
Expand Down Expand Up @@ -733,7 +733,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() {
err = txn.LockKeys(context.Background(), lockCtx, key2)
s.Nil(err)
lockInfo := s.getLockInfo(key)
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{})
s.GreaterOrEqual(msBeforeLockExpired, int64(100))

lr := s.store.NewLockResolver()
Expand All @@ -746,7 +746,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() {
check := func() bool {
lockInfoNew := s.getLockInfo(key)
if lockInfoNew.LockTtl > lockInfo.LockTtl {
currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{})
s.Nil(err)
// Check that the TTL is update to a reasonable range.
expire := oracle.ExtractPhysical(txn.StartTS()) + int64(lockInfoNew.LockTtl)
Expand Down Expand Up @@ -1583,7 +1583,7 @@ func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() {
s.True(txn.GetCommitter().IsTTLRunning())

// Get a new ts as the new forUpdateTS.
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
Expand Down Expand Up @@ -1662,7 +1662,7 @@ func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfter
return
}

forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.NoError(err)
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
key := []byte("k1")
Expand Down
39 changes: 7 additions & 32 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, pr
s.Nil(err)

if commitPrimary {
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
tpc.SetCommitTS(commitTS)
err = tpc.CommitMutations(ctx)
Expand All @@ -257,23 +257,23 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() {
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, []byte("z"), []byte("z"), false)
lock := s.mustGetLock([]byte("z"))
lock.UseAsyncCommit = true
ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
var lockutil txnlock.LockProbe
status := lockutil.NewLockStatus(nil, true, ts)

resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
err = resolver.ResolveAsyncCommitLock(s.bo, lock, status)
s.Nil(err)
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
status, err = resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false, nil)
s.Nil(err)
s.True(status.IsCommitted())
s.Equal(status.CommitTS(), ts)

// One key is committed (i), one key is locked (a). Should get committed.
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
commitTs := ts + 10

Expand Down Expand Up @@ -350,7 +350,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() {
s.Equal(gotResolve, int64(1))

// One key has been rolled back (b), one is locked (a). Should be rolled back.
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
commitTs = ts + 10

Expand Down Expand Up @@ -400,7 +400,7 @@ func (s *testAsyncCommitSuite) TestRepeatableRead() {
txn1.Set([]byte("k1"), []byte("v2"))

for i := 0; i < 20; i++ {
_, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
_, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
}

Expand Down Expand Up @@ -445,31 +445,6 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() {
s.Less(commitTS2, commitTS1)
}

// TestAsyncCommitWithMultiDC tests that async commit can only be enabled in global transactions
func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC() {
// It requires setting placement rules to run with TiKV
if *withTiKV {
return
}

localTxn := s.beginAsyncCommit()
err := localTxn.Set([]byte("a"), []byte("a1"))
localTxn.SetScope("bj")
s.Nil(err)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
err = localTxn.Commit(ctx)
s.Nil(err)
s.False(localTxn.IsAsyncCommit())

globalTxn := s.beginAsyncCommit()
err = globalTxn.Set([]byte("b"), []byte("b1"))
globalTxn.SetScope(oracle.GlobalTxnScope)
s.Nil(err)
err = globalTxn.Commit(ctx)
s.Nil(err)
s.True(globalTxn.IsAsyncCommit())
}

func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() {
keys := [][]byte{[]byte("k0"), []byte("k1")}
values := [][]byte{[]byte("v00"), []byte("v10")}
Expand Down Expand Up @@ -620,7 +595,7 @@ func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() {
lock := s.mustGetLock([]byte("a"))
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
for {
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil)
s.Nil(err)
Expand Down
20 changes: 10 additions & 10 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, ttl
s.Nil(err)

if commitPrimary {
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
tpc.SetCommitTS(commitTS)
err = tpc.CommitMutations(ctx)
Expand Down Expand Up @@ -262,7 +262,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL() {

bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
lr := s.store.NewLockResolver()
callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{})
s.Nil(err)

// Check the lock TTL of a transaction.
Expand Down Expand Up @@ -324,7 +324,7 @@ func (s *testLockSuite) TestCheckTxnStatus() {
s.prewriteTxnWithTTL(txn, 1000)

o := s.store.GetOracle()
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
s.Greater(currentTS, txn.StartTS())

Expand All @@ -351,7 +351,7 @@ func (s *testLockSuite) TestCheckTxnStatus() {
s.Equal(timeBeforeExpire, int64(0))

// Then call getTxnStatus again and check the lock status.
currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
status, err = s.store.NewLockResolver().GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false, nil)
s.Nil(err)
Expand Down Expand Up @@ -383,7 +383,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() {
s.Nil(err)

o := s.store.GetOracle()
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
resolver := s.store.NewLockResolver()
Expand Down Expand Up @@ -412,7 +412,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() {
s.Nil(committer.CleanupMutations(context.Background()))

// Call getTxnStatusFromLock to cover TxnNotFound and retry timeout.
startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{})
s.Nil(err)
lock = &txnkv.Lock{
Key: []byte("second"),
Expand Down Expand Up @@ -538,9 +538,9 @@ func (s *testLockSuite) TestBatchResolveLocks() {
}

// Locks may not expired
msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL, &oracle.Option{})
s.Greater(msBeforeLockExpired, int64(0))
msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{})
s.Greater(msBeforeLockExpired, int64(0))

lr := s.store.NewLockResolver()
Expand Down Expand Up @@ -961,10 +961,10 @@ func (s *testLockSuite) TestResolveLocksForRead() {
s.Nil(committer.PrewriteAllMutations(ctx))
committer.SetPrimaryKey([]byte("k66"))

readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)

commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{})
s.Nil(err)
s.Greater(commitTS, readStartTS)
committer.SetCommitTS(commitTS)
Expand Down
2 changes: 1 addition & 1 deletion internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS()
}

getTS := func() uint64 {
ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{})
s.NoError(err)
return ts
}
Expand Down
10 changes: 0 additions & 10 deletions internal/locate/store_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,6 @@ func (s *Store) IsStoreMatch(stores []uint64) bool {
return false
}

// GetLabelValue returns the value of the label
func (s *Store) GetLabelValue(key string) (string, bool) {
for _, label := range s.labels {
if label.Key == key {
return label.Value, true
}
}
return "", false
}

// IsSameLabels returns whether the store have the same labels with target labels
func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool {
if len(s.labels) != len(labels) {
Expand Down
Loading

0 comments on commit 72d2742

Please sign in to comment.