Skip to content

Commit

Permalink
streamingest: fix ALTER TENANT REPLICATION output
Browse files Browse the repository at this point in the history
ALTER TENANT PAUSE/RESUME: currently prints the job id, and this PR removes
the output.

ALTER TENANT COMPLETE: currently prints the job id. This PR changes the output
to be the cutover timestamp. The rational is that the user may not know the
cutover time because LATEST or NOW() etc. was used.

Epic: CRDB-18752

Release note: None
  • Loading branch information
lidorcarmel committed Dec 20, 2022
1 parent fdaa9f5 commit 70edfac
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
16 changes: 11 additions & 5 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -30,8 +31,8 @@ import (

const alterReplicationJobOp = "ALTER TENANT REPLICATION"

var alterReplicationJobHeader = colinfo.ResultColumns{
{Name: "replication_job_id", Typ: types.Int},
var alterReplicationCutoverHeader = colinfo.ResultColumns{
{Name: "cutover_time", Typ: types.TimestampTZ},
}

func alterReplicationJobTypeCheck(
Expand All @@ -56,8 +57,9 @@ func alterReplicationJobTypeCheck(
return false, nil, err
}
}
return true, alterReplicationCutoverHeader, nil
}
return true, alterReplicationJobHeader, nil
return true, nil, nil
}

func alterReplicationJobHook(
Expand Down Expand Up @@ -159,6 +161,8 @@ func alterReplicationJobHook(
if err := jobRegistry.Unpause(ctx, p.Txn(), tenInfo.TenantReplicationJobID); err != nil {
return err
}
cutoverTimestamp, _ := tree.MakeDTimestampTZ(cutoverTime.GoTime(), time.Nanosecond)
resultsCh <- tree.Datums{cutoverTimestamp}
} else {
switch alterTenantStmt.Command {
case tree.ResumeJob:
Expand All @@ -174,10 +178,12 @@ func alterReplicationJobHook(
return errors.New("unsupported job command in ALTER TENANT REPLICATION")
}
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(tenInfo.TenantReplicationJobID))}
return nil
}
return fn, alterReplicationJobHeader, nil, false, nil
if alterTenantStmt.Cutover != nil {
return fn, alterReplicationCutoverHeader, nil, false, nil
}
return fn, nil, nil, false, nil
}

func typeCheckCutoverTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func TestAlterTenantPauseResume(t *testing.T) {
var cutoverTime time.Time
c.destSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

c.destSysSQL.Exec(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.destTenantName, cutoverTime)
var cutoverOutput time.Time
c.destSysSQL.QueryRow(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.destTenantName, cutoverTime).Scan(&cutoverOutput)
require.Equal(t, cutoverTime, cutoverOutput)
jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
cleanupTenant := c.createDestTenantSQL(ctx)
defer func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ INSERT INTO d.t2 VALUES (2);
`)

waitUntilStartTimeReached(t, destSQL, jobspb.JobID(ingestionJobID))
var cutoverOutput time.Time
cutoverTime := timeutil.Now().Round(time.Microsecond)
destSQL.Exec(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime())
destSQL.QueryRow(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`,
hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime()).Scan(&cutoverOutput)
require.Equal(t, cutoverTime, cutoverOutput)
jobutils.WaitForJobToSucceed(t, destSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(t, sourceDBRunner, jobspb.JobID(streamProducerJobID))

Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ func (c *tenantStreamingClusters) cutover(
producerJobID, ingestionJobID int, cutoverTime time.Time,
) {
// Cut over the ingestion job and the job will stop eventually.
c.destSysSQL.Exec(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, c.args.destTenantName, cutoverTime)
var cutoverOutput time.Time
c.destSysSQL.QueryRow(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.args.destTenantName, cutoverTime).Scan(&cutoverOutput)
require.Equal(c.t, cutoverTime.UnixMicro(), cutoverOutput.UnixMicro())
jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
}
Expand Down Expand Up @@ -901,7 +904,10 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
var cutoverTime time.Time
alternateSrcSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

c.destSysSQL.Exec(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, c.args.destTenantName, cutoverTime)
var cutoverOutput time.Time
c.destSysSQL.QueryRow(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.args.destTenantName, cutoverTime).Scan(&cutoverOutput)
require.Equal(c.t, cutoverTime, cutoverOutput)
jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

// The destroyed address should have been removed from the topology
Expand Down Expand Up @@ -948,7 +954,10 @@ func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) {
// Destroy the source cluster
c.srcCleanup()

c.destSysSQL.Exec(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, c.args.destTenantName, cutoverTime.AsOfSystemTime())
var cutoverOutput time.Time
c.destSysSQL.QueryRow(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.args.destTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverOutput)
require.Equal(c.t, cutoverTime.GoTime().UnixMicro(), cutoverOutput.UnixMicro())

// Resume ingestion.
c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID))
Expand Down

0 comments on commit 70edfac

Please sign in to comment.