Skip to content

Commit

Permalink
cmdccl: test utility for running a /dev/null c2c stream
Browse files Browse the repository at this point in the history
A small utility for reading a tenant replication stream and doing
nothing with it. Useful for manual testing.

Epic: none

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
lidorcarmel and stevendanna committed Jun 14, 2023
1 parent 2f6d708 commit 2cdae9f
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@
/pkg/ccl/buildccl/ @cockroachdb/dev-inf
#!/pkg/ccl/cliccl/ @cockroachdb/unowned
/pkg/ccl/cmdccl/stub-schema-registry/ @cockroachdb/cdc-prs
/pkg/ccl/cmdccl/clusterrepl/ @cockroachdb/disaster-recovery
#!/pkg/ccl/gssapiccl/ @cockroachdb/unowned
/pkg/ccl/jwtauthccl/ @cockroachdb/cloud-identity
#!/pkg/ccl/kvccl/ @cockroachdb/kv-noreview
Expand Down
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,8 @@ GO_TARGETS = [
"//pkg/ccl/cloudccl/cloudprivilege:cloudprivilege_test",
"//pkg/ccl/cloudccl/externalconn:externalconn_test",
"//pkg/ccl/cloudccl/gcp:gcp_test",
"//pkg/ccl/cmdccl/clusterrepl:clusterrepl",
"//pkg/ccl/cmdccl/clusterrepl:clusterrepl_lib",
"//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry",
"//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry_lib",
"//pkg/ccl/gssapiccl:gssapiccl",
Expand Down Expand Up @@ -2491,6 +2493,7 @@ GET_X_DATA_TARGETS = [
"//pkg/ccl/cloudccl/cloudprivilege:get_x_data",
"//pkg/ccl/cloudccl/externalconn:get_x_data",
"//pkg/ccl/cloudccl/gcp:get_x_data",
"//pkg/ccl/cmdccl/clusterrepl:get_x_data",
"//pkg/ccl/cmdccl/stub-schema-registry:get_x_data",
"//pkg/ccl/gssapiccl:get_x_data",
"//pkg/ccl/importerccl:get_x_data",
Expand Down
33 changes: 33 additions & 0 deletions pkg/ccl/cmdccl/clusterrepl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "clusterrepl_lib",
srcs = ["main.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/cmdccl/clusterrepl",
visibility = ["//visibility:private"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/cli/exit",
"//pkg/keys",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/protoutil",
"//pkg/util/span",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgx_v4//:pgx",
],
)

go_binary(
name = "clusterrepl",
embed = [":clusterrepl_lib"],
visibility = ["//visibility:public"],
)

get_x_data(name = "get_x_data")
291 changes: 291 additions & 0 deletions pkg/ccl/cmdccl/clusterrepl/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
// Copyright 2023 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package main

import (
"context"
"flag"
"fmt"
"net/url"
"os"
"os/signal"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v4"
)

var (
uri = flag.String("uri", "", "sql uri")
tenant = flag.String("tenant", "", "tenant name")

parallelScan = flag.Int("scans", 16, "parallel scan count")
batchSize = flag.Int("batch-size", 1<<20, "batch size")
checkpointInterval = flag.Duration("checkpoint-iterval", 10*time.Second, "checkpoint interval")
statsInterval = flag.Duration("stats-interval", 5*time.Second, "period over which to measure throughput")

noParse = flag.Bool("no-parse", false, "avoid parsing data")
)

func errorf(msg string, args ...interface{}) {
fmt.Fprintf(os.Stderr, "error: %s\n", fmt.Sprintf(msg, args...))
}

func fatalf(msg string, args ...interface{}) {
errorf(msg, args...)
exit.WithCode(exit.UnspecifiedError())
}

func cancelOnShutdown(ctx context.Context) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
fmt.Println("received interrupt, shutting down; send another interrupt to force kill")
cancel()
<-c
fatalf("received second interrupt, shutting down now\n")
}()
return ctx
}

func main() {
flag.Parse()
if *uri == "" {
fatalf("URI required")
}
if *tenant == "" {
fatalf("tenant name required")
}

streamAddr, err := url.Parse(*uri)
if err != nil {
fatalf("parse: %s", err)
}

ctx := cancelOnShutdown(context.Background())
if err := streamPartition(ctx, streamAddr); err != nil {
if errors.Is(err, context.Canceled) {
exit.WithCode(exit.Interrupted())
} else {
fatalf(err.Error())
}
}
}

func streamPartition(ctx context.Context, streamAddr *url.URL) error {
fmt.Println("creating producer stream")
client, err := streamclient.NewPartitionedStreamClient(ctx, streamAddr)
if err != nil {
return err
}

replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(*tenant))
if err != nil {
return err
}

defer func() {
// Attempt to clean up the replication job on shutdown. But don't wait forever.
if err := timeutil.RunWithTimeout(ctx, "complete-stream", 30*time.Second, func(ctx context.Context) error {
return client.Complete(ctx, replicationProducerSpec.StreamID, true)
}); err != nil {
errorf("warning: could not clean up producer job: %s", err.Error())
}
}()

// We ignore most of this plan. But, it gives us the tenant ID.
plan, err := client.Plan(ctx, replicationProducerSpec.StreamID)
if err != nil {
return err
}

prefix := keys.MakeTenantPrefix(plan.SourceTenantID)
tenantSpan := roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}

var sps streampb.StreamPartitionSpec
sps.Config.MinCheckpointFrequency = *checkpointInterval
sps.Config.BatchByteSize = int64(*batchSize)
sps.Config.InitialScanParallelism = int32(*parallelScan)
sps.Spans = append(sps.Spans, tenantSpan)
sps.InitialScanTimestamp = replicationProducerSpec.ReplicationStartTime
spsBytes, err := protoutil.Marshal(&sps)
if err != nil {
return err
}
frontier, err := span.MakeFrontier(sps.Spans...)
if err != nil {
return err
}

fmt.Printf("streaming %s (%s) as of %s\n", *tenant, tenantSpan, sps.InitialScanTimestamp)
if *noParse {
return rawStream(ctx, streamAddr, replicationProducerSpec.StreamID, spsBytes)
}

sub, err := client.Subscribe(ctx, replicationProducerSpec.StreamID,
spsBytes,
sps.InitialScanTimestamp,
hlc.Timestamp{})
if err != nil {
return err
}

g := ctxgroup.WithContext(ctx)
g.GoCtx(sub.Subscribe)
g.GoCtx(subscriptionConsumer(sub, frontier))
return g.Wait()
}

func rawStream(
ctx context.Context,
uri *url.URL,
streamID streampb.StreamID,
spec streamclient.SubscriptionToken,
) error {
config, err := pgx.ParseConfig(uri.String())
if err != nil {
return err
}

conn, err := pgx.ConnectConfig(ctx, config)
if err != nil {
return err
}

_, err = conn.Exec(ctx, `SET avoid_buffering = true`)
if err != nil {
return err
}
rows, err := conn.Query(ctx, `SELECT * FROM crdb_internal.stream_partition($1, $2)`,
streamID, spec)
if err != nil {
return err
}
defer rows.Close()

var data []byte
var (
totalBytes int
intervalBytes int

totalRowCount int
intervalRowCount int
)
intervalStart := timeutil.Now()
for {
if !rows.Next() {
if err := rows.Err(); err != nil {
return err
}
return nil
}
data = data[:0]
if err := rows.Scan(&data); err != nil {
return err
}
totalBytes += len(data)
intervalBytes += len(data)
totalRowCount++
intervalRowCount++

if timeutil.Since(intervalStart) > *statsInterval {
var (
elapsedSeconds = timeutil.Since(intervalStart).Seconds()
bps = humanizeutil.IBytes(int64(float64(intervalBytes) / elapsedSeconds))
total = humanizeutil.IBytes(int64(totalBytes))
averageRowSize = humanizeutil.IBytes(int64(intervalBytes / intervalRowCount))
)
fmt.Printf("%s throughput: %s/s; total transfered: %s; average row size: %s\n",
timeutil.Now().Format(time.RFC3339),
bps, total, averageRowSize)
intervalStart = timeutil.Now()
intervalBytes = 0
intervalRowCount = 0
}
}
}

func subscriptionConsumer(
sub streamclient.Subscription, frontier *span.Frontier,
) func(ctx context.Context) error {
return func(ctx context.Context) error {
var (
totalBytes int
intervalBytes int

totalEventCount int
intervalEventCount int
)
intervalStart := timeutil.Now()
for {
var sz int
select {
case event, ok := <-sub.Events():
if !ok {
return sub.Err()
}
switch event.Type() {
case streamingccl.KVEvent:
kv := event.GetKV()
sz = kv.Size()
case streamingccl.SSTableEvent:
ssTab := event.GetSSTable()
sz = ssTab.Size()
case streamingccl.DeleteRangeEvent:
case streamingccl.CheckpointEvent:
fmt.Printf("%s checkpoint\n", timeutil.Now().Format(time.RFC3339))
resolved := event.GetResolvedSpans()
for _, r := range resolved {
_, err := frontier.Forward(r.Span, r.Timestamp)
if err != nil {
return err
}
}
default:
return errors.Newf("unknown streaming event type %v", event.Type())
}
case <-ctx.Done():
return nil
}
totalBytes += sz
intervalBytes += sz
totalEventCount++
intervalEventCount++

if timeutil.Since(intervalStart) > *statsInterval {
var (
elapsedSeconds = timeutil.Since(intervalStart).Seconds()
bps = humanizeutil.IBytes(int64(float64(intervalBytes) / elapsedSeconds))
total = humanizeutil.IBytes(int64(totalBytes))
averageEventSize = humanizeutil.IBytes(int64(intervalBytes / intervalEventCount))
)
fmt.Printf("%s throughput: %s/s; total transfered: %s; average event size: %s; frontier: %s\n",
timeutil.Now().Format(time.RFC3339),
bps, total, averageEventSize, frontier.Frontier())
intervalStart = timeutil.Now()
intervalBytes = 0
intervalEventCount = 0
}
}
}
}

0 comments on commit 2cdae9f

Please sign in to comment.