-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathadmission_control_elastic_cdc.go
147 lines (129 loc) · 5.31 KB
/
admission_control_elastic_cdc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package tests
import (
"context"
"fmt"
"time"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
)
// This test sets up a 3-node CRDB cluster on 8vCPU machines running
// 1000-warehouse TPC-C, and kicks off a few changefeed backfills concurrently.
// We've observed latency spikes during backfills because of its CPU/scan-heavy
// nature -- it can elevate CPU scheduling latencies which in turn translates to
// an increase in foreground latency.
func registerElasticControlForCDC(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "admission-control/elastic-cdc",
Owner: registry.OwnerAdmissionControl,
// TODO(irfansharif): After two weeks of nightly baking time, reduce
// this to a weekly cadence. This is a long-running test and serves only
// as a coarse-grained benchmark.
// Tags: []string{`weekly`},
Cluster: r.MakeClusterSpec(4, spec.CPU(8)),
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().NodeCount < 4 {
t.Fatalf("expected at least 4 nodes, found %d", c.Spec().NodeCount)
}
crdbNodes := c.Spec().NodeCount - 1
workloadNode := crdbNodes + 1
numWarehouses, workloadDuration, estimatedSetupTime := 1000, 60*time.Minute, 10*time.Minute
if c.IsLocal() {
numWarehouses, workloadDuration, estimatedSetupTime = 1, time.Minute, 2*time.Minute
}
promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]).
WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithGrafanaDashboard("http://go.crdb.dev/p/changefeed-admission-control-grafana").
WithScrapeConfigs(
prometheus.MakeWorkloadScrapeConfig("workload", "/",
makeWorkloadScrapeNodes(
c.Node(workloadNode).InstallNodes()[0],
[]workloadInstance{{nodes: c.Node(workloadNode)}},
),
),
)
if t.SkipInit() {
t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, time.Minute))
} else {
t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, 10*time.Minute))
}
padDuration, err := time.ParseDuration(ifLocal(c, "5s", "5m"))
if err != nil {
t.Fatal(err)
}
stopFeedsDuration, err := time.ParseDuration(ifLocal(c, "5s", "1m"))
if err != nil {
t.Fatal(err)
}
runTPCC(ctx, t, c, tpccOptions{
Warehouses: numWarehouses,
Duration: workloadDuration,
SetupType: usingImport,
EstimatedSetupTime: estimatedSetupTime,
SkipPostRunCheck: true,
ExtraSetupArgs: "--checks=false",
PrometheusConfig: promCfg,
During: func(ctx context.Context) error {
db := c.Conn(ctx, t.L(), crdbNodes)
defer db.Close()
t.Status(fmt.Sprintf("configuring cluster (<%s)", 30*time.Second))
{
setAdmissionControl(ctx, t, c, true)
// Changefeeds depend on rangefeeds being enabled.
if _, err := db.Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil {
return err
}
}
stopFeeds(db) // stop stray feeds (from repeated runs against the same cluster for ex.)
defer stopFeeds(db)
m := c.NewMonitor(ctx, c.Range(1, crdbNodes))
m.Go(func(ctx context.Context) error {
const iters, changefeeds = 5, 10
for i := 0; i < iters; i++ {
if i == 0 {
t.Status(fmt.Sprintf("setting performance baseline (<%s)", padDuration))
}
time.Sleep(padDuration) // each iteration lasts long enough to observe effects in metrics
t.Status(fmt.Sprintf("during: round %d: stopping extant changefeeds (<%s)", i, stopFeedsDuration))
stopFeeds(db)
time.Sleep(stopFeedsDuration) // buffer for cancellations to take effect/show up in metrics
t.Status(fmt.Sprintf("during: round %d: creating %d changefeeds (<%s)", i, changefeeds, time.Minute))
for j := 0; j < changefeeds; j++ {
stmtWithCursor := fmt.Sprintf(`
CREATE CHANGEFEED FOR tpcc.order_line, tpcc.stock, tpcc.customer
INTO 'null://' WITH cursor = '-%ds'
`, int64(float64(i+1)*padDuration.Seconds())) // scanning as far back as possible (~ when the workload started)
if _, err := db.ExecContext(ctx, stmtWithCursor); err != nil {
return err
}
}
// TODO(irfansharif): Add a version of this test
// with initial_scan = 'only' to demonstrate the
// need+efficacy of using elastic CPU control in
// changefeed workers. That too has a severe effect
// on scheduling latencies.
}
return nil
})
t.Status(fmt.Sprintf("waiting for workload to finish (<%s)", workloadDuration))
m.Wait()
return nil
},
})
},
})
}