Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvnemesis: toggle global_reads attribute in zone configs #63747

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"applier.go",
"doc.go",
"engine.go",
"env.go",
"generator.go",
"kvnemesis.go",
"operations.go",
Expand All @@ -18,6 +19,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvnemesis",
visibility = ["//visibility:public"],
deps = [
"//pkg/config/zonepb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
Expand All @@ -30,15 +32,18 @@ go_library(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_cockroach_go//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//vfs",
"@org_golang_google_protobuf//proto",
],
)

Expand All @@ -57,7 +62,9 @@ go_test(
embed = [":kvnemesis"],
deps = [
"//pkg/base",
"//pkg/config/zonepb",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -76,6 +83,7 @@ go_test(
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//proto",
],
)

Expand Down
29 changes: 26 additions & 3 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"google.golang.org/protobuf/proto"
)

// Applier executes Steps.
type Applier struct {
env *Env
dbs []*kv.DB
mu struct {
dbIdx int
Expand All @@ -34,8 +37,9 @@ type Applier struct {
}

// MakeApplier constructs an Applier that executes against the given DBs.
func MakeApplier(dbs ...*kv.DB) *Applier {
func MakeApplier(env *Env, dbs ...*kv.DB) *Applier {
a := &Applier{
env: env,
dbs: dbs,
}
a.mu.txns = make(map[string]*kv.Txn)
Expand All @@ -56,7 +60,7 @@ func (a *Applier) Apply(ctx context.Context, step *Step) (retErr error) {
retErr = errors.Errorf(`panic applying step %s: %v`, step, p)
}
}()
applyOp(ctx, db, &step.Op)
applyOp(ctx, a.env, db, &step.Op)
return nil
}

Expand All @@ -68,7 +72,7 @@ func (a *Applier) getNextDBRoundRobin() (*kv.DB, int32) {
return a.dbs[dbIdx], int32(dbIdx)
}

func applyOp(ctx context.Context, db *kv.DB, op *Operation) {
func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
switch o := op.GetValue().(type) {
case *GetOperation, *PutOperation, *ScanOperation, *BatchOperation:
applyClientOp(ctx, db, op)
Expand All @@ -86,6 +90,9 @@ func applyOp(ctx context.Context, db *kv.DB, op *Operation) {
case *TransferLeaseOperation:
err := db.AdminTransferLease(ctx, o.Key, o.Target)
o.Result = resultError(ctx, err)
case *ChangeZoneOperation:
err := updateZoneConfigInEnv(ctx, env, o.Type)
o.Result = resultError(ctx, err)
case *ClosureTxnOperation:
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
// epochs of the same transaction to avoid waiting while holding locks.
Expand Down Expand Up @@ -294,3 +301,19 @@ func newGetReplicasFn(dbs ...*kv.DB) GetReplicasFn {
return targets
}
}

func updateZoneConfig(zone *zonepb.ZoneConfig, change ChangeZoneType) {
switch change {
case ChangeZoneType_ToggleGlobalReads:
cur := zone.GlobalReads != nil && *zone.GlobalReads
zone.GlobalReads = proto.Bool(!cur)
default:
panic(errors.AssertionFailedf(`unknown ChangeZoneType: %v`, change))
}
}

func updateZoneConfigInEnv(ctx context.Context, env *Env, change ChangeZoneType) error {
return env.UpdateZoneConfig(ctx, GeneratorDataTableID, func(zone *zonepb.ZoneConfig) {
updateZoneConfig(zone, change)
})
}
48 changes: 47 additions & 1 deletion pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ package kvnemesis

import (
"context"
gosql "database/sql"
"regexp"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

func TestApplier(t *testing.T) {
Expand All @@ -32,8 +35,10 @@ func TestApplier(t *testing.T) {
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
db := tc.Server(0).DB()
sqlDB := tc.ServerConn(0)
env := &Env{sqlDBs: []*gosql.DB{sqlDB}}

a := MakeApplier(db, db)
a := MakeApplier(env, db, db)
check := func(t *testing.T, s Step, expected string) {
t.Helper()
require.NoError(t, a.Apply(ctx, &s))
Expand Down Expand Up @@ -146,4 +151,45 @@ db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
`db1.TransferLeaseOperation(ctx, "foo", 1) // nil`)
checkErr(t, step(transferLease(`foo`, 1)),
`db0.TransferLeaseOperation(ctx, "foo", 1) // context canceled`)

// Zone config changes
check(t, step(changeZone(ChangeZoneType_ToggleGlobalReads)),
`env.UpdateZoneConfig(ctx, ToggleGlobalReads) // nil`)
checkErr(t, step(changeZone(ChangeZoneType_ToggleGlobalReads)),
`env.UpdateZoneConfig(ctx, ToggleGlobalReads) // context canceled`)
}

func TestUpdateZoneConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := []struct {
before zonepb.ZoneConfig
change ChangeZoneType
expAfter zonepb.ZoneConfig
}{
{
before: zonepb.ZoneConfig{NumReplicas: proto.Int32(3)},
change: ChangeZoneType_ToggleGlobalReads,
expAfter: zonepb.ZoneConfig{NumReplicas: proto.Int32(3), GlobalReads: proto.Bool(true)},
},
{
before: zonepb.ZoneConfig{NumReplicas: proto.Int32(3), GlobalReads: proto.Bool(false)},
change: ChangeZoneType_ToggleGlobalReads,
expAfter: zonepb.ZoneConfig{NumReplicas: proto.Int32(3), GlobalReads: proto.Bool(true)},
},
{
before: zonepb.ZoneConfig{NumReplicas: proto.Int32(3), GlobalReads: proto.Bool(true)},
change: ChangeZoneType_ToggleGlobalReads,
expAfter: zonepb.ZoneConfig{NumReplicas: proto.Int32(3), GlobalReads: proto.Bool(false)},
},
}

for _, test := range tests {
t.Run("", func(t *testing.T) {
zone := test.before
updateZoneConfig(&zone, test.change)
require.Equal(t, test.expAfter, zone)
})
}
}
87 changes: 87 additions & 0 deletions pkg/kv/kvnemesis/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvnemesis

import (
"context"
gosql "database/sql"
"fmt"
"time"

"github.com/cockroachdb/cockroach-go/crdb"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

// Env manipulates the environment (cluster settings, zone configurations) that
// the Applier operates in.
type Env struct {
sqlDBs []*gosql.DB
}

func (e *Env) anyNode() *gosql.DB {
// NOTE: There is currently no need to round-robin through the sql gateways,
// so we always just return the first DB.
return e.sqlDBs[0]
}

// SetClosedTimestampInterval sets the kv.closed_timestamp.target_duration
// cluster setting to the provided duration.
func (e *Env) SetClosedTimestampInterval(ctx context.Context, d time.Duration) error {
q := fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s'`, d)
_, err := e.anyNode().ExecContext(ctx, q)
return err
}

// ResetClosedTimestampInterval resets the kv.closed_timestamp.target_duration
// cluster setting to its default value.
func (e *Env) ResetClosedTimestampInterval(ctx context.Context) error {
const q = `SET CLUSTER SETTING kv.closed_timestamp.target_duration TO DEFAULT`
_, err := e.anyNode().ExecContext(ctx, q)
return err
}

// UpdateZoneConfig updates the zone configuration with the provided ID using
// the provided function. If no such zone exists, a new one is created.
func (e *Env) UpdateZoneConfig(
ctx context.Context, zoneID int, updateZone func(*zonepb.ZoneConfig),
) error {
return crdb.ExecuteTx(ctx, e.anyNode(), nil, func(tx *gosql.Tx) error {
// Read existing zone configuration.
var zone zonepb.ZoneConfig
var zoneRaw []byte
const q1 = `SELECT config FROM system.zones WHERE id = $1`
if err := tx.QueryRowContext(ctx, q1, zoneID).Scan(&zoneRaw); err != nil {
if !errors.Is(err, gosql.ErrNoRows) {
return err
}
// Zone does not exist. Create it.
zone = zonepb.DefaultZoneConfig()
} else {
if err := protoutil.Unmarshal(zoneRaw, &zone); err != nil {
return errors.Wrap(err, "unmarshaling existing zone")
}
}

// Update zone configuration proto.
updateZone(&zone)
zoneRaw, err := protoutil.Marshal(&zone)
if err != nil {
return errors.Wrap(err, "marshaling new zone")
}

// Rewrite updated zone configuration.
const q2 = `UPSERT INTO system.zones (id, config) VALUES ($1, $2)`
_, err = tx.ExecContext(ctx, q2, zoneID, zoneRaw)
return err
})
}
32 changes: 28 additions & 4 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type OperationConfig struct {
Merge MergeConfig
ChangeReplicas ChangeReplicasConfig
ChangeLease ChangeLeaseConfig
ChangeZone ChangeZoneConfig
}

// ClosureTxnConfig configures the relative probability of running some
Expand Down Expand Up @@ -143,6 +144,13 @@ type ChangeLeaseConfig struct {
TransferLease int
}

// ChangeZoneConfig configures the relative probability of generating a zone
// configuration change operation.
type ChangeZoneConfig struct {
// ToggleGlobalReads sets global_reads to a new value.
ToggleGlobalReads int
}

// newAllOperationsConfig returns a GeneratorConfig that exercises *all*
// options. You probably want NewDefaultConfig. Most of the time, these will be
// the same, but having both allows us to merge code for operations that do not
Expand Down Expand Up @@ -174,12 +182,12 @@ func newAllOperationsConfig() GeneratorConfig {
CommitBatchOps: clientOpConfig,
},
Split: SplitConfig{
SplitNew: 1,
SplitNew: 10,
SplitAgain: 1,
},
Merge: MergeConfig{
MergeNotSplit: 1,
MergeIsSplit: 1,
MergeIsSplit: 10,
},
ChangeReplicas: ChangeReplicasConfig{
AddReplica: 1,
Expand All @@ -189,6 +197,9 @@ func newAllOperationsConfig() GeneratorConfig {
ChangeLease: ChangeLeaseConfig{
TransferLease: 1,
},
ChangeZone: ChangeZoneConfig{
ToggleGlobalReads: 1,
},
}}
}

Expand All @@ -215,12 +226,15 @@ func NewDefaultConfig() GeneratorConfig {
return config
}

// GeneratorDataTableID is the table ID that corresponds to GeneratorDataSpan.
const GeneratorDataTableID = 50

// GeneratorDataSpan returns a span that contains all of the operations created
// by this Generator.
func GeneratorDataSpan() roachpb.Span {
return roachpb.Span{
Key: keys.SystemSQLCodec.TablePrefix(50),
EndKey: keys.SystemSQLCodec.TablePrefix(51),
Key: keys.SystemSQLCodec.TablePrefix(GeneratorDataTableID),
EndKey: keys.SystemSQLCodec.TablePrefix(GeneratorDataTableID + 1),
}
}

Expand Down Expand Up @@ -346,6 +360,8 @@ func (g *generator) RandStep(rng *rand.Rand) Step {
transferLeaseFn := makeTransferLeaseFn(key, current)
addOpGen(&allowed, transferLeaseFn, g.Config.Ops.ChangeLease.TransferLease)

addOpGen(&allowed, toggleGlobalReads, g.Config.Ops.ChangeZone.ToggleGlobalReads)

return step(g.selectOp(rng, allowed))
}

Expand Down Expand Up @@ -516,6 +532,10 @@ func makeTransferLeaseFn(key string, current []roachpb.ReplicationTarget) opGenF
}
}

func toggleGlobalReads(_ *generator, _ *rand.Rand) Operation {
return changeZone(ChangeZoneType_ToggleGlobalReads)
}

func makeRandBatch(c *ClientOperationConfig) opGenFunc {
return func(g *generator, rng *rand.Rand) Operation {
var allowed []opGen
Expand Down Expand Up @@ -651,3 +671,7 @@ func changeReplicas(key string, changes ...roachpb.ReplicationChange) Operation
func transferLease(key string, target roachpb.StoreID) Operation {
return Operation{TransferLease: &TransferLeaseOperation{Key: []byte(key), Target: target}}
}

func changeZone(changeType ChangeZoneType) Operation {
return Operation{ChangeZone: &ChangeZoneOperation{Type: changeType}}
}
Loading