diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index d6ae8e91e07f..bdbe4898a526 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -10,6 +10,7 @@ package streamingest import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -30,8 +31,8 @@ import ( const alterReplicationJobOp = "ALTER TENANT REPLICATION" -var alterReplicationJobHeader = colinfo.ResultColumns{ - {Name: "replication_job_id", Typ: types.Int}, +var alterReplicationCutoverHeader = colinfo.ResultColumns{ + {Name: "cutover_time", Typ: types.TimestampTZ}, } func alterReplicationJobTypeCheck( @@ -56,8 +57,9 @@ func alterReplicationJobTypeCheck( return false, nil, err } } + return true, alterReplicationCutoverHeader, nil } - return true, alterReplicationJobHeader, nil + return true, nil, nil } func alterReplicationJobHook( @@ -159,6 +161,8 @@ func alterReplicationJobHook( if err := jobRegistry.Unpause(ctx, p.Txn(), tenInfo.TenantReplicationJobID); err != nil { return err } + cutoverTimestamp, _ := tree.MakeDTimestampTZ(cutoverTime.GoTime(), time.Nanosecond) + resultsCh <- tree.Datums{cutoverTimestamp} } else { switch alterTenantStmt.Command { case tree.ResumeJob: @@ -174,10 +178,12 @@ func alterReplicationJobHook( return errors.New("unsupported job command in ALTER TENANT REPLICATION") } } - resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(tenInfo.TenantReplicationJobID))} return nil } - return fn, alterReplicationJobHeader, nil, false, nil + if alterTenantStmt.Cutover != nil { + return fn, alterReplicationCutoverHeader, nil, false, nil + } + return fn, nil, nil, false, nil } func typeCheckCutoverTime( diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go index 9f70c0ea83dd..e2bf0e62a992 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go @@ -48,7 +48,10 @@ func TestAlterTenantPauseResume(t *testing.T) { var cutoverTime time.Time c.destSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) - c.destSysSQL.Exec(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.destTenantName, cutoverTime) + var cutoverOutput time.Time + c.destSysSQL.QueryRow(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, + args.destTenantName, cutoverTime).Scan(&cutoverOutput) + require.Equal(t, cutoverTime, cutoverOutput) jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) cleanupTenant := c.createDestTenantSQL(ctx) defer func() { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 51c07636ec60..858778bdaa59 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -135,8 +135,11 @@ INSERT INTO d.t2 VALUES (2); `) waitUntilStartTimeReached(t, destSQL, jobspb.JobID(ingestionJobID)) + var cutoverOutput time.Time cutoverTime := timeutil.Now().Round(time.Microsecond) - destSQL.Exec(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime()) + destSQL.QueryRow(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, + hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime()).Scan(&cutoverOutput) + require.Equal(t, cutoverTime, cutoverOutput) jobutils.WaitForJobToSucceed(t, destSQL, jobspb.JobID(ingestionJobID)) jobutils.WaitForJobToSucceed(t, sourceDBRunner, jobspb.JobID(streamProducerJobID)) diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index cf93cd1efb9e..8bac512b8cb4 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -187,7 +187,10 @@ func (c *tenantStreamingClusters) cutover( producerJobID, ingestionJobID int, cutoverTime time.Time, ) { // Cut over the ingestion job and the job will stop eventually. - c.destSysSQL.Exec(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, c.args.destTenantName, cutoverTime) + var cutoverOutput time.Time + c.destSysSQL.QueryRow(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, + c.args.destTenantName, cutoverTime).Scan(&cutoverOutput) + require.Equal(c.t, cutoverTime.UnixMicro(), cutoverOutput.UnixMicro()) jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) jobutils.WaitForJobToSucceed(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) } @@ -901,7 +904,10 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { var cutoverTime time.Time alternateSrcSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) - c.destSysSQL.Exec(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, c.args.destTenantName, cutoverTime) + var cutoverOutput time.Time + c.destSysSQL.QueryRow(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, + c.args.destTenantName, cutoverTime).Scan(&cutoverOutput) + require.Equal(c.t, cutoverTime, cutoverOutput) jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) // The destroyed address should have been removed from the topology @@ -948,7 +954,10 @@ func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) { // Destroy the source cluster c.srcCleanup() - c.destSysSQL.Exec(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, c.args.destTenantName, cutoverTime.AsOfSystemTime()) + var cutoverOutput time.Time + c.destSysSQL.QueryRow(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, + c.args.destTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverOutput) + require.Equal(c.t, cutoverTime.GoTime().UnixMicro(), cutoverOutput.UnixMicro()) // Resume ingestion. c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID))