diff --git a/config/config.go b/config/config.go index bafc164aaa..566ef9d9fd 100644 --- a/config/config.go +++ b/config/config.go @@ -76,9 +76,9 @@ type Config struct { Path string EnableForwarding bool // FIXME: rename - TxnScope string - EnableAsyncCommit bool - Enable1PC bool + TxnScope string + EnableAsyncCommit bool + Enable1PC bool // RegionsRefreshInterval indicates the interval of loading regions info, the unit is second, if RegionsRefreshInterval == 0, it will be disabled. RegionsRefreshInterval uint64 // EnablePreload indicates whether to preload region info when initializing the client. diff --git a/integration_tests/1pc_test.go b/integration_tests/1pc_test.go index c69bbdbcfe..67224322cc 100644 --- a/integration_tests/1pc_test.go +++ b/integration_tests/1pc_test.go @@ -135,7 +135,7 @@ func (s *testOnePCSuite) Test1PC() { // Check all keys keys := [][]byte{k1, k2, k3, k4, k5} values := [][]byte{v1, v2, v3, v4, v5New} - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) snap := s.store.GetSnapshot(ver) for i, k := range keys { @@ -161,7 +161,7 @@ func (s *testOnePCSuite) Test1PCIsolation() { // Make `txn`'s commitTs more likely to be less than `txn2`'s startTs if there's bug in commitTs // calculation. for i := 0; i < 10; i++ { - _, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + _, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) } @@ -215,7 +215,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() { s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS()) - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) snap := s.store.GetSnapshot(ver) for i, k := range keys { @@ -244,29 +244,6 @@ func (s *testOnePCSuite) Test1PCLinearizability() { s.Less(commitTS2, commitTS1) } -func (s *testOnePCSuite) Test1PCWithMultiDC() { - // It requires setting placement rules to run with TiKV - if *withTiKV { - return - } - - localTxn := s.begin1PC() - err := localTxn.Set([]byte("a"), []byte("a1")) - localTxn.SetScope("bj") - s.Nil(err) - err = localTxn.Commit(context.Background()) - s.Nil(err) - s.False(localTxn.GetCommitter().IsOnePC()) - - globalTxn := s.begin1PC() - err = globalTxn.Set([]byte("b"), []byte("b1")) - globalTxn.SetScope(oracle.GlobalTxnScope) - s.Nil(err) - err = globalTxn.Commit(context.Background()) - s.Nil(err) - s.True(globalTxn.GetCommitter().IsOnePC()) -} - func (s *testOnePCSuite) TestTxnCommitCounter() { initial := metrics.GetTxnCommitCounter() diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index fdad3b2969..2d19949b90 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -272,7 +272,7 @@ func (s *testCommitterSuite) TestPrewriteRollback() { err = committer.PrewriteAllMutations(ctx) s.Nil(err) } - commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) committer.SetCommitTS(commitTS) err = committer.CommitMutations(ctx) @@ -372,7 +372,7 @@ func (s *testCommitterSuite) mustGetRegionID(key []byte) uint64 { } func (s *testCommitterSuite) isKeyOptimisticLocked(key []byte) bool { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), 500, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ @@ -733,7 +733,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() { err = txn.LockKeys(context.Background(), lockCtx, key2) s.Nil(err) lockInfo := s.getLockInfo(key) - msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{}) s.GreaterOrEqual(msBeforeLockExpired, int64(100)) lr := s.store.NewLockResolver() @@ -746,7 +746,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() { check := func() bool { lockInfoNew := s.getLockInfo(key) if lockInfoNew.LockTtl > lockInfo.LockTtl { - currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{}) s.Nil(err) // Check that the TTL is update to a reasonable range. expire := oracle.ExtractPhysical(txn.StartTS()) + int64(lockInfoNew.LockTtl) @@ -1272,7 +1272,7 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS) s.Equal(txn2.CommitTS(), lockCtx.Values["k8"].LockedWithConflictTS) // Update forUpdateTS to simulate a pessimistic retry. - newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + newForUpdateTS, err := s.store.CurrentTimestamp() s.Nil(err) s.GreaterOrEqual(newForUpdateTS, txn2.CommitTS()) lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()} @@ -1583,7 +1583,7 @@ func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() { s.True(txn.GetCommitter().IsTTLRunning()) // Get a new ts as the new forUpdateTS. - forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.NoError(err) lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) @@ -1662,7 +1662,7 @@ func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfter return } - forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.NoError(err) lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} key := []byte("k1") diff --git a/integration_tests/assertion_test.go b/integration_tests/assertion_test.go index f5699d53e2..2eea4e2300 100644 --- a/integration_tests/assertion_test.go +++ b/integration_tests/assertion_test.go @@ -27,7 +27,6 @@ import ( "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/kv" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" ) @@ -187,7 +186,7 @@ func (s *testAssertionSuite) TestPrewriteAssertion() { // When the test cases runs with TiKV, the TiKV cluster can be reused, thus there may be deleted versions caused by // previous tests. This test case may meet different behavior if there are deleted versions. To avoid it, compose a // key prefix with a timestamp to ensure the keys to be unique. - ts, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ts, err := s.store.CurrentTimestamp() s.Nil(err) prefix := fmt.Sprintf("test-prewrite-assertion-%d-", ts) s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Strict) @@ -199,7 +198,7 @@ func (s *testAssertionSuite) TestFastAssertion() { // When the test cases runs with TiKV, the TiKV cluster can be reused, thus there may be deleted versions caused by // previous tests. This test case may meet different behavior if there are deleted versions. To avoid it, compose a // key prefix with a timestamp to ensure the keys to be unique. - ts, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ts, err := s.store.CurrentTimestamp() s.Nil(err) prefix := fmt.Sprintf("test-fast-assertion-%d-", ts) s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Fast) diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 4c478f45a9..feb96cb437 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -122,7 +122,7 @@ func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, ex } func (s *testAsyncCommitCommon) mustGetLock(key []byte) *txnkv.Lock { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: key, @@ -230,7 +230,7 @@ func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, pr s.Nil(err) if commitPrimary { - commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) tpc.SetCommitTS(commitTS) err = tpc.CommitMutations(ctx) @@ -257,7 +257,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, []byte("z"), []byte("z"), false) lock := s.mustGetLock([]byte("z")) lock.UseAsyncCommit = true - ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) var lockutil txnlock.LockProbe status := lockutil.NewLockStatus(nil, true, ts) @@ -265,7 +265,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { resolver := tikv.NewLockResolverProb(s.store.GetLockResolver()) err = resolver.ResolveAsyncCommitLock(s.bo, lock, status) s.Nil(err) - currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) status, err = resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false, nil) s.Nil(err) @@ -273,7 +273,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { s.Equal(status.CommitTS(), ts) // One key is committed (i), one key is locked (a). Should get committed. - ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) commitTs := ts + 10 @@ -350,7 +350,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { s.Equal(gotResolve, int64(1)) // One key has been rolled back (b), one is locked (a). Should be rolled back. - ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) commitTs = ts + 10 @@ -400,7 +400,7 @@ func (s *testAsyncCommitSuite) TestRepeatableRead() { txn1.Set([]byte("k1"), []byte("v2")) for i := 0; i < 20; i++ { - _, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + _, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) } @@ -445,31 +445,6 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() { s.Less(commitTS2, commitTS1) } -// TestAsyncCommitWithMultiDC tests that async commit can only be enabled in global transactions -func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC() { - // It requires setting placement rules to run with TiKV - if *withTiKV { - return - } - - localTxn := s.beginAsyncCommit() - err := localTxn.Set([]byte("a"), []byte("a1")) - localTxn.SetScope("bj") - s.Nil(err) - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - err = localTxn.Commit(ctx) - s.Nil(err) - s.False(localTxn.IsAsyncCommit()) - - globalTxn := s.beginAsyncCommit() - err = globalTxn.Set([]byte("b"), []byte("b1")) - globalTxn.SetScope(oracle.GlobalTxnScope) - s.Nil(err) - err = globalTxn.Commit(ctx) - s.Nil(err) - s.True(globalTxn.IsAsyncCommit()) -} - func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() { keys := [][]byte{[]byte("k0"), []byte("k1")} values := [][]byte{[]byte("v00"), []byte("v10")} @@ -620,7 +595,7 @@ func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() { lock := s.mustGetLock([]byte("a")) resolver := tikv.NewLockResolverProb(s.store.GetLockResolver()) for { - currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil) s.Nil(err) diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index 0019027849..b55bd36310 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -120,7 +120,7 @@ func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, ttl s.Nil(err) if commitPrimary { - commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) tpc.SetCommitTS(commitTS) err = tpc.CommitMutations(ctx) @@ -262,7 +262,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL() { bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) lr := s.store.NewLockResolver() - callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{}) s.Nil(err) // Check the lock TTL of a transaction. @@ -324,7 +324,7 @@ func (s *testLockSuite) TestCheckTxnStatus() { s.prewriteTxnWithTTL(txn, 1000) o := s.store.GetOracle() - currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) s.Greater(currentTS, txn.StartTS()) @@ -351,7 +351,7 @@ func (s *testLockSuite) TestCheckTxnStatus() { s.Equal(timeBeforeExpire, int64(0)) // Then call getTxnStatus again and check the lock status. - currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) status, err = s.store.NewLockResolver().GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false, nil) s.Nil(err) @@ -383,7 +383,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() { s.Nil(err) o := s.store.GetOracle() - currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) resolver := s.store.NewLockResolver() @@ -412,7 +412,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() { s.Nil(committer.CleanupMutations(context.Background())) // Call getTxnStatusFromLock to cover TxnNotFound and retry timeout. - startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) lock = &txnkv.Lock{ Key: []byte("second"), @@ -443,7 +443,7 @@ func (s *testLockSuite) prewriteTxnWithTTL(txn transaction.TxnProbe, ttl uint64) } func (s *testLockSuite) mustGetLock(key []byte) *txnkv.Lock { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ @@ -538,9 +538,9 @@ func (s *testLockSuite) TestBatchResolveLocks() { } // Locks may not expired - msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL, &oracle.Option{}) s.Greater(msBeforeLockExpired, int64(0)) - msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{}) s.Greater(msBeforeLockExpired, int64(0)) lr := s.store.NewLockResolver() @@ -961,10 +961,10 @@ func (s *testLockSuite) TestResolveLocksForRead() { s.Nil(committer.PrewriteAllMutations(ctx)) committer.SetPrimaryKey([]byte("k66")) - readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) - commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) s.Greater(commitTS, readStartTS) committer.SetCommitTS(commitTS) @@ -1044,7 +1044,7 @@ func (s *testLockWithTiKVSuite) cleanupLocks() { // Cleanup possible left locks. bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + currentTS, err := s.store.CurrentTimestamp() s.NoError(err) remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS) s.NoError(err) @@ -1358,7 +1358,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() { // There must be a new tso allocation before committing if any key is locked with conflict, otherwise // async commit will become unsafe. // In TiDB, there must be a tso allocation for updating forUpdateTS. - currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + currentTS, err := s.store.CurrentTimestamp() s.NoError(err) lockCtx = kv.NewLockCtx(currentTS, 200, lockTime) // k4: non-conflicting key but forUpdateTS updated @@ -1429,7 +1429,7 @@ func (s *testLockWithTiKVSuite) TestCheckTxnStatusSentToSecondary() { // * k1: stale pessimistic lock, primary // * k2: stale pessimistic lock, primary -> k1 - forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + forUpdateTS, err := s.store.CurrentTimestamp() s.NoError(err) lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now()) err = txn.LockKeys(ctx, lockCtx, k3) // k3 becomes primary @@ -1467,7 +1467,7 @@ func (s *testLockWithTiKVSuite) TestCheckTxnStatusSentToSecondary() { } // Check data consistency - readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + readTS, err := s.store.CurrentTimestamp() s.NoError(err) snapshot := s.store.GetSnapshot(readTS) v, err := snapshot.Get(ctx, k3) @@ -1520,7 +1520,7 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { // k1 has txn1's stale pessimistic lock now. - forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + forUpdateTS, err := s.store.CurrentTimestamp() s.NoError(err) lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now()) s.NoError(txn1.LockKeys(ctx, lockCtx, k2, k3)) @@ -1540,7 +1540,7 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { s.NoError(txn2.Rollback()) // k4 has txn2's stale primary pessimistic lock now. - currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + currentTS, err := s.store.CurrentTimestamp() remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS) s.NoError(err) @@ -1564,7 +1564,7 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { s.Empty(remainingLocks) // Check data consistency - readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + readTS, err := s.store.CurrentTimestamp() snapshot := s.store.GetSnapshot(readTS) _, err = snapshot.Get(ctx, k1) s.Equal(tikverr.ErrNotExist, err) diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index 0266f1aa51..def7f0f152 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -126,13 +126,13 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() { require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) s.waitForMinSafeTS(dcLabel, 100) require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount)) - require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel)) + require.Equal(uint64(100), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) { s.Eventually(func() bool { - return s.store.GetMinSafeTS(txnScope) == ts + return s.store.GetMinSafeTS() == ts }, time.Second, 200*time.Millisecond) } @@ -145,7 +145,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) s.waitForMinSafeTS(oracle.GlobalTxnScope, 100) require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) - require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(100), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Set DC label for store 1. @@ -168,7 +168,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { // Try to get the minimum resolved timestamp of the store from TiKV. s.waitForMinSafeTS(dcLabel, 150) require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) - require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel)) + require.Equal(uint64(150), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } @@ -184,14 +184,14 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) // Make sure the store's min resolved ts is not initialized. s.waitForMinSafeTS(oracle.GlobalTxnScope, 0) - require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(0), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Try to get the minimum resolved timestamp of the cluster from PD. require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) // Make sure the store's min resolved ts is not regarded as MaxUint64. s.waitForMinSafeTS(oracle.GlobalTxnScope, 100) - require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(100), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Fallback to KV Request when PD server not support get min resolved ts. @@ -199,7 +199,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { mockClient.SetKVSafeTS(150) // Make sure the minSafeTS can advance. s.waitForMinSafeTS(oracle.GlobalTxnScope, 150) - require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(150), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } diff --git a/integration_tests/pipelined_memdb_test.go b/integration_tests/pipelined_memdb_test.go index 025d9df224..f05e191d9c 100644 --- a/integration_tests/pipelined_memdb_test.go +++ b/integration_tests/pipelined_memdb_test.go @@ -76,7 +76,7 @@ func (s *testPipelinedMemDBSuite) TearDownTest() { } func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ diff --git a/integration_tests/snapshot_fail_test.go b/integration_tests/snapshot_fail_test.go index 17ac9436a1..896e04fc28 100644 --- a/integration_tests/snapshot_fail_test.go +++ b/integration_tests/snapshot_fail_test.go @@ -296,7 +296,7 @@ func (s *testSnapshotFailSuite) TestResetSnapshotTS() { } func (s *testSnapshotFailSuite) getLock(key []byte) *txnkv.Lock { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ diff --git a/integration_tests/snapshot_test.go b/integration_tests/snapshot_test.go index 3f0ef9716f..4453a6fbf9 100644 --- a/integration_tests/snapshot_test.go +++ b/integration_tests/snapshot_test.go @@ -47,7 +47,6 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/error" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv" @@ -411,7 +410,7 @@ func (s *testSnapshotSuite) TestSnapshotCacheBypassMaxUint64() { s.Nil(txn.Set([]byte("z"), []byte("z"))) s.Nil(txn.Commit(context.Background())) // cache version < math.MaxUint64 - startTS, err := s.store.GetTimestampWithRetry(tikv.NewNoopBackoff(context.Background()), oracle.GlobalTxnScope) + startTS, err := s.store.GetTimestampWithRetry(tikv.NewNoopBackoff(context.Background())) s.Nil(err) snapshot := s.store.GetSnapshot(startTS) snapshot.Get(context.Background(), []byte("x")) diff --git a/integration_tests/store_test.go b/integration_tests/store_test.go index 8f61b42896..c03fe7f73c 100644 --- a/integration_tests/store_test.go +++ b/integration_tests/store_test.go @@ -73,9 +73,9 @@ func (s *testStoreSuite) TestOracle() { s.store.SetOracle(o) ctx := context.Background() - t1, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope) + t1, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil)) s.Nil(err) - t2, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope) + t2, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil)) s.Nil(err) s.Less(t1, t2) @@ -101,7 +101,7 @@ func (s *testStoreSuite) TestOracle() { go func() { defer wg.Done() - t3, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 5000, nil), oracle.GlobalTxnScope) + t3, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 5000, nil)) s.Nil(err) s.Less(t2, t3) expired := s.store.GetOracle().IsExpired(t2, 50, &oracle.Option{}) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index e7a5070e9f..46760669fa 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1237,7 +1237,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope req.EnableStaleWithMixedReplicaRead() req.ReplicaReadType = kv.ReplicaReadFollower @@ -1420,7 +1419,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope req.EnableStaleWithMixedReplicaRead() bo := retry.NewBackoffer(context.Background(), -1) resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) @@ -1443,7 +1441,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestPreferLeader() { // make request req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadPreferLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope // setup mock client s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { @@ -1606,7 +1603,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() { req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadMixed, nil) req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope downStore := s.cluster.GetStore(s.storeIDs[2]) s.cluster.MarkPeerDown(s.peerIDs[2]) diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 0719c2c22f..589aa63d89 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -949,7 +949,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS() } getTS := func() uint64 { - ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{}) s.NoError(err) return ts } diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 980a4cf3aa..86701601ae 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -2836,7 +2836,6 @@ func (ca *replicaSelectorAccessPathCase) buildRequest(s *testReplicaSelectorSuit if ca.staleRead { req.EnableStaleWithMixedReplicaRead() req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope } else { req.ReplicaReadType = ca.readType req.ReplicaRead = ca.readType.IsFollowerRead() diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 7d4583ff22..bc237d979c 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -299,16 +299,6 @@ func (s *Store) IsStoreMatch(stores []uint64) bool { return false } -// GetLabelValue returns the value of the label -func (s *Store) GetLabelValue(key string) (string, bool) { - for _, label := range s.labels { - if label.Key == key { - return label.Value, true - } - } - return "", false -} - // IsSameLabels returns whether the store have the same labels with target labels func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool { if len(s.labels) != len(labels) { diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index fee5e0087c..68af63bf90 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -146,7 +146,7 @@ type pdOracle struct { state adaptiveUpdateTSIntervalState } - // When the low resolution ts is not new enough and there are many concurrent stane read / snapshot read + // When the low resolution ts is not new enough and there are many concurrent stale read / snapshot read // operations that needs to validate the read ts, we can use this to avoid too many concurrent GetTS calls by // reusing a result for different `ValidateReadTS` calls. This can be done because that // we don't require the ts for validation to be strictly the latest one. @@ -192,7 +192,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e go o.updateTS(ctx) } // Initialize the timestamp of the global txnScope by Get. - _, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + _, err := o.GetTimestamp(ctx, &oracle.Option{}) if err != nil { o.Close() return nil, err @@ -203,7 +203,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e // IsExpired returns whether lockTS+TTL is expired, both are ms. It uses `lastTS` // to compare, may return false negative result temporarily. func (o *pdOracle) IsExpired(lockTS, TTL uint64, opt *oracle.Option) bool { - lastTS, exist := o.getLastTS(opt.TxnScope) + lastTS, exist := o.getLastTS() if !exist { return true } @@ -212,11 +212,11 @@ func (o *pdOracle) IsExpired(lockTS, TTL uint64, opt *oracle.Option) bool { // GetTimestamp gets a new increasing time. func (o *pdOracle) GetTimestamp(ctx context.Context, opt *oracle.Option) (uint64, error) { - ts, err := o.getTimestamp(ctx, opt.TxnScope) + ts, err := o.getTimestamp(ctx) if err != nil { return 0, err } - o.setLastTS(ts, opt.TxnScope) + o.setLastTS(ts) return ts, nil } @@ -227,8 +227,7 @@ func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, err type tsFuture struct { tso.TSFuture - o *pdOracle - txnScope string + o *pdOracle } // Wait implements the oracle.Future interface. @@ -240,12 +239,12 @@ func (f *tsFuture) Wait() (uint64, error) { return 0, errors.WithStack(err) } ts := oracle.ComposeTS(physical, logical) - f.o.setLastTS(ts, f.txnScope) + f.o.setLastTS(ts) return ts, nil } func (o *pdOracle) GetTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future { - return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope} + return &tsFuture{o.c.GetTSAsync(ctx), o} } func (o *pdOracle) getTimestamp(ctx context.Context) (uint64, error) { @@ -300,8 +299,8 @@ func (o *pdOracle) setLastTS(ts uint64) { } } -func (o *pdOracle) getLastTS(txnScope string) (uint64, bool) { - last, exist := o.getLastTSWithArrivalTS(txnScope) +func (o *pdOracle) getLastTS() (uint64, bool) { + last, exist := o.getLastTSWithArrivalTS() if !exist { return 0, false } @@ -452,7 +451,7 @@ func (o *pdOracle) updateTS(ctx context.Context) { doUpdate := func(now time.Time) { ts, err := o.getTimestamp(ctx) if err != nil { - logutil.Logger(ctx).Error("updateTS error", zap.String("txnScope", txnScope), zap.Error(err)) + logutil.Logger(ctx).Error("updateTS error", zap.Error(err)) } o.setLastTS(ts) @@ -493,7 +492,7 @@ func (o *pdOracle) updateTS(ctx context.Context) { // UntilExpired implement oracle.Oracle interface. func (o *pdOracle) UntilExpired(lockTS uint64, TTL uint64, opt *oracle.Option) int64 { - lastTS, ok := o.getLastTS(opt.TxnScope) + lastTS, ok := o.getLastTS() if !ok { return 0 } @@ -556,19 +555,19 @@ func (o *pdOracle) SetLowResolutionTimestampUpdateInterval(newUpdateInterval tim // GetLowResolutionTimestamp gets a new increasing time. func (o *pdOracle) GetLowResolutionTimestamp(ctx context.Context, opt *oracle.Option) (uint64, error) { - lastTS, ok := o.getLastTS(opt.TxnScope) + lastTS, ok := o.getLastTS() if !ok { - return 0, errors.Errorf("get low resolution timestamp fail, invalid txnScope = %s", opt.TxnScope) + return 0, errors.New("get low resolution timestamp fail") } return lastTS, nil } func (o *pdOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future { - lastTS, ok := o.getLastTS(opt.TxnScope) + lastTS, ok := o.getLastTS() if !ok { return lowResolutionTsFuture{ ts: 0, - err: errors.Errorf("get low resolution timestamp async fail, invalid txnScope = %s", opt.TxnScope), + err: errors.New("get low resolution timestamp async fail"), } } return lowResolutionTsFuture{ @@ -598,11 +597,11 @@ func (o *pdOracle) getStaleTimestampWithLastTS(last *lastTSO, prevSecond uint64) // GetStaleTimestamp generate a TSO which represents for the TSO prevSecond secs ago. func (o *pdOracle) GetStaleTimestamp(ctx context.Context, prevSecond uint64) (ts uint64, err error) { - ts, err = o.getStaleTimestamp(txnScope, prevSecond) + ts, err = o.getStaleTimestamp(prevSecond) if err != nil { if !strings.HasPrefix(err.Error(), "invalid prevSecond") { // If any error happened, we will try to fetch tso and set it as last ts. - _, tErr := o.GetTimestamp(ctx, &oracle.Option{TxnScope: txnScope}) + _, tErr := o.GetTimestamp(ctx, &oracle.Option{}) if tErr != nil { return 0, tErr } @@ -621,7 +620,8 @@ func (o *pdOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) { } func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Option) (uint64, error) { - ch := o.tsForValidation.DoChan(opt.TxnScope, func() (interface{}, error) { + // TODO: do we still need the group after the deprecation of TxnScope? + ch := o.tsForValidation.DoChan(oracle.GlobalTxnScope, func() (interface{}, error) { metrics.TiKVValidateReadTSFromPDCount.Inc() // If the call that triggers the execution of this function is canceled by the context, other calls that are @@ -649,7 +649,7 @@ func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRea return nil } - latestTSInfo, exists := o.getLastTSWithArrivalTS(opt.TxnScope) + latestTSInfo, exists := o.getLastTSWithArrivalTS() // If we fail to get latestTSInfo or the readTS exceeds it, get a timestamp from PD to double-check. // But we don't need to strictly fetch the latest TS. So if there are already concurrent calls to this function // loading the latest TS, we can just reuse the same result to avoid too many concurrent GetTS calls. @@ -671,7 +671,7 @@ func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRea estimatedCurrentTS, err := o.getStaleTimestampWithLastTS(latestTSInfo, 0) if err != nil { logutil.Logger(ctx).Warn("failed to estimate current ts by getSlateTimestamp for auto-adjusting update low resolution ts interval", - zap.Error(err), zap.Uint64("readTS", readTS), zap.String("txnScope", opt.TxnScope)) + zap.Error(err), zap.Uint64("readTS", readTS)) } else { o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, estimatedCurrentTS, time.Now()) } diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index 01c67c5e46..0c9fc5c0bc 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -53,7 +53,7 @@ func TestPDOracle_UntilExpired(t *testing.T) { start := time.Now() SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start)) lockTs := oracle.GoTimeToTS(start.Add(time.Duration(lockAfter)*time.Millisecond)) + 1 - waitTs := o.UntilExpired(lockTs, uint64(lockExp), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + waitTs := o.UntilExpired(lockTs, uint64(lockExp), &oracle.Option{}) assert.Equal(t, int64(lockAfter+lockExp), waitTs) } @@ -62,15 +62,15 @@ func TestPdOracle_GetStaleTimestamp(t *testing.T) { start := time.Now() SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start)) - ts, err := o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, 10) + ts, err := o.GetStaleTimestamp(context.Background(), 10) assert.Nil(t, err) assert.WithinDuration(t, start.Add(-10*time.Second), oracle.GetTimeFromTS(ts), 2*time.Second) - _, err = o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, 1e12) + _, err = o.GetStaleTimestamp(context.Background(), 1e12) assert.NotNil(t, err) assert.Regexp(t, ".*invalid prevSecond.*", err.Error()) - _, err = o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, math.MaxUint64) + _, err = o.GetStaleTimestamp(context.Background(), math.MaxUint64) assert.NotNil(t, err) assert.Regexp(t, ".*invalid prevSecond.*", err.Error()) } @@ -168,7 +168,7 @@ func TestNonFutureStaleTSO(t *testing.T) { case <-closeCh: break CHECK default: - ts, err := o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, 0) + ts, err := o.GetStaleTimestamp(context.Background(), 0) assert.Nil(t, err) staleTime := oracle.GetTimeFromTS(ts) if staleTime.After(upperBound) && time.Since(now) < time.Millisecond /* only check staleTime within 1ms */ { @@ -352,7 +352,7 @@ func TestValidateReadTS(t *testing.T) { defer o.Close() ctx := context.Background() - opt := &oracle.Option{TxnScope: oracle.GlobalTxnScope} + opt := &oracle.Option{} // Always returns error for MaxUint64 err = o.ValidateReadTS(ctx, math.MaxUint64, staleRead, opt) @@ -423,7 +423,7 @@ func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) { asyncValidate := func(ctx context.Context, readTS uint64) chan error { ch := make(chan error, 1) go func() { - err := o.ValidateReadTS(ctx, readTS, true, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + err := o.ValidateReadTS(ctx, readTS, true, &oracle.Option{}) ch <- err }() return ch @@ -521,7 +521,7 @@ func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) { defer o.Close() ctx := context.Background() - opt := &oracle.Option{TxnScope: oracle.GlobalTxnScope} + opt := &oracle.Option{} // Validating read ts for non-stale-read requests must not trigger updating the adaptive update interval of // low resolution ts. @@ -574,9 +574,9 @@ func TestSetLastTSAlwaysPushTS(t *testing.T) { return default: } - ts, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err := o.GetTimestamp(ctx, &oracle.Option{}) assert.NoError(t, err) - lastTS, found := o.getLastTS(oracle.GlobalTxnScope) + lastTS, found := o.getLastTS() assert.True(t, found) assert.GreaterOrEqual(t, lastTS, ts) } diff --git a/tikv/interface.go b/tikv/interface.go index f1c674334e..9373e7dfc4 100644 --- a/tikv/interface.go +++ b/tikv/interface.go @@ -80,8 +80,8 @@ type Storage interface { Close() error // UUID return a unique ID which represents a Storage. UUID() string - // CurrentTimestamp returns current timestamp with the given txnScope (local or global). - CurrentTimestamp(txnScope string) (uint64, error) + // CurrentTimestamp returns current timestamp. + CurrentTimestamp() (uint64, error) // GetOracle gets a timestamp oracle client. GetOracle() oracle.Oracle // SupportDeleteRange gets the storage support delete range or not. diff --git a/tikv/kv.go b/tikv/kv.go index cd5f17d7ba..4510eaed91 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -143,8 +143,8 @@ type KVStore struct { // it indicates the safe timestamp point that can be used to read consistent but may not the latest data. safeTSMap sync.Map - // MinSafeTs stores the minimum ts value for each txnScope - minSafeTS sync.Map + // MinSafeTs stores the minimum ts value + minSafeTS atomic.Uint64 replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled @@ -377,9 +377,6 @@ func (s *KVStore) Begin(opts ...TxnOption) (txn *transaction.KVTxn, err error) { opt(options) } - if options.TxnScope == "" { - options.TxnScope = oracle.GlobalTxnScope - } var ( startTS uint64 ) @@ -387,7 +384,7 @@ func (s *KVStore) Begin(opts ...TxnOption) (txn *transaction.KVTxn, err error) { startTS = *options.StartTS } else { bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil) - startTS, err = s.getTimestampWithRetry(bo, options.TxnScope) + startTS, err = s.getTimestampWithRetry(bo) if err != nil { return nil, err } @@ -454,10 +451,10 @@ func (s *KVStore) UUID() string { return s.uuid } -// CurrentTimestamp returns current timestamp with the given txnScope (local or global). -func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) { +// CurrentTimestamp returns current timestamp. +func (s *KVStore) CurrentTimestamp() (uint64, error) { bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil) - startTS, err := s.getTimestampWithRetry(bo, txnScope) + startTS, err := s.getTimestampWithRetry(bo) if err != nil { return 0, err } @@ -475,11 +472,11 @@ func (s *KVStore) CurrentAllTSOKeyspaceGroupMinTs() (uint64, error) { } // GetTimestampWithRetry returns latest timestamp. -func (s *KVStore) GetTimestampWithRetry(bo *Backoffer, scope string) (uint64, error) { - return s.getTimestampWithRetry(bo, scope) +func (s *KVStore) GetTimestampWithRetry(bo *Backoffer) (uint64, error) { + return s.getTimestampWithRetry(bo) } -func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, error) { +func (s *KVStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("TiKVStore.getTimestampWithRetry", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -487,7 +484,7 @@ func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, } for { - startTS, err := s.oracle.GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: txnScope}) + startTS, err := s.oracle.GetTimestamp(bo.GetCtx(), &oracle.Option{}) // mockGetTSErrorInRetry should wait MockCommitErrorOnce first, then will run into retry() logic. // Then mockGetTSErrorInRetry will return retryable error when first retry. // Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout @@ -603,21 +600,18 @@ func (s *KVStore) GetTiKVClient() (client Client) { return s.clientMu.client } -// GetMinSafeTS return the minimal safeTS of the storage with given txnScope. -func (s *KVStore) GetMinSafeTS(txnScope string) uint64 { - if val, ok := s.minSafeTS.Load(txnScope); ok { - return val.(uint64) - } - return 0 +// GetMinSafeTS return the minimal safeTS of the storage. +func (s *KVStore) GetMinSafeTS() uint64 { + return s.minSafeTS.Load() } -func (s *KVStore) setMinSafeTS(txnScope string, safeTS uint64) { +func (s *KVStore) setMinSafeTS(safeTS uint64) { // ensure safeTS is not set to max uint64 if safeTS == math.MaxUint64 { - logutil.AssertWarn(logutil.BgLogger(), "skip setting min-safe-ts to max uint64", zap.String("txnScope", txnScope), zap.Stack("stack")) + logutil.AssertWarn(logutil.BgLogger(), "skip setting min-safe-ts to max uint64", zap.Stack("stack")) return } - s.minSafeTS.Store(txnScope, safeTS) + s.minSafeTS.Store(safeTS) } // Ctx returns ctx. @@ -663,13 +657,13 @@ func (s *KVStore) setSafeTS(storeID, safeTS uint64) { s.safeTSMap.Store(storeID, safeTS) } -func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { +func (s *KVStore) updateMinSafeTS(storeIDs []uint64) { minSafeTS := uint64(math.MaxUint64) // when there is no store, return 0 in order to let minStartTS become startTS directly // actually storeIDs won't be empty since updateMinSafeTS is only called by updateSafeTS and updateSafeTS builds // txnScopeMap with non-empty values. here we check it to make the logic more robust. if len(storeIDs) < 1 { - s.setMinSafeTS(txnScope, 0) + s.setMinSafeTS(0) return } for _, store := range storeIDs { @@ -687,7 +681,7 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { if minSafeTS == math.MaxUint64 { minSafeTS = 0 } - s.setMinSafeTS(txnScope, minSafeTS) + s.setMinSafeTS(minSafeTS) } func (s *KVStore) safeTSUpdater() { @@ -712,7 +706,7 @@ func (s *KVStore) safeTSUpdater() { func (s *KVStore) updateSafeTS(ctx context.Context) { // Try to get the cluster-level minimum resolved timestamp from PD first. - if s.updateGlobalTxnScopeTSFromPD(ctx) { + if s.updateTSFromPD(ctx) { return } @@ -788,18 +782,12 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds()) }(ctx, wg, storeID, storeAddr) } - - txnScopeMap := make(map[string][]uint64) + storeIDs = make([]uint64, 0, len(stores)) for _, store := range stores { - txnScopeMap[oracle.GlobalTxnScope] = append(txnScopeMap[oracle.GlobalTxnScope], store.StoreID()) - - if label, ok := store.GetLabelValue(DCLabelKey); ok { - txnScopeMap[label] = append(txnScopeMap[label], store.StoreID()) - } - } - for txnScope, storeIDs := range txnScopeMap { - s.updateMinSafeTS(txnScope, storeIDs) + storeIDs = append(storeIDs, store.StoreID()) } + s.updateMinSafeTS(storeIDs) + wg.Wait() } @@ -837,9 +825,11 @@ var ( clusterMinSafeTSGap = metrics.TiKVMinSafeTSGapSeconds.WithLabelValues("cluster") ) -// updateGlobalTxnScopeTSFromPD check whether it is needed to get cluster-level's min resolved ts from PD -// to update min safe ts for global txn scope. -func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { +// updateTSFromPD check whether it is needed to get cluster-level's min resolved ts from PD +// to update min safe ts +func (s *KVStore) updateTSFromPD(ctx context.Context) bool { + // TODO: confirm the logic here. Is the check condition txn_scope or "zone" label? + //isGlobal := true isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope // Try to get the minimum resolved timestamp of the cluster from PD. if s.pdHttpClient != nil && isGlobal { @@ -848,7 +838,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err)) } else if isValidSafeTS(clusterMinSafeTS) { // Update ts and metrics. - preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope) + preClusterMinSafeTS := s.GetMinSafeTS() // preClusterMinSafeTS is guaranteed to be less than math.MaxUint64 (by this method and setMinSafeTS) // related to https://github.com/tikv/client-go/issues/991 if preClusterMinSafeTS > clusterMinSafeTS { @@ -856,7 +846,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS) clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds()) } else { - s.setMinSafeTS(oracle.GlobalTxnScope, clusterMinSafeTS) + s.setMinSafeTS(clusterMinSafeTS) successSafeTSUpdateCounter.Inc() safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS) clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds()) @@ -944,13 +934,6 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...opt.C // TxnOption configures Transaction type TxnOption func(*transaction.TxnOptions) -// WithTxnScope sets the TxnScope to txnScope -func WithTxnScope(txnScope string) TxnOption { - return func(st *transaction.TxnOptions) { - st.TxnScope = txnScope - } -} - // WithStartTS sets the StartTS to startTS func WithStartTS(startTS uint64) TxnOption { return func(st *transaction.TxnOptions) { diff --git a/tikv/kv_test.go b/tikv/kv_test.go index 1a9a4618fe..d2de5c5477 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -152,12 +152,12 @@ func (s *testKVSuite) TestMinSafeTsFromStores() { s.store.SetTiKVClient(mockClient) s.Eventually(func() bool { - ts := s.store.GetMinSafeTS(oracle.GlobalTxnScope) + ts := s.store.GetMinSafeTS() s.Require().False(math.MaxUint64 == ts) return ts == mockClient.tiflashSafeTs }, 15*time.Second, time.Second) s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2)) - s.Require().Equal(mockClient.tiflashSafeTs, s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(mockClient.tiflashSafeTs, s.store.GetMinSafeTS()) ok, ts := s.store.getSafeTS(s.tikvStoreID) s.Require().True(ok) s.Require().Equal(mockClient.tikvSafeTs, ts) @@ -174,7 +174,7 @@ func (s *testKVSuite) TestMinSafeTsFromStoresWithAllZeros() { return atomic.LoadInt32(&mockClient.requestCount) >= 4 }, 15*time.Second, time.Second) - s.Require().Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(0), s.store.GetMinSafeTS()) } func (s *testKVSuite) TestMinSafeTsFromStoresWithSomeZeros() { @@ -187,7 +187,7 @@ func (s *testKVSuite) TestMinSafeTsFromStoresWithSomeZeros() { return atomic.LoadInt32(&mockClient.requestCount) >= 4 }, 15*time.Second, time.Second) - s.Require().Equal(mockClient.tikvSafeTs, s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(mockClient.tikvSafeTs, s.store.GetMinSafeTS()) } func (s *testKVSuite) TestMinSafeTsFromPD() { @@ -197,12 +197,12 @@ func (s *testKVSuite) TestMinSafeTsFromPD() { return 90, nil, nil }) s.Eventually(func() bool { - ts := s.store.GetMinSafeTS(oracle.GlobalTxnScope) + ts := s.store.GetMinSafeTS() s.Require().False(math.MaxUint64 == ts) return ts == 90 }, 15*time.Second, time.Second) s.Require().Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) - s.Require().Equal(uint64(90), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(90), s.store.GetMinSafeTS()) } func (s *testKVSuite) TestMinSafeTsFromPDByStores() { @@ -216,12 +216,12 @@ func (s *testKVSuite) TestMinSafeTsFromPDByStores() { return math.MaxUint64, m, nil }) s.Eventually(func() bool { - ts := s.store.GetMinSafeTS(oracle.GlobalTxnScope) + ts := s.store.GetMinSafeTS() s.Require().False(math.MaxUint64 == ts) return ts == uint64(100)+s.tikvStoreID }, 15*time.Second, time.Second) s.Require().Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) - s.Require().Equal(uint64(100)+s.tikvStoreID, s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(100)+s.tikvStoreID, s.store.GetMinSafeTS()) } func (s *testKVSuite) TestMinSafeTsFromMixed1() { @@ -241,10 +241,10 @@ func (s *testKVSuite) TestMinSafeTsFromMixed1() { s.Eventually(func() bool { ts := s.store.GetMinSafeTS("z1") s.Require().False(math.MaxUint64 == ts) - return ts == uint64(10) && s.store.GetMinSafeTS(oracle.GlobalTxnScope) == uint64(10) + return ts == uint64(10) && s.store.GetMinSafeTS() == uint64(10) }, 15*time.Second, time.Second) s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) - s.Require().Equal(uint64(10), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(10), s.store.GetMinSafeTS()) s.Require().Equal(uint64(10), s.store.GetMinSafeTS("z1")) s.Require().Equal(mockClient.tiflashSafeTs, s.store.GetMinSafeTS("z2")) } @@ -266,10 +266,10 @@ func (s *testKVSuite) TestMinSafeTsFromMixed2() { s.Eventually(func() bool { ts := s.store.GetMinSafeTS("z2") s.Require().False(math.MaxUint64 == ts) - return ts == uint64(10) && s.store.GetMinSafeTS(oracle.GlobalTxnScope) == uint64(10) + return ts == uint64(10) && s.store.GetMinSafeTS() == uint64(10) }, 15*time.Second, time.Second) s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) - s.Require().Equal(uint64(10), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(10), s.store.GetMinSafeTS()) s.Require().Equal(mockClient.tikvSafeTs, s.store.GetMinSafeTS("z1")) s.Require().Equal(uint64(10), s.store.GetMinSafeTS("z2")) } diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 95228aa08d..6a445abd98 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -48,7 +48,6 @@ import ( "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" "github.com/tikv/client-go/v2/kv" - "github.com/tikv/client-go/v2/oracle" ) // CmdType represents the concrete request type in Request or response type in Response. @@ -248,11 +247,9 @@ type Request struct { Req interface{} kvrpcpb.Context ReadReplicaScope string - // remove txnScope after tidb removed txnScope - TxnScope string - ReplicaReadType kv.ReplicaReadType // different from `kvrpcpb.Context.ReplicaRead` - ReplicaReadSeed *uint32 // pointer to follower read seed in snapshot/coprocessor - StoreTp EndpointType + ReplicaReadType kv.ReplicaReadType // different from `kvrpcpb.Context.ReplicaRead` + ReplicaReadSeed *uint32 // pointer to follower read seed in snapshot/coprocessor + StoreTp EndpointType // ForwardedHost is the address of a store which will handle the request. It's different from // the address the request sent to. // If it's not empty, the store which receive the request will forward it to @@ -315,14 +312,6 @@ func (req *Request) DisableStaleReadMeetLock() { req.ReplicaReadType = kv.ReplicaReadLeader } -// IsGlobalStaleRead checks if the request is a global stale read request. -func (req *Request) IsGlobalStaleRead() bool { - return req.ReadReplicaScope == oracle.GlobalTxnScope && - // remove txnScope after tidb remove it - req.TxnScope == oracle.GlobalTxnScope && - req.GetStaleRead() -} - // IsDebugReq check whether the req is debug req. func (req *Request) IsDebugReq() bool { switch req.Type { diff --git a/txnkv/client.go b/txnkv/client.go index aa37333598..d277780ac5 100644 --- a/txnkv/client.go +++ b/txnkv/client.go @@ -22,7 +22,6 @@ import ( "github.com/pkg/errors" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" "github.com/tikv/client-go/v2/util" @@ -122,7 +121,7 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) { // GetTimestamp returns the current global timestamp. func (c *Client) GetTimestamp(ctx context.Context) (uint64, error) { bo := retry.NewBackofferWithVars(ctx, transaction.TsoMaxBackoff, nil) - startTS, err := c.GetTimestampWithRetry(bo, oracle.GlobalTxnScope) + startTS, err := c.GetTimestampWithRetry(bo) if err != nil { return 0, err } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 6a739b3616..3a689d2138 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -102,10 +102,10 @@ type kvstore interface { WaitScatterRegionFinish(ctx context.Context, regionID uint64, backOff int) error // GetTimestampWithRetry returns latest timestamp. - GetTimestampWithRetry(bo *retry.Backoffer, scope string) (uint64, error) + GetTimestampWithRetry(bo *retry.Backoffer) (uint64, error) // GetOracle gets a timestamp oracle client. GetOracle() oracle.Oracle - CurrentTimestamp(txnScope string) (uint64, error) + CurrentTimestamp() (uint64, error) // SendReq sends a request to TiKV. SendReq(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) // GetTiKVClient gets the client instance. @@ -535,7 +535,7 @@ func (c *twoPhaseCommitter) checkAssertionByPessimisticLockResults(ctx context.C func (c *twoPhaseCommitter) checkSchemaOnAssertionFail(ctx context.Context, assertionFailed *tikverr.ErrAssertionFailed) error { // If the schema has changed, it might be a false-positive. In this case we should return schema changed, which // is a usual case, instead of assertion failed. - ts, err := c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars), c.txn.GetScope()) + ts, err := c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars)) if err != nil { return err } @@ -1279,7 +1279,7 @@ func keepAlive( return } bo := retry.NewBackofferWithVars(context.Background(), keepAliveMaxBackoff, c.txn.vars) - now, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + now, err := c.store.GetTimestampWithRetry(bo) if err != nil { logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail", zap.Error(err)) @@ -1515,11 +1515,6 @@ func sendTxnHeartBeat( // checkAsyncCommit checks if async commit protocol is available for current transaction commit, true is returned if possible. func (c *twoPhaseCommitter) checkAsyncCommit() bool { - // Disable async commit in local transactions - if c.txn.GetScope() != oracle.GlobalTxnScope { - return false - } - // Don't use async commit when commitTSUpperBoundCheck is set. // For TiDB, this is used by cached table. if c.txn.commitTSUpperBoundCheck != nil { @@ -1546,10 +1541,6 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool { // checkOnePC checks if 1PC protocol is available for current transaction. func (c *twoPhaseCommitter) checkOnePC() bool { - // Disable 1PC in local transactions - if c.txn.GetScope() != oracle.GlobalTxnScope { - return false - } // Disable 1PC for transaction when commitTSUpperBoundCheck is set. if c.txn.commitTSUpperBoundCheck != nil { return false @@ -1761,7 +1752,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if commitTSMayBeCalculated && c.needLinearizability() { util.EvalFailpoint("getMinCommitTSFromTSO") start := time.Now() - latestTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + latestTS, err := c.store.GetTimestampWithRetry(bo) // If we fail to get a timestamp from PD, we just propagate the failure // instead of falling back to the normal 2PC because a normal 2PC will // also be likely to fail due to the same timestamp issue. @@ -1896,7 +1887,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } else { start = time.Now() logutil.Event(ctx, "start get commit ts") - commitTS, err = c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars), c.txn.GetScope()) + commitTS, err = c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars)) if err != nil { logutil.Logger(ctx).Warn("2PC get commitTS failed", zap.Error(err), @@ -1916,7 +1907,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } atomic.StoreUint64(&c.commitTS, commitTS) - if c.store.GetOracle().IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) { + if c.store.GetOracle().IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{}) { err = errors.Errorf("session %d txn takes too much time, txnStartTS: %d, comm: %d", c.sessionID, c.startTS, c.commitTS) return err diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index 749b85a5b2..4dbcbfd5a3 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -179,7 +179,7 @@ func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Bac } // Update commit ts and retry. - commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + commitTS, err := c.store.GetTimestampWithRetry(bo) if err != nil { logutil.Logger(bo.GetCtx()).Warn("2PC get commitTS failed", zap.Error(err), diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index d2fe35cf9e..76fc0c2bc7 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -311,7 +311,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))), zap.Uint64("startTS", c.startTS), ) - commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + commitTS, err := c.store.GetTimestampWithRetry(bo) if err != nil { logutil.Logger(bo.GetCtx()).Warn("[pipelined dml] commit transaction get commitTS failed", zap.Error(err), diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 49fd67493f..aea0418043 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -122,7 +122,6 @@ type PipelinedTxnOptions struct { // TxnOptions indicates the option when beginning a transaction. // TxnOptions are set by the TxnOption values passed to Begin type TxnOptions struct { - TxnScope string StartTS *uint64 PipelinedTxn PipelinedTxnOptions } @@ -183,7 +182,6 @@ type KVTxn struct { enableAsyncCommit bool enable1PC bool causalConsistency bool - scope string kvFilter KVFilter resourceGroupTag []byte resourceGroupTagger tikvrpc.ResourceGroupTagger // use this when resourceGroupTag is nil @@ -223,7 +221,6 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, startTime: time.Now(), valid: true, vars: tikv.DefaultVars, - scope: options.TxnScope, enableAsyncCommit: cfg.EnableAsyncCommit, enable1PC: cfg.Enable1PC, diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, @@ -466,11 +463,6 @@ func (txn *KVTxn) SetCausalConsistency(b bool) { txn.causalConsistency = b } -// SetScope sets the geographical scope of the transaction. -func (txn *KVTxn) SetScope(scope string) { - txn.scope = scope -} - // SetKVFilter sets the filter to ignore key-values in memory buffer. func (txn *KVTxn) SetKVFilter(filter KVFilter) { txn.kvFilter = filter @@ -737,11 +729,6 @@ func (txn *KVTxn) IsCasualConsistency() bool { return txn.causalConsistency } -// GetScope returns the geographical scope of the transaction. -func (txn *KVTxn) GetScope() string { - return txn.scope -} - // Commit commits the transaction operations to KV store. func (txn *KVTxn) Commit(ctx context.Context) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { @@ -1007,7 +994,6 @@ func (txn *KVTxn) isInternal() bool { // TxnInfo is used to keep track the info of a committed transaction (mainly for diagnosis and testing) type TxnInfo struct { - TxnScope string `json:"txn_scope"` StartTS uint64 `json:"start_ts"` CommitTS uint64 `json:"commit_ts"` TxnCommitMode string `json:"txn_commit_mode"` @@ -1031,7 +1017,6 @@ func (txn *KVTxn) onCommitted(err error) { } info := TxnInfo{ - TxnScope: txn.GetScope(), StartTS: txn.startTS, CommitTS: txn.commitTS, TxnCommitMode: commitMode, @@ -1055,7 +1040,7 @@ func (txn *KVTxn) LockKeysWithWaitTime(ctx context.Context, lockWaitTime int64, forUpdateTs := txn.startTS if txn.IsPessimistic() { bo := retry.NewBackofferWithVars(context.Background(), TsoMaxBackoff, nil) - forUpdateTs, err = txn.store.GetTimestampWithRetry(bo, txn.scope) + forUpdateTs, err = txn.store.GetTimestampWithRetry(bo) if err != nil { return err } diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 3d1508e9f3..5a1b676159 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -554,7 +554,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio if !forRead { if status.ttl != 0 { metrics.LockResolverCountWithNotExpired.Inc() - msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{}) msBeforeTxnExpired.update(msBeforeLockExpired) continue } @@ -572,7 +572,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio canAccess = append(canAccess, l.TxnID) } else { metrics.LockResolverCountWithNotExpired.Inc() - msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{}) msBeforeTxnExpired.update(msBeforeLockExpired) } } @@ -639,7 +639,7 @@ func (t *txnExpireTime) value() int64 { func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary []byte) (TxnStatus, error) { var status TxnStatus bo := retry.NewBackoffer(context.Background(), getTxnStatusMaxBackoff) - currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{}) if err != nil { return status, err } @@ -658,7 +658,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle // Set currentTS to max uint64 to make the lock expired. currentTS = math.MaxUint64 } else { - currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{}) if err != nil { return TxnStatus{}, err } @@ -683,7 +683,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle return TxnStatus{ttl: l.TTL, action: kvrpcpb.Action_NoAction}, nil } - if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) <= 0 { + if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{}) <= 0 { logutil.Logger(bo.GetCtx()).Warn("lock txn not found, lock has expired", zap.Uint64("CallerStartTs", callerStartTS), zap.Stringer("lock str", l), @@ -816,7 +816,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary status.primaryLock = cmdResp.LockInfo if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit { - if !lr.store.GetOracle().IsExpired(txnID, cmdResp.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) { + if !lr.store.GetOracle().IsExpired(txnID, cmdResp.LockTtl, &oracle.Option{}) { status.ttl = cmdResp.LockTtl } } else if cmdResp.LockTtl != 0 { diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 57b91bd821..d903c51ff1 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -457,7 +457,6 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, matchStoreLabels := s.mu.matchStoreLabels replicaAdjuster := s.mu.replicaReadAdjuster s.mu.RUnlock() - req.TxnScope = scope req.ReadReplicaScope = scope if isStaleness { req.EnableStaleWithMixedReplicaRead() @@ -703,7 +702,6 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] scope := s.mu.readReplicaScope replicaAdjuster := s.mu.replicaReadAdjuster s.mu.RUnlock() - req.TxnScope = scope req.ReadReplicaScope = scope var ops []locate.StoreSelectorOption if isStaleness { @@ -897,13 +895,6 @@ func (s *KVSnapshot) SetRuntimeStats(stats *SnapshotRuntimeStats) { s.mu.stats = stats } -// SetTxnScope is same as SetReadReplicaScope, keep it in order to keep compatible for now. -func (s *KVSnapshot) SetTxnScope(scope string) { - s.mu.Lock() - defer s.mu.Unlock() - s.mu.readReplicaScope = scope -} - // SetReadReplicaScope set read replica scope func (s *KVSnapshot) SetReadReplicaScope(scope string) { s.mu.Lock()