From 2cdae9fd7ce87123dd116ecbede789345f15ce40 Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Thu, 20 Apr 2023 13:08:57 -0400 Subject: [PATCH] cmdccl: test utility for running a /dev/null c2c stream A small utility for reading a tenant replication stream and doing nothing with it. Useful for manual testing. Epic: none Release note: None Co-authored-by: Steven Danna --- .github/CODEOWNERS | 1 + pkg/BUILD.bazel | 3 + pkg/ccl/cmdccl/clusterrepl/BUILD.bazel | 33 +++ pkg/ccl/cmdccl/clusterrepl/main.go | 291 +++++++++++++++++++++++++ 4 files changed, 328 insertions(+) create mode 100644 pkg/ccl/cmdccl/clusterrepl/BUILD.bazel create mode 100644 pkg/ccl/cmdccl/clusterrepl/main.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 074f99d24e08..4db42f721dd4 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -359,6 +359,7 @@ /pkg/ccl/buildccl/ @cockroachdb/dev-inf #!/pkg/ccl/cliccl/ @cockroachdb/unowned /pkg/ccl/cmdccl/stub-schema-registry/ @cockroachdb/cdc-prs +/pkg/ccl/cmdccl/clusterrepl/ @cockroachdb/disaster-recovery #!/pkg/ccl/gssapiccl/ @cockroachdb/unowned /pkg/ccl/jwtauthccl/ @cockroachdb/cloud-identity #!/pkg/ccl/kvccl/ @cockroachdb/kv-noreview diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index db15f4ce3d43..b90e6ab8cd24 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -796,6 +796,8 @@ GO_TARGETS = [ "//pkg/ccl/cloudccl/cloudprivilege:cloudprivilege_test", "//pkg/ccl/cloudccl/externalconn:externalconn_test", "//pkg/ccl/cloudccl/gcp:gcp_test", + "//pkg/ccl/cmdccl/clusterrepl:clusterrepl", + "//pkg/ccl/cmdccl/clusterrepl:clusterrepl_lib", "//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry", "//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry_lib", "//pkg/ccl/gssapiccl:gssapiccl", @@ -2491,6 +2493,7 @@ GET_X_DATA_TARGETS = [ "//pkg/ccl/cloudccl/cloudprivilege:get_x_data", "//pkg/ccl/cloudccl/externalconn:get_x_data", "//pkg/ccl/cloudccl/gcp:get_x_data", + "//pkg/ccl/cmdccl/clusterrepl:get_x_data", "//pkg/ccl/cmdccl/stub-schema-registry:get_x_data", "//pkg/ccl/gssapiccl:get_x_data", "//pkg/ccl/importerccl:get_x_data", diff --git a/pkg/ccl/cmdccl/clusterrepl/BUILD.bazel b/pkg/ccl/cmdccl/clusterrepl/BUILD.bazel new file mode 100644 index 000000000000..56d3b5b38b17 --- /dev/null +++ b/pkg/ccl/cmdccl/clusterrepl/BUILD.bazel @@ -0,0 +1,33 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "clusterrepl_lib", + srcs = ["main.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/cmdccl/clusterrepl", + visibility = ["//visibility:private"], + deps = [ + "//pkg/ccl/streamingccl", + "//pkg/ccl/streamingccl/streamclient", + "//pkg/cli/exit", + "//pkg/keys", + "//pkg/repstream/streampb", + "//pkg/roachpb", + "//pkg/util/ctxgroup", + "//pkg/util/hlc", + "//pkg/util/humanizeutil", + "//pkg/util/protoutil", + "//pkg/util/span", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_jackc_pgx_v4//:pgx", + ], +) + +go_binary( + name = "clusterrepl", + embed = [":clusterrepl_lib"], + visibility = ["//visibility:public"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/ccl/cmdccl/clusterrepl/main.go b/pkg/ccl/cmdccl/clusterrepl/main.go new file mode 100644 index 000000000000..d4dfcd1cee0d --- /dev/null +++ b/pkg/ccl/cmdccl/clusterrepl/main.go @@ -0,0 +1,291 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package main + +import ( + "context" + "flag" + "fmt" + "net/url" + "os" + "os/signal" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/cli/exit" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/jackc/pgx/v4" +) + +var ( + uri = flag.String("uri", "", "sql uri") + tenant = flag.String("tenant", "", "tenant name") + + parallelScan = flag.Int("scans", 16, "parallel scan count") + batchSize = flag.Int("batch-size", 1<<20, "batch size") + checkpointInterval = flag.Duration("checkpoint-iterval", 10*time.Second, "checkpoint interval") + statsInterval = flag.Duration("stats-interval", 5*time.Second, "period over which to measure throughput") + + noParse = flag.Bool("no-parse", false, "avoid parsing data") +) + +func errorf(msg string, args ...interface{}) { + fmt.Fprintf(os.Stderr, "error: %s\n", fmt.Sprintf(msg, args...)) +} + +func fatalf(msg string, args ...interface{}) { + errorf(msg, args...) + exit.WithCode(exit.UnspecifiedError()) +} + +func cancelOnShutdown(ctx context.Context) context.Context { + ctx, cancel := context.WithCancel(ctx) + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + fmt.Println("received interrupt, shutting down; send another interrupt to force kill") + cancel() + <-c + fatalf("received second interrupt, shutting down now\n") + }() + return ctx +} + +func main() { + flag.Parse() + if *uri == "" { + fatalf("URI required") + } + if *tenant == "" { + fatalf("tenant name required") + } + + streamAddr, err := url.Parse(*uri) + if err != nil { + fatalf("parse: %s", err) + } + + ctx := cancelOnShutdown(context.Background()) + if err := streamPartition(ctx, streamAddr); err != nil { + if errors.Is(err, context.Canceled) { + exit.WithCode(exit.Interrupted()) + } else { + fatalf(err.Error()) + } + } +} + +func streamPartition(ctx context.Context, streamAddr *url.URL) error { + fmt.Println("creating producer stream") + client, err := streamclient.NewPartitionedStreamClient(ctx, streamAddr) + if err != nil { + return err + } + + replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(*tenant)) + if err != nil { + return err + } + + defer func() { + // Attempt to clean up the replication job on shutdown. But don't wait forever. + if err := timeutil.RunWithTimeout(ctx, "complete-stream", 30*time.Second, func(ctx context.Context) error { + return client.Complete(ctx, replicationProducerSpec.StreamID, true) + }); err != nil { + errorf("warning: could not clean up producer job: %s", err.Error()) + } + }() + + // We ignore most of this plan. But, it gives us the tenant ID. + plan, err := client.Plan(ctx, replicationProducerSpec.StreamID) + if err != nil { + return err + } + + prefix := keys.MakeTenantPrefix(plan.SourceTenantID) + tenantSpan := roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()} + + var sps streampb.StreamPartitionSpec + sps.Config.MinCheckpointFrequency = *checkpointInterval + sps.Config.BatchByteSize = int64(*batchSize) + sps.Config.InitialScanParallelism = int32(*parallelScan) + sps.Spans = append(sps.Spans, tenantSpan) + sps.InitialScanTimestamp = replicationProducerSpec.ReplicationStartTime + spsBytes, err := protoutil.Marshal(&sps) + if err != nil { + return err + } + frontier, err := span.MakeFrontier(sps.Spans...) + if err != nil { + return err + } + + fmt.Printf("streaming %s (%s) as of %s\n", *tenant, tenantSpan, sps.InitialScanTimestamp) + if *noParse { + return rawStream(ctx, streamAddr, replicationProducerSpec.StreamID, spsBytes) + } + + sub, err := client.Subscribe(ctx, replicationProducerSpec.StreamID, + spsBytes, + sps.InitialScanTimestamp, + hlc.Timestamp{}) + if err != nil { + return err + } + + g := ctxgroup.WithContext(ctx) + g.GoCtx(sub.Subscribe) + g.GoCtx(subscriptionConsumer(sub, frontier)) + return g.Wait() +} + +func rawStream( + ctx context.Context, + uri *url.URL, + streamID streampb.StreamID, + spec streamclient.SubscriptionToken, +) error { + config, err := pgx.ParseConfig(uri.String()) + if err != nil { + return err + } + + conn, err := pgx.ConnectConfig(ctx, config) + if err != nil { + return err + } + + _, err = conn.Exec(ctx, `SET avoid_buffering = true`) + if err != nil { + return err + } + rows, err := conn.Query(ctx, `SELECT * FROM crdb_internal.stream_partition($1, $2)`, + streamID, spec) + if err != nil { + return err + } + defer rows.Close() + + var data []byte + var ( + totalBytes int + intervalBytes int + + totalRowCount int + intervalRowCount int + ) + intervalStart := timeutil.Now() + for { + if !rows.Next() { + if err := rows.Err(); err != nil { + return err + } + return nil + } + data = data[:0] + if err := rows.Scan(&data); err != nil { + return err + } + totalBytes += len(data) + intervalBytes += len(data) + totalRowCount++ + intervalRowCount++ + + if timeutil.Since(intervalStart) > *statsInterval { + var ( + elapsedSeconds = timeutil.Since(intervalStart).Seconds() + bps = humanizeutil.IBytes(int64(float64(intervalBytes) / elapsedSeconds)) + total = humanizeutil.IBytes(int64(totalBytes)) + averageRowSize = humanizeutil.IBytes(int64(intervalBytes / intervalRowCount)) + ) + fmt.Printf("%s throughput: %s/s; total transfered: %s; average row size: %s\n", + timeutil.Now().Format(time.RFC3339), + bps, total, averageRowSize) + intervalStart = timeutil.Now() + intervalBytes = 0 + intervalRowCount = 0 + } + } +} + +func subscriptionConsumer( + sub streamclient.Subscription, frontier *span.Frontier, +) func(ctx context.Context) error { + return func(ctx context.Context) error { + var ( + totalBytes int + intervalBytes int + + totalEventCount int + intervalEventCount int + ) + intervalStart := timeutil.Now() + for { + var sz int + select { + case event, ok := <-sub.Events(): + if !ok { + return sub.Err() + } + switch event.Type() { + case streamingccl.KVEvent: + kv := event.GetKV() + sz = kv.Size() + case streamingccl.SSTableEvent: + ssTab := event.GetSSTable() + sz = ssTab.Size() + case streamingccl.DeleteRangeEvent: + case streamingccl.CheckpointEvent: + fmt.Printf("%s checkpoint\n", timeutil.Now().Format(time.RFC3339)) + resolved := event.GetResolvedSpans() + for _, r := range resolved { + _, err := frontier.Forward(r.Span, r.Timestamp) + if err != nil { + return err + } + } + default: + return errors.Newf("unknown streaming event type %v", event.Type()) + } + case <-ctx.Done(): + return nil + } + totalBytes += sz + intervalBytes += sz + totalEventCount++ + intervalEventCount++ + + if timeutil.Since(intervalStart) > *statsInterval { + var ( + elapsedSeconds = timeutil.Since(intervalStart).Seconds() + bps = humanizeutil.IBytes(int64(float64(intervalBytes) / elapsedSeconds)) + total = humanizeutil.IBytes(int64(totalBytes)) + averageEventSize = humanizeutil.IBytes(int64(intervalBytes / intervalEventCount)) + ) + fmt.Printf("%s throughput: %s/s; total transfered: %s; average event size: %s; frontier: %s\n", + timeutil.Now().Format(time.RFC3339), + bps, total, averageEventSize, frontier.Frontier()) + intervalStart = timeutil.Now() + intervalBytes = 0 + intervalEventCount = 0 + } + } + } +}