Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
102821: backupccl: introduce backup fixture generator framework r=rhu713,renatolabs a=msbutler

Previously, we either created backup fixtures by manually creating and running workloads on roachprod or using unweildy bash scripts. This patch introduces a framework that makes it easy to generate a backup fixture via the roachtest api. Once the fixture writer specifies a foreground workload (e.g. tpce) and a scheduled backup specification, a single `roachprod run` invocation will create the fixture in a cloud bucket that can be easily fetched by restore roachtests.

The fixture creator can initialize a foreground workload using a `workload init` cmd or by restoring from an old fixture.

Note that the vast majority of the test specifications are "skipped" so they are not run in the nightly roachtest suite. Creating large fixtures is expensive and only need to be recreated once a major release. This patch creates 5 new "roachtests":
- backupFixture/tpce/15GB/aws [disaster-recovery]
- backupFixture/tpce/32TB/aws [disaster-recovery] (skipped)
- backupFixture/tpce/400GB/aws [disaster-recovery] (skipped)
- backupFixture/tpce/400GB/gce [disaster-recovery] (skipped)
- backupFixture/tpce/8TB/aws [disaster-recovery] (skipped)

In the future, this framework should be extended to make it easier to write backup-restore roundtrip tests as well.

Fixes #99787

Release note: None

104318: cmdccl: test utility for running a /dev/null c2c stream r=lidorcarmel a=stevendanna

A small utility for reading a tenant replication stream and doing nothing with it. Useful for manual testing.

Epic: none

Release note: None

104877: multiregionccl: reenable regional_by_table test r=chengxiong-ruan a=chengxiong-ruan

Informs: #98020

I wasn't able to repro the error mentioned in #98020 for all the 3 tests by stressing 2k runs. Assuming that something has been changed and we're fine now. But just enabling one test at a time just in case it would fail in CI for some reason.

Release note: None

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: Chengxiong Ruan <[email protected]>
  • Loading branch information
4 people committed Jun 14, 2023
4 parents 0caad15 + 37463f5 + 2cdae9f + 4852007 commit 244ef1c
Show file tree
Hide file tree
Showing 9 changed files with 753 additions and 30 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@
/pkg/ccl/buildccl/ @cockroachdb/dev-inf
#!/pkg/ccl/cliccl/ @cockroachdb/unowned
/pkg/ccl/cmdccl/stub-schema-registry/ @cockroachdb/cdc-prs
/pkg/ccl/cmdccl/clusterrepl/ @cockroachdb/disaster-recovery
#!/pkg/ccl/gssapiccl/ @cockroachdb/unowned
/pkg/ccl/jwtauthccl/ @cockroachdb/cloud-identity
#!/pkg/ccl/kvccl/ @cockroachdb/kv-noreview
Expand Down
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,8 @@ GO_TARGETS = [
"//pkg/ccl/cloudccl/cloudprivilege:cloudprivilege_test",
"//pkg/ccl/cloudccl/externalconn:externalconn_test",
"//pkg/ccl/cloudccl/gcp:gcp_test",
"//pkg/ccl/cmdccl/clusterrepl:clusterrepl",
"//pkg/ccl/cmdccl/clusterrepl:clusterrepl_lib",
"//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry",
"//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry_lib",
"//pkg/ccl/gssapiccl:gssapiccl",
Expand Down Expand Up @@ -2493,6 +2495,7 @@ GET_X_DATA_TARGETS = [
"//pkg/ccl/cloudccl/cloudprivilege:get_x_data",
"//pkg/ccl/cloudccl/externalconn:get_x_data",
"//pkg/ccl/cloudccl/gcp:get_x_data",
"//pkg/ccl/cmdccl/clusterrepl:get_x_data",
"//pkg/ccl/cmdccl/stub-schema-registry:get_x_data",
"//pkg/ccl/gssapiccl:get_x_data",
"//pkg/ccl/importerccl:get_x_data",
Expand Down
33 changes: 33 additions & 0 deletions pkg/ccl/cmdccl/clusterrepl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "clusterrepl_lib",
srcs = ["main.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/cmdccl/clusterrepl",
visibility = ["//visibility:private"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/cli/exit",
"//pkg/keys",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/protoutil",
"//pkg/util/span",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgx_v4//:pgx",
],
)

go_binary(
name = "clusterrepl",
embed = [":clusterrepl_lib"],
visibility = ["//visibility:public"],
)

get_x_data(name = "get_x_data")
291 changes: 291 additions & 0 deletions pkg/ccl/cmdccl/clusterrepl/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package main

import (
"context"
"flag"
"fmt"
"net/url"
"os"
"os/signal"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v4"
)

var (
uri = flag.String("uri", "", "sql uri")
tenant = flag.String("tenant", "", "tenant name")

parallelScan = flag.Int("scans", 16, "parallel scan count")
batchSize = flag.Int("batch-size", 1<<20, "batch size")
checkpointInterval = flag.Duration("checkpoint-iterval", 10*time.Second, "checkpoint interval")
statsInterval = flag.Duration("stats-interval", 5*time.Second, "period over which to measure throughput")

noParse = flag.Bool("no-parse", false, "avoid parsing data")
)

func errorf(msg string, args ...interface{}) {
fmt.Fprintf(os.Stderr, "error: %s\n", fmt.Sprintf(msg, args...))
}

func fatalf(msg string, args ...interface{}) {
errorf(msg, args...)
exit.WithCode(exit.UnspecifiedError())
}

func cancelOnShutdown(ctx context.Context) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
fmt.Println("received interrupt, shutting down; send another interrupt to force kill")
cancel()
<-c
fatalf("received second interrupt, shutting down now\n")
}()
return ctx
}

func main() {
flag.Parse()
if *uri == "" {
fatalf("URI required")
}
if *tenant == "" {
fatalf("tenant name required")
}

streamAddr, err := url.Parse(*uri)
if err != nil {
fatalf("parse: %s", err)
}

ctx := cancelOnShutdown(context.Background())
if err := streamPartition(ctx, streamAddr); err != nil {
if errors.Is(err, context.Canceled) {
exit.WithCode(exit.Interrupted())
} else {
fatalf(err.Error())
}
}
}

func streamPartition(ctx context.Context, streamAddr *url.URL) error {
fmt.Println("creating producer stream")
client, err := streamclient.NewPartitionedStreamClient(ctx, streamAddr)
if err != nil {
return err
}

replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(*tenant))
if err != nil {
return err
}

defer func() {
// Attempt to clean up the replication job on shutdown. But don't wait forever.
if err := timeutil.RunWithTimeout(ctx, "complete-stream", 30*time.Second, func(ctx context.Context) error {
return client.Complete(ctx, replicationProducerSpec.StreamID, true)
}); err != nil {
errorf("warning: could not clean up producer job: %s", err.Error())
}
}()

// We ignore most of this plan. But, it gives us the tenant ID.
plan, err := client.Plan(ctx, replicationProducerSpec.StreamID)
if err != nil {
return err
}

prefix := keys.MakeTenantPrefix(plan.SourceTenantID)
tenantSpan := roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}

var sps streampb.StreamPartitionSpec
sps.Config.MinCheckpointFrequency = *checkpointInterval
sps.Config.BatchByteSize = int64(*batchSize)
sps.Config.InitialScanParallelism = int32(*parallelScan)
sps.Spans = append(sps.Spans, tenantSpan)
sps.InitialScanTimestamp = replicationProducerSpec.ReplicationStartTime
spsBytes, err := protoutil.Marshal(&sps)
if err != nil {
return err
}
frontier, err := span.MakeFrontier(sps.Spans...)
if err != nil {
return err
}

fmt.Printf("streaming %s (%s) as of %s\n", *tenant, tenantSpan, sps.InitialScanTimestamp)
if *noParse {
return rawStream(ctx, streamAddr, replicationProducerSpec.StreamID, spsBytes)
}

sub, err := client.Subscribe(ctx, replicationProducerSpec.StreamID,
spsBytes,
sps.InitialScanTimestamp,
hlc.Timestamp{})
if err != nil {
return err
}

g := ctxgroup.WithContext(ctx)
g.GoCtx(sub.Subscribe)
g.GoCtx(subscriptionConsumer(sub, frontier))
return g.Wait()
}

func rawStream(
ctx context.Context,
uri *url.URL,
streamID streampb.StreamID,
spec streamclient.SubscriptionToken,
) error {
config, err := pgx.ParseConfig(uri.String())
if err != nil {
return err
}

conn, err := pgx.ConnectConfig(ctx, config)
if err != nil {
return err
}

_, err = conn.Exec(ctx, `SET avoid_buffering = true`)
if err != nil {
return err
}
rows, err := conn.Query(ctx, `SELECT * FROM crdb_internal.stream_partition($1, $2)`,
streamID, spec)
if err != nil {
return err
}
defer rows.Close()

var data []byte
var (
totalBytes int
intervalBytes int

totalRowCount int
intervalRowCount int
)
intervalStart := timeutil.Now()
for {
if !rows.Next() {
if err := rows.Err(); err != nil {
return err
}
return nil
}
data = data[:0]
if err := rows.Scan(&data); err != nil {
return err
}
totalBytes += len(data)
intervalBytes += len(data)
totalRowCount++
intervalRowCount++

if timeutil.Since(intervalStart) > *statsInterval {
var (
elapsedSeconds = timeutil.Since(intervalStart).Seconds()
bps = humanizeutil.IBytes(int64(float64(intervalBytes) / elapsedSeconds))
total = humanizeutil.IBytes(int64(totalBytes))
averageRowSize = humanizeutil.IBytes(int64(intervalBytes / intervalRowCount))
)
fmt.Printf("%s throughput: %s/s; total transfered: %s; average row size: %s\n",
timeutil.Now().Format(time.RFC3339),
bps, total, averageRowSize)
intervalStart = timeutil.Now()
intervalBytes = 0
intervalRowCount = 0
}
}
}

func subscriptionConsumer(
sub streamclient.Subscription, frontier *span.Frontier,
) func(ctx context.Context) error {
return func(ctx context.Context) error {
var (
totalBytes int
intervalBytes int

totalEventCount int
intervalEventCount int
)
intervalStart := timeutil.Now()
for {
var sz int
select {
case event, ok := <-sub.Events():
if !ok {
return sub.Err()
}
switch event.Type() {
case streamingccl.KVEvent:
kv := event.GetKV()
sz = kv.Size()
case streamingccl.SSTableEvent:
ssTab := event.GetSSTable()
sz = ssTab.Size()
case streamingccl.DeleteRangeEvent:
case streamingccl.CheckpointEvent:
fmt.Printf("%s checkpoint\n", timeutil.Now().Format(time.RFC3339))
resolved := event.GetResolvedSpans()
for _, r := range resolved {
_, err := frontier.Forward(r.Span, r.Timestamp)
if err != nil {
return err
}
}
default:
return errors.Newf("unknown streaming event type %v", event.Type())
}
case <-ctx.Done():
return nil
}
totalBytes += sz
intervalBytes += sz
totalEventCount++
intervalEventCount++

if timeutil.Since(intervalStart) > *statsInterval {
var (
elapsedSeconds = timeutil.Since(intervalStart).Seconds()
bps = humanizeutil.IBytes(int64(float64(intervalBytes) / elapsedSeconds))
total = humanizeutil.IBytes(int64(totalBytes))
averageEventSize = humanizeutil.IBytes(int64(intervalBytes / intervalEventCount))
)
fmt.Printf("%s throughput: %s/s; total transfered: %s; average event size: %s; frontier: %s\n",
timeutil.Now().Format(time.RFC3339),
bps, total, averageEventSize, frontier.Frontier())
intervalStart = timeutil.Now()
intervalBytes = 0
intervalEventCount = 0
}
}
}
}
3 changes: 0 additions & 3 deletions pkg/ccl/multiregionccl/testdata/regional_by_table
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
skip issue-num=98020
----

new-cluster localities=us-east-1,us-east-1,us-east-1,us-west-1,us-west-1,us-west-1,us-central-1,eu-central-1,eu-west-1
----

Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"autoupgrade.go",
"awsdms.go",
"backup.go",
"backup_fixtures.go",
"build_info.go",
"canary.go",
"cancel.go",
Expand Down
Loading

0 comments on commit 244ef1c

Please sign in to comment.