Skip to content

Commit

Permalink
roachtest: remove old cdc poller tests
Browse files Browse the repository at this point in the history
Before 19.1, CDC could use either rangefeeds or a poller. The poller is
since gone. We used to run CDC tests on 19.1+ with rangefeeds, and on
2.1 with the poller. I don't know if we still run nightlies for 2.1 btw.
This historical artifact was affecting the names of the tests, which had
a confusing "rangefeeds=true" suffix. This patch makes the tests not run
on 2.1, and removes the code around poller vs feeds. The test names drop
the respective suffix.

Release note: None
  • Loading branch information
andreimatei committed Aug 6, 2020
1 parent 557f774 commit 8a81eac
Showing 1 changed file with 11 additions and 37 deletions.
48 changes: 11 additions & 37 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
"github.com/codahale/hdrhistogram"
)
Expand All @@ -43,7 +42,6 @@ type cdcTestArgs struct {
tpccWarehouseCount int
workloadDuration string
initialScan bool
rangefeed bool
kafkaChaos bool
crdbChaos bool
cloudStorageSink bool
Expand All @@ -55,12 +53,6 @@ type cdcTestArgs struct {
}

func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) {
// Skip the poller test on v19.2. After 19.2 is out, we should likely delete
// the test entirely.
if !args.rangefeed && t.buildVersion.Compare(version.MustParse(`v19.1.0-0`)) > 0 {
t.Skip("no poller in >= v19.2.0", "")
}

crdbNodes := c.Range(1, c.spec.NodeCount-1)
workloadNode := c.Node(c.spec.NodeCount)
kafkaNode := c.Node(c.spec.NodeCount)
Expand All @@ -70,16 +62,10 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) {

db := c.Conn(ctx, 1)
defer stopFeeds(db)
if _, err := db.Exec(
`SET CLUSTER SETTING kv.rangefeed.enabled = $1`, args.rangefeed,
); err != nil {
if _, err := db.Exec(`SET CLUSTER SETTING kv.rangefeed.enabled = true`); err != nil {
t.Fatal(err)
}
// The 2.1 branch doesn't have this cluster setting, so ignore the error if
// it's about an unknown cluster setting
if _, err := db.Exec(
`SET CLUSTER SETTING changefeed.push.enabled = $1`, args.rangefeed,
); err != nil && !strings.Contains(err.Error(), "unknown cluster setting") {
if _, err := db.Exec(`SET CLUSTER SETTING changefeed.push.enabled = true`); err != nil {
t.Fatal(err)
}
kafka := kafkaManager{
Expand Down Expand Up @@ -499,31 +485,25 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) {
}

func registerCDC(r *testRegistry) {
useRangeFeed := true
if r.buildVersion.Compare(version.MustParse(`v19.1.0-0`)) < 0 {
// RangeFeed is not production ready in 2.1, so run the tests with the
// poller.
useRangeFeed = false
}

r.Add(testSpec{
Name: fmt.Sprintf("cdc/tpcc-1000/rangefeed=%t", useRangeFeed),
Owner: OwnerCDC,
MinVersion: "v2.1.0",
Name: fmt.Sprintf("cdc/tpcc-1000"),
Owner: OwnerCDC,
// RangeFeed is not production ready in 2.1; we could run the tests with the
// old poller, but it's not worth it.
MinVersion: "v19.1.0",
Cluster: makeClusterSpec(4, cpu(16)),
Run: func(ctx context.Context, t *test, c *cluster) {
cdcBasicTest(ctx, t, c, cdcTestArgs{
workloadType: tpccWorkloadType,
tpccWarehouseCount: 1000,
workloadDuration: "120m",
rangefeed: useRangeFeed,
targetInitialScanLatency: 3 * time.Minute,
targetSteadyLatency: 10 * time.Minute,
})
},
})
r.Add(testSpec{
Name: fmt.Sprintf("cdc/initial-scan/rangefeed=%t", useRangeFeed),
Name: fmt.Sprintf("cdc/initial-scan"),
Owner: OwnerCDC,
MinVersion: "v2.1.0",
Cluster: makeClusterSpec(4, cpu(16)),
Expand All @@ -533,7 +513,6 @@ func registerCDC(r *testRegistry) {
tpccWarehouseCount: 100,
workloadDuration: "30m",
initialScan: true,
rangefeed: useRangeFeed,
targetInitialScanLatency: 30 * time.Minute,
targetSteadyLatency: time.Minute,
})
Expand All @@ -552,14 +531,13 @@ func registerCDC(r *testRegistry) {
workloadType: tpccWorkloadType,
tpccWarehouseCount: 1000,
workloadDuration: "30m",
rangefeed: false,
targetInitialScanLatency: 30 * time.Minute,
targetSteadyLatency: 2 * time.Minute,
})
},
})
r.Add(testSpec{
Name: fmt.Sprintf("cdc/sink-chaos/rangefeed=%t", useRangeFeed),
Name: fmt.Sprintf("cdc/sink-chaos"),
Owner: `cdc`,
// TODO(dan): Re-enable this test on 2.1 if we decide to backport #36852.
MinVersion: "v19.1.0",
Expand All @@ -569,15 +547,14 @@ func registerCDC(r *testRegistry) {
workloadType: tpccWorkloadType,
tpccWarehouseCount: 100,
workloadDuration: "30m",
rangefeed: useRangeFeed,
kafkaChaos: true,
targetInitialScanLatency: 3 * time.Minute,
targetSteadyLatency: 5 * time.Minute,
})
},
})
r.Add(testSpec{
Name: fmt.Sprintf("cdc/crdb-chaos/rangefeed=%t", useRangeFeed),
Name: fmt.Sprintf("cdc/crdb-chaos"),
Owner: `cdc`,
Skip: "#37716",
// TODO(dan): Re-enable this test on 2.1 if we decide to backport #36852.
Expand All @@ -588,7 +565,6 @@ func registerCDC(r *testRegistry) {
workloadType: tpccWorkloadType,
tpccWarehouseCount: 100,
workloadDuration: "30m",
rangefeed: useRangeFeed,
crdbChaos: true,
targetInitialScanLatency: 3 * time.Minute,
// TODO(dan): It should be okay to drop this as low as 2 to 3 minutes,
Expand All @@ -600,7 +576,7 @@ func registerCDC(r *testRegistry) {
},
})
r.Add(testSpec{
Name: fmt.Sprintf("cdc/ledger/rangefeed=%t", useRangeFeed),
Name: fmt.Sprintf("cdc/ledger"),
Owner: `cdc`,
MinVersion: "v2.1.0",
// TODO(mrtracy): This workload is designed to be running on a 20CPU nodes,
Expand All @@ -612,7 +588,6 @@ func registerCDC(r *testRegistry) {
workloadType: ledgerWorkloadType,
workloadDuration: "30m",
initialScan: true,
rangefeed: useRangeFeed,
targetInitialScanLatency: 10 * time.Minute,
targetSteadyLatency: time.Minute,
targetTxnPerSecond: 575,
Expand All @@ -634,7 +609,6 @@ func registerCDC(r *testRegistry) {
tpccWarehouseCount: 50,
workloadDuration: "30m",
initialScan: true,
rangefeed: true,
cloudStorageSink: true,
fixturesImport: true,
targetInitialScanLatency: 30 * time.Minute,
Expand Down

0 comments on commit 8a81eac

Please sign in to comment.