Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingest: c2c UX fixes #93755

Merged
merged 4 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/base",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/replicationutils",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand All @@ -68,6 +69,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
43 changes: 38 additions & 5 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,24 @@ import (
"testing"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -144,16 +148,19 @@ func (c *TenantStreamingClusters) Cutover(
producerJobID, ingestionJobID int, cutoverTime time.Time,
) {
// Cut over the ingestion job and the job will stop eventually.
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime)
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := DecimalTimeToHLC(c.T, cutoverStr)
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
}

// StartStreamReplication producer job ID and ingestion job ID.
func (c *TenantStreamingClusters) StartStreamReplication() (int, int) {
var ingestionJobID, streamProducerJobID int
c.DestSysSQL.QueryRow(c.T, c.BuildCreateTenantQuery()).Scan(&ingestionJobID, &streamProducerJobID)
func (c *TenantStreamingClusters) StartStreamReplication(ctx context.Context) (int, int) {
c.DestSysSQL.Exec(c.T, c.BuildCreateTenantQuery())
streamProducerJobID, ingestionJobID := GetStreamJobIds(c.T, ctx, c.DestSysSQL, c.Args.DestTenantName)
return streamProducerJobID, ingestionJobID
}

Expand Down Expand Up @@ -352,3 +359,29 @@ func RunningStatus(t *testing.T, sqlRunner *sqlutils.SQLRunner, ingestionJobID i
p := jobutils.GetJobProgress(t, sqlRunner, jobspb.JobID(ingestionJobID))
return p.RunningStatus
}

func DecimalTimeToHLC(t *testing.T, s string) hlc.Timestamp {
t.Helper()
d, _, err := apd.NewFromString(s)
require.NoError(t, err)
ts, err := hlc.DecimalToHLC(d)
require.NoError(t, err)
return ts
}

// GetStreamJobIds returns the jod ids of the producer and ingestion jobs.
func GetStreamJobIds(
t *testing.T,
ctx context.Context,
sqlRunner *sqlutils.SQLRunner,
destTenantName roachpb.TenantName,
) (producer int, consumer int) {
var tenantInfoBytes []byte
var tenantInfo descpb.TenantInfo
sqlRunner.QueryRow(t, "SELECT info FROM system.tenants WHERE name=$1",
destTenantName).Scan(&tenantInfoBytes)
require.NoError(t, protoutil.Unmarshal(tenantInfoBytes, &tenantInfo))

stats := replicationutils.TestingGetStreamIngestionStatsNoHeartbeatFromReplicationJob(t, ctx, sqlRunner, int(tenantInfo.TenantReplicationJobID))
return int(stats.IngestionDetails.StreamID), int(tenantInfo.TenantReplicationJobID)
}
17 changes: 17 additions & 0 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,23 @@ func GetStreamIngestionStats(
return stats, client.Close(ctx)
}

func TestingGetStreamIngestionStatsNoHeartbeatFromReplicationJob(
t *testing.T, ctx context.Context, sqlRunner *sqlutils.SQLRunner, ingestionJobID int,
) *streampb.StreamIngestionStats {
var payloadBytes []byte
var progressBytes []byte
var payload jobspb.Payload
var progress jobspb.Progress
sqlRunner.QueryRow(t, "SELECT payload, progress FROM system.jobs WHERE id = $1",
ingestionJobID).Scan(&payloadBytes, &progressBytes)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
require.NoError(t, protoutil.Unmarshal(progressBytes, &progress))
details := payload.GetStreamIngestion()
stats, err := GetStreamIngestionStatsNoHeartbeat(ctx, *details, progress)
require.NoError(t, err)
return stats
}

func TestingGetStreamIngestionStatsFromReplicationJob(
t *testing.T, ctx context.Context, sqlRunner *sqlutils.SQLRunner, ingestionJobID int,
) *streampb.StreamIngestionStats {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func NewStreamClient(
case "postgres", "postgresql":
// The canonical PostgreSQL URL scheme is "postgresql", however our
// own client commands also accept "postgres".
return newPartitionedStreamClient(ctx, streamURL)
return NewPartitionedStreamClient(ctx, streamURL)
case RandomGenScheme:
streamClient, err = newRandomStreamClient(streamURL)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type partitionedStreamClient struct {
}
}

func newPartitionedStreamClient(
func NewPartitionedStreamClient(
ctx context.Context, remote *url.URL,
) (*partitionedStreamClient, error) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamclient
package streamclient_test

import (
"context"
Expand All @@ -21,6 +21,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant.
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -42,7 +43,7 @@ import (
)

type subscriptionFeedSource struct {
sub Subscription
sub streamclient.Subscription
}

var _ replicationtestutils.FeedSource = (*subscriptionFeedSource)(nil)
Expand Down Expand Up @@ -117,7 +118,7 @@ INSERT INTO d.t2 VALUES (2);
}

maybeInlineURL := maybeGenerateInlineURL(&h.PGUrl)
client, err := newPartitionedStreamClient(ctx, maybeInlineURL)
client, err := streamclient.NewPartitionedStreamClient(ctx, maybeInlineURL)
defer func() {
require.NoError(t, client.Close(ctx))
}()
Expand Down Expand Up @@ -197,7 +198,7 @@ INSERT INTO d.t2 VALUES (2);
url, err := streamingccl.StreamAddress(top.Partitions[0].SrcAddr).URL()
require.NoError(t, err)
// Create a new stream client with the given partition address.
subClient, err := newPartitionedStreamClient(ctx, url)
subClient, err := streamclient.NewPartitionedStreamClient(ctx, url)
defer func() {
require.NoError(t, subClient.Close(ctx))
}()
Expand Down
14 changes: 9 additions & 5 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (

const alterReplicationJobOp = "ALTER TENANT REPLICATION"

var alterReplicationJobHeader = colinfo.ResultColumns{
{Name: "replication_job_id", Typ: types.Int},
var alterReplicationCutoverHeader = colinfo.ResultColumns{
{Name: "cutover_time", Typ: types.Decimal},
}

// ResolvedTenantReplicationOptions represents options from an
Expand Down Expand Up @@ -96,9 +96,10 @@ func alterReplicationJobTypeCheck(
return false, nil, err
}
}
return true, alterReplicationCutoverHeader, nil
}

return true, alterReplicationJobHeader, nil
return true, nil, nil
}

func alterReplicationJobHook(
Expand Down Expand Up @@ -168,6 +169,7 @@ func alterReplicationJobHook(
p.ExecCfg().ProtectedTimestampProvider, alterTenantStmt, tenInfo, cutoverTime); err != nil {
return err
}
resultsCh <- tree.Datums{eval.TimestampToDecimalDatum(cutoverTime)}
} else if !alterTenantStmt.Options.IsDefault() {
if err := alterTenantOptions(ctx, p.Txn(), jobRegistry, options, tenInfo); err != nil {
return err
Expand All @@ -187,10 +189,12 @@ func alterReplicationJobHook(
return errors.New("unsupported job command in ALTER TENANT REPLICATION")
}
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(tenInfo.TenantReplicationJobID))}
return nil
}
return fn, alterReplicationJobHeader, nil, false, nil
if alterTenantStmt.Cutover != nil {
return fn, alterReplicationCutoverHeader, nil, false, nil
}
return fn, nil, nil, false, nil
}

func alterTenantJobCutover(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestAlterTenantPauseResume(t *testing.T) {

c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.StartStreamReplication()
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
Expand All @@ -49,7 +49,11 @@ func TestAlterTenantPauseResume(t *testing.T) {
var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.DestTenantName, cutoverTime)
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(t, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
cleanupTenant := c.CreateDestTenantSQL(ctx)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestDataDriven(t *testing.T) {
})

case "start-replication-stream":
ds.producerJobID, ds.replicationJobID = ds.replicationClusters.StartStreamReplication()
ds.producerJobID, ds.replicationJobID = ds.replicationClusters.StartStreamReplication(ctx)

case "wait-until-high-watermark":
var highWaterMark string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // To start tenants.
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -242,8 +243,9 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
_, err = conn.Exec(`SET enable_experimental_stream_replication = true`)
require.NoError(t, err)

var ingestionJobID, producerJobID int
require.NoError(t, conn.QueryRow(query).Scan(&ingestionJobID, &producerJobID))
_, err = conn.Exec(query)
require.NoError(t, err)
_, ingestionJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlDB, "30")

// Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running.
allowResponse <- struct{}{}
Expand Down
Loading