Skip to content

Commit

Permalink
tenantrate: fix test flake
Browse files Browse the repository at this point in the history
The way the test interacts with the configuration (through cluster
settings) is flaky - when we modify multiple settings and the head of
the queue gets notified, it might or might not observe the
intermediary state.

This commit cleans things up: the test now directly updates the config
rather than going through the cluster settings.

Fixes #63621.

Release note: None
  • Loading branch information
RaduBerinde committed May 24, 2021
1 parent 402c192 commit f6b620d
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 86 deletions.
8 changes: 2 additions & 6 deletions pkg/kv/kvserver/tenantrate/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,11 @@ go_library(
go_test(
name = "tenantrate_test",
size = "small",
srcs = [
"helpers_test.go",
"limiter_test.go",
],
srcs = ["limiter_test.go"],
data = glob(["testdata/**"]),
embed = [":tenantrate"],
deps = [
":tenantrate",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/util/leaktest",
Expand Down
24 changes: 14 additions & 10 deletions pkg/kv/kvserver/tenantrate/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ type TestingKnobs struct {

// LimiterFactory constructs and manages per-tenant Limiters.
type LimiterFactory struct {
settings *cluster.Settings
knobs TestingKnobs
metrics Metrics
systemLimiter systemLimiter
mu struct {
syncutil.RWMutex
limits Config
config Config
tenants map[roachpb.TenantID]*refCountedLimiter
}
}
Expand All @@ -46,19 +45,21 @@ type refCountedLimiter struct {
// NewLimiterFactory constructs a new LimiterFactory.
func NewLimiterFactory(st *cluster.Settings, knobs *TestingKnobs) *LimiterFactory {
rl := &LimiterFactory{
settings: st,
metrics: makeMetrics(),
metrics: makeMetrics(),
}
if knobs != nil {
rl.knobs = *knobs
}
rl.mu.tenants = make(map[roachpb.TenantID]*refCountedLimiter)
rl.mu.limits = ConfigFromSettings(st)
rl.mu.config = ConfigFromSettings(st)
rl.systemLimiter = systemLimiter{
tenantMetrics: rl.metrics.tenantMetrics(roachpb.SystemTenantID),
}
for _, setting := range configSettings {
setting.SetOnChange(&st.SV, rl.updateConfig)
setting.SetOnChange(&st.SV, func() {
config := ConfigFromSettings(st)
rl.UpdateConfig(config)
})
}
return rl
}
Expand Down Expand Up @@ -86,7 +87,7 @@ func (rl *LimiterFactory) GetTenant(tenantID roachpb.TenantID, closer <-chan str
options = append(options, quotapool.WithCloser(closer))
}
rcLim = new(refCountedLimiter)
rcLim.lim.init(rl, tenantID, rl.mu.limits, rl.metrics.tenantMetrics(tenantID), options...)
rcLim.lim.init(rl, tenantID, rl.mu.config, rl.metrics.tenantMetrics(tenantID), options...)
rl.mu.tenants[tenantID] = rcLim
}
rcLim.refCount++
Expand Down Expand Up @@ -114,12 +115,15 @@ func (rl *LimiterFactory) Release(lim Limiter) {
}
}

func (rl *LimiterFactory) updateConfig() {
// UpdateConfig changes the config of all limiters (existing and future).
// It is called automatically when a cluster setting is changed. It is also
// called by tests.
func (rl *LimiterFactory) UpdateConfig(config Config) {
rl.mu.Lock()
defer rl.mu.Unlock()
rl.mu.limits = ConfigFromSettings(rl.settings)
rl.mu.config = config
for _, rcLim := range rl.mu.tenants {
rcLim.lim.updateConfig(rl.mu.limits)
rcLim.lim.updateConfig(rl.mu.config)
}
}

Expand Down
60 changes: 0 additions & 60 deletions pkg/kv/kvserver/tenantrate/helpers_test.go

This file was deleted.

46 changes: 39 additions & 7 deletions pkg/kv/kvserver/tenantrate/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type testState struct {
m *metric.Registry
clock *timeutil.ManualTime
settings *cluster.Settings
config tenantrate.Config
}

type launchState struct {
Expand Down Expand Up @@ -144,11 +145,14 @@ func (ts *testState) init(t *testing.T, d *datadriven.TestData) string {
ts.tenants = make(map[roachpb.TenantID][]tenantrate.Limiter)
ts.clock = timeutil.NewManualTime(t0)
ts.settings = cluster.MakeTestingClusterSettings()
settings := parseSettings(t, d)
tenantrate.OverrideSettings(&ts.settings.SV, settings)
ts.config = tenantrate.DefaultConfig()

parseSettings(t, d, &ts.config)

ts.rl = tenantrate.NewLimiterFactory(ts.settings, &tenantrate.TestingKnobs{
TimeSource: ts.clock,
})
ts.rl.UpdateConfig(ts.config)
ts.m = metric.NewRegistry()
ts.m.AddMetricStruct(ts.rl.Metrics())
return ts.clock.Now().Format(timeFormat)
Expand All @@ -158,8 +162,8 @@ func (ts *testState) init(t *testing.T, d *datadriven.TestData) string {
// yaml object representing the limits and updates accordingly. It returns
// the current time. See init for more details as the semantics are the same.
func (ts *testState) updateSettings(t *testing.T, d *datadriven.TestData) string {
settings := parseSettings(t, d)
tenantrate.OverrideSettings(&ts.settings.SV, settings)
parseSettings(t, d, &ts.config)
ts.rl.UpdateConfig(ts.config)
return ts.formatTime()
}

Expand Down Expand Up @@ -579,12 +583,40 @@ func parseTenantIDs(t *testing.T, d *datadriven.TestData) []uint64 {
return tenantIDs
}

func parseSettings(t *testing.T, d *datadriven.TestData) tenantrate.SettingValues {
var vals tenantrate.SettingValues
// SettingValues is a struct that can be populated from test files, via YAML.
type SettingValues struct {
Rate float64
Burst float64

Read Factors
Write Factors
}

// Factors for reads and writes.
type Factors struct {
Base float64
PerByte float64
}

// parseSettings parses a SettingValues yaml and updates the given config.
// Missing (zero) values are ignored.
func parseSettings(t *testing.T, d *datadriven.TestData, config *tenantrate.Config) {
var vals SettingValues
if err := yaml.UnmarshalStrict([]byte(d.Input), &vals); err != nil {
d.Fatalf(t, "failed to unmarshal limits: %v", err)
}
return vals

override := func(dest *float64, val float64) {
if val != 0 {
*dest = val
}
}
override(&config.Rate, vals.Rate)
override(&config.Burst, vals.Burst)
override(&config.ReadRequestUnits, vals.Read.Base)
override(&config.ReadUnitsPerByte, vals.Read.PerByte)
override(&config.WriteRequestUnits, vals.Write.Base)
override(&config.WriteUnitsPerByte, vals.Write.PerByte)
}

func parseStrings(t *testing.T, d *datadriven.TestData) []string {
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/tenantrate/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,16 @@ func ConfigFromSettings(st *cluster.Settings) Config {
c.WriteUnitsPerByte = writeCostPerMB.Get(&st.SV) * perMBToPerByte
return c
}

// DefaultConfig returns the configuration that corresponds to the default
// setting values.
func DefaultConfig() Config {
return Config{
Rate: kvcuRateLimit.Default(),
Burst: kvcuRateLimit.Default() * kvcuBurstLimitSeconds.Default(),
ReadRequestUnits: readRequestCost.Default(),
ReadUnitsPerByte: readCostPerMB.Default() / (1024 * 1024),
WriteRequestUnits: writeRequestCost.Default(),
WriteUnitsPerByte: writeCostPerMB.Default() / (1024 * 1024),
}
}
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/tenantrate/testdata/update
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ timers
----
00:00:03.000

# Advance time by a second, at this point the limiter should have zero
# writebytes available.
# Advance time by a second, at this point the debt should be paid and the
# limiter should have zero units available.

advance
1s
----
00:00:01.000

# Update the settings to double the writebytes rate.
# Update the settings to double the rate but reduce the burst.

update_settings
rate: 4
Expand Down

0 comments on commit f6b620d

Please sign in to comment.