diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 061efda48fa6..ee2e925a6f2b 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -24,9 +24,12 @@ go_library( "//pkg/ccl/utilccl", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", "//pkg/kv/bulk", + "//pkg/kv/kvserver/protectedts", + "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/repstream", "//pkg/repstream/streampb", "//pkg/roachpb", @@ -58,6 +61,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", + "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", @@ -95,6 +99,7 @@ go_test( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/protectedts", "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/securityassets", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 2609c40d40b4..c71fb79373fd 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -400,6 +400,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { f := sf.frontier registry := sf.flowCtx.Cfg.JobRegistry jobID := jobspb.JobID(sf.spec.JobID) + ptp := sf.flowCtx.Cfg.ProtectedTimestampProvider frontierResolvedSpans := make([]jobspb.ResolvedSpan, 0) f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { @@ -439,6 +440,26 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { if md.RunStats != nil && md.RunStats.NumRuns > 1 { ju.UpdateRunStats(1, md.RunStats.LastRun) } + + // Update the protected timestamp record protecting the destination tenant's + // keyspan if the highWatermark has moved forward since the last time we + // recorded progress. This makes older revisions of replicated values with a + // timestamp less than highWatermark - ReplicationTTLSeconds, eligible for + // garbage collection. + replicationDetails := md.Payload.GetStreamIngestion() + if replicationDetails.ProtectedTimestampRecordID == nil { + return errors.AssertionFailedf("expected replication job to have a protected timestamp " + + "record over the destination tenant's keyspan") + } + record, err := ptp.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID) + if err != nil { + return err + } + newProtectAbove := highWatermark.Add( + -int64(replicationDetails.ReplicationTTLSeconds)*time.Second.Nanoseconds(), 0) + if record.Timestamp.Less(newProtectAbove) { + return ptp.UpdateTimestamp(ctx, txn, *replicationDetails.ProtectedTimestampRecordID, newProtectAbove) + } return nil }); err != nil { return err diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index a5351939be10..b3e1478b159a 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -17,7 +17,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -29,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -302,6 +306,17 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil { log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID) } + + // Now that we have completed the cutover we can release the protected + // timestamp record on the destination tenant's keyspace. + if details.ProtectedTimestampRecordID != nil { + if err := execCtx.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn, *details.ProtectedTimestampRecordID) + }); err != nil { + return err + } + } + return nil } return errors.CombineErrors(ingestWithClient(), client.Close(ctx)) @@ -365,14 +380,77 @@ func (s *streamIngestionResumer) handleResumeError( // Resume is part of the jobs.Resumer interface. Ensure that any errors // produced here are returned as s.handleResumeError. func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error { + // Protect the destination tenant's keyspan from garbage collection. + err := s.protectDestinationTenant(resumeCtx, execCtx) + if err != nil { + return s.handleResumeError(resumeCtx, execCtx, err) + } + // Start ingesting KVs from the replication stream. - err := ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job) + err = ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job) if err != nil { return s.handleResumeError(resumeCtx, execCtx, err) } return nil } +func releaseDestinationTenantProtectedTimestamp( + ctx context.Context, execCtx interface{}, txn *kv.Txn, ptsID uuid.UUID, +) error { + jobExecCtx := execCtx.(sql.JobExecContext) + ptp := jobExecCtx.ExecCfg().ProtectedTimestampProvider + if err := ptp.Release(ctx, txn, ptsID); err != nil { + if errors.Is(err, protectedts.ErrNotExists) { + // No reason to return an error which might cause problems if it doesn't + // seem to exist. + log.Warningf(ctx, "failed to release protected which seems not to exist: %v", err) + err = nil + } + return err + } + return nil +} + +// protectDestinationTenant writes a protected timestamp record protecting the +// destination tenant's keyspace from garbage collection. This protected +// timestamp record is updated everytime the replication job records a new +// frontier timestamp, and is released OnFailOrCancel. +// +// The method persists the ID of the protected timestamp record in the +// replication job's Payload. +func (s *streamIngestionResumer) protectDestinationTenant( + ctx context.Context, execCtx interface{}, +) error { + details := s.job.Details().(jobspb.StreamIngestionDetails) + + // If we have already protected the destination tenant keyspan in a previous + // resumption of the stream ingestion job, then there is nothing to do. + if details.ProtectedTimestampRecordID != nil { + return nil + } + + execCfg := execCtx.(sql.JobExecContext).ExecCfg() + target := ptpb.MakeTenantsTarget([]roachpb.TenantID{details.DestinationTenantID}) + ptsID := uuid.MakeV4() + now := execCfg.Clock.Now() + return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + pts := jobsprotectedts.MakeRecord(ptsID, int64(s.job.ID()), now, + nil /* deprecatedSpans */, jobsprotectedts.Jobs, target) + if err := execCfg.ProtectedTimestampProvider.Protect(ctx, txn, pts); err != nil { + return err + } + return s.job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + if err := md.CheckRunningOrReverting(); err != nil { + return err + } + details.ProtectedTimestampRecordID = &ptsID + md.Payload.Details = jobspb.WrapPayloadDetails(details) + ju.UpdatePayload(md.Payload) + return nil + }) + }) +} + // revertToCutoverTimestamp attempts a cutover and errors out if one was not // executed. func revertToCutoverTimestamp( @@ -523,6 +601,13 @@ func (s *streamIngestionResumer) OnFailOrCancel( return errors.Wrap(err, "update tenant record") } + if details.ProtectedTimestampRecordID != nil { + if err := releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn, + *details.ProtectedTimestampRecordID); err != nil { + return err + } + } + return nil }) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index e49b976c6987..b6cf803b59bf 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -169,6 +169,11 @@ func ingestionPlanHook( } prefix := keys.MakeTenantPrefix(destinationTenantID) + // TODO(adityamaru): Wire this up to the user configurable option. + replicationTTLSeconds := 25 * 60 * 60 + if knobs := p.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.OverrideReplicationTTLSeconds != 0 { + replicationTTLSeconds = knobs.OverrideReplicationTTLSeconds + } streamIngestionDetails := jobspb.StreamIngestionDetails{ StreamAddress: string(streamAddress), StreamID: uint64(streamID), @@ -176,6 +181,7 @@ func ingestionPlanHook( DestinationTenantID: destinationTenantID, SourceTenantName: roachpb.TenantName(sourceTenant), DestinationTenantName: roachpb.TenantName(destinationTenant), + ReplicationTTLSeconds: int32(replicationTTLSeconds), } jobDescription, err := streamIngestionJobDescription(p, ingestionStmt) diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index fb128c123ed6..9438c3f2a77b 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -23,6 +23,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -768,6 +770,9 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { defer cleanup() producerJobID, ingestionJobID := c.startStreamReplication() + c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") + c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';") + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) @@ -1073,3 +1078,169 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { // Since the data was distributed across multiple nodes, multiple nodes should've been connected to require.Greater(t, len(clientAddresses), 1) } + +// TestTenantReplicationProtectedTimestampManagement tests the active protected +// timestamps management on the destination tenant's keyspan. +func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + args := defaultTenantStreamingClustersArgs + // Override the replication job details ReplicationTTLSeconds to a small value + // so that every progress update results in a protected timestamp update. + // + // TODO(adityamaru): Once this is wired up to be user configurable via an + // option to `CREATE TENANT ... FROM REPLICATION` we should replace this + // testing knob with a create tenant option. + args.testingKnobs = &sql.StreamingTestingKnobs{ + OverrideReplicationTTLSeconds: 1, + } + + testProtectedTimestampManagement := func(t *testing.T, pauseBeforeTerminal bool, completeReplication bool) { + // waitForProducerProtection asserts that there is a PTS record protecting + // the source tenant. We ensure the PTS record is protecting a timestamp + // greater or equal to the frontier we know we have replicated up until. + waitForProducerProtection := func(c *tenantStreamingClusters, frontier hlc.Timestamp, replicationJobID int) { + testutils.SucceedsSoon(t, func() error { + stats := streamIngestionStats(t, c.destSysSQL, replicationJobID) + if stats.ProducerStatus == nil { + return errors.New("nil ProducerStatus") + } + if stats.ProducerStatus.ProtectedTimestamp == nil { + return errors.New("nil ProducerStatus.ProtectedTimestamp") + } + pts := *stats.ProducerStatus.ProtectedTimestamp + if pts.Less(frontier) { + return errors.Newf("protection is at %s, expected to be >= %s", + pts.String(), frontier.String()) + } + return nil + }) + } + + // checkNoDestinationProtections asserts that there is no PTS record + // protecting the destination tenant. + checkNoDestinationProtection := func(c *tenantStreamingClusters, replicationJobID int) { + execCfg := c.destSysServer.ExecutorConfig().(sql.ExecutorConfig) + require.NoError(t, c.destCluster.Server(0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn) + require.NoError(t, err) + payload := j.Payload() + replicationDetails := payload.GetStreamIngestion() + _, err = execCfg.ProtectedTimestampProvider.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID) + require.EqualError(t, err, protectedts.ErrNotExists.Error()) + return nil + })) + } + checkDestinationProtection := func(c *tenantStreamingClusters, frontier hlc.Timestamp, replicationJobID int) { + execCfg := c.destSysServer.ExecutorConfig().(sql.ExecutorConfig) + ptp := execCfg.ProtectedTimestampProvider + require.NoError(t, c.destCluster.Server(0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn) + if err != nil { + return err + } + payload := j.Payload() + progress := j.Progress() + replicationDetails := payload.GetStreamIngestion() + + require.NotNil(t, replicationDetails.ProtectedTimestampRecordID) + rec, err := ptp.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID) + if err != nil { + return err + } + require.True(t, frontier.LessEq(*progress.GetHighWater())) + frontier := progress.GetHighWater().GoTime().Round(time.Millisecond) + window := frontier.Sub(rec.Timestamp.GoTime().Round(time.Millisecond)) + require.Equal(t, time.Second, window) + return nil + })) + } + + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + + c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") + c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';") + + producerJobID, replicationJobID := c.startStreamReplication() + + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + + // Ensure that we wait at least a second so that the gap between the first + // time we write the protected timestamp (t1) during replication job + // startup, and the first progress update (t2) is greater than 1s. This is + // important because if `frontier@t2 - ReplicationTTLSeconds < t1` then we + // will not update the PTS record. + now := c.srcCluster.Server(0).Clock().Now().Add(int64(time.Second)*2, 0) + c.waitUntilHighWatermark(now, jobspb.JobID(replicationJobID)) + + // Check that the producer and replication job have written a protected + // timestamp. + waitForProducerProtection(c, now, replicationJobID) + checkDestinationProtection(c, now, replicationJobID) + + now2 := now.Add(time.Second.Nanoseconds(), 0) + c.waitUntilHighWatermark(now2, jobspb.JobID(replicationJobID)) + // Let the replication progress for a second before checking that the + // protected timestamp record has also been updated on the destination + // cluster. This update happens in the same txn in which we update the + // replication job's progress. + waitForProducerProtection(c, now2, replicationJobID) + checkDestinationProtection(c, now2, replicationJobID) + + if pauseBeforeTerminal { + c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", replicationJobID)) + jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + } + + if completeReplication { + c.destSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", replicationJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + var cutoverTime time.Time + c.destSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) + c.cutover(producerJobID, replicationJobID, cutoverTime) + jobutils.WaitForJobToSucceed(t, c.destSysSQL, jobspb.JobID(replicationJobID)) + } + + // Set GC TTL low, so that the GC job completes quickly in the test. + c.destSysSQL.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;") + c.destSysSQL.Exec(t, fmt.Sprintf("DROP TENANT %s", c.args.destTenantName)) + + if !completeReplication { + jobutils.WaitForJobToCancel(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + jobutils.WaitForJobToCancel(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + } + + // Check if the producer job has released protected timestamp. + stats := streamIngestionStats(t, c.destSysSQL, replicationJobID) + require.NotNil(t, stats.ProducerStatus) + require.Nil(t, stats.ProducerStatus.ProtectedTimestamp) + + // Check if the replication job has released protected timestamp. + checkNoDestinationProtection(c, replicationJobID) + + // Wait for the GC job to finish, this should happen once the protected + // timestamp has been released. + c.destSysSQL.Exec(t, "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'") + + // Check if dest tenant key range is cleaned up. + destTenantPrefix := keys.MakeTenantPrefix(args.destTenantID) + rows, err := c.destCluster.Server(0).DB(). + Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10) + require.NoError(t, err) + require.Empty(t, rows) + + c.destSysSQL.CheckQueryResults(t, + fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID), + [][]string{{"0"}}) + } + + testutils.RunTrueAndFalse(t, "pause-before-terminal", func(t *testing.T, pauseBeforeTerminal bool) { + testutils.RunTrueAndFalse(t, "complete-replication", func(t *testing.T, completeReplication bool) { + testProtectedTimestampManagement(t, pauseBeforeTerminal, completeReplication) + }) + }) +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index efd8d3ce55bf..57eb23fd566d 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -97,6 +97,23 @@ message StreamIngestionDetails { (gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"]; + // ID of the protected timestamp record that protects the destination tenant's + // keyspan from GC while it is being replicated into. + bytes protected_timestamp_record_id = 10 [ + (gogoproto.customname) = "ProtectedTimestampRecordID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" + ]; + + // ReplicationTTLSeconds specifies the maximum age of a value relative to the + // replication job's frontier timestamp, before the value is made eligible for + // garbage collection. Note, only older versions of values are eligible for + // GC. All values newer than this maximum age will be protected from GC by a + // protected timestamp record managed by the replication job. + // + // In other words, the `replication job's frontier timestamp - ReplicationTTLSeconds` + // is the earliest timestamp that the replication job can be cut-over to. + int32 replication_ttl_seconds = 11 [(gogoproto.customname) = "ReplicationTTLSeconds"]; + reserved 5, 6; } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 55f19d0320b3..cd7cb016ad14 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1626,6 +1626,10 @@ type StreamingTestingKnobs struct { // BeforeIngestionStart allows blocking the stream ingestion job // before a stream ingestion happens. BeforeIngestionStart func(ctx context.Context) error + + // OverrideReplicationTTLSeconds will override the default value of the + // `ReplicationTTLSeconds` field on the StreamIngestion job details. + OverrideReplicationTTLSeconds int } var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{} diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index 85190cc8056f..9d44d30ec1e5 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -29,8 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -461,19 +458,6 @@ func refreshTenant( } tenID := details.Tenant.ID - // TODO(ssd): Once - // https://github.com/cockroachdb/cockroach/issues/92093 is - // done, we should be able to simply rely on the protected - // timestamp for the replication job. - jobActive, err := tenantHasActiveReplicationJob(ctx, execCfg, tenID) - if err != nil { - return false, time.Time{}, err - } - if jobActive { - log.Infof(ctx, "tenant %d has active tenant replication job, waiting for it to stop before running GC", tenID) - return false, timeutil.Now().Add(MaxSQLGCInterval), nil - } - // Read the tenant's GC TTL to check if the tenant's data has expired. cfg := execCfg.SystemConfig.GetSystemConfig() tenantTTLSeconds := execCfg.DefaultZoneConfig.GC.TTLSeconds @@ -507,30 +491,3 @@ func refreshTenant( } return false, deadlineUnix, nil } - -func tenantHasActiveReplicationJob( - ctx context.Context, execCfg *sql.ExecutorConfig, tenID uint64, -) (bool, error) { - info, err := sql.GetTenantRecordByID(ctx, execCfg, nil /* txn */, roachpb.MustMakeTenantID(tenID)) - if err != nil { - if pgerror.GetPGCode(err) == pgcode.UndefinedObject { - log.Errorf(ctx, "tenant id %d not found while attempting to GC", tenID) - return false, nil - } else { - return false, errors.Wrapf(err, "fetching tenant %d", tenID) - } - } - if jobID := info.TenantReplicationJobID; jobID != 0 { - j, err := execCfg.JobRegistry.LoadJob(ctx, jobID) - if err != nil { - if errors.Is(err, &jobs.JobNotFoundError{}) { - log.Infof(ctx, "tenant replication job %d not found", jobID) - return false, nil - } else { - return false, err - } - } - return !j.Status().Terminal(), nil - } - return false, err -}