Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add closedts side-transport consumer #61305

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func TestClosedTimestampWorksWhenRequestsAreSentToNonLeaseHolders(t *testing.T)
const closeInterval = 10 * time.Millisecond
sqlRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '"+
closeInterval.String()+"'")
sqlRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '"+
closeInterval.String()+"'")

// To make node3 have a large epoch, synthesize a liveness record for with
// epoch 1000 before starting the node.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ func TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases(t *testing.T) {
closedTimestampDuration.String())
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1",
closedTimestampFraction)
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = $1",
closedTimestampDuration.String())

// Let's get to a point where we know that we have an expiration based lease
// with a start time more than some time ago and then we have a max closed
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/closedts/closedts.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// IssueTrackingRemovalOfOldClosedTimestampsCode is the Github issue tracking
// the deletion of the "old" closed timestamps code (i.e. everything around
// here) in 21.2, now that 21.1 has a new Raft-based closed-timestamps
// mechanism. The old mechanism is disabled when the cluster version is
// sufficiently high, and all the tests failing because of it are skipped with
// this issue.
const IssueTrackingRemovalOfOldClosedTimestampsCode = 61299

// ReleaseFunc is a closure returned from Track which is used to record the
// LeaseAppliedIndex (LAI) given to a tracked proposal. The supplied epoch must
// match that of the lease under which the proposal was proposed.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/closedts/container/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_test(
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/stop",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/closedts/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -129,6 +130,7 @@ func setupTwoNodeTest() (_ *TestContainer, _ *TestContainer, shutdown func()) {

func TestTwoNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, closedts.IssueTrackingRemovalOfOldClosedTimestampsCode)

ctx := context.Background()

Expand Down
17 changes: 9 additions & 8 deletions pkg/kv/kvserver/closedts/ctpb/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/kv/kvserver/closedts/ctpb/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ message Update {
}
repeated GroupUpdate closed_timestamps = 4 [(gogoproto.nullable) = false];

// removed contains the set of ranges that are no longer registered on the
// stream and who future updates are no longer applicable to.
// removed contains the set of ranges that are no longer tracked on this
// stream. The closed timestamps in this message and future messages no longer
// apply to these removed ranges.
//
// The field will be empty if snapshot is true, as a snapshot message implies
// that all ranges not present in the snapshot's added_or_updated list are no
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/closedts/provider/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/provider",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/roachpb",
Expand All @@ -31,6 +32,7 @@ go_test(
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/closedts/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -129,6 +130,12 @@ func (p *Provider) runCloser(ctx context.Context) {
t := timeutil.NewTimer()
defer t.Stop()
for {
// If the "new" closed timestamps mechanism is enabled, we inhibit this old one.
if p.cfg.Settings.Version.IsActive(ctx, clusterversion.ClosedTimestampsRaftTransport) {
log.Infof(ctx, "disabling legacy closed-timestamp mechanism; the new one is enabled")
break
}

closeFraction := closedts.CloseFraction.Get(&p.cfg.Settings.SV)
targetDuration := float64(closedts.TargetDuration.Get(&p.cfg.Settings.SV))
if targetDuration > 0 {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/closedts/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -37,6 +38,7 @@ import (

func TestProviderSubscribeNotify(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, closedts.IssueTrackingRemovalOfOldClosedTimestampsCode)

ctx := context.Background()
stopper := stop.NewStopper()
Expand Down Expand Up @@ -293,6 +295,7 @@ func TestProviderSubscribeConcurrent(t *testing.T) {
// value re-enables it.
func TestProviderTargetDurationSetting(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, closedts.IssueTrackingRemovalOfOldClosedTimestampsCode)

st := cluster.MakeTestingClusterSettings()
closedts.TargetDuration.Override(&st.SV, time.Millisecond)
Expand Down
15 changes: 13 additions & 2 deletions pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "sidetransport",
srcs = ["sender.go"],
srcs = [
"receiver.go",
"sender.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
Expand All @@ -19,24 +23,31 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@org_golang_google_grpc//:go_default_library",
],
)

go_test(
name = "sidetransport_test",
srcs = ["sender_test.go"],
srcs = [
"receiver_test.go",
"sender_test.go",
],
embed = [":sidetransport"],
deps = [
"//pkg/base",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/settings/cluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
],
Expand Down
Loading