diff --git a/config/config.go b/config/config.go index 8b062645a8..566ef9d9fd 100644 --- a/config/config.go +++ b/config/config.go @@ -75,9 +75,10 @@ type Config struct { OpenTracingEnable bool Path string EnableForwarding bool - TxnScope string - EnableAsyncCommit bool - Enable1PC bool + // FIXME: rename + TxnScope string + EnableAsyncCommit bool + Enable1PC bool // RegionsRefreshInterval indicates the interval of loading regions info, the unit is second, if RegionsRefreshInterval == 0, it will be disabled. RegionsRefreshInterval uint64 // EnablePreload indicates whether to preload region info when initializing the client. diff --git a/integration_tests/1pc_test.go b/integration_tests/1pc_test.go index c69bbdbcfe..67224322cc 100644 --- a/integration_tests/1pc_test.go +++ b/integration_tests/1pc_test.go @@ -135,7 +135,7 @@ func (s *testOnePCSuite) Test1PC() { // Check all keys keys := [][]byte{k1, k2, k3, k4, k5} values := [][]byte{v1, v2, v3, v4, v5New} - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) snap := s.store.GetSnapshot(ver) for i, k := range keys { @@ -161,7 +161,7 @@ func (s *testOnePCSuite) Test1PCIsolation() { // Make `txn`'s commitTs more likely to be less than `txn2`'s startTs if there's bug in commitTs // calculation. for i := 0; i < 10; i++ { - _, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + _, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) } @@ -215,7 +215,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() { s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS()) - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) snap := s.store.GetSnapshot(ver) for i, k := range keys { @@ -244,29 +244,6 @@ func (s *testOnePCSuite) Test1PCLinearizability() { s.Less(commitTS2, commitTS1) } -func (s *testOnePCSuite) Test1PCWithMultiDC() { - // It requires setting placement rules to run with TiKV - if *withTiKV { - return - } - - localTxn := s.begin1PC() - err := localTxn.Set([]byte("a"), []byte("a1")) - localTxn.SetScope("bj") - s.Nil(err) - err = localTxn.Commit(context.Background()) - s.Nil(err) - s.False(localTxn.GetCommitter().IsOnePC()) - - globalTxn := s.begin1PC() - err = globalTxn.Set([]byte("b"), []byte("b1")) - globalTxn.SetScope(oracle.GlobalTxnScope) - s.Nil(err) - err = globalTxn.Commit(context.Background()) - s.Nil(err) - s.True(globalTxn.GetCommitter().IsOnePC()) -} - func (s *testOnePCSuite) TestTxnCommitCounter() { initial := metrics.GetTxnCommitCounter() diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index fdad3b2969..2d19949b90 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -272,7 +272,7 @@ func (s *testCommitterSuite) TestPrewriteRollback() { err = committer.PrewriteAllMutations(ctx) s.Nil(err) } - commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) committer.SetCommitTS(commitTS) err = committer.CommitMutations(ctx) @@ -372,7 +372,7 @@ func (s *testCommitterSuite) mustGetRegionID(key []byte) uint64 { } func (s *testCommitterSuite) isKeyOptimisticLocked(key []byte) bool { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), 500, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ @@ -733,7 +733,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() { err = txn.LockKeys(context.Background(), lockCtx, key2) s.Nil(err) lockInfo := s.getLockInfo(key) - msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl, &oracle.Option{}) s.GreaterOrEqual(msBeforeLockExpired, int64(100)) lr := s.store.NewLockResolver() @@ -746,7 +746,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() { check := func() bool { lockInfoNew := s.getLockInfo(key) if lockInfoNew.LockTtl > lockInfo.LockTtl { - currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{}) s.Nil(err) // Check that the TTL is update to a reasonable range. expire := oracle.ExtractPhysical(txn.StartTS()) + int64(lockInfoNew.LockTtl) @@ -1272,7 +1272,7 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS) s.Equal(txn2.CommitTS(), lockCtx.Values["k8"].LockedWithConflictTS) // Update forUpdateTS to simulate a pessimistic retry. - newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + newForUpdateTS, err := s.store.CurrentTimestamp() s.Nil(err) s.GreaterOrEqual(newForUpdateTS, txn2.CommitTS()) lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()} @@ -1583,7 +1583,7 @@ func (s *testCommitterSuite) TestAggressiveLockingResetTTLManager() { s.True(txn.GetCommitter().IsTTLRunning()) // Get a new ts as the new forUpdateTS. - forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.NoError(err) lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) @@ -1662,7 +1662,7 @@ func (s *testCommitterSuite) testAggressiveLockingResetPrimaryAndTTLManagerAfter return } - forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + forUpdateTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.NoError(err) lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} key := []byte("k1") diff --git a/integration_tests/assertion_test.go b/integration_tests/assertion_test.go index f5699d53e2..2eea4e2300 100644 --- a/integration_tests/assertion_test.go +++ b/integration_tests/assertion_test.go @@ -27,7 +27,6 @@ import ( "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/kv" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" ) @@ -187,7 +186,7 @@ func (s *testAssertionSuite) TestPrewriteAssertion() { // When the test cases runs with TiKV, the TiKV cluster can be reused, thus there may be deleted versions caused by // previous tests. This test case may meet different behavior if there are deleted versions. To avoid it, compose a // key prefix with a timestamp to ensure the keys to be unique. - ts, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ts, err := s.store.CurrentTimestamp() s.Nil(err) prefix := fmt.Sprintf("test-prewrite-assertion-%d-", ts) s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Strict) @@ -199,7 +198,7 @@ func (s *testAssertionSuite) TestFastAssertion() { // When the test cases runs with TiKV, the TiKV cluster can be reused, thus there may be deleted versions caused by // previous tests. This test case may meet different behavior if there are deleted versions. To avoid it, compose a // key prefix with a timestamp to ensure the keys to be unique. - ts, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ts, err := s.store.CurrentTimestamp() s.Nil(err) prefix := fmt.Sprintf("test-fast-assertion-%d-", ts) s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Fast) diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 4c478f45a9..feb96cb437 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -122,7 +122,7 @@ func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, ex } func (s *testAsyncCommitCommon) mustGetLock(key []byte) *txnkv.Lock { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: key, @@ -230,7 +230,7 @@ func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, pr s.Nil(err) if commitPrimary { - commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) tpc.SetCommitTS(commitTS) err = tpc.CommitMutations(ctx) @@ -257,7 +257,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, []byte("z"), []byte("z"), false) lock := s.mustGetLock([]byte("z")) lock.UseAsyncCommit = true - ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) var lockutil txnlock.LockProbe status := lockutil.NewLockStatus(nil, true, ts) @@ -265,7 +265,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { resolver := tikv.NewLockResolverProb(s.store.GetLockResolver()) err = resolver.ResolveAsyncCommitLock(s.bo, lock, status) s.Nil(err) - currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) status, err = resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false, nil) s.Nil(err) @@ -273,7 +273,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { s.Equal(status.CommitTS(), ts) // One key is committed (i), one key is locked (a). Should get committed. - ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) commitTs := ts + 10 @@ -350,7 +350,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() { s.Equal(gotResolve, int64(1)) // One key has been rolled back (b), one is locked (a). Should be rolled back. - ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) commitTs = ts + 10 @@ -400,7 +400,7 @@ func (s *testAsyncCommitSuite) TestRepeatableRead() { txn1.Set([]byte("k1"), []byte("v2")) for i := 0; i < 20; i++ { - _, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + _, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) } @@ -445,31 +445,6 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() { s.Less(commitTS2, commitTS1) } -// TestAsyncCommitWithMultiDC tests that async commit can only be enabled in global transactions -func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC() { - // It requires setting placement rules to run with TiKV - if *withTiKV { - return - } - - localTxn := s.beginAsyncCommit() - err := localTxn.Set([]byte("a"), []byte("a1")) - localTxn.SetScope("bj") - s.Nil(err) - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - err = localTxn.Commit(ctx) - s.Nil(err) - s.False(localTxn.IsAsyncCommit()) - - globalTxn := s.beginAsyncCommit() - err = globalTxn.Set([]byte("b"), []byte("b1")) - globalTxn.SetScope(oracle.GlobalTxnScope) - s.Nil(err) - err = globalTxn.Commit(ctx) - s.Nil(err) - s.True(globalTxn.IsAsyncCommit()) -} - func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() { keys := [][]byte{[]byte("k0"), []byte("k1")} values := [][]byte{[]byte("v00"), []byte("v10")} @@ -620,7 +595,7 @@ func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() { lock := s.mustGetLock([]byte("a")) resolver := tikv.NewLockResolverProb(s.store.GetLockResolver()) for { - currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil) s.Nil(err) diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 2558f02770..e4e4a7af2f 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,12 +6,12 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 + github.com/pingcap/kvproto v0.0.0-20250205033218-ad14807ace91 github.com/pingcap/tidb v1.1.0-beta.0.20250208075453-ad2c9f464fde github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.10.0 github.com/tidwall/gjson v1.14.1 - github.com/tikv/client-go/v2 v2.0.8-0.20250207065327-ec9ad0fd70cf + github.com/tikv/client-go/v2 v2.0.8-0.20250214064015-a805ea84f300 github.com/tikv/pd/client v0.0.0-20250213082949-e8930327be42 go.uber.org/goleak v1.3.0 ) @@ -100,7 +100,7 @@ require ( golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.30.0 // indirect golang.org/x/text v0.21.0 // indirect - golang.org/x/time v0.9.0 // indirect + golang.org/x/time v0.10.0 // indirect golang.org/x/tools v0.29.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect @@ -114,5 +114,7 @@ require ( replace ( github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117 + github.com/pingcap/tidb => github.com/ekexium/tidb v1.1.0-beta.0.20250227061106-e2b72ed83f4e + github.com/tikv/client-go/v2 => ../ ) diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 479f65609a..b5e9e42fb9 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -974,6 +974,8 @@ github.com/dolthub/swiss v0.2.1/go.mod h1:8AhKZZ1HK7g18j7v7k6c5cYIGEZJcPn0ARsai8 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/ekexium/tidb v1.1.0-beta.0.20250227061106-e2b72ed83f4e h1:pA8jJKkP28hCaRMsbEjIq8DO1gRyiVNVkX8WFyKfjJY= +github.com/ekexium/tidb v1.1.0-beta.0.20250227061106-e2b72ed83f4e/go.mod h1:jpXKt+hLf9exV5W21JwGM774Sx1373pMEQvIB7BAFV0= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -1351,16 +1353,14 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= -github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E= -github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20250205033218-ad14807ace91 h1:immgftBDX85+LT5elPSY4AKV3I8yzKsaMQC7wC3rX2Q= +github.com/pingcap/kvproto v0.0.0-20250205033218-ad14807ace91/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8= github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= github.com/pingcap/sysutil v1.0.1-0.20241113070546-23b50de46fd3 h1:Q9CMGKUztbM0RWHdQu0pD9b9OC47sbISRiMvf9vJ2RY= github.com/pingcap/sysutil v1.0.1-0.20241113070546-23b50de46fd3/go.mod h1:tyo4AX5P7udiSKN0Mv3nD9DcUnuLLmFfE22+dEs4vbU= -github.com/pingcap/tidb v1.1.0-beta.0.20250208075453-ad2c9f464fde h1:H0TA7G0+QDgvylJN+XpL1I+VY+rhk0fbWWschPOS1ec= -github.com/pingcap/tidb v1.1.0-beta.0.20250208075453-ad2c9f464fde/go.mod h1:c7tIXSA8zZEgobZi3+j6FMGvts7rd6tkjf/m2o9GZCs= github.com/pingcap/tidb/pkg/parser v0.0.0-20250123092444-cf4d252af8aa h1:YTfITllgt53Htbu0gDYOYpeBn6k9ZM8kt1ZmtZHbOlA= github.com/pingcap/tidb/pkg/parser v0.0.0-20250123092444-cf4d252af8aa/go.mod h1:Hju1TEWZvrctQKbztTRwXH7rd41Yq0Pgmq4PrEKcq7o= github.com/pingcap/tipb v0.0.0-20241212101007-246f91188357 h1:s58UXyaWMNeaoeuVPZdrkm5Uk7NcODHqICGCUQ3A9s4= @@ -1943,8 +1943,8 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= -golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index 0019027849..b55bd36310 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -120,7 +120,7 @@ func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, ttl s.Nil(err) if commitPrimary { - commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) tpc.SetCommitTS(commitTS) err = tpc.CommitMutations(ctx) @@ -262,7 +262,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL() { bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) lr := s.store.NewLockResolver() - callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{}) s.Nil(err) // Check the lock TTL of a transaction. @@ -324,7 +324,7 @@ func (s *testLockSuite) TestCheckTxnStatus() { s.prewriteTxnWithTTL(txn, 1000) o := s.store.GetOracle() - currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) s.Greater(currentTS, txn.StartTS()) @@ -351,7 +351,7 @@ func (s *testLockSuite) TestCheckTxnStatus() { s.Equal(timeBeforeExpire, int64(0)) // Then call getTxnStatus again and check the lock status. - currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) status, err = s.store.NewLockResolver().GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false, nil) s.Nil(err) @@ -383,7 +383,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() { s.Nil(err) o := s.store.GetOracle() - currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) resolver := s.store.NewLockResolver() @@ -412,7 +412,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() { s.Nil(committer.CleanupMutations(context.Background())) // Call getTxnStatusFromLock to cover TxnNotFound and retry timeout. - startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{}) s.Nil(err) lock = &txnkv.Lock{ Key: []byte("second"), @@ -443,7 +443,7 @@ func (s *testLockSuite) prewriteTxnWithTTL(txn transaction.TxnProbe, ttl uint64) } func (s *testLockSuite) mustGetLock(key []byte) *txnkv.Lock { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ @@ -538,9 +538,9 @@ func (s *testLockSuite) TestBatchResolveLocks() { } // Locks may not expired - msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL, &oracle.Option{}) s.Greater(msBeforeLockExpired, int64(0)) - msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{}) s.Greater(msBeforeLockExpired, int64(0)) lr := s.store.NewLockResolver() @@ -961,10 +961,10 @@ func (s *testLockSuite) TestResolveLocksForRead() { s.Nil(committer.PrewriteAllMutations(ctx)) committer.SetPrimaryKey([]byte("k66")) - readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + readStartTS, err = s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) - commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{}) s.Nil(err) s.Greater(commitTS, readStartTS) committer.SetCommitTS(commitTS) @@ -1044,7 +1044,7 @@ func (s *testLockWithTiKVSuite) cleanupLocks() { // Cleanup possible left locks. bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + currentTS, err := s.store.CurrentTimestamp() s.NoError(err) remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS) s.NoError(err) @@ -1358,7 +1358,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() { // There must be a new tso allocation before committing if any key is locked with conflict, otherwise // async commit will become unsafe. // In TiDB, there must be a tso allocation for updating forUpdateTS. - currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + currentTS, err := s.store.CurrentTimestamp() s.NoError(err) lockCtx = kv.NewLockCtx(currentTS, 200, lockTime) // k4: non-conflicting key but forUpdateTS updated @@ -1429,7 +1429,7 @@ func (s *testLockWithTiKVSuite) TestCheckTxnStatusSentToSecondary() { // * k1: stale pessimistic lock, primary // * k2: stale pessimistic lock, primary -> k1 - forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + forUpdateTS, err := s.store.CurrentTimestamp() s.NoError(err) lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now()) err = txn.LockKeys(ctx, lockCtx, k3) // k3 becomes primary @@ -1467,7 +1467,7 @@ func (s *testLockWithTiKVSuite) TestCheckTxnStatusSentToSecondary() { } // Check data consistency - readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + readTS, err := s.store.CurrentTimestamp() s.NoError(err) snapshot := s.store.GetSnapshot(readTS) v, err := snapshot.Get(ctx, k3) @@ -1520,7 +1520,7 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { // k1 has txn1's stale pessimistic lock now. - forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + forUpdateTS, err := s.store.CurrentTimestamp() s.NoError(err) lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now()) s.NoError(txn1.LockKeys(ctx, lockCtx, k2, k3)) @@ -1540,7 +1540,7 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { s.NoError(txn2.Rollback()) // k4 has txn2's stale primary pessimistic lock now. - currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + currentTS, err := s.store.CurrentTimestamp() remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS) s.NoError(err) @@ -1564,7 +1564,7 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { s.Empty(remainingLocks) // Check data consistency - readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + readTS, err := s.store.CurrentTimestamp() snapshot := s.store.GetSnapshot(readTS) _, err = snapshot.Get(ctx, k1) s.Equal(tikverr.ErrNotExist, err) diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index 0266f1aa51..def7f0f152 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -126,13 +126,13 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() { require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) s.waitForMinSafeTS(dcLabel, 100) require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount)) - require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel)) + require.Equal(uint64(100), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) { s.Eventually(func() bool { - return s.store.GetMinSafeTS(txnScope) == ts + return s.store.GetMinSafeTS() == ts }, time.Second, 200*time.Millisecond) } @@ -145,7 +145,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) s.waitForMinSafeTS(oracle.GlobalTxnScope, 100) require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) - require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(100), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Set DC label for store 1. @@ -168,7 +168,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { // Try to get the minimum resolved timestamp of the store from TiKV. s.waitForMinSafeTS(dcLabel, 150) require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) - require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel)) + require.Equal(uint64(150), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } @@ -184,14 +184,14 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) // Make sure the store's min resolved ts is not initialized. s.waitForMinSafeTS(oracle.GlobalTxnScope, 0) - require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(0), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Try to get the minimum resolved timestamp of the cluster from PD. require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) // Make sure the store's min resolved ts is not regarded as MaxUint64. s.waitForMinSafeTS(oracle.GlobalTxnScope, 100) - require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(100), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Fallback to KV Request when PD server not support get min resolved ts. @@ -199,7 +199,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { mockClient.SetKVSafeTS(150) // Make sure the minSafeTS can advance. s.waitForMinSafeTS(oracle.GlobalTxnScope, 150) - require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(150), s.store.GetMinSafeTS()) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } diff --git a/integration_tests/pipelined_memdb_test.go b/integration_tests/pipelined_memdb_test.go index 025d9df224..45031895a9 100644 --- a/integration_tests/pipelined_memdb_test.go +++ b/integration_tests/pipelined_memdb_test.go @@ -29,7 +29,6 @@ import ( "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -76,7 +75,7 @@ func (s *testPipelinedMemDBSuite) TearDownTest() { } func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ diff --git a/integration_tests/snapshot_fail_test.go b/integration_tests/snapshot_fail_test.go index 17ac9436a1..7eef63fd26 100644 --- a/integration_tests/snapshot_fail_test.go +++ b/integration_tests/snapshot_fail_test.go @@ -44,7 +44,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv" @@ -296,7 +295,7 @@ func (s *testSnapshotFailSuite) TestResetSnapshotTS() { } func (s *testSnapshotFailSuite) getLock(key []byte) *txnkv.Lock { - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + ver, err := s.store.CurrentTimestamp() s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ diff --git a/integration_tests/snapshot_test.go b/integration_tests/snapshot_test.go index 3f0ef9716f..4453a6fbf9 100644 --- a/integration_tests/snapshot_test.go +++ b/integration_tests/snapshot_test.go @@ -47,7 +47,6 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/error" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv" @@ -411,7 +410,7 @@ func (s *testSnapshotSuite) TestSnapshotCacheBypassMaxUint64() { s.Nil(txn.Set([]byte("z"), []byte("z"))) s.Nil(txn.Commit(context.Background())) // cache version < math.MaxUint64 - startTS, err := s.store.GetTimestampWithRetry(tikv.NewNoopBackoff(context.Background()), oracle.GlobalTxnScope) + startTS, err := s.store.GetTimestampWithRetry(tikv.NewNoopBackoff(context.Background())) s.Nil(err) snapshot := s.store.GetSnapshot(startTS) snapshot.Get(context.Background(), []byte("x")) diff --git a/integration_tests/store_test.go b/integration_tests/store_test.go index 8f61b42896..c03fe7f73c 100644 --- a/integration_tests/store_test.go +++ b/integration_tests/store_test.go @@ -73,9 +73,9 @@ func (s *testStoreSuite) TestOracle() { s.store.SetOracle(o) ctx := context.Background() - t1, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope) + t1, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil)) s.Nil(err) - t2, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope) + t2, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil)) s.Nil(err) s.Less(t1, t2) @@ -101,7 +101,7 @@ func (s *testStoreSuite) TestOracle() { go func() { defer wg.Done() - t3, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 5000, nil), oracle.GlobalTxnScope) + t3, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 5000, nil)) s.Nil(err) s.Less(t2, t3) expired := s.store.GetOracle().IsExpired(t2, 50, &oracle.Option{}) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 16e56881ce..bbb50f96dc 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1809,7 +1809,7 @@ func (s *RegionRequestSender) validateReadTS(ctx context.Context, req *tikvrpc.R default: return nil } - return s.readTSValidator.ValidateReadTS(ctx, readTS, req.StaleRead, &oracle.Option{TxnScope: req.TxnScope}) + return s.readTSValidator.ValidateReadTS(ctx, readTS, req.StaleRead, &oracle.Option{}) } func patchRequestSource(req *tikvrpc.Request, replicaType string) { diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index e7a5070e9f..46760669fa 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1237,7 +1237,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope req.EnableStaleWithMixedReplicaRead() req.ReplicaReadType = kv.ReplicaReadFollower @@ -1420,7 +1419,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope req.EnableStaleWithMixedReplicaRead() bo := retry.NewBackoffer(context.Background(), -1) resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) @@ -1443,7 +1441,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestPreferLeader() { // make request req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadPreferLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope // setup mock client s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { @@ -1606,7 +1603,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() { req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadMixed, nil) req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope downStore := s.cluster.GetStore(s.storeIDs[2]) s.cluster.MarkPeerDown(s.peerIDs[2]) diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 0719c2c22f..589aa63d89 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -949,7 +949,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS() } getTS := func() uint64 { - ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{}) s.NoError(err) return ts } diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 980a4cf3aa..86701601ae 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -2836,7 +2836,6 @@ func (ca *replicaSelectorAccessPathCase) buildRequest(s *testReplicaSelectorSuit if ca.staleRead { req.EnableStaleWithMixedReplicaRead() req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope } else { req.ReplicaReadType = ca.readType req.ReplicaRead = ca.readType.IsFollowerRead() diff --git a/oracle/oracle.go b/oracle/oracle.go index cd3833371e..381173dc8f 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -41,8 +41,8 @@ import ( ) // Option represents available options for the oracle.Oracle. +// TODO: remove this struct type Option struct { - TxnScope string } // Oracle is the interface that provides strictly ascending timestamps. @@ -58,7 +58,7 @@ type Oracle interface { // WARNING: This method does not guarantee whether the generated timestamp is legal for accessing the data. // Neither is it safe to use it for verifying the legality of another calculated timestamp. // Be sure to validate the timestamp before using it to access the data. - GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (uint64, error) + GetStaleTimestamp(ctx context.Context, prevSecond uint64) (uint64, error) IsExpired(lockTimestamp, TTL uint64, opt *Option) bool UntilExpired(lockTimeStamp, TTL uint64, opt *Option) int64 Close() diff --git a/oracle/oracles/export_test.go b/oracle/oracles/export_test.go index d18c962436..bc7d791d18 100644 --- a/oracle/oracles/export_test.go +++ b/oracle/oracles/export_test.go @@ -37,7 +37,6 @@ package oracles import ( "context" "sync" - "sync/atomic" "time" "github.com/tikv/client-go/v2/oracle" @@ -85,8 +84,6 @@ func StartTsUpdateLoop(o oracle.Oracle, ctx context.Context, wg *sync.WaitGroup) func SetEmptyPDOracleLastTs(oc oracle.Oracle, ts uint64) { switch o := oc.(type) { case *pdOracle: - lastTSInterface, _ := o.lastTSMap.LoadOrStore(oracle.GlobalTxnScope, &atomic.Pointer[lastTSO]{}) - lastTSPointer := lastTSInterface.(*atomic.Pointer[lastTSO]) - lastTSPointer.Store(&lastTSO{tso: ts, arrival: oracle.GetTimeFromTS(ts)}) + o.lastTS.Store(&lastTSO{tso: ts, arrival: oracle.GetTimeFromTS(ts)}) } } diff --git a/oracle/oracles/local.go b/oracle/oracles/local.go index a980bbddbd..696b1af71c 100644 --- a/oracle/oracles/local.go +++ b/oracle/oracles/local.go @@ -118,7 +118,7 @@ func (l *localOracle) SetLowResolutionTimestampUpdateInterval(time.Duration) err } // GetStaleTimestamp return physical -func (l *localOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) { +func (l *localOracle) GetStaleTimestamp(ctx context.Context, prevSecond uint64) (ts uint64, err error) { return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil } diff --git a/oracle/oracles/mock.go b/oracle/oracles/mock.go index 757ce3158d..edf9103771 100644 --- a/oracle/oracles/mock.go +++ b/oracle/oracles/mock.go @@ -106,7 +106,7 @@ func (o *MockOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, e } // GetStaleTimestamp implements oracle.Oracle interface. -func (o *MockOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) { +func (o *MockOracle) GetStaleTimestamp(ctx context.Context, prevSecond uint64) (ts uint64, err error) { return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil } diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 60f0ee7cb3..393493e0e9 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -115,10 +115,9 @@ const ( // pdOracle is an Oracle that uses a placement driver client as source. type pdOracle struct { - c pd.Client - // txn_scope (string) -> lastTSPointer (*atomic.Pointer[lastTSO]) - lastTSMap sync.Map - quit chan struct{} + c pd.Client + lastTS atomic.Pointer[lastTSO] + quit chan struct{} // The configured interval to update the low resolution ts. Set by SetLowResolutionTimestampUpdateInterval. // For TiDB, this is directly controlled by the system variable `tidb_low_resolution_tso_update_interval`. lastTSUpdateInterval atomic.Int64 @@ -147,7 +146,7 @@ type pdOracle struct { state adaptiveUpdateTSIntervalState } - // When the low resolution ts is not new enough and there are many concurrent stane read / snapshot read + // When the low resolution ts is not new enough and there are many concurrent stale read / snapshot read // operations that needs to validate the read ts, we can use this to avoid too many concurrent GetTS calls by // reusing a result for different `ValidateReadTS` calls. This can be done because that // we don't require the ts for validation to be strictly the latest one. @@ -193,7 +192,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e go o.updateTS(ctx) } // Initialize the timestamp of the global txnScope by Get. - _, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + _, err := o.GetTimestamp(ctx, &oracle.Option{}) if err != nil { o.Close() return nil, err @@ -204,7 +203,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e // IsExpired returns whether lockTS+TTL is expired, both are ms. It uses `lastTS` // to compare, may return false negative result temporarily. func (o *pdOracle) IsExpired(lockTS, TTL uint64, opt *oracle.Option) bool { - lastTS, exist := o.getLastTS(opt.TxnScope) + lastTS, exist := o.getLastTS() if !exist { return true } @@ -213,11 +212,11 @@ func (o *pdOracle) IsExpired(lockTS, TTL uint64, opt *oracle.Option) bool { // GetTimestamp gets a new increasing time. func (o *pdOracle) GetTimestamp(ctx context.Context, opt *oracle.Option) (uint64, error) { - ts, err := o.getTimestamp(ctx, opt.TxnScope) + ts, err := o.getTimestamp(ctx) if err != nil { return 0, err } - o.setLastTS(ts, opt.TxnScope) + o.setLastTS(ts) return ts, nil } @@ -228,8 +227,7 @@ func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, err type tsFuture struct { tso.TSFuture - o *pdOracle - txnScope string + o *pdOracle } // Wait implements the oracle.Future interface. @@ -241,15 +239,15 @@ func (f *tsFuture) Wait() (uint64, error) { return 0, errors.WithStack(err) } ts := oracle.ComposeTS(physical, logical) - f.o.setLastTS(ts, f.txnScope) + f.o.setLastTS(ts) return ts, nil } func (o *pdOracle) GetTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future { - return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope} + return &tsFuture{o.c.GetTSAsync(ctx), o} } -func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, error) { +func (o *pdOracle) getTimestamp(ctx context.Context) (uint64, error) { now := time.Now() physical, logical, err := o.c.GetTS(ctx) if err != nil { @@ -278,54 +276,38 @@ func (o *pdOracle) getMinTimestampInAllTSOGroup(ctx context.Context) (uint64, er return oracle.ComposeTS(physical, logical), nil } -func (o *pdOracle) setLastTS(ts uint64, txnScope string) { - if txnScope == "" { - txnScope = oracle.GlobalTxnScope - } +func (o *pdOracle) setLastTS(ts uint64) { current := &lastTSO{ tso: ts, arrival: time.Now(), } - lastTSInterface, ok := o.lastTSMap.Load(txnScope) - if !ok { - pointer := &atomic.Pointer[lastTSO]{} - pointer.Store(current) - // do not handle the stored case, because it only runs once. - lastTSInterface, _ = o.lastTSMap.LoadOrStore(txnScope, pointer) + if o.lastTS.Load() == nil { + o.lastTS.Store(current) } - lastTSPointer := lastTSInterface.(*atomic.Pointer[lastTSO]) for { - last := lastTSPointer.Load() + last := o.lastTS.Load() if current.tso <= last.tso { return } if last.arrival.After(current.arrival) { current.arrival = last.arrival } - if lastTSPointer.CompareAndSwap(last, current) { + if o.lastTS.CompareAndSwap(last, current) { return } } } -func (o *pdOracle) getLastTS(txnScope string) (uint64, bool) { - last, exist := o.getLastTSWithArrivalTS(txnScope) +func (o *pdOracle) getLastTS() (uint64, bool) { + last, exist := o.getLastTSWithArrivalTS() if !exist { return 0, false } return last.tso, true } -func (o *pdOracle) getLastTSWithArrivalTS(txnScope string) (*lastTSO, bool) { - if txnScope == "" { - txnScope = oracle.GlobalTxnScope - } - lastTSInterface, ok := o.lastTSMap.Load(txnScope) - if !ok { - return nil, false - } - lastTSPointer := lastTSInterface.(*atomic.Pointer[lastTSO]) - last := lastTSPointer.Load() +func (o *pdOracle) getLastTSWithArrivalTS() (*lastTSO, bool) { + last := o.lastTS.Load() if last == nil { return nil, false } @@ -463,17 +445,11 @@ func (o *pdOracle) updateTS(ctx context.Context) { // Note that as `doUpdate` updates last tick time while `nextUpdateInterval` may perform calculation depending on the // last tick time, `doUpdate` should be called after finishing calculating the next interval. doUpdate := func(now time.Time) { - // Update the timestamp for each txnScope - o.lastTSMap.Range(func(key, _ interface{}) bool { - txnScope := key.(string) - ts, err := o.getTimestamp(ctx, txnScope) - if err != nil { - logutil.Logger(ctx).Error("updateTS error", zap.String("txnScope", txnScope), zap.Error(err)) - return true - } - o.setLastTS(ts, txnScope) - return true - }) + ts, err := o.getTimestamp(ctx) + if err != nil { + logutil.Logger(ctx).Error("updateTS error", zap.Error(err)) + } + o.setLastTS(ts) o.adaptiveUpdateIntervalState.lastTick = now } @@ -512,7 +488,7 @@ func (o *pdOracle) updateTS(ctx context.Context) { // UntilExpired implement oracle.Oracle interface. func (o *pdOracle) UntilExpired(lockTS uint64, TTL uint64, opt *oracle.Option) int64 { - lastTS, ok := o.getLastTS(opt.TxnScope) + lastTS, ok := o.getLastTS() if !ok { return 0 } @@ -575,19 +551,19 @@ func (o *pdOracle) SetLowResolutionTimestampUpdateInterval(newUpdateInterval tim // GetLowResolutionTimestamp gets a new increasing time. func (o *pdOracle) GetLowResolutionTimestamp(ctx context.Context, opt *oracle.Option) (uint64, error) { - lastTS, ok := o.getLastTS(opt.TxnScope) + lastTS, ok := o.getLastTS() if !ok { - return 0, errors.Errorf("get low resolution timestamp fail, invalid txnScope = %s", opt.TxnScope) + return 0, errors.New("get low resolution timestamp fail") } return lastTS, nil } func (o *pdOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future { - lastTS, ok := o.getLastTS(opt.TxnScope) + lastTS, ok := o.getLastTS() if !ok { return lowResolutionTsFuture{ ts: 0, - err: errors.Errorf("get low resolution timestamp async fail, invalid txnScope = %s", opt.TxnScope), + err: errors.New("get low resolution timestamp async fail"), } } return lowResolutionTsFuture{ @@ -596,10 +572,10 @@ func (o *pdOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *orac } } -func (o *pdOracle) getStaleTimestamp(txnScope string, prevSecond uint64) (uint64, error) { - last, ok := o.getLastTSWithArrivalTS(txnScope) +func (o *pdOracle) getStaleTimestamp(prevSecond uint64) (uint64, error) { + last, ok := o.getLastTSWithArrivalTS() if !ok { - return 0, errors.Errorf("get stale timestamp fail, txnScope: %s", txnScope) + return 0, errors.Errorf("get stale timestamp fail") } return o.getStaleTimestampWithLastTS(last, prevSecond) } @@ -616,12 +592,12 @@ func (o *pdOracle) getStaleTimestampWithLastTS(last *lastTSO, prevSecond uint64) } // GetStaleTimestamp generate a TSO which represents for the TSO prevSecond secs ago. -func (o *pdOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) { - ts, err = o.getStaleTimestamp(txnScope, prevSecond) +func (o *pdOracle) GetStaleTimestamp(ctx context.Context, prevSecond uint64) (ts uint64, err error) { + ts, err = o.getStaleTimestamp(prevSecond) if err != nil { if !strings.HasPrefix(err.Error(), "invalid prevSecond") { // If any error happened, we will try to fetch tso and set it as last ts. - _, tErr := o.GetTimestamp(ctx, &oracle.Option{TxnScope: txnScope}) + _, tErr := o.GetTimestamp(ctx, &oracle.Option{}) if tErr != nil { return 0, tErr } @@ -640,7 +616,8 @@ func (o *pdOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) { } func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Option) (uint64, error) { - ch := o.tsForValidation.DoChan(opt.TxnScope, func() (interface{}, error) { + // TODO: do we still need the group after the deprecation of TxnScope? + ch := o.tsForValidation.DoChan(oracle.GlobalTxnScope, func() (interface{}, error) { metrics.TiKVValidateReadTSFromPDCount.Inc() // If the call that triggers the execution of this function is canceled by the context, other calls that are @@ -668,7 +645,7 @@ func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRea return nil } - latestTSInfo, exists := o.getLastTSWithArrivalTS(opt.TxnScope) + latestTSInfo, exists := o.getLastTSWithArrivalTS() // If we fail to get latestTSInfo or the readTS exceeds it, get a timestamp from PD to double-check. // But we don't need to strictly fetch the latest TS. So if there are already concurrent calls to this function // loading the latest TS, we can just reuse the same result to avoid too many concurrent GetTS calls. @@ -690,7 +667,7 @@ func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRea estimatedCurrentTS, err := o.getStaleTimestampWithLastTS(latestTSInfo, 0) if err != nil { logutil.Logger(ctx).Warn("failed to estimate current ts by getSlateTimestamp for auto-adjusting update low resolution ts interval", - zap.Error(err), zap.Uint64("readTS", readTS), zap.String("txnScope", opt.TxnScope)) + zap.Error(err), zap.Uint64("readTS", readTS)) } else { o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, estimatedCurrentTS, time.Now()) } diff --git a/oracle/oracles/pd_test.go b/oracle/oracles/pd_test.go index 01c67c5e46..0c9fc5c0bc 100644 --- a/oracle/oracles/pd_test.go +++ b/oracle/oracles/pd_test.go @@ -53,7 +53,7 @@ func TestPDOracle_UntilExpired(t *testing.T) { start := time.Now() SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start)) lockTs := oracle.GoTimeToTS(start.Add(time.Duration(lockAfter)*time.Millisecond)) + 1 - waitTs := o.UntilExpired(lockTs, uint64(lockExp), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + waitTs := o.UntilExpired(lockTs, uint64(lockExp), &oracle.Option{}) assert.Equal(t, int64(lockAfter+lockExp), waitTs) } @@ -62,15 +62,15 @@ func TestPdOracle_GetStaleTimestamp(t *testing.T) { start := time.Now() SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start)) - ts, err := o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, 10) + ts, err := o.GetStaleTimestamp(context.Background(), 10) assert.Nil(t, err) assert.WithinDuration(t, start.Add(-10*time.Second), oracle.GetTimeFromTS(ts), 2*time.Second) - _, err = o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, 1e12) + _, err = o.GetStaleTimestamp(context.Background(), 1e12) assert.NotNil(t, err) assert.Regexp(t, ".*invalid prevSecond.*", err.Error()) - _, err = o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, math.MaxUint64) + _, err = o.GetStaleTimestamp(context.Background(), math.MaxUint64) assert.NotNil(t, err) assert.Regexp(t, ".*invalid prevSecond.*", err.Error()) } @@ -168,7 +168,7 @@ func TestNonFutureStaleTSO(t *testing.T) { case <-closeCh: break CHECK default: - ts, err := o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, 0) + ts, err := o.GetStaleTimestamp(context.Background(), 0) assert.Nil(t, err) staleTime := oracle.GetTimeFromTS(ts) if staleTime.After(upperBound) && time.Since(now) < time.Millisecond /* only check staleTime within 1ms */ { @@ -352,7 +352,7 @@ func TestValidateReadTS(t *testing.T) { defer o.Close() ctx := context.Background() - opt := &oracle.Option{TxnScope: oracle.GlobalTxnScope} + opt := &oracle.Option{} // Always returns error for MaxUint64 err = o.ValidateReadTS(ctx, math.MaxUint64, staleRead, opt) @@ -423,7 +423,7 @@ func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) { asyncValidate := func(ctx context.Context, readTS uint64) chan error { ch := make(chan error, 1) go func() { - err := o.ValidateReadTS(ctx, readTS, true, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + err := o.ValidateReadTS(ctx, readTS, true, &oracle.Option{}) ch <- err }() return ch @@ -521,7 +521,7 @@ func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) { defer o.Close() ctx := context.Background() - opt := &oracle.Option{TxnScope: oracle.GlobalTxnScope} + opt := &oracle.Option{} // Validating read ts for non-stale-read requests must not trigger updating the adaptive update interval of // low resolution ts. @@ -574,9 +574,9 @@ func TestSetLastTSAlwaysPushTS(t *testing.T) { return default: } - ts, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + ts, err := o.GetTimestamp(ctx, &oracle.Option{}) assert.NoError(t, err) - lastTS, found := o.getLastTS(oracle.GlobalTxnScope) + lastTS, found := o.getLastTS() assert.True(t, found) assert.GreaterOrEqual(t, lastTS, ts) } diff --git a/tikv/interface.go b/tikv/interface.go index f1c674334e..9373e7dfc4 100644 --- a/tikv/interface.go +++ b/tikv/interface.go @@ -80,8 +80,8 @@ type Storage interface { Close() error // UUID return a unique ID which represents a Storage. UUID() string - // CurrentTimestamp returns current timestamp with the given txnScope (local or global). - CurrentTimestamp(txnScope string) (uint64, error) + // CurrentTimestamp returns current timestamp. + CurrentTimestamp() (uint64, error) // GetOracle gets a timestamp oracle client. GetOracle() oracle.Oracle // SupportDeleteRange gets the storage support delete range or not. diff --git a/tikv/kv.go b/tikv/kv.go index cd5f17d7ba..4510eaed91 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -143,8 +143,8 @@ type KVStore struct { // it indicates the safe timestamp point that can be used to read consistent but may not the latest data. safeTSMap sync.Map - // MinSafeTs stores the minimum ts value for each txnScope - minSafeTS sync.Map + // MinSafeTs stores the minimum ts value + minSafeTS atomic.Uint64 replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled @@ -377,9 +377,6 @@ func (s *KVStore) Begin(opts ...TxnOption) (txn *transaction.KVTxn, err error) { opt(options) } - if options.TxnScope == "" { - options.TxnScope = oracle.GlobalTxnScope - } var ( startTS uint64 ) @@ -387,7 +384,7 @@ func (s *KVStore) Begin(opts ...TxnOption) (txn *transaction.KVTxn, err error) { startTS = *options.StartTS } else { bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil) - startTS, err = s.getTimestampWithRetry(bo, options.TxnScope) + startTS, err = s.getTimestampWithRetry(bo) if err != nil { return nil, err } @@ -454,10 +451,10 @@ func (s *KVStore) UUID() string { return s.uuid } -// CurrentTimestamp returns current timestamp with the given txnScope (local or global). -func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) { +// CurrentTimestamp returns current timestamp. +func (s *KVStore) CurrentTimestamp() (uint64, error) { bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil) - startTS, err := s.getTimestampWithRetry(bo, txnScope) + startTS, err := s.getTimestampWithRetry(bo) if err != nil { return 0, err } @@ -475,11 +472,11 @@ func (s *KVStore) CurrentAllTSOKeyspaceGroupMinTs() (uint64, error) { } // GetTimestampWithRetry returns latest timestamp. -func (s *KVStore) GetTimestampWithRetry(bo *Backoffer, scope string) (uint64, error) { - return s.getTimestampWithRetry(bo, scope) +func (s *KVStore) GetTimestampWithRetry(bo *Backoffer) (uint64, error) { + return s.getTimestampWithRetry(bo) } -func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, error) { +func (s *KVStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("TiKVStore.getTimestampWithRetry", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -487,7 +484,7 @@ func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, } for { - startTS, err := s.oracle.GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: txnScope}) + startTS, err := s.oracle.GetTimestamp(bo.GetCtx(), &oracle.Option{}) // mockGetTSErrorInRetry should wait MockCommitErrorOnce first, then will run into retry() logic. // Then mockGetTSErrorInRetry will return retryable error when first retry. // Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout @@ -603,21 +600,18 @@ func (s *KVStore) GetTiKVClient() (client Client) { return s.clientMu.client } -// GetMinSafeTS return the minimal safeTS of the storage with given txnScope. -func (s *KVStore) GetMinSafeTS(txnScope string) uint64 { - if val, ok := s.minSafeTS.Load(txnScope); ok { - return val.(uint64) - } - return 0 +// GetMinSafeTS return the minimal safeTS of the storage. +func (s *KVStore) GetMinSafeTS() uint64 { + return s.minSafeTS.Load() } -func (s *KVStore) setMinSafeTS(txnScope string, safeTS uint64) { +func (s *KVStore) setMinSafeTS(safeTS uint64) { // ensure safeTS is not set to max uint64 if safeTS == math.MaxUint64 { - logutil.AssertWarn(logutil.BgLogger(), "skip setting min-safe-ts to max uint64", zap.String("txnScope", txnScope), zap.Stack("stack")) + logutil.AssertWarn(logutil.BgLogger(), "skip setting min-safe-ts to max uint64", zap.Stack("stack")) return } - s.minSafeTS.Store(txnScope, safeTS) + s.minSafeTS.Store(safeTS) } // Ctx returns ctx. @@ -663,13 +657,13 @@ func (s *KVStore) setSafeTS(storeID, safeTS uint64) { s.safeTSMap.Store(storeID, safeTS) } -func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { +func (s *KVStore) updateMinSafeTS(storeIDs []uint64) { minSafeTS := uint64(math.MaxUint64) // when there is no store, return 0 in order to let minStartTS become startTS directly // actually storeIDs won't be empty since updateMinSafeTS is only called by updateSafeTS and updateSafeTS builds // txnScopeMap with non-empty values. here we check it to make the logic more robust. if len(storeIDs) < 1 { - s.setMinSafeTS(txnScope, 0) + s.setMinSafeTS(0) return } for _, store := range storeIDs { @@ -687,7 +681,7 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { if minSafeTS == math.MaxUint64 { minSafeTS = 0 } - s.setMinSafeTS(txnScope, minSafeTS) + s.setMinSafeTS(minSafeTS) } func (s *KVStore) safeTSUpdater() { @@ -712,7 +706,7 @@ func (s *KVStore) safeTSUpdater() { func (s *KVStore) updateSafeTS(ctx context.Context) { // Try to get the cluster-level minimum resolved timestamp from PD first. - if s.updateGlobalTxnScopeTSFromPD(ctx) { + if s.updateTSFromPD(ctx) { return } @@ -788,18 +782,12 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds()) }(ctx, wg, storeID, storeAddr) } - - txnScopeMap := make(map[string][]uint64) + storeIDs = make([]uint64, 0, len(stores)) for _, store := range stores { - txnScopeMap[oracle.GlobalTxnScope] = append(txnScopeMap[oracle.GlobalTxnScope], store.StoreID()) - - if label, ok := store.GetLabelValue(DCLabelKey); ok { - txnScopeMap[label] = append(txnScopeMap[label], store.StoreID()) - } - } - for txnScope, storeIDs := range txnScopeMap { - s.updateMinSafeTS(txnScope, storeIDs) + storeIDs = append(storeIDs, store.StoreID()) } + s.updateMinSafeTS(storeIDs) + wg.Wait() } @@ -837,9 +825,11 @@ var ( clusterMinSafeTSGap = metrics.TiKVMinSafeTSGapSeconds.WithLabelValues("cluster") ) -// updateGlobalTxnScopeTSFromPD check whether it is needed to get cluster-level's min resolved ts from PD -// to update min safe ts for global txn scope. -func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { +// updateTSFromPD check whether it is needed to get cluster-level's min resolved ts from PD +// to update min safe ts +func (s *KVStore) updateTSFromPD(ctx context.Context) bool { + // TODO: confirm the logic here. Is the check condition txn_scope or "zone" label? + //isGlobal := true isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope // Try to get the minimum resolved timestamp of the cluster from PD. if s.pdHttpClient != nil && isGlobal { @@ -848,7 +838,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err)) } else if isValidSafeTS(clusterMinSafeTS) { // Update ts and metrics. - preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope) + preClusterMinSafeTS := s.GetMinSafeTS() // preClusterMinSafeTS is guaranteed to be less than math.MaxUint64 (by this method and setMinSafeTS) // related to https://github.com/tikv/client-go/issues/991 if preClusterMinSafeTS > clusterMinSafeTS { @@ -856,7 +846,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS) clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds()) } else { - s.setMinSafeTS(oracle.GlobalTxnScope, clusterMinSafeTS) + s.setMinSafeTS(clusterMinSafeTS) successSafeTSUpdateCounter.Inc() safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS) clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds()) @@ -944,13 +934,6 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...opt.C // TxnOption configures Transaction type TxnOption func(*transaction.TxnOptions) -// WithTxnScope sets the TxnScope to txnScope -func WithTxnScope(txnScope string) TxnOption { - return func(st *transaction.TxnOptions) { - st.TxnScope = txnScope - } -} - // WithStartTS sets the StartTS to startTS func WithStartTS(startTS uint64) TxnOption { return func(st *transaction.TxnOptions) { diff --git a/tikv/kv_test.go b/tikv/kv_test.go index 1a9a4618fe..47f808a0d5 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" @@ -152,12 +151,12 @@ func (s *testKVSuite) TestMinSafeTsFromStores() { s.store.SetTiKVClient(mockClient) s.Eventually(func() bool { - ts := s.store.GetMinSafeTS(oracle.GlobalTxnScope) + ts := s.store.GetMinSafeTS() s.Require().False(math.MaxUint64 == ts) return ts == mockClient.tiflashSafeTs }, 15*time.Second, time.Second) s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2)) - s.Require().Equal(mockClient.tiflashSafeTs, s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(mockClient.tiflashSafeTs, s.store.GetMinSafeTS()) ok, ts := s.store.getSafeTS(s.tikvStoreID) s.Require().True(ok) s.Require().Equal(mockClient.tikvSafeTs, ts) @@ -174,7 +173,7 @@ func (s *testKVSuite) TestMinSafeTsFromStoresWithAllZeros() { return atomic.LoadInt32(&mockClient.requestCount) >= 4 }, 15*time.Second, time.Second) - s.Require().Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(0), s.store.GetMinSafeTS()) } func (s *testKVSuite) TestMinSafeTsFromStoresWithSomeZeros() { @@ -187,7 +186,7 @@ func (s *testKVSuite) TestMinSafeTsFromStoresWithSomeZeros() { return atomic.LoadInt32(&mockClient.requestCount) >= 4 }, 15*time.Second, time.Second) - s.Require().Equal(mockClient.tikvSafeTs, s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(mockClient.tikvSafeTs, s.store.GetMinSafeTS()) } func (s *testKVSuite) TestMinSafeTsFromPD() { @@ -197,12 +196,12 @@ func (s *testKVSuite) TestMinSafeTsFromPD() { return 90, nil, nil }) s.Eventually(func() bool { - ts := s.store.GetMinSafeTS(oracle.GlobalTxnScope) + ts := s.store.GetMinSafeTS() s.Require().False(math.MaxUint64 == ts) return ts == 90 }, 15*time.Second, time.Second) s.Require().Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) - s.Require().Equal(uint64(90), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(90), s.store.GetMinSafeTS()) } func (s *testKVSuite) TestMinSafeTsFromPDByStores() { @@ -216,12 +215,12 @@ func (s *testKVSuite) TestMinSafeTsFromPDByStores() { return math.MaxUint64, m, nil }) s.Eventually(func() bool { - ts := s.store.GetMinSafeTS(oracle.GlobalTxnScope) + ts := s.store.GetMinSafeTS() s.Require().False(math.MaxUint64 == ts) return ts == uint64(100)+s.tikvStoreID }, 15*time.Second, time.Second) s.Require().Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) - s.Require().Equal(uint64(100)+s.tikvStoreID, s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Equal(uint64(100)+s.tikvStoreID, s.store.GetMinSafeTS()) } func (s *testKVSuite) TestMinSafeTsFromMixed1() { @@ -239,14 +238,12 @@ func (s *testKVSuite) TestMinSafeTsFromMixed1() { return math.MaxUint64, m, nil }) s.Eventually(func() bool { - ts := s.store.GetMinSafeTS("z1") + ts := s.store.GetMinSafeTS() s.Require().False(math.MaxUint64 == ts) - return ts == uint64(10) && s.store.GetMinSafeTS(oracle.GlobalTxnScope) == uint64(10) + return ts == uint64(10) && s.store.GetMinSafeTS() == uint64(10) }, 15*time.Second, time.Second) s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) - s.Require().Equal(uint64(10), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - s.Require().Equal(uint64(10), s.store.GetMinSafeTS("z1")) - s.Require().Equal(mockClient.tiflashSafeTs, s.store.GetMinSafeTS("z2")) + s.Require().Equal(uint64(10), s.store.GetMinSafeTS()) } func (s *testKVSuite) TestMinSafeTsFromMixed2() { @@ -264,12 +261,10 @@ func (s *testKVSuite) TestMinSafeTsFromMixed2() { return math.MaxUint64, m, nil }) s.Eventually(func() bool { - ts := s.store.GetMinSafeTS("z2") + ts := s.store.GetMinSafeTS() s.Require().False(math.MaxUint64 == ts) - return ts == uint64(10) && s.store.GetMinSafeTS(oracle.GlobalTxnScope) == uint64(10) + return ts == uint64(10) && s.store.GetMinSafeTS() == uint64(10) }, 15*time.Second, time.Second) s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) - s.Require().Equal(uint64(10), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - s.Require().Equal(mockClient.tikvSafeTs, s.store.GetMinSafeTS("z1")) - s.Require().Equal(uint64(10), s.store.GetMinSafeTS("z2")) + s.Require().Equal(uint64(10), s.store.GetMinSafeTS()) } diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 95228aa08d..6a445abd98 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -48,7 +48,6 @@ import ( "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" "github.com/tikv/client-go/v2/kv" - "github.com/tikv/client-go/v2/oracle" ) // CmdType represents the concrete request type in Request or response type in Response. @@ -248,11 +247,9 @@ type Request struct { Req interface{} kvrpcpb.Context ReadReplicaScope string - // remove txnScope after tidb removed txnScope - TxnScope string - ReplicaReadType kv.ReplicaReadType // different from `kvrpcpb.Context.ReplicaRead` - ReplicaReadSeed *uint32 // pointer to follower read seed in snapshot/coprocessor - StoreTp EndpointType + ReplicaReadType kv.ReplicaReadType // different from `kvrpcpb.Context.ReplicaRead` + ReplicaReadSeed *uint32 // pointer to follower read seed in snapshot/coprocessor + StoreTp EndpointType // ForwardedHost is the address of a store which will handle the request. It's different from // the address the request sent to. // If it's not empty, the store which receive the request will forward it to @@ -315,14 +312,6 @@ func (req *Request) DisableStaleReadMeetLock() { req.ReplicaReadType = kv.ReplicaReadLeader } -// IsGlobalStaleRead checks if the request is a global stale read request. -func (req *Request) IsGlobalStaleRead() bool { - return req.ReadReplicaScope == oracle.GlobalTxnScope && - // remove txnScope after tidb remove it - req.TxnScope == oracle.GlobalTxnScope && - req.GetStaleRead() -} - // IsDebugReq check whether the req is debug req. func (req *Request) IsDebugReq() bool { switch req.Type { diff --git a/txnkv/client.go b/txnkv/client.go index aa37333598..d277780ac5 100644 --- a/txnkv/client.go +++ b/txnkv/client.go @@ -22,7 +22,6 @@ import ( "github.com/pkg/errors" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" "github.com/tikv/client-go/v2/util" @@ -122,7 +121,7 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) { // GetTimestamp returns the current global timestamp. func (c *Client) GetTimestamp(ctx context.Context) (uint64, error) { bo := retry.NewBackofferWithVars(ctx, transaction.TsoMaxBackoff, nil) - startTS, err := c.GetTimestampWithRetry(bo, oracle.GlobalTxnScope) + startTS, err := c.GetTimestampWithRetry(bo) if err != nil { return 0, err } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 6a739b3616..3a689d2138 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -102,10 +102,10 @@ type kvstore interface { WaitScatterRegionFinish(ctx context.Context, regionID uint64, backOff int) error // GetTimestampWithRetry returns latest timestamp. - GetTimestampWithRetry(bo *retry.Backoffer, scope string) (uint64, error) + GetTimestampWithRetry(bo *retry.Backoffer) (uint64, error) // GetOracle gets a timestamp oracle client. GetOracle() oracle.Oracle - CurrentTimestamp(txnScope string) (uint64, error) + CurrentTimestamp() (uint64, error) // SendReq sends a request to TiKV. SendReq(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) // GetTiKVClient gets the client instance. @@ -535,7 +535,7 @@ func (c *twoPhaseCommitter) checkAssertionByPessimisticLockResults(ctx context.C func (c *twoPhaseCommitter) checkSchemaOnAssertionFail(ctx context.Context, assertionFailed *tikverr.ErrAssertionFailed) error { // If the schema has changed, it might be a false-positive. In this case we should return schema changed, which // is a usual case, instead of assertion failed. - ts, err := c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars), c.txn.GetScope()) + ts, err := c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars)) if err != nil { return err } @@ -1279,7 +1279,7 @@ func keepAlive( return } bo := retry.NewBackofferWithVars(context.Background(), keepAliveMaxBackoff, c.txn.vars) - now, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + now, err := c.store.GetTimestampWithRetry(bo) if err != nil { logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail", zap.Error(err)) @@ -1515,11 +1515,6 @@ func sendTxnHeartBeat( // checkAsyncCommit checks if async commit protocol is available for current transaction commit, true is returned if possible. func (c *twoPhaseCommitter) checkAsyncCommit() bool { - // Disable async commit in local transactions - if c.txn.GetScope() != oracle.GlobalTxnScope { - return false - } - // Don't use async commit when commitTSUpperBoundCheck is set. // For TiDB, this is used by cached table. if c.txn.commitTSUpperBoundCheck != nil { @@ -1546,10 +1541,6 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool { // checkOnePC checks if 1PC protocol is available for current transaction. func (c *twoPhaseCommitter) checkOnePC() bool { - // Disable 1PC in local transactions - if c.txn.GetScope() != oracle.GlobalTxnScope { - return false - } // Disable 1PC for transaction when commitTSUpperBoundCheck is set. if c.txn.commitTSUpperBoundCheck != nil { return false @@ -1761,7 +1752,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if commitTSMayBeCalculated && c.needLinearizability() { util.EvalFailpoint("getMinCommitTSFromTSO") start := time.Now() - latestTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + latestTS, err := c.store.GetTimestampWithRetry(bo) // If we fail to get a timestamp from PD, we just propagate the failure // instead of falling back to the normal 2PC because a normal 2PC will // also be likely to fail due to the same timestamp issue. @@ -1896,7 +1887,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } else { start = time.Now() logutil.Event(ctx, "start get commit ts") - commitTS, err = c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars), c.txn.GetScope()) + commitTS, err = c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars)) if err != nil { logutil.Logger(ctx).Warn("2PC get commitTS failed", zap.Error(err), @@ -1916,7 +1907,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } atomic.StoreUint64(&c.commitTS, commitTS) - if c.store.GetOracle().IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) { + if c.store.GetOracle().IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{}) { err = errors.Errorf("session %d txn takes too much time, txnStartTS: %d, comm: %d", c.sessionID, c.startTS, c.commitTS) return err diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index 749b85a5b2..4dbcbfd5a3 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -179,7 +179,7 @@ func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Bac } // Update commit ts and retry. - commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + commitTS, err := c.store.GetTimestampWithRetry(bo) if err != nil { logutil.Logger(bo.GetCtx()).Warn("2PC get commitTS failed", zap.Error(err), diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index d2fe35cf9e..76fc0c2bc7 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -311,7 +311,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))), zap.Uint64("startTS", c.startTS), ) - commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) + commitTS, err := c.store.GetTimestampWithRetry(bo) if err != nil { logutil.Logger(bo.GetCtx()).Warn("[pipelined dml] commit transaction get commitTS failed", zap.Error(err), diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 49fd67493f..aea0418043 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -122,7 +122,6 @@ type PipelinedTxnOptions struct { // TxnOptions indicates the option when beginning a transaction. // TxnOptions are set by the TxnOption values passed to Begin type TxnOptions struct { - TxnScope string StartTS *uint64 PipelinedTxn PipelinedTxnOptions } @@ -183,7 +182,6 @@ type KVTxn struct { enableAsyncCommit bool enable1PC bool causalConsistency bool - scope string kvFilter KVFilter resourceGroupTag []byte resourceGroupTagger tikvrpc.ResourceGroupTagger // use this when resourceGroupTag is nil @@ -223,7 +221,6 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, startTime: time.Now(), valid: true, vars: tikv.DefaultVars, - scope: options.TxnScope, enableAsyncCommit: cfg.EnableAsyncCommit, enable1PC: cfg.Enable1PC, diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, @@ -466,11 +463,6 @@ func (txn *KVTxn) SetCausalConsistency(b bool) { txn.causalConsistency = b } -// SetScope sets the geographical scope of the transaction. -func (txn *KVTxn) SetScope(scope string) { - txn.scope = scope -} - // SetKVFilter sets the filter to ignore key-values in memory buffer. func (txn *KVTxn) SetKVFilter(filter KVFilter) { txn.kvFilter = filter @@ -737,11 +729,6 @@ func (txn *KVTxn) IsCasualConsistency() bool { return txn.causalConsistency } -// GetScope returns the geographical scope of the transaction. -func (txn *KVTxn) GetScope() string { - return txn.scope -} - // Commit commits the transaction operations to KV store. func (txn *KVTxn) Commit(ctx context.Context) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { @@ -1007,7 +994,6 @@ func (txn *KVTxn) isInternal() bool { // TxnInfo is used to keep track the info of a committed transaction (mainly for diagnosis and testing) type TxnInfo struct { - TxnScope string `json:"txn_scope"` StartTS uint64 `json:"start_ts"` CommitTS uint64 `json:"commit_ts"` TxnCommitMode string `json:"txn_commit_mode"` @@ -1031,7 +1017,6 @@ func (txn *KVTxn) onCommitted(err error) { } info := TxnInfo{ - TxnScope: txn.GetScope(), StartTS: txn.startTS, CommitTS: txn.commitTS, TxnCommitMode: commitMode, @@ -1055,7 +1040,7 @@ func (txn *KVTxn) LockKeysWithWaitTime(ctx context.Context, lockWaitTime int64, forUpdateTs := txn.startTS if txn.IsPessimistic() { bo := retry.NewBackofferWithVars(context.Background(), TsoMaxBackoff, nil) - forUpdateTs, err = txn.store.GetTimestampWithRetry(bo, txn.scope) + forUpdateTs, err = txn.store.GetTimestampWithRetry(bo) if err != nil { return err } diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 3d1508e9f3..5a1b676159 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -554,7 +554,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio if !forRead { if status.ttl != 0 { metrics.LockResolverCountWithNotExpired.Inc() - msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{}) msBeforeTxnExpired.update(msBeforeLockExpired) continue } @@ -572,7 +572,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio canAccess = append(canAccess, l.TxnID) } else { metrics.LockResolverCountWithNotExpired.Inc() - msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{}) msBeforeTxnExpired.update(msBeforeLockExpired) } } @@ -639,7 +639,7 @@ func (t *txnExpireTime) value() int64 { func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary []byte) (TxnStatus, error) { var status TxnStatus bo := retry.NewBackoffer(context.Background(), getTxnStatusMaxBackoff) - currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{}) if err != nil { return status, err } @@ -658,7 +658,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle // Set currentTS to max uint64 to make the lock expired. currentTS = math.MaxUint64 } else { - currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{}) if err != nil { return TxnStatus{}, err } @@ -683,7 +683,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle return TxnStatus{ttl: l.TTL, action: kvrpcpb.Action_NoAction}, nil } - if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) <= 0 { + if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{}) <= 0 { logutil.Logger(bo.GetCtx()).Warn("lock txn not found, lock has expired", zap.Uint64("CallerStartTs", callerStartTS), zap.Stringer("lock str", l), @@ -816,7 +816,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary status.primaryLock = cmdResp.LockInfo if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit { - if !lr.store.GetOracle().IsExpired(txnID, cmdResp.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) { + if !lr.store.GetOracle().IsExpired(txnID, cmdResp.LockTtl, &oracle.Option{}) { status.ttl = cmdResp.LockTtl } } else if cmdResp.LockTtl != 0 { diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 57b91bd821..d903c51ff1 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -457,7 +457,6 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, matchStoreLabels := s.mu.matchStoreLabels replicaAdjuster := s.mu.replicaReadAdjuster s.mu.RUnlock() - req.TxnScope = scope req.ReadReplicaScope = scope if isStaleness { req.EnableStaleWithMixedReplicaRead() @@ -703,7 +702,6 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] scope := s.mu.readReplicaScope replicaAdjuster := s.mu.replicaReadAdjuster s.mu.RUnlock() - req.TxnScope = scope req.ReadReplicaScope = scope var ops []locate.StoreSelectorOption if isStaleness { @@ -897,13 +895,6 @@ func (s *KVSnapshot) SetRuntimeStats(stats *SnapshotRuntimeStats) { s.mu.stats = stats } -// SetTxnScope is same as SetReadReplicaScope, keep it in order to keep compatible for now. -func (s *KVSnapshot) SetTxnScope(scope string) { - s.mu.Lock() - defer s.mu.Unlock() - s.mu.readReplicaScope = scope -} - // SetReadReplicaScope set read replica scope func (s *KVSnapshot) SetReadReplicaScope(scope string) { s.mu.Lock()