Skip to content

Commit

Permalink
streamingest: remove job ids from CREATE TENANT FROM REPLICATION
Browse files Browse the repository at this point in the history
Currently we print the producer and ingestion job ids. Instead,
users should not see job ids unless something is really broken.

Epic: CRDB-18752

Release note: None
  • Loading branch information
lidorcarmel committed Dec 20, 2022
1 parent f6865f9 commit fdaa9f5
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestAlterTenantPauseResume(t *testing.T) {

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ SET enable_experimental_stream_replication = true;
pgURL, cleanupSink := sqlutils.PGUrl(t, source.ServingSQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanupSink()

var ingestionJobID, streamProducerJobID int64
var startTime string
sourceSQL.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&startTime)

destSQL.QueryRow(t,
destSQL.Exec(t,
`CREATE TENANT "destination-tenant" FROM REPLICATION OF "source-tenant" ON $1 `,
pgURL.String(),
).Scan(&ingestionJobID, &streamProducerJobID)
)
streamProducerJobID, ingestionJobID := getStreamJobIds(t, ctx, destSQL, "destination-tenant")

sourceSQL.Exec(t, `
CREATE DATABASE d;
Expand All @@ -140,7 +140,7 @@ INSERT INTO d.t2 VALUES (2);
jobutils.WaitForJobToSucceed(t, destSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(t, sourceDBRunner, jobspb.JobID(streamProducerJobID))

stats := streamIngestionStats(t, ctx, destSQL, int(ingestionJobID))
stats := streamIngestionStats(t, ctx, destSQL, ingestionJobID)
require.Equal(t, cutoverTime, stats.IngestionProgress.CutoverTime.GoTime())
require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, stats.ProducerStatus.StreamStatus)

Expand Down Expand Up @@ -308,7 +308,7 @@ func TestReplicationJobResumptionStartTime(t *testing.T) {
defer close(planned)
defer close(canContinue)

producerJobID, replicationJobID := c.startStreamReplication()
producerJobID, replicationJobID := c.startStreamReplication(ctx)
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID))

Expand Down
19 changes: 5 additions & 14 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
Expand All @@ -38,14 +37,9 @@ func streamIngestionJobDescription(
return tree.AsStringWithFQNames(streamIngestion, ann), nil
}

var resultColumns = colinfo.ResultColumns{
{Name: "ingestion_job_id", Typ: types.Int},
{Name: "producer_job_id", Typ: types.Int},
}

func ingestionTypeCheck(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (matched bool, header colinfo.ResultColumns, _ error) {
) (matched bool, _ colinfo.ResultColumns, _ error) {
ingestionStmt, ok := stmt.(*tree.CreateTenantFromReplication)
if !ok {
return false, nil, nil
Expand All @@ -54,7 +48,7 @@ func ingestionTypeCheck(
exprutil.Strings{ingestionStmt.ReplicationSourceAddress}); err != nil {
return false, nil, err
}
return true, resultColumns, nil
return true, nil, nil
}

func ingestionPlanHook(
Expand Down Expand Up @@ -91,7 +85,7 @@ func ingestionPlanHook(
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
fn := func(ctx context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()

Expand Down Expand Up @@ -198,18 +192,15 @@ func ingestionPlanHook(
Details: streamIngestionDetails,
}

sj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr,
jobID, p.Txn())
_, err = p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobID, p.Txn())
if err != nil {
return err
}

resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID())),
tree.NewDInt(tree.DInt(replicationProducerSpec.StreamID))}
return nil
}

return fn, resultColumns, nil, false, nil
return fn, nil, nil, false, nil
}

func init() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
_, err = conn.Exec(`SET enable_experimental_stream_replication = true`)
require.NoError(t, err)

var ingestionJobID, producerJobID int
require.NoError(t, conn.QueryRow(query).Scan(&ingestionJobID, &producerJobID))
_, err = conn.Exec(query)
require.NoError(t, err)
ingestionJobID := getStreamIngestionJobId(t, sqlDB, "30")

// Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running.
allowResponse <- struct{}{}
Expand Down
46 changes: 46 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -91,3 +92,48 @@ func streamIngestionStats(
require.NoError(t, err)
return stats
}

func streamIngestionStatsNoHeartbeat(
t *testing.T, ctx context.Context, sqlRunner *sqlutils.SQLRunner, ingestionJobID int,
) *streampb.StreamIngestionStats {
var payloadBytes []byte
var progressBytes []byte
var payload jobspb.Payload
var progress jobspb.Progress
sqlRunner.QueryRow(t, "SELECT payload, progress FROM system.jobs WHERE id = $1",
ingestionJobID).Scan(&payloadBytes, &progressBytes)
require.NoError(t, protoutil.Unmarshal(payloadBytes, &payload))
require.NoError(t, protoutil.Unmarshal(progressBytes, &progress))
details := payload.GetStreamIngestion()
stats, err := getStreamIngestionStatsNoHeartbeat(ctx, *details, progress)
require.NoError(t, err)
return stats
}

// getStreamJobIds returns the jod ids of the producer and ingestion jobs.
func getStreamJobIds(
t *testing.T,
ctx context.Context,
sqlRunner *sqlutils.SQLRunner,
destTenantName roachpb.TenantName,
) (producer int, consumer int) {
var tenantInfoBytes []byte
var tenantInfo descpb.TenantInfo
sqlRunner.QueryRow(t, "SELECT info FROM system.tenants WHERE name=$1",
destTenantName).Scan(&tenantInfoBytes)
require.NoError(t, protoutil.Unmarshal(tenantInfoBytes, &tenantInfo))

stats := streamIngestionStatsNoHeartbeat(t, ctx, sqlRunner, int(tenantInfo.TenantReplicationJobID)) //TODO avoid the int..
return int(stats.IngestionDetails.StreamID), int(tenantInfo.TenantReplicationJobID)
}

// getStreamIngestionJon returns the ingestion job id running on the destination tenant.
func getStreamIngestionJobId(
t *testing.T, sqlRunner *sqlutils.SQLRunner, destTenantName roachpb.TenantName,
) int {
var tenantInfoBytes []byte
var tenantInfo descpb.TenantInfo
sqlRunner.QueryRow(t, "SELECT info FROM system.tenants WHERE name=$1", destTenantName).Scan(&tenantInfoBytes)
require.NoError(t, protoutil.Unmarshal(tenantInfoBytes, &tenantInfo))
return int(tenantInfo.TenantReplicationJobID)
}
37 changes: 19 additions & 18 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,12 @@ func (c *tenantStreamingClusters) cutover(
}

// Returns producer job ID and ingestion job ID.
func (c *tenantStreamingClusters) startStreamReplication() (int, int) {
var ingestionJobID, streamProducerJobID int
streamReplStmt := fmt.Sprintf("CREATE TENANT %s FROM REPLICATION OF %s ON '%s'", c.args.destTenantName, c.args.srcTenantName, c.srcURL.String())
c.destSysSQL.QueryRow(c.t, streamReplStmt).Scan(&ingestionJobID, &streamProducerJobID)
return streamProducerJobID, ingestionJobID
func (c *tenantStreamingClusters) startStreamReplication(ctx context.Context) (int, int) {
streamReplStmt := fmt.Sprintf("CREATE TENANT %s FROM REPLICATION OF %s ON '%s'",
c.args.destTenantName, c.args.srcTenantName, c.srcURL.String())
c.destSysSQL.Exec(c.t, streamReplStmt)

return getStreamJobIds(c.t, ctx, c.destSysSQL, c.args.destTenantName)
}

func waitForTenantPodsActive(
Expand Down Expand Up @@ -371,7 +372,7 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, defaultTenantStreamingClustersArgs)
defer cleanup()

producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

c.srcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) {
tenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
Expand Down Expand Up @@ -426,7 +427,7 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
Expand Down Expand Up @@ -486,7 +487,7 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
Expand Down Expand Up @@ -559,7 +560,7 @@ func TestTenantStreamingPauseOnPermanentJobError(t *testing.T) {
ingestErrCh <- errors.Newf("ingestion error from test")
close(ingestErrCh)

producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

Expand Down Expand Up @@ -609,7 +610,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

// Helper to read job progress
jobRegistry := c.destSysServer.JobRegistry().(*jobs.Registry)
Expand Down Expand Up @@ -705,7 +706,7 @@ func TestTenantStreamingCancelIngestion(t *testing.T) {
testCancelIngestion := func(t *testing.T, cancelAfterPaused bool) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
Expand Down Expand Up @@ -781,7 +782,7 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) {
testCancelIngestion := func(t *testing.T, cancelAfterPaused bool) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';")
Expand Down Expand Up @@ -848,7 +849,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
createScatteredTable(t, c, 3)
srcScatteredData := c.srcTenantSQL.QueryStr(c.t, "SELECT * FROM d.scattered ORDER BY key")

producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

Expand Down Expand Up @@ -928,7 +929,7 @@ func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
Expand Down Expand Up @@ -968,7 +969,7 @@ func TestTenantStreamingDeleteRange(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, defaultTenantStreamingClustersArgs)
defer cleanup()

producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

Expand Down Expand Up @@ -1052,7 +1053,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {

createScatteredTable(t, c, 3)

producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

Expand Down Expand Up @@ -1177,7 +1178,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) {
c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';")

producerJobID, replicationJobID := c.startStreamReplication()
producerJobID, replicationJobID := c.startStreamReplication(ctx)

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID))
Expand Down Expand Up @@ -1268,7 +1269,7 @@ func TestTenantStreamingShowTenant(t *testing.T) {

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.startStreamReplication()
producerJobID, ingestionJobID := c.startStreamReplication(ctx)

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ go_library(
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/storage/enginepb",
Expand Down
12 changes: 10 additions & 2 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm/local"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -167,8 +168,15 @@ func registerClusterToCluster(r registry.Registry) {
t.Status("starting replication stream")
streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'",
setup.dst.name, setup.src.name, setup.src.pgURL)
var ingestionJobID, streamProducerJobID int
setup.dst.sql.QueryRow(t, streamReplStmt).Scan(&ingestionJobID, &streamProducerJobID)
setup.dst.sql.Exec(t, streamReplStmt)

// Get the ingestion job id.
var tenantInfoBytes []byte
var tenantInfo descpb.TenantInfo
setup.dst.sql.QueryRow(t, "SELECT info FROM system.tenants WHERE name=$1",
setup.dst.name).Scan(&tenantInfoBytes)
require.NoError(t, protoutil.Unmarshal(tenantInfoBytes, &tenantInfo))
ingestionJobID := int(tenantInfo.TenantReplicationJobID)

// TODO(ssd): The job doesn't record the initial
// statement time, so we can't correctly measure the
Expand Down

0 comments on commit fdaa9f5

Please sign in to comment.