Skip to content

Commit

Permalink
Merge #86039 #86200
Browse files Browse the repository at this point in the history
86039: kvserver: record paused replica message drops to a metric r=tbg a=pavelkalinnikov

Part of #83917

Release justification: The metric may help investigating issues with paused replicas
Release note: None

86200: sql/schemachanger: Initial support for generating a corpus r=fqazi a=fqazi

These changes introduce the following:

1. Support for gathering declarative schema changer states (corpus) from logictests to help backwards/mixed version compatibility testing
2. Debug command support for replaying corpuses
3. Optional test for testing a generated corpus when a parameter is specified 

Release justification: Mainly test enhancements that are disabled by default, which will allow us to have an automated workload to test declarative schema changer state between versions.

Co-authored-by: Pavel Kalinnikov <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
3 people committed Aug 17, 2022
3 parents 45a49ef + 0bd35fb + a7098d2 commit 5f45e42
Show file tree
Hide file tree
Showing 29 changed files with 839 additions and 84 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1592,6 +1592,7 @@ GO_TARGETS = [
"//pkg/sql/scheduledlogging:scheduledlogging_test",
"//pkg/sql/schemachange:schemachange",
"//pkg/sql/schemachange:schemachange_test",
"//pkg/sql/schemachanger/corpus:corpus",
"//pkg/sql/schemachanger/rel/internal/comparetest:comparetest",
"//pkg/sql/schemachanger/rel/internal/cyclegraphtest:cyclegraphtest",
"//pkg/sql/schemachanger/rel/internal/entitynodetest:entitynodetest",
Expand Down Expand Up @@ -2647,6 +2648,7 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/scheduledlogging:get_x_data",
"//pkg/sql/schemachange:get_x_data",
"//pkg/sql/schemachanger:get_x_data",
"//pkg/sql/schemachanger/corpus:get_x_data",
"//pkg/sql/schemachanger/rel:get_x_data",
"//pkg/sql/schemachanger/rel/internal/comparetest:get_x_data",
"//pkg/sql/schemachanger/rel/internal/cyclegraphtest:get_x_data",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# cluster-opt: disable-corpus-generation
statement ok
SET use_declarative_schema_changer = 'unsafe'

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/schemachangerccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_test(
"//pkg/sql/schemachanger/sctest",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
19 changes: 19 additions & 0 deletions pkg/ccl/schemachangerccl/schemachanger_ccl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package schemachangerccl

import (
gosql "database/sql"
"flag"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -19,10 +20,18 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/sctest"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// Used for saving corpus information in TestGenerateCorpus
var corpusPath string

func init() {
flag.StringVar(&corpusPath, "declarative-corpus", "", "Path to the corpus file")
}

func newCluster(t *testing.T, knobs *scexec.TestingKnobs) (*gosql.DB, func()) {
_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, base.TestingKnobs{
Expand Down Expand Up @@ -81,6 +90,16 @@ func TestPause(t *testing.T) {
sctest.Pause(t, endToEndPath(t), newCluster)
}

// TestGenerateCorpus generates a corpus based on the end to end test files.
func TestGenerateCorpus(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
if corpusPath == "" {
skip.IgnoreLintf(t, "requires declarative-corpus path parameter")
}
sctest.GenerateSchemaChangeCorpus(t, endToEndPath(t), corpusPath, newCluster)
}

func TestDecomposeToElements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
5 changes: 5 additions & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"debug_reset_quorum.go",
"debug_send_kv_batch.go",
"debug_synctest.go",
"declarative_corpus.go",
"decode.go",
"demo.go",
"demo_telemetry.go",
Expand Down Expand Up @@ -168,6 +169,9 @@ go_library(
"//pkg/sql/protoreflect",
"//pkg/sql/row",
"//pkg/sql/rowenc",
"//pkg/sql/schemachanger/corpus",
"//pkg/sql/schemachanger/scop",
"//pkg/sql/schemachanger/scplan",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/eval",
Expand Down Expand Up @@ -310,6 +314,7 @@ go_test(
"debug_recover_loss_of_quorum_test.go",
"debug_send_kv_batch_test.go",
"debug_test.go",
"declarative_corpus_test.go",
"decode_test.go",
"demo_locality_test.go",
"demo_test.go",
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,8 @@ func init() {
debugDoctorCmd.AddCommand(doctorExamineCmd, doctorRecreateCmd, doctorExamineFallbackClusterCmd, doctorExamineFallbackZipDirCmd)
DebugCmd.AddCommand(debugDoctorCmd)

DebugCmd.AddCommand(declarativeValidateCorpus)

debugStatementBundleCmd.AddCommand(statementBundleRecreateCmd)
DebugCmd.AddCommand(debugStatementBundleCmd)

Expand Down
62 changes: 62 additions & 0 deletions pkg/cli/declarative_corpus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package cli

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/cli/clierrorplus"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/corpus"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
"github.com/spf13/cobra"
)

var declarativeValidateCorpus = &cobra.Command{
Use: "declarative-corpus-validate <filename>",
Short: "validates a corpus file for the declarative schema changer",
Long: `
Validates very single declarative schema changer state can be planned against in
a given corpus file.
`,
Args: cobra.ExactArgs(1),
RunE: clierrorplus.MaybeDecorateError(
func(cmd *cobra.Command, args []string) (resErr error) {
cr, err := corpus.NewCorpusReaderWithPath(args[0])
if err != nil {
panic(err)
}
err = cr.ReadCorpus()
if err != nil {
panic(err)
}
for idx := 0; idx < cr.GetNumEntries(); idx++ {
name, state := cr.GetCorpus(idx)
jobID := jobspb.JobID(0)
params := scplan.Params{
InRollback: state.InRollback,
ExecutionPhase: scop.LatestPhase,
SchemaChangerJobIDSupplier: func() jobspb.JobID {
jobID++
return jobID
},
}
_, err := scplan.MakePlan(*state, params)
if err != nil {
fmt.Printf("failed to validate %s with error %v\n", name, err)
} else {
fmt.Printf("validated %s\n", name)
}
}
return nil
}),
}
38 changes: 38 additions & 0 deletions pkg/cli/declarative_corpus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package cli

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/datadriven"
)

// This test doctoring a secure cluster.
func TestDeclarativeCorpus(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCLITest(TestCLIParams{T: t, NoServer: true})
defer c.Cleanup()

t.Run("declarative corpus validation standalone command", func(t *testing.T) {
out, err := c.RunWithCapture("debug declarative-corpus-validate testdata/declarative-corpus/corpus")
if err != nil {
t.Fatal(err)
}

// Using datadriven allows TESTFLAGS=-rewrite.
datadriven.RunTest(t, testutils.TestDataPath(t, "declarative-corpus", "corpus_expected"), func(t *testing.T, td *datadriven.TestData) string {
return out
})
})
}
Binary file added pkg/cli/testdata/declarative-corpus/corpus
Binary file not shown.
6 changes: 6 additions & 0 deletions pkg/cli/testdata/declarative-corpus/corpus_expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
debug declarative-corpus-validate testdata/declarative-corpus/corpus
----
debug declarative-corpus-validate testdata/declarative-corpus/corpus
validated EndToEndCorpus_TestGenerateCorpus/drop_multiregion/DROP_DATABASE_multi_region_test_db_CASCADE/starting_DROP DATABASE multi_region_test_db CASCADE_0
validated EndToEndCorpus_TestGenerateCorpus/drop_multiregion/DROP_TABLE_multi_region_test_db.public.table_regional_by_row/starting_DROP TABLE multi_region_test_db.public.table_regional_by_row_0
validated EndToEndCorpus_TestGenerateCorpus/drop_multiregion/DROP_TABLE_multi_region_test_db.public.table_regional_by_table_CASCADE/starting_DROP TABLE multi_region_test_db.public.table_regional_by_table CASCADE_0
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/client_replica_raft_overload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func TestReplicaRaftOverload(t *testing.T) {
if n := s1.Metrics().RaftPausedFollowerCount.Value(); n == 0 {
return errors.New("no paused followers")
}
if n := s1.Metrics().RaftPausedFollowerDroppedMsgs.Count(); n == 0 {
return errors.New("no dropped messages to paused followers")
}
return nil
})

Expand Down
17 changes: 13 additions & 4 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,11 +1004,18 @@ Such Replicas will be ignored for the purposes of proposal quota, and will not
receive replication traffic. They are essentially treated as offline for the
purpose of replication. This serves as a crude form of admission control.
The count is emitted by the leaseholder of each range.
.`,
The count is emitted by the leaseholder of each range.`,
Measurement: "Followers",
Unit: metric.Unit_COUNT,
}
metaRaftPausedFollowerDroppedMsgs = metric.Metadata{
Name: "admission.raft.paused_replicas_dropped_msgs",
Help: `Number of messages dropped instead of being sent to paused replicas.
The messages are dropped to help these replicas to recover from I/O overload.`,
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}

// Replica queue metrics.
metaMVCCGCQueueSuccesses = metric.Metadata{
Expand Down Expand Up @@ -1754,7 +1761,8 @@ type StoreMetrics struct {
RaftLogFollowerBehindCount *metric.Gauge
RaftLogTruncated *metric.Counter

RaftPausedFollowerCount *metric.Gauge
RaftPausedFollowerCount *metric.Gauge
RaftPausedFollowerDroppedMsgs *metric.Counter

RaftCoalescedHeartbeatsPending *metric.Gauge

Expand Down Expand Up @@ -2266,7 +2274,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RaftLogFollowerBehindCount: metric.NewGauge(metaRaftLogFollowerBehindCount),
RaftLogTruncated: metric.NewCounter(metaRaftLogTruncated),

RaftPausedFollowerCount: metric.NewGauge(metaRaftFollowerPaused),
RaftPausedFollowerCount: metric.NewGauge(metaRaftFollowerPaused),
RaftPausedFollowerDroppedMsgs: metric.NewCounter(metaRaftPausedFollowerDroppedMsgs),

// This Gauge measures the number of heartbeats queued up just before
// the queue is cleared, to avoid flapping wildly.
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,9 @@ func (r *Replica) sendRaftMessagesRaftMuLocked(
var lastAppResp raftpb.Message
for _, message := range messages {
_, drop := blocked[roachpb.ReplicaID(message.To)]
if drop {
r.store.Metrics().RaftPausedFollowerDroppedMsgs.Inc(1)
}
switch message.Type {
case raftpb.MsgApp:
if util.RaceEnabled {
Expand Down Expand Up @@ -1531,9 +1534,6 @@ func (r *Replica) sendRaftMessagesRaftMuLocked(
}
}

// TODO(tbg): record this to metrics.
//
// See: https://github.com/cockroachdb/cockroach/issues/83917
if !drop {
r.sendRaftMessageRaftMuLocked(ctx, message)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/randgen",
"//pkg/sql/schemachanger/corpus",
"//pkg/sql/schemachanger/scexec",
"//pkg/sql/schemachanger/scplan",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondatapb",
Expand Down
Loading

0 comments on commit 5f45e42

Please sign in to comment.