diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index ea9bdd996449..697c99f4d94b 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -45,7 +45,6 @@ type cdcTestArgs struct { kafkaChaos bool crdbChaos bool cloudStorageSink bool - fixturesImport bool targetInitialScanLatency time.Duration targetSteadyLatency time.Duration @@ -101,7 +100,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) { // value" errors #34025. tpcc.tolerateErrors = true - tpcc.install(ctx, c, args.fixturesImport) + tpcc.install(ctx, c) // TODO(dan,ajwerner): sleeping momentarily before running the workload // mitigates errors like "error in newOrder: missing stock row" from tpcc. time.Sleep(2 * time.Second) @@ -593,7 +592,6 @@ func registerCDC(r *testRegistry) { workloadDuration: "30m", initialScan: true, cloudStorageSink: true, - fixturesImport: true, targetInitialScanLatency: 30 * time.Minute, targetSteadyLatency: time.Minute, }) @@ -733,16 +731,11 @@ type tpccWorkload struct { tolerateErrors bool } -func (tw *tpccWorkload) install(ctx context.Context, c *cluster, fixturesImport bool) { - command := `./workload fixtures load` - if fixturesImport { - // For fixtures import, use the version built into the cockroach binary so - // the tpcc workload-versions match on release branches. - command = `./cockroach workload fixtures import` - } +func (tw *tpccWorkload) install(ctx context.Context, c *cluster) { + // For fixtures import, use the version built into the cockroach binary so + // the tpcc workload-versions match on release branches. c.Run(ctx, tw.workloadNodes, fmt.Sprintf( - `%s tpcc --warehouses=%d --checks=false {pgurl%s}`, - command, + `./cockroach workload fixtures import tpcc --warehouses=%d --checks=false {pgurl%s}`, tw.tpccWarehouseCount, tw.sqlNodes.randNode(), )) diff --git a/pkg/cmd/roachtest/import.go b/pkg/cmd/roachtest/import.go index 910329d4595b..6aca4c009422 100644 --- a/pkg/cmd/roachtest/import.go +++ b/pkg/cmd/roachtest/import.go @@ -17,7 +17,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/version" "github.com/cockroachdb/errors" ) @@ -36,11 +35,7 @@ func registerImportTPCC(r *testRegistry) { hc := NewHealthChecker(c, c.All()) m.Go(hc.Runner) - workloadStr := `./workload fixtures import tpcc --warehouses=%d --csv-server='http://localhost:8081'` - if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) { - workloadStr += " --deprecated-fk-indexes" - } - + workloadStr := `./cockroach workload fixtures import tpcc --warehouses=%d --csv-server='http://localhost:8081'` m.Go(func(ctx context.Context) error { defer dul.Done() defer hc.Done() diff --git a/pkg/cmd/roachtest/overload_tpcc_olap.go b/pkg/cmd/roachtest/overload_tpcc_olap.go index 64f94acd2780..5dd1074351e1 100644 --- a/pkg/cmd/roachtest/overload_tpcc_olap.go +++ b/pkg/cmd/roachtest/overload_tpcc_olap.go @@ -45,7 +45,7 @@ type tpccOLAPSpec struct { func (s tpccOLAPSpec) run(ctx context.Context, t *test, c *cluster) { crdbNodes, workloadNode := setupTPCC( ctx, t, c, tpccOptions{ - Warehouses: s.Warehouses, SetupType: usingFixture, + Warehouses: s.Warehouses, SetupType: usingImport, }) const queryFileName = "queries.sql" // querybench expects the entire query to be on a single line. diff --git a/pkg/cmd/roachtest/schemachange.go b/pkg/cmd/roachtest/schemachange.go index a3729e17fe20..612173ebef00 100644 --- a/pkg/cmd/roachtest/schemachange.go +++ b/pkg/cmd/roachtest/schemachange.go @@ -312,7 +312,7 @@ func makeIndexAddTpccTest(spec clusterSpec, warehouses int, length time.Duration }) }, Duration: length, - SetupType: usingFixture, + SetupType: usingImport, }) }, MinVersion: "v19.1.0", @@ -457,7 +457,7 @@ func makeSchemaChangeDuringTPCC(spec clusterSpec, warehouses int, length time.Du }) }, Duration: length, - SetupType: usingFixture, + SetupType: usingImport, }) }, MinVersion: "v19.1.0", diff --git a/pkg/cmd/roachtest/scrub.go b/pkg/cmd/roachtest/scrub.go index c7349646ba9e..38e126ff14fb 100644 --- a/pkg/cmd/roachtest/scrub.go +++ b/pkg/cmd/roachtest/scrub.go @@ -82,7 +82,7 @@ func makeScrubTPCCTest( return nil }, Duration: length, - SetupType: usingFixture, + SetupType: usingImport, }) }, MinVersion: "v19.1.0", diff --git a/pkg/cmd/roachtest/tpcc.go b/pkg/cmd/roachtest/tpcc.go index 1e60aa1e89e9..56976bc26752 100644 --- a/pkg/cmd/roachtest/tpcc.go +++ b/pkg/cmd/roachtest/tpcc.go @@ -38,7 +38,7 @@ import ( type tpccSetupType int const ( - usingFixture tpccSetupType = iota + usingImport tpccSetupType = iota usingInit ) @@ -63,10 +63,12 @@ type tpccOptions struct { Versions []string } -// tpccFixturesCmd generates the command string to load tpcc data for the +// tpccImportCmd generates the command string to load tpcc data for the // specified warehouse count into a cluster. -func tpccFixturesCmd(t *test, warehouses int, extraArgs string) string { - return fmt.Sprintf("./workload fixtures import tpcc --warehouses=%d %s {pgurl:1}", +func tpccImportCmd(t *test, warehouses int, extraArgs string) string { + // Use `cockroach workload` instead of `workload` so the tpcc + // workload-versions match on release branches. + return fmt.Sprintf("./cockroach workload fixtures import tpcc --warehouses=%d %s {pgurl:1}", warehouses, extraArgs) } @@ -111,21 +113,21 @@ func setupTPCC( c.Put(ctx, cockroach, "./cockroach", workloadNode) c.Put(ctx, workload, "./workload", workloadNode) - extraArgs := opts.ExtraSetupArgs - if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) { - extraArgs += " --deprecated-fk-indexes" - } func() { db := c.Conn(ctx, 1) defer db.Close() c.Start(ctx, t, crdbNodes, startArgsDontEncrypt) waitForFullReplication(t, c.Conn(ctx, crdbNodes[0])) switch opts.SetupType { - case usingFixture: + case usingImport: t.Status("loading fixture") - c.Run(ctx, workloadNode, tpccFixturesCmd(t, opts.Warehouses, extraArgs)) + c.Run(ctx, workloadNode, tpccImportCmd(t, opts.Warehouses, opts.ExtraSetupArgs)) case usingInit: t.Status("initializing tables") + extraArgs := opts.ExtraSetupArgs + if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) { + extraArgs += " --deprecated-fk-indexes" + } cmd := fmt.Sprintf( "./workload init tpcc --warehouses=%d %s {pgurl:1}", opts.Warehouses, extraArgs, @@ -234,7 +236,7 @@ func registerTPCC(r *testRegistry) { runTPCC(ctx, t, c, tpccOptions{ Warehouses: headroomWarehouses, Duration: 120 * time.Minute, - SetupType: usingFixture, + SetupType: usingImport, }) }, }) @@ -268,7 +270,7 @@ func registerTPCC(r *testRegistry) { Warehouses: headroomWarehouses, Duration: 120 * time.Minute, Versions: []string{oldV, "", oldV, ""}, - SetupType: usingFixture, + SetupType: usingImport, }) // TODO(tbg): run another TPCC with the final binaries here and // teach TPCC to re-use the dataset (seems easy enough) to at least @@ -285,7 +287,7 @@ func registerTPCC(r *testRegistry) { Warehouses: 1, Duration: 10 * time.Minute, ExtraRunArgs: "--wait=false", - SetupType: usingFixture, + SetupType: usingImport, }) }, }) @@ -301,7 +303,7 @@ func registerTPCC(r *testRegistry) { runTPCC(ctx, t, c, tpccOptions{ Warehouses: warehouses, Duration: 6 * 24 * time.Hour, - SetupType: usingFixture, + SetupType: usingImport, }) }, }) @@ -330,7 +332,7 @@ func registerTPCC(r *testRegistry) { DrainAndQuit: false, } }, - SetupType: usingFixture, + SetupType: usingImport, }) }, }) @@ -662,7 +664,7 @@ func loadTPCCBench( // Load the corresponding fixture. t.l.Printf("restoring tpcc fixture\n") waitForFullReplication(t, db) - cmd := tpccFixturesCmd(t, b.LoadWarehouses, loadArgs) + cmd := tpccImportCmd(t, b.LoadWarehouses, loadArgs) if err := c.RunE(ctx, loadNode, cmd); err != nil { return err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 4e9dbe2e4cd8..2e525abf2a36 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" ) var ( @@ -1375,25 +1376,24 @@ func (ds *DistSender) sendPartialBatchAsync( } func slowRangeRPCWarningStr( + s *redact.StringBuilder, ba roachpb.BatchRequest, dur time.Duration, attempts int64, desc *roachpb.RangeDescriptor, err error, br *roachpb.BatchResponse, -) string { - var resp string - if err != nil { - resp = err.Error() - } else { - resp = br.String() +) { + resp := interface{}(err) + if resp == nil { + resp = br } - return fmt.Sprintf("have been waiting %.2fs (%d attempts) for RPC %s to %s; resp: %s", + s.Printf("have been waiting %.2fs (%d attempts) for RPC %s to %s; resp: %s", dur.Seconds(), attempts, ba, desc, resp) } -func slowRangeRPCReturnWarningStr(dur time.Duration, attempts int64) string { - return fmt.Sprintf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts) +func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, attempts int64) { + s.Printf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts) } // sendPartialBatch sends the supplied batch to the range specified by @@ -1496,16 +1496,20 @@ func (ds *DistSender) sendPartialBatch( const slowDistSenderThreshold = time.Minute if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() { - log.Warningf(ctx, "slow range RPC: %v", - slowRangeRPCWarningStr(ba, dur, attempts, routingTok.Desc(), err, reply)) + { + var s redact.StringBuilder + slowRangeRPCWarningStr(&s, ba, dur, attempts, routingTok.Desc(), err, reply) + log.Warningf(ctx, "slow range RPC: %v", s) + } // If the RPC wasn't successful, defer the logging of a message once the // RPC is not retried any more. if err != nil || reply.Error != nil { ds.metrics.SlowRPCs.Inc(1) defer func(tBegin time.Time, attempts int64) { ds.metrics.SlowRPCs.Dec(1) - log.Warningf(ctx, "slow RPC response: %v", - slowRangeRPCReturnWarningStr(timeutil.Since(tBegin), attempts)) + var s redact.StringBuilder + slowRangeRPCReturnWarningStr(&s, timeutil.Since(tBegin), attempts) + log.Warningf(ctx, "slow RPC response: %v", s) }(tBegin, attempts) } tBegin = time.Time{} // prevent reentering branch for this RPC diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 47e99e4672b6..4dd6c2a20dca 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -3956,18 +3957,22 @@ func TestDistSenderSlowLogMessage(t *testing.T) { ba.Add(get) br := &roachpb.BatchResponse{} br.Error = roachpb.NewError(errors.New("boom")) - desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x")} + desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")} { - exp := `have been waiting 8.16s (120 attempts) for RPC Get ["a",/Min) to` + - ` r9:{-} [, next=0, gen=0]; resp: (err: boom)` - act := slowRangeRPCWarningStr(ba, dur, attempts, desc, nil /* err */, br) - require.Equal(t, exp, act) + exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,‹/Min›) to` + + ` r9:‹{x-z}› [, next=0, gen=0]; resp: ‹(err: boom)›` + var s redact.StringBuilder + slowRangeRPCWarningStr(&s, ba, dur, attempts, desc, nil /* err */, br) + act := s.RedactableString() + require.EqualValues(t, exp, act) } { exp := `slow RPC finished after 8.16s (120 attempts)` - act := slowRangeRPCReturnWarningStr(dur, attempts) - require.Equal(t, exp, act) + var s redact.StringBuilder + slowRangeRPCReturnWarningStr(&s, dur, attempts) + act := s.RedactableString() + require.EqualValues(t, exp, act) } }