Skip to content

Commit

Permalink
Merge #54743 #55050
Browse files Browse the repository at this point in the history
54743: kvcoord: properly redact 'have been waiting' message in DistSender r=knz a=tbg

Release note: None

55050: roachtest: use crdb's workload for fixtures import tpcc r=nvanbenschoten a=nvanbenschoten

Fixes #55042.
Fixes #55041.
Fixes #55039.
Fixes #55038.
Fixes #55037.
Fixes #55036.
Fixes #55035.
Fixes #55033.
Fixes #55029.
Fixes #55024.
Fixes #55022.
Fixes #55020.
Fixes #55019.
Fixes #55018.
Fixes #55017.
Fixes #55016.
Fixes #55013.
Fixes #55010.
Fixes #55009.
Fixes #55008.
Fixes #55003.
Fixes #55002.
Fixes #54998.
Fixes #54995.
Fixes #54822.
Fixes #52693.
Fixes #54802.

We were already doing this in some places, but needed it in others.

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Sep 30, 2020
3 parents 1ac75dd + 09d2222 + 20fc710 commit b2c6c13
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 58 deletions.
17 changes: 5 additions & 12 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type cdcTestArgs struct {
kafkaChaos bool
crdbChaos bool
cloudStorageSink bool
fixturesImport bool

targetInitialScanLatency time.Duration
targetSteadyLatency time.Duration
Expand Down Expand Up @@ -101,7 +100,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) {
// value" errors #34025.
tpcc.tolerateErrors = true

tpcc.install(ctx, c, args.fixturesImport)
tpcc.install(ctx, c)
// TODO(dan,ajwerner): sleeping momentarily before running the workload
// mitigates errors like "error in newOrder: missing stock row" from tpcc.
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -593,7 +592,6 @@ func registerCDC(r *testRegistry) {
workloadDuration: "30m",
initialScan: true,
cloudStorageSink: true,
fixturesImport: true,
targetInitialScanLatency: 30 * time.Minute,
targetSteadyLatency: time.Minute,
})
Expand Down Expand Up @@ -733,16 +731,11 @@ type tpccWorkload struct {
tolerateErrors bool
}

func (tw *tpccWorkload) install(ctx context.Context, c *cluster, fixturesImport bool) {
command := `./workload fixtures load`
if fixturesImport {
// For fixtures import, use the version built into the cockroach binary so
// the tpcc workload-versions match on release branches.
command = `./cockroach workload fixtures import`
}
func (tw *tpccWorkload) install(ctx context.Context, c *cluster) {
// For fixtures import, use the version built into the cockroach binary so
// the tpcc workload-versions match on release branches.
c.Run(ctx, tw.workloadNodes, fmt.Sprintf(
`%s tpcc --warehouses=%d --checks=false {pgurl%s}`,
command,
`./cockroach workload fixtures import tpcc --warehouses=%d --checks=false {pgurl%s}`,
tw.tpccWarehouseCount,
tw.sqlNodes.randNode(),
))
Expand Down
7 changes: 1 addition & 6 deletions pkg/cmd/roachtest/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
)

Expand All @@ -36,11 +35,7 @@ func registerImportTPCC(r *testRegistry) {
hc := NewHealthChecker(c, c.All())
m.Go(hc.Runner)

workloadStr := `./workload fixtures import tpcc --warehouses=%d --csv-server='http://localhost:8081'`
if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) {
workloadStr += " --deprecated-fk-indexes"
}

workloadStr := `./cockroach workload fixtures import tpcc --warehouses=%d --csv-server='http://localhost:8081'`
m.Go(func(ctx context.Context) error {
defer dul.Done()
defer hc.Done()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/overload_tpcc_olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type tpccOLAPSpec struct {
func (s tpccOLAPSpec) run(ctx context.Context, t *test, c *cluster) {
crdbNodes, workloadNode := setupTPCC(
ctx, t, c, tpccOptions{
Warehouses: s.Warehouses, SetupType: usingFixture,
Warehouses: s.Warehouses, SetupType: usingImport,
})
const queryFileName = "queries.sql"
// querybench expects the entire query to be on a single line.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func makeIndexAddTpccTest(spec clusterSpec, warehouses int, length time.Duration
})
},
Duration: length,
SetupType: usingFixture,
SetupType: usingImport,
})
},
MinVersion: "v19.1.0",
Expand Down Expand Up @@ -457,7 +457,7 @@ func makeSchemaChangeDuringTPCC(spec clusterSpec, warehouses int, length time.Du
})
},
Duration: length,
SetupType: usingFixture,
SetupType: usingImport,
})
},
MinVersion: "v19.1.0",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func makeScrubTPCCTest(
return nil
},
Duration: length,
SetupType: usingFixture,
SetupType: usingImport,
})
},
MinVersion: "v19.1.0",
Expand Down
34 changes: 18 additions & 16 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
type tpccSetupType int

const (
usingFixture tpccSetupType = iota
usingImport tpccSetupType = iota
usingInit
)

Expand All @@ -63,10 +63,12 @@ type tpccOptions struct {
Versions []string
}

// tpccFixturesCmd generates the command string to load tpcc data for the
// tpccImportCmd generates the command string to load tpcc data for the
// specified warehouse count into a cluster.
func tpccFixturesCmd(t *test, warehouses int, extraArgs string) string {
return fmt.Sprintf("./workload fixtures import tpcc --warehouses=%d %s {pgurl:1}",
func tpccImportCmd(t *test, warehouses int, extraArgs string) string {
// Use `cockroach workload` instead of `workload` so the tpcc
// workload-versions match on release branches.
return fmt.Sprintf("./cockroach workload fixtures import tpcc --warehouses=%d %s {pgurl:1}",
warehouses, extraArgs)
}

Expand Down Expand Up @@ -111,21 +113,21 @@ func setupTPCC(
c.Put(ctx, cockroach, "./cockroach", workloadNode)
c.Put(ctx, workload, "./workload", workloadNode)

extraArgs := opts.ExtraSetupArgs
if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) {
extraArgs += " --deprecated-fk-indexes"
}
func() {
db := c.Conn(ctx, 1)
defer db.Close()
c.Start(ctx, t, crdbNodes, startArgsDontEncrypt)
waitForFullReplication(t, c.Conn(ctx, crdbNodes[0]))
switch opts.SetupType {
case usingFixture:
case usingImport:
t.Status("loading fixture")
c.Run(ctx, workloadNode, tpccFixturesCmd(t, opts.Warehouses, extraArgs))
c.Run(ctx, workloadNode, tpccImportCmd(t, opts.Warehouses, opts.ExtraSetupArgs))
case usingInit:
t.Status("initializing tables")
extraArgs := opts.ExtraSetupArgs
if !t.buildVersion.AtLeast(version.MustParse("v20.2.0")) {
extraArgs += " --deprecated-fk-indexes"
}
cmd := fmt.Sprintf(
"./workload init tpcc --warehouses=%d %s {pgurl:1}",
opts.Warehouses, extraArgs,
Expand Down Expand Up @@ -234,7 +236,7 @@ func registerTPCC(r *testRegistry) {
runTPCC(ctx, t, c, tpccOptions{
Warehouses: headroomWarehouses,
Duration: 120 * time.Minute,
SetupType: usingFixture,
SetupType: usingImport,
})
},
})
Expand Down Expand Up @@ -268,7 +270,7 @@ func registerTPCC(r *testRegistry) {
Warehouses: headroomWarehouses,
Duration: 120 * time.Minute,
Versions: []string{oldV, "", oldV, ""},
SetupType: usingFixture,
SetupType: usingImport,
})
// TODO(tbg): run another TPCC with the final binaries here and
// teach TPCC to re-use the dataset (seems easy enough) to at least
Expand All @@ -285,7 +287,7 @@ func registerTPCC(r *testRegistry) {
Warehouses: 1,
Duration: 10 * time.Minute,
ExtraRunArgs: "--wait=false",
SetupType: usingFixture,
SetupType: usingImport,
})
},
})
Expand All @@ -301,7 +303,7 @@ func registerTPCC(r *testRegistry) {
runTPCC(ctx, t, c, tpccOptions{
Warehouses: warehouses,
Duration: 6 * 24 * time.Hour,
SetupType: usingFixture,
SetupType: usingImport,
})
},
})
Expand Down Expand Up @@ -330,7 +332,7 @@ func registerTPCC(r *testRegistry) {
DrainAndQuit: false,
}
},
SetupType: usingFixture,
SetupType: usingImport,
})
},
})
Expand Down Expand Up @@ -662,7 +664,7 @@ func loadTPCCBench(
// Load the corresponding fixture.
t.l.Printf("restoring tpcc fixture\n")
waitForFullReplication(t, db)
cmd := tpccFixturesCmd(t, b.LoadWarehouses, loadArgs)
cmd := tpccImportCmd(t, b.LoadWarehouses, loadArgs)
if err := c.RunE(ctx, loadNode, cmd); err != nil {
return err
}
Expand Down
30 changes: 17 additions & 13 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

var (
Expand Down Expand Up @@ -1375,25 +1376,24 @@ func (ds *DistSender) sendPartialBatchAsync(
}

func slowRangeRPCWarningStr(
s *redact.StringBuilder,
ba roachpb.BatchRequest,
dur time.Duration,
attempts int64,
desc *roachpb.RangeDescriptor,
err error,
br *roachpb.BatchResponse,
) string {
var resp string
if err != nil {
resp = err.Error()
} else {
resp = br.String()
) {
resp := interface{}(err)
if resp == nil {
resp = br
}
return fmt.Sprintf("have been waiting %.2fs (%d attempts) for RPC %s to %s; resp: %s",
s.Printf("have been waiting %.2fs (%d attempts) for RPC %s to %s; resp: %s",
dur.Seconds(), attempts, ba, desc, resp)
}

func slowRangeRPCReturnWarningStr(dur time.Duration, attempts int64) string {
return fmt.Sprintf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts)
func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, attempts int64) {
s.Printf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts)
}

// sendPartialBatch sends the supplied batch to the range specified by
Expand Down Expand Up @@ -1496,16 +1496,20 @@ func (ds *DistSender) sendPartialBatch(

const slowDistSenderThreshold = time.Minute
if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() {
log.Warningf(ctx, "slow range RPC: %v",
slowRangeRPCWarningStr(ba, dur, attempts, routingTok.Desc(), err, reply))
{
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, routingTok.Desc(), err, reply)
log.Warningf(ctx, "slow range RPC: %v", s)
}
// If the RPC wasn't successful, defer the logging of a message once the
// RPC is not retried any more.
if err != nil || reply.Error != nil {
ds.metrics.SlowRPCs.Inc(1)
defer func(tBegin time.Time, attempts int64) {
ds.metrics.SlowRPCs.Dec(1)
log.Warningf(ctx, "slow RPC response: %v",
slowRangeRPCReturnWarningStr(timeutil.Since(tBegin), attempts))
var s redact.StringBuilder
slowRangeRPCReturnWarningStr(&s, timeutil.Since(tBegin), attempts)
log.Warningf(ctx, "slow RPC response: %v", s)
}(tBegin, attempts)
}
tBegin = time.Time{} // prevent reentering branch for this RPC
Expand Down
19 changes: 12 additions & 7 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -3956,18 +3957,22 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
ba.Add(get)
br := &roachpb.BatchResponse{}
br.Error = roachpb.NewError(errors.New("boom"))
desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x")}
desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")}
{
exp := `have been waiting 8.16s (120 attempts) for RPC Get ["a",/Min) to` +
` r9:{-} [<no replicas>, next=0, gen=0]; resp: (err: boom)`
act := slowRangeRPCWarningStr(ba, dur, attempts, desc, nil /* err */, br)
require.Equal(t, exp, act)
exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,‹/Min›) to` +
` r9:‹{x-z}› [<no replicas>, next=0, gen=0]; resp: ‹(err: boom)›`
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, desc, nil /* err */, br)
act := s.RedactableString()
require.EqualValues(t, exp, act)
}

{
exp := `slow RPC finished after 8.16s (120 attempts)`
act := slowRangeRPCReturnWarningStr(dur, attempts)
require.Equal(t, exp, act)
var s redact.StringBuilder
slowRangeRPCReturnWarningStr(&s, dur, attempts)
act := s.RedactableString()
require.EqualValues(t, exp, act)
}
}

Expand Down

0 comments on commit b2c6c13

Please sign in to comment.