From 7c20c70346ab3612c9b9eb5b14e920744fd9cb32 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 24 Dec 2024 16:31:36 +0900 Subject: [PATCH 1/5] store last commit transaction's info Signed-off-by: you06 fix test Signed-off-by: you06 update tidb Signed-off-by: you06 check commit ts in oracle Signed-off-by: you06 update Signed-off-by: you06 --- examples/gcworker/go.mod | 24 +++---- examples/rawkv/go.mod | 24 +++---- examples/txnkv/1pc_txn/go.mod | 24 +++---- examples/txnkv/async_commit/go.mod | 24 +++---- examples/txnkv/delete_range/go.mod | 24 +++---- examples/txnkv/go.mod | 24 +++---- examples/txnkv/pessimistic_txn/go.mod | 24 +++---- examples/txnkv/unsafedestoryrange/go.mod | 24 +++---- integration_tests/1pc_test.go | 5 ++ integration_tests/2pc_test.go | 4 ++ integration_tests/async_commit_test.go | 5 ++ integration_tests/go.mod | 2 +- integration_tests/store_test.go | 36 ++++++++++ oracle/oracles/pd.go | 31 +++++++-- tikv/kv.go | 16 +++++ txnkv/transaction/2pc.go | 34 +++++++++- txnkv/transaction/test_probe.go | 6 ++ util/commit_ts_verify.go | 86 ++++++++++++++++++++++++ 18 files changed, 315 insertions(+), 102 deletions(-) create mode 100644 util/commit_ts_verify.go diff --git a/examples/gcworker/go.mod b/examples/gcworker/go.mod index 711e70930f..7af1d6b76e 100644 --- a/examples/gcworker/go.mod +++ b/examples/gcworker/go.mod @@ -6,7 +6,7 @@ require github.com/tikv/client-go/v2 v2.0.0 require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudfoundry/gosigar v1.3.6 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -21,16 +21,16 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 // indirect + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 // indirect + github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect @@ -39,14 +39,14 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/examples/rawkv/go.mod b/examples/rawkv/go.mod index 39e643a43f..b779427b3c 100644 --- a/examples/rawkv/go.mod +++ b/examples/rawkv/go.mod @@ -6,7 +6,7 @@ require github.com/tikv/client-go/v2 v2.0.0 require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudfoundry/gosigar v1.3.6 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -21,16 +21,16 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 // indirect + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 // indirect + github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect @@ -39,14 +39,14 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/examples/txnkv/1pc_txn/go.mod b/examples/txnkv/1pc_txn/go.mod index 179bce58d2..4926479a1b 100644 --- a/examples/txnkv/1pc_txn/go.mod +++ b/examples/txnkv/1pc_txn/go.mod @@ -6,7 +6,7 @@ require github.com/tikv/client-go/v2 v2.0.0 require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudfoundry/gosigar v1.3.6 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -21,16 +21,16 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 // indirect + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 // indirect + github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect @@ -39,14 +39,14 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/examples/txnkv/async_commit/go.mod b/examples/txnkv/async_commit/go.mod index c63516097f..066120e1ec 100644 --- a/examples/txnkv/async_commit/go.mod +++ b/examples/txnkv/async_commit/go.mod @@ -6,7 +6,7 @@ require github.com/tikv/client-go/v2 v2.0.0 require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudfoundry/gosigar v1.3.6 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -21,16 +21,16 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 // indirect + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 // indirect + github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect @@ -39,14 +39,14 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/examples/txnkv/delete_range/go.mod b/examples/txnkv/delete_range/go.mod index 33036f2a78..2f9d244b9b 100644 --- a/examples/txnkv/delete_range/go.mod +++ b/examples/txnkv/delete_range/go.mod @@ -6,7 +6,7 @@ require github.com/tikv/client-go/v2 v2.0.0 require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudfoundry/gosigar v1.3.6 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -21,16 +21,16 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 // indirect + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 // indirect + github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect @@ -39,14 +39,14 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/examples/txnkv/go.mod b/examples/txnkv/go.mod index b70e0325d9..5a23a1978e 100644 --- a/examples/txnkv/go.mod +++ b/examples/txnkv/go.mod @@ -6,7 +6,7 @@ require github.com/tikv/client-go/v2 v2.0.0 require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudfoundry/gosigar v1.3.6 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -21,16 +21,16 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 // indirect + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 // indirect + github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect @@ -39,14 +39,14 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/examples/txnkv/pessimistic_txn/go.mod b/examples/txnkv/pessimistic_txn/go.mod index ccefa42c02..016964ad93 100644 --- a/examples/txnkv/pessimistic_txn/go.mod +++ b/examples/txnkv/pessimistic_txn/go.mod @@ -6,7 +6,7 @@ require github.com/tikv/client-go/v2 v2.0.0 require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudfoundry/gosigar v1.3.6 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -21,16 +21,16 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 // indirect + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 // indirect + github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect @@ -39,14 +39,14 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/examples/txnkv/unsafedestoryrange/go.mod b/examples/txnkv/unsafedestoryrange/go.mod index 9a562abe29..7f5c8d11ea 100644 --- a/examples/txnkv/unsafedestoryrange/go.mod +++ b/examples/txnkv/unsafedestoryrange/go.mod @@ -6,7 +6,7 @@ require github.com/tikv/client-go/v2 v2.0.0 require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudfoundry/gosigar v1.3.6 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -21,16 +21,16 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 // indirect + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.46.0 // indirect - github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 // indirect + github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect github.com/twmb/murmur3 v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.10 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect @@ -39,14 +39,14 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect google.golang.org/grpc v1.64.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/integration_tests/1pc_test.go b/integration_tests/1pc_test.go index c69bbdbcfe..d4fd03222d 100644 --- a/integration_tests/1pc_test.go +++ b/integration_tests/1pc_test.go @@ -306,3 +306,8 @@ func (s *testOnePCSuite) TestTxnCommitCounter() { s.Equal(diff.AsyncCommit, int64(1)) s.Equal(diff.OnePC, int64(1)) } + +func (s *testOnePCSuite) Test1PCUpdateLatestCommitInf() { + store := tikv.StoreProbe{KVStore: s.store} + testUpdateLatestCommitInfo(s.Require(), store, "1pc") +} diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index fdad3b2969..82511c66dd 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -2769,3 +2769,7 @@ func (s *testCommitterSuite) Test2PCCleanupLifecycleHooks() { wg.Wait() s.Equal(reachedPost.Load(), true) } + +func (s *testCommitterSuite) Test2PCUpdateLatestCommitInf() { + testUpdateLatestCommitInfo(s.Require(), s.store, "2pc") +} diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 4c478f45a9..8b1eb90b45 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -663,3 +663,8 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLifecycleHooks() { wg.Wait() s.Equal(reachedPost.Load(), true) } + +func (s *testAsyncCommitSuite) TestAsyncCommitUpdateLatestCommitInf() { + store := tikv.StoreProbe{KVStore: s.store} + testUpdateLatestCommitInfo(s.Require(), store, "async") +} diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 55cd65c916..0fa988b133 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -7,7 +7,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 - github.com/pingcap/tidb v1.1.0-beta.0.20241219182103-27a9a7d39567 + github.com/pingcap/tidb v1.1.0-beta.0.20241223103612-14a469ab9d4e github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.10.0 github.com/tidwall/gjson v1.14.1 diff --git a/integration_tests/store_test.go b/integration_tests/store_test.go index 8f61b42896..ce70fc33df 100644 --- a/integration_tests/store_test.go +++ b/integration_tests/store_test.go @@ -42,12 +42,14 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/oracle/oracles" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv" + "github.com/tikv/client-go/v2/util" ) func TestStore(t *testing.T) { @@ -192,3 +194,37 @@ func (s *testStoreSuite) TestFailBusyServerKV() { s.Nil(err) s.Equal(val, []byte("value")) } + +func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StoreProbe, mode string) { + doTxn := func() *util.CommitInfo { + txn, err := store.Begin() + require.Nil(err) + switch mode { + case "async": + txn.SetEnableAsyncCommit(true) + case "1pc": + txn.SetEnable1PC(true) + case "2pc": + // do nothing + default: + require.FailNow("unknown mode:" + mode) + } + err = txn.Set([]byte("key"), []byte("value")) + require.Nil(err) + err = txn.Commit(context.Background()) + require.Nil(err) + return txn.GetCommitter().GetCommitInfo() + } + + commitInfo1 := doTxn() + require.Equal(commitInfo1, store.GetLastCommitInfo("")) + require.Equal(commitInfo1.MutationLen, 1) + require.Equal(commitInfo1.TxnSize, 8) + require.Equal(commitInfo1.TxnType, mode) + commitInfo2 := doTxn() + lastInfo := store.GetLastCommitInfo("") + require.NotEqual(commitInfo1, lastInfo) + require.Equal(commitInfo2, lastInfo) + require.Greater(lastInfo.CommitTS, commitInfo1.CommitTS) + require.GreaterOrEqual(lastInfo.StartTS, commitInfo1.CommitTS) +} diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 60f0ee7cb3..068d4e1e80 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -47,6 +47,7 @@ import ( "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/clients/tso" "go.uber.org/zap" @@ -153,6 +154,9 @@ type pdOracle struct { // we don't require the ts for validation to be strictly the latest one. // Note that the result can't be reused for different txnScopes. The txnScope is used as the key. tsForValidation singleflight.Group + + // tsVerifier is used to verify the correctness of the partial order between the received timestamp and committed transactions. + tsVerifier *util.TSVerifier } // lastTSO stores the last timestamp oracle gets from PD server and the local time when the TSO is fetched. @@ -166,6 +170,8 @@ type PDOracleOptions struct { UpdateInterval time.Duration // Disable the background periodic update of the last ts. This is for test purposes only. NoUpdateTS bool + // TSVerifier is used to verify the correctness of the timestamp. + TSVerifier *util.TSVerifier } // NewPdOracle create an Oracle that uses a pd client source. @@ -182,6 +188,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e c: pdClient, quit: make(chan struct{}), lastTSUpdateInterval: atomic.Int64{}, + tsVerifier: options.TSVerifier, } o.adaptiveUpdateIntervalState.shrinkIntervalCh = make(chan time.Duration, 1) o.lastTSUpdateInterval.Store(int64(options.UpdateInterval)) @@ -228,8 +235,9 @@ func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, err type tsFuture struct { tso.TSFuture - o *pdOracle - txnScope string + o *pdOracle + txnScope string + commitInfo *util.CommitInfo } // Wait implements the oracle.Future interface. @@ -241,16 +249,27 @@ func (f *tsFuture) Wait() (uint64, error) { return 0, errors.WithStack(err) } ts := oracle.ComposeTS(physical, logical) + if f.commitInfo != nil { + f.commitInfo.Verify(ts) + } f.o.setLastTS(ts, f.txnScope) return ts, nil } func (o *pdOracle) GetTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future { - return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope} + var commitInfo *util.CommitInfo + if o.tsVerifier != nil { + commitInfo = o.tsVerifier.GetLastCommitInfo(opt.TxnScope) + } + return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope, commitInfo} } func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, error) { now := time.Now() + var commitInfo *util.CommitInfo + if o.tsVerifier != nil { + commitInfo = o.tsVerifier.GetLastCommitInfo(txnScope) + } physical, logical, err := o.c.GetTS(ctx) if err != nil { return 0, errors.WithStack(err) @@ -260,7 +279,11 @@ func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, e logutil.Logger(ctx).Warn("get timestamp too slow", zap.Duration("cost time", dist)) } - return oracle.ComposeTS(physical, logical), nil + ts := oracle.ComposeTS(physical, logical) + if commitInfo != nil { + commitInfo.Verify(ts) + } + return ts, nil } func (o *pdOracle) getMinTimestampInAllTSOGroup(ctx context.Context) (uint64, error) { diff --git a/tikv/kv.go b/tikv/kv.go index 8973143504..2a8144bc79 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -143,6 +143,8 @@ type KVStore struct { wg sync.WaitGroup close atomicutil.Bool gP Pool + + tsVerifier *util.TSVerifier } var _ Storage = (*KVStore)(nil) @@ -261,8 +263,10 @@ func requestHealthFeedbackFromKVClient(ctx context.Context, addr string, tikvCli // NewKVStore creates a new TiKV store instance. func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client, opt ...Option) (*KVStore, error) { + tsVerifier := util.NewTSVerifier() o, err := oracles.NewPdOracle(pdClient, &oracles.PDOracleOptions{ UpdateInterval: defaultOracleUpdateInterval, + TSVerifier: tsVerifier, }) if err != nil { return nil, err @@ -284,7 +288,9 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl ctx: ctx, cancel: cancel, gP: NewSpool(128, 10*time.Second), + tsVerifier: tsVerifier, } + store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) store.clientMu.client.SetEventListener(regionCache.GetClientEventListener()) @@ -858,6 +864,16 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { return false } +// SetLastCommitInfo sets the last committed transaction's information. +func (s *KVStore) SetLastCommitInfo(txnScope string, ci *util.CommitInfo) { + s.tsVerifier.StoreCommitInfo(txnScope, ci) +} + +// GetLastCommitInfo get the last committed transaction's information. +func (s *KVStore) GetLastCommitInfo(txnScope string) *util.CommitInfo { + return s.tsVerifier.GetLastCommitInfo(txnScope) +} + func isValidSafeTS(ts uint64) bool { return ts != 0 && ts != math.MaxUint64 } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 3298b10de9..492c9b1f65 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -120,6 +120,7 @@ type kvstore interface { IsClose() bool // Go run the function in a separate goroutine. Go(f func()) error + storeCommitInfo } // twoPhaseCommitter executes a two-phase commit protocol. @@ -343,7 +344,7 @@ type PlainMutations struct { flags []CommitterMutationFlags } -// NewPlainMutations creates a PlainMutations object with sizeHint reserved. +// NewPlainMutations creates a PlainMutations object with sizeHint reserved.G func NewPlainMutations(sizeHint int) PlainMutations { return PlainMutations{ ops: make([]kvrpcpb.Op, 0, sizeHint), @@ -1868,6 +1869,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { logutil.Logger(ctx).Debug("1PC protocol is used to commit this txn", zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), zap.Uint64("session", c.sessionID)) + c.updateStoreCommitInfo() return nil } @@ -1903,6 +1905,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } } atomic.StoreUint64(&c.commitTS, commitTS) + c.updateStoreCommitInfo() if c.store.GetOracle().IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) { err = errors.Errorf("session %d txn takes too much time, txnStartTS: %d, comm: %d", @@ -2372,3 +2375,32 @@ func (c *twoPhaseCommitter) mutationsOfKeys(keys [][]byte) CommitterMutations { } return &res } + +func (c *twoPhaseCommitter) getCommitInfo() *util.CommitInfo { + var txnType string + if c.isAsyncCommit() { + txnType = "async" + } else if c.isOnePC() { + txnType = "1pc" + } else { + txnType = "2pc" + } + return &util.CommitInfo{ + TxnType: txnType, + StartTS: c.startTS, + CommitTS: atomic.LoadUint64(&c.commitTS), + MutationLen: c.mutations.Len(), + TxnSize: c.txnSize, + } +} + +// updateStoreCommitInfo sets the commit info for the store. +func (c *twoPhaseCommitter) updateStoreCommitInfo() { + c.store.SetLastCommitInfo(c.txn.scope, c.getCommitInfo()) +} + +type storeCommitInfo interface { + SetLastCommitInfo(string, *util.CommitInfo) + // GetLastCommitInfo get the last committed transaction's information. + GetLastCommitInfo(string) *util.CommitInfo +} diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index d3b049f9cf..8fe8f6fef1 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" + "github.com/tikv/client-go/v2/util" ) // TxnProbe wraps a txn and exports internal states for testing purpose. @@ -379,6 +380,11 @@ func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []by c.resolveFlushedLocks(bo, start, end, commit) } +// GetCommitInfo expose CommitInfo of committer for testing purpose. +func (c CommitterProbe) GetCommitInfo() *util.CommitInfo { + return c.getCommitInfo() +} + // SendTxnHeartBeat renews a txn's ttl. func SendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) { return sendTxnHeartBeat(bo, store, primary, startTS, ttl, 0) diff --git a/util/commit_ts_verify.go b/util/commit_ts_verify.go new file mode 100644 index 0000000000..cb9e731fa2 --- /dev/null +++ b/util/commit_ts_verify.go @@ -0,0 +1,86 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "fmt" + "sync" +) + +const GlobalTxnScope = "global" + +// CommitInfo stores the information of a COMMITTED transaction. +type CommitInfo struct { + TxnType string + StartTS uint64 + CommitTS uint64 + MutationLen int + TxnSize int +} + +// String returns the string representation of CommitInfo. +func (c *CommitInfo) String() string { + return fmt.Sprintf("TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d", + c.TxnType, c.StartTS, c.CommitTS, c.MutationLen, c.TxnSize) +} + +// Verify checks validation of this commit information from the given ts. +func (c *CommitInfo) Verify(ts uint64) { + if ts < c.CommitTS || ts <= c.StartTS { + panic(fmt.Sprintf("ts: %d, lastCommit: %s", ts, c.String())) + } +} + +// TSVerifier is used to verify the commit ts. +type TSVerifier struct { + scope2commitInfo sync.Map +} + +// NewTSVerifier creates a new TSVerifier. +func NewTSVerifier() *TSVerifier { + return &TSVerifier{} +} + +// StoreCommitInfo stores the commit information of a transaction. +func (t *TSVerifier) StoreCommitInfo(txnScope string, commitInfo *CommitInfo) { + if txnScope == "" { + txnScope = GlobalTxnScope + } + for { + old, loaded := t.scope2commitInfo.LoadOrStore(txnScope, commitInfo) + if !loaded { + return + } + oldCommitInfo := old.(*CommitInfo) + if oldCommitInfo.CommitTS >= commitInfo.CommitTS { + return + } + if t.scope2commitInfo.CompareAndSwap(txnScope, oldCommitInfo, commitInfo) { + return + } + } +} + +// GetLastCommitInfo gets the last commit information of a transaction. +func (t *TSVerifier) GetLastCommitInfo(txnScope string) *CommitInfo { + if txnScope == "" { + txnScope = GlobalTxnScope + } + commitInfo, ok := t.scope2commitInfo.Load(txnScope) + if !ok { + return nil + } + return commitInfo.(*CommitInfo) +} From fdaf4f20f85f36f902505b783516d79330e62737 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 14 Jan 2025 14:16:16 +0900 Subject: [PATCH 2/5] remove txn scope Signed-off-by: you06 --- oracle/oracles/pd.go | 4 ++-- tikv/kv.go | 8 ++++---- txnkv/transaction/2pc.go | 17 +++++++++-------- util/commit_ts_verify.go | 41 +++++++++++++--------------------------- 4 files changed, 28 insertions(+), 42 deletions(-) diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 068d4e1e80..4d5798af79 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -259,7 +259,7 @@ func (f *tsFuture) Wait() (uint64, error) { func (o *pdOracle) GetTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future { var commitInfo *util.CommitInfo if o.tsVerifier != nil { - commitInfo = o.tsVerifier.GetLastCommitInfo(opt.TxnScope) + commitInfo = o.tsVerifier.GetLastCommitInfo() } return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope, commitInfo} } @@ -268,7 +268,7 @@ func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, e now := time.Now() var commitInfo *util.CommitInfo if o.tsVerifier != nil { - commitInfo = o.tsVerifier.GetLastCommitInfo(txnScope) + commitInfo = o.tsVerifier.GetLastCommitInfo() } physical, logical, err := o.c.GetTS(ctx) if err != nil { diff --git a/tikv/kv.go b/tikv/kv.go index 2a8144bc79..6b234d92ac 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -865,13 +865,13 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { } // SetLastCommitInfo sets the last committed transaction's information. -func (s *KVStore) SetLastCommitInfo(txnScope string, ci *util.CommitInfo) { - s.tsVerifier.StoreCommitInfo(txnScope, ci) +func (s *KVStore) SetLastCommitInfo(ci *util.CommitInfo) { + s.tsVerifier.SetLastCommitInfo(ci) } // GetLastCommitInfo get the last committed transaction's information. -func (s *KVStore) GetLastCommitInfo(txnScope string) *util.CommitInfo { - return s.tsVerifier.GetLastCommitInfo(txnScope) +func (s *KVStore) GetLastCommitInfo() *util.CommitInfo { + return s.tsVerifier.GetLastCommitInfo() } func isValidSafeTS(ts uint64) bool { diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 492c9b1f65..4cb9f8ea37 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -344,7 +344,7 @@ type PlainMutations struct { flags []CommitterMutationFlags } -// NewPlainMutations creates a PlainMutations object with sizeHint reserved.G +// NewPlainMutations creates a PlainMutations object with sizeHint reserved. func NewPlainMutations(sizeHint int) PlainMutations { return PlainMutations{ ops: make([]kvrpcpb.Op, 0, sizeHint), @@ -2376,6 +2376,11 @@ func (c *twoPhaseCommitter) mutationsOfKeys(keys [][]byte) CommitterMutations { return &res } +// updateStoreCommitInfo sets the commit info for the store. +func (c *twoPhaseCommitter) updateStoreCommitInfo() { + c.store.SetLastCommitInfo(c.getCommitInfo()) +} + func (c *twoPhaseCommitter) getCommitInfo() *util.CommitInfo { var txnType string if c.isAsyncCommit() { @@ -2391,16 +2396,12 @@ func (c *twoPhaseCommitter) getCommitInfo() *util.CommitInfo { CommitTS: atomic.LoadUint64(&c.commitTS), MutationLen: c.mutations.Len(), TxnSize: c.txnSize, + Primary: c.primary(), } } -// updateStoreCommitInfo sets the commit info for the store. -func (c *twoPhaseCommitter) updateStoreCommitInfo() { - c.store.SetLastCommitInfo(c.txn.scope, c.getCommitInfo()) -} - type storeCommitInfo interface { - SetLastCommitInfo(string, *util.CommitInfo) + SetLastCommitInfo(*util.CommitInfo) // GetLastCommitInfo get the last committed transaction's information. - GetLastCommitInfo(string) *util.CommitInfo + GetLastCommitInfo() *util.CommitInfo } diff --git a/util/commit_ts_verify.go b/util/commit_ts_verify.go index cb9e731fa2..5c5337f3d9 100644 --- a/util/commit_ts_verify.go +++ b/util/commit_ts_verify.go @@ -1,4 +1,4 @@ -// Copyright 2024 TiKV Authors +// Copyright 2025 TiKV Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,11 +16,9 @@ package util import ( "fmt" - "sync" + "sync/atomic" ) -const GlobalTxnScope = "global" - // CommitInfo stores the information of a COMMITTED transaction. type CommitInfo struct { TxnType string @@ -28,12 +26,13 @@ type CommitInfo struct { CommitTS uint64 MutationLen int TxnSize int + Primary []byte } // String returns the string representation of CommitInfo. func (c *CommitInfo) String() string { - return fmt.Sprintf("TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d", - c.TxnType, c.StartTS, c.CommitTS, c.MutationLen, c.TxnSize) + return fmt.Sprintf("TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d, Primary: %v", + c.TxnType, c.StartTS, c.CommitTS, c.MutationLen, c.TxnSize, c.Primary) } // Verify checks validation of this commit information from the given ts. @@ -45,7 +44,7 @@ func (c *CommitInfo) Verify(ts uint64) { // TSVerifier is used to verify the commit ts. type TSVerifier struct { - scope2commitInfo sync.Map + lastCommitInfo atomic.Pointer[CommitInfo] } // NewTSVerifier creates a new TSVerifier. @@ -53,34 +52,20 @@ func NewTSVerifier() *TSVerifier { return &TSVerifier{} } -// StoreCommitInfo stores the commit information of a transaction. -func (t *TSVerifier) StoreCommitInfo(txnScope string, commitInfo *CommitInfo) { - if txnScope == "" { - txnScope = GlobalTxnScope - } +// SetLastCommitInfo stores the commit information of a transaction. +func (t *TSVerifier) SetLastCommitInfo(commitInfo *CommitInfo) { for { - old, loaded := t.scope2commitInfo.LoadOrStore(txnScope, commitInfo) - if !loaded { - return - } - oldCommitInfo := old.(*CommitInfo) - if oldCommitInfo.CommitTS >= commitInfo.CommitTS { + last := t.lastCommitInfo.Load() + if last != nil && commitInfo.CommitTS <= last.CommitTS { return } - if t.scope2commitInfo.CompareAndSwap(txnScope, oldCommitInfo, commitInfo) { + if t.lastCommitInfo.CompareAndSwap(last, commitInfo) { return } } } // GetLastCommitInfo gets the last commit information of a transaction. -func (t *TSVerifier) GetLastCommitInfo(txnScope string) *CommitInfo { - if txnScope == "" { - txnScope = GlobalTxnScope - } - commitInfo, ok := t.scope2commitInfo.Load(txnScope) - if !ok { - return nil - } - return commitInfo.(*CommitInfo) +func (t *TSVerifier) GetLastCommitInfo() *CommitInfo { + return t.lastCommitInfo.Load() } From c713e316698652b6a33947a7a61ec551cfb913b9 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 14 Jan 2025 14:49:06 +0900 Subject: [PATCH 3/5] test panic message Signed-off-by: you06 --- integration_tests/store_test.go | 16 ++++++++++++++-- util/commit_ts_verify.go | 2 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/integration_tests/store_test.go b/integration_tests/store_test.go index ce70fc33df..f46d2f8961 100644 --- a/integration_tests/store_test.go +++ b/integration_tests/store_test.go @@ -36,6 +36,7 @@ package tikv_test import ( "context" + "fmt" "sync" "testing" "time" @@ -217,14 +218,25 @@ func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StorePro } commitInfo1 := doTxn() - require.Equal(commitInfo1, store.GetLastCommitInfo("")) + require.Equal(commitInfo1, store.GetLastCommitInfo()) require.Equal(commitInfo1.MutationLen, 1) require.Equal(commitInfo1.TxnSize, 8) require.Equal(commitInfo1.TxnType, mode) commitInfo2 := doTxn() - lastInfo := store.GetLastCommitInfo("") + lastInfo := store.GetLastCommitInfo() require.NotEqual(commitInfo1, lastInfo) require.Equal(commitInfo2, lastInfo) require.Greater(lastInfo.CommitTS, commitInfo1.CommitTS) require.GreaterOrEqual(lastInfo.StartTS, commitInfo1.CommitTS) + + errMsg := fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: 1, TxnSize: 8, Primary: [107 101 121]", + lastInfo.StartTS, mode, lastInfo.StartTS, lastInfo.CommitTS) + require.PanicsWithValue(errMsg, func() { + lastInfo.Verify(lastInfo.StartTS) + }) + errMsg = fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: 1, TxnSize: 8, Primary: [107 101 121]", + lastInfo.CommitTS-1, mode, lastInfo.StartTS, lastInfo.CommitTS) + require.PanicsWithValue(errMsg, func() { + lastInfo.Verify(lastInfo.CommitTS - 1) + }) } diff --git a/util/commit_ts_verify.go b/util/commit_ts_verify.go index 5c5337f3d9..78a6089ccd 100644 --- a/util/commit_ts_verify.go +++ b/util/commit_ts_verify.go @@ -38,7 +38,7 @@ func (c *CommitInfo) String() string { // Verify checks validation of this commit information from the given ts. func (c *CommitInfo) Verify(ts uint64) { if ts < c.CommitTS || ts <= c.StartTS { - panic(fmt.Sprintf("ts: %d, lastCommit: %s", ts, c.String())) + panic(fmt.Sprintf("Verified ts: %d, LastCommit: %s", ts, c.String())) } } From bc4949d7d8c5f8f12e0d808faa97b7ab4b07902d Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 14 Jan 2025 15:02:29 +0900 Subject: [PATCH 4/5] fix test that commit empty txn Signed-off-by: you06 --- txnkv/transaction/2pc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 4cb9f8ea37..cacc377b81 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -2396,7 +2396,7 @@ func (c *twoPhaseCommitter) getCommitInfo() *util.CommitInfo { CommitTS: atomic.LoadUint64(&c.commitTS), MutationLen: c.mutations.Len(), TxnSize: c.txnSize, - Primary: c.primary(), + Primary: c.primaryKey, } } From 974a320b424d49d6f1cbf179199714d4ae69a11c Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 28 Jan 2025 16:19:38 +0900 Subject: [PATCH 5/5] address comment & add test for pipelined txn Signed-off-by: you06 --- integration_tests/2pc_test.go | 1 + integration_tests/store_test.go | 26 ++++++++++++------ internal/mockstore/mocktikv/rpc.go | 41 ++++++++++++++++++++++++++++ oracle/oracles/pd.go | 10 +++---- txnkv/transaction/2pc.go | 15 ++++++++-- txnkv/transaction/pipelined_flush.go | 1 + 6 files changed, 78 insertions(+), 16 deletions(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 82511c66dd..5617607218 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -2772,4 +2772,5 @@ func (s *testCommitterSuite) Test2PCCleanupLifecycleHooks() { func (s *testCommitterSuite) Test2PCUpdateLatestCommitInf() { testUpdateLatestCommitInfo(s.Require(), s.store, "2pc") + testUpdateLatestCommitInfo(s.Require(), s.store, "pipelined") } diff --git a/integration_tests/store_test.go b/integration_tests/store_test.go index f46d2f8961..a2628f0fcd 100644 --- a/integration_tests/store_test.go +++ b/integration_tests/store_test.go @@ -198,14 +198,18 @@ func (s *testStoreSuite) TestFailBusyServerKV() { func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StoreProbe, mode string) { doTxn := func() *util.CommitInfo { - txn, err := store.Begin() + var ops []tikv.TxnOption + if mode == "pipelined" { + ops = append(ops, tikv.WithPipelinedMemDB()) + } + txn, err := store.Begin(ops...) require.Nil(err) switch mode { case "async": txn.SetEnableAsyncCommit(true) case "1pc": txn.SetEnable1PC(true) - case "2pc": + case "2pc", "pipelined": // do nothing default: require.FailNow("unknown mode:" + mode) @@ -217,10 +221,16 @@ func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StorePro return txn.GetCommitter().GetCommitInfo() } + txnSize := 8 + mutationLen := 1 + if mode == "pipelined" { + txnSize = 0 + mutationLen = 0 + } commitInfo1 := doTxn() require.Equal(commitInfo1, store.GetLastCommitInfo()) - require.Equal(commitInfo1.MutationLen, 1) - require.Equal(commitInfo1.TxnSize, 8) + require.Equal(commitInfo1.MutationLen, mutationLen) + require.Equal(commitInfo1.TxnSize, txnSize) require.Equal(commitInfo1.TxnType, mode) commitInfo2 := doTxn() lastInfo := store.GetLastCommitInfo() @@ -229,13 +239,13 @@ func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StorePro require.Greater(lastInfo.CommitTS, commitInfo1.CommitTS) require.GreaterOrEqual(lastInfo.StartTS, commitInfo1.CommitTS) - errMsg := fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: 1, TxnSize: 8, Primary: [107 101 121]", - lastInfo.StartTS, mode, lastInfo.StartTS, lastInfo.CommitTS) + errMsg := fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d, Primary: [107 101 121]", + lastInfo.StartTS, mode, lastInfo.StartTS, lastInfo.CommitTS, mutationLen, txnSize) require.PanicsWithValue(errMsg, func() { lastInfo.Verify(lastInfo.StartTS) }) - errMsg = fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: 1, TxnSize: 8, Primary: [107 101 121]", - lastInfo.CommitTS-1, mode, lastInfo.StartTS, lastInfo.CommitTS) + errMsg = fmt.Sprintf("Verified ts: %d, LastCommit: TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d, Primary: [107 101 121]", + lastInfo.CommitTS-1, mode, lastInfo.StartTS, lastInfo.CommitTS, mutationLen, txnSize) require.PanicsWithValue(errMsg, func() { lastInfo.Verify(lastInfo.CommitTS - 1) }) diff --git a/internal/mockstore/mocktikv/rpc.go b/internal/mockstore/mocktikv/rpc.go index 1e1ae53419..dbcda642b2 100644 --- a/internal/mockstore/mocktikv/rpc.go +++ b/internal/mockstore/mocktikv/rpc.go @@ -661,6 +661,40 @@ func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.S return resp } +func (h kvHandler) handleKvFlush(req *kvrpcpb.FlushRequest) *kvrpcpb.FlushResponse { + regionID := req.Context.RegionId + prewriteReq := &kvrpcpb.PrewriteRequest{ + Context: req.Context, + Mutations: req.Mutations, + PrimaryLock: req.PrimaryKey, + StartVersion: req.StartTs, + MinCommitTs: req.MinCommitTs, + LockTtl: req.LockTtl, + AssertionLevel: req.AssertionLevel, + } + + h.cluster.handleDelay(prewriteReq.StartVersion, regionID) + + for _, m := range req.Mutations { + if !h.checkKeyInRegion(m.Key) { + panic("KvPrewrite: key not in region") + } + } + errs := h.mvccStore.Prewrite(prewriteReq) + for i, e := range errs { + if e != nil { + if _, isLocked := errors.Cause(e).(*ErrLocked); !isLocked { + // Keep only one error if it's not a KeyIsLocked error. + errs = errs[i : i+1] + break + } + } + } + return &kvrpcpb.FlushResponse{ + Errors: convertToKeyErrors(errs), + } +} + // Client is a client that sends RPC. // This is same with tikv.Client, define again for avoid circle import. type Client interface { @@ -1070,6 +1104,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R Name: "mvcc.num_rows", Value: strconv.Itoa(len(scanResp.Pairs)), }}} + case tikvrpc.CmdFlush: + r := req.Flush() + if err := session.checkRequest(reqCtx, r.Size()); err != nil { + resp.Resp = &kvrpcpb.PrewriteResponse{RegionError: err} + return resp, nil + } + resp.Resp = kvHandler{session}.handleKvFlush(r) default: return nil, errors.Errorf("unsupported this request type %v", req.Type) } diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 4d5798af79..a9597e1d1c 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -235,9 +235,9 @@ func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, err type tsFuture struct { tso.TSFuture - o *pdOracle - txnScope string - commitInfo *util.CommitInfo + o *pdOracle + txnScope string + lastCommitInfo *util.CommitInfo } // Wait implements the oracle.Future interface. @@ -249,8 +249,8 @@ func (f *tsFuture) Wait() (uint64, error) { return 0, errors.WithStack(err) } ts := oracle.ComposeTS(physical, logical) - if f.commitInfo != nil { - f.commitInfo.Verify(ts) + if f.lastCommitInfo != nil { + f.lastCommitInfo.Verify(ts) } f.o.setLastTS(ts, f.txnScope) return ts, nil diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index cacc377b81..4fbcefd5e4 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1013,6 +1013,9 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action // Already spawned a goroutine for async commit transaction. if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() { + if !c.txn.IsPipelined() { + c.updateStoreCommitInfo() + } secondaryBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars) if c.store.IsClose() { logutil.Logger(bo.GetCtx()).Warn("the store is closed", @@ -1905,7 +1908,6 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } } atomic.StoreUint64(&c.commitTS, commitTS) - c.updateStoreCommitInfo() if c.store.GetOracle().IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) { err = errors.Errorf("session %d txn takes too much time, txnStartTS: %d, comm: %d", @@ -1958,6 +1960,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if c.isAsyncCommit() { // For async commit protocol, the commit is considered success here. c.txn.commitTS = c.commitTS + c.updateStoreCommitInfo() logutil.Logger(ctx).Debug("2PC will use async commit protocol to commit this txn", zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), zap.Uint64("sessionID", c.sessionID)) @@ -2383,18 +2386,24 @@ func (c *twoPhaseCommitter) updateStoreCommitInfo() { func (c *twoPhaseCommitter) getCommitInfo() *util.CommitInfo { var txnType string - if c.isAsyncCommit() { + if c.txn.isPipelined { + txnType = "pipelined" + } else if c.isAsyncCommit() { txnType = "async" } else if c.isOnePC() { txnType = "1pc" } else { txnType = "2pc" } + var mutationLen int + if !c.txn.isPipelined { + mutationLen = c.mutations.Len() + } return &util.CommitInfo{ TxnType: txnType, StartTS: c.startTS, CommitTS: atomic.LoadUint64(&c.commitTS), - MutationLen: c.mutations.Len(), + MutationLen: mutationLen, TxnSize: c.txnSize, Primary: c.primaryKey, } diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index c745e538a5..bad13de021 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -338,6 +338,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", commitTS), ) + c.updateStoreCommitInfo() broadcastToAllStores( c.txn, c.store,