Skip to content

Commit

Permalink
streamingest: c2c UX fixes
Browse files Browse the repository at this point in the history
CREATE TENANT FROM REPLICATION: currently has an output (the job
ids of the producer and consumer). This PR removes the output.

ALTER TENANT PAUSE/RESUME: currently prints the job id, and this
PR removes the output.

ALTER TENANT COMPLETE: currently prints the job id. This PR changes
the output to be the cutover timestamp. The rational is that the
user may not know the cutover time because LATEST or NOW() etc. was
used.

Epic: CRDB-18752

Release note: None
  • Loading branch information
lidorcarmel committed Dec 18, 2022
1 parent 5d792e6 commit 45a2ea9
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 58 deletions.
17 changes: 12 additions & 5 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -25,13 +26,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

const alterReplicationJobOp = "ALTER TENANT REPLICATION"

var alterReplicationJobHeader = colinfo.ResultColumns{
{Name: "replication_job_id", Typ: types.Int},
var alterReplicationCutoverHeader = colinfo.ResultColumns{
{Name: "cutover_time", Typ: types.TimestampTZ},
}

func alterReplicationJobTypeCheck(
Expand All @@ -56,8 +58,9 @@ func alterReplicationJobTypeCheck(
return false, nil, err
}
}
return true, alterReplicationCutoverHeader, nil
}
return true, alterReplicationJobHeader, nil
return true, nil, nil
}

func alterReplicationJobHook(
Expand Down Expand Up @@ -159,6 +162,8 @@ func alterReplicationJobHook(
if err := jobRegistry.Unpause(ctx, p.Txn(), tenInfo.TenantReplicationJobID); err != nil {
return err
}
cutoverTimestamp, _ := tree.MakeDTimestampTZ(timeutil.Unix(0, cutoverTime.WallTime), time.Nanosecond)
resultsCh <- tree.Datums{cutoverTimestamp}
} else {
switch alterTenantStmt.Command {
case tree.ResumeJob:
Expand All @@ -174,10 +179,12 @@ func alterReplicationJobHook(
return errors.New("unsupported job command in ALTER TENANT REPLICATION")
}
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(tenInfo.TenantReplicationJobID))}
return nil
}
return fn, alterReplicationJobHeader, nil, false, nil
if alterTenantStmt.Cutover != nil {
return fn, alterReplicationCutoverHeader, nil, false, nil
}
return fn, nil, nil, false, nil
}

func typeCheckCutoverTime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestAlterTenantPauseResume(t *testing.T) {

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
Expand All @@ -48,7 +48,10 @@ func TestAlterTenantPauseResume(t *testing.T) {
var cutoverTime time.Time
c.destSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

c.destSysSQL.Exec(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.destTenantName, cutoverTime)
var cutoverOutput time.Time
c.destSysSQL.QueryRow(c.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.destTenantName, cutoverTime).Scan(&cutoverOutput)
require.Equal(t, cutoverTime, cutoverOutput)
jobutils.WaitForJobToSucceed(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
cleanupTenant := c.createDestTenantSQL(ctx)
defer func() {
Expand Down
15 changes: 9 additions & 6 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ SET enable_experimental_stream_replication = true;
pgURL, cleanupSink := sqlutils.PGUrl(t, source.ServingSQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanupSink()

var ingestionJobID, streamProducerJobID int64
var startTime string
sourceSQL.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&startTime)

destSQL.QueryRow(t,
destSQL.Exec(t,
`CREATE TENANT "destination-tenant" FROM REPLICATION OF "source-tenant" ON $1 `,
pgURL.String(),
).Scan(&ingestionJobID, &streamProducerJobID)
)
streamProducerJobID, ingestionJobID := getStreamJobIds(t, ctx, destSQL, "destination-tenant")

sourceSQL.Exec(t, `
CREATE DATABASE d;
Expand All @@ -135,12 +135,15 @@ INSERT INTO d.t2 VALUES (2);
`)

waitUntilStartTimeReached(t, destSQL, jobspb.JobID(ingestionJobID))
var cutoverOutput time.Time
cutoverTime := timeutil.Now().Round(time.Microsecond)
destSQL.Exec(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime())
destSQL.QueryRow(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`,
hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime()).Scan(&cutoverOutput)
require.Equal(t, cutoverTime, cutoverOutput)
jobutils.WaitForJobToSucceed(t, destSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(t, sourceDBRunner, jobspb.JobID(streamProducerJobID))

stats := streamIngestionStats(t, ctx, destSQL, int(ingestionJobID))
stats := streamIngestionStats(t, ctx, destSQL, ingestionJobID)
require.Equal(t, cutoverTime, stats.IngestionProgress.CutoverTime.GoTime())
require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, stats.ProducerStatus.StreamStatus)

Expand Down Expand Up @@ -308,7 +311,7 @@ func TestReplicationJobResumptionStartTime(t *testing.T) {
defer close(planned)
defer close(canContinue)

producerJobID, replicationJobID := c.startStreamReplication()
producerJobID, replicationJobID := c.startStreamReplication(ctx)
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID))

Expand Down
19 changes: 5 additions & 14 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand All @@ -38,14 +37,9 @@ func streamIngestionJobDescription(
return tree.AsStringWithFQNames(streamIngestion, ann), nil
}

var resultColumns = colinfo.ResultColumns{
{Name: "ingestion_job_id", Typ: types.Int},
{Name: "producer_job_id", Typ: types.Int},
}

func ingestionTypeCheck(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (matched bool, header colinfo.ResultColumns, _ error) {
) (matched bool, _ colinfo.ResultColumns, _ error) {
ingestionStmt, ok := stmt.(*tree.CreateTenantFromReplication)
if !ok {
return false, nil, nil
Expand All @@ -54,7 +48,7 @@ func ingestionTypeCheck(
exprutil.Strings{ingestionStmt.ReplicationSourceAddress}); err != nil {
return false, nil, err
}
return true, resultColumns, nil
return true, nil, nil
}

func ingestionPlanHook(
Expand Down Expand Up @@ -91,7 +85,7 @@ func ingestionPlanHook(
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
fn := func(ctx context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()

Expand Down Expand Up @@ -198,18 +192,15 @@ func ingestionPlanHook(
Details: streamIngestionDetails,
}

sj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr,
jobID, p.Txn())
_, err = p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobID, p.Txn())
if err != nil {
return err
}

resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID())),
tree.NewDInt(tree.DInt(replicationProducerSpec.StreamID))}
return nil
}

return fn, resultColumns, nil, false, nil
return fn, nil, nil, false, nil
}

func init() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
_, err = conn.Exec(`SET enable_experimental_stream_replication = true`)
require.NoError(t, err)

var ingestionJobID, producerJobID int
require.NoError(t, conn.QueryRow(query).Scan(&ingestionJobID, &producerJobID))
_, err = conn.Exec(query)
require.NoError(t, err)
ingestionJobID := getStreamIngestionJobId(t, sqlDB, "30")

// Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running.
allowResponse <- struct{}{}
Expand Down
46 changes: 46 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -91,3 +92,48 @@ func streamIngestionStats(
require.NoError(t, err)
return stats
}

func streamIngestionStatsNoHeartbeat(
t *testing.T, ctx context.Context, sqlRunner *sqlutils.SQLRunner, ingestionJobID int,
) *streampb.StreamIngestionStats {
var payloadBytes []byte
var progressBytes []byte
var payload jobspb.Payload
var progress jobspb.Progress
sqlRunner.QueryRow(t, "SELECT payload, progress FROM system.jobs WHERE id = $1",
ingestionJobID).Scan(&payloadBytes, &progressBytes)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
require.NoError(t, protoutil.Unmarshal(progressBytes, &progress))
details := payload.GetStreamIngestion()
stats, err := getStreamIngestionStatsNoHeartbeat(ctx, *details, progress)
require.NoError(t, err)
return stats
}

// getStreamJobIds returns the jod ids of the producer and ingestion jobs.
func getStreamJobIds(
t *testing.T,
ctx context.Context,
sqlRunner *sqlutils.SQLRunner,
destTenantName roachpb.TenantName,
) (producer int, consumer int) {
var tenantInfoBytes []byte
var tenantInfo descpb.TenantInfo
sqlRunner.QueryRow(t, "SELECT info FROM system.tenants WHERE name=$1",
destTenantName).Scan(&tenantInfoBytes)
require.NoError(t, protoutil.Unmarshal(tenantInfoBytes, &tenantInfo))

stats := streamIngestionStatsNoHeartbeat(t, ctx, sqlRunner, int(tenantInfo.TenantReplicationJobID)) //TODO avoid the int..
return int(stats.IngestionDetails.StreamID), int(tenantInfo.TenantReplicationJobID)
}

// getStreamIngestionJon returns the ingestion job id running on the destination tenant.
func getStreamIngestionJobId(
t *testing.T, sqlRunner *sqlutils.SQLRunner, destTenantName roachpb.TenantName,
) int {
var tenantInfoBytes []byte
var tenantInfo descpb.TenantInfo
sqlRunner.QueryRow(t, "SELECT info FROM system.tenants WHERE name=$1", destTenantName).Scan(&tenantInfoBytes)
require.NoError(t, protoutil.Unmarshal(tenantInfoBytes, &tenantInfo))
return int(tenantInfo.TenantReplicationJobID)
}
Loading

0 comments on commit 45a2ea9

Please sign in to comment.