Skip to content

Commit

Permalink
Merge #96478
Browse files Browse the repository at this point in the history
96478: streamingest: use replication start time for the initial protected timestamp r=lidorcarmel a=lidorcarmel

Fixes: #96477

Release note: None

Co-authored-by: Lidor Carmel <[email protected]>
  • Loading branch information
craig[bot] and lidorcarmel committed Feb 4, 2023
2 parents 581560b + ea315e1 commit 5fbcd8a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
26 changes: 26 additions & 0 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,29 @@ func TestTenantReplicationStatus(t *testing.T) {
require.ErrorContains(t, err, "is not a stream ingestion job")
require.Equal(t, "replication error", status)
}

// TestAlterTenantHandleFutureProtectedTimestamp verifies that cutting over "TO
// LATEST" doesn't fail if the destination cluster clock is ahead of the source
// cluster clock. See issue #96477.
func TestAlterTenantHandleFutureProtectedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()

// Push the clock on the destination cluster forward.
destNow := c.DestCluster.Server(0).Clock().NowAsClockTimestamp()
destNow.WallTime += (200 * time.Millisecond).Nanoseconds()
c.DestCluster.Server(0).Clock().Update(destNow)

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`, args.DestTenantName)
}
9 changes: 7 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,15 @@ func (s *streamIngestionResumer) protectDestinationTenant(
execCfg := execCtx.(sql.JobExecContext).ExecCfg()
target := ptpb.MakeTenantsTarget([]roachpb.TenantID{oldDetails.DestinationTenantID})
ptsID := uuid.MakeV4()
now := execCfg.Clock.Now()

// Note that the protected timestamps are in the context of the source cluster
// clock, not the destination. This is because the data timestamps are also
// decided on the source cluster. Replication start time is picked on the
// producer job on the source cluster.
replicationStartTime := oldDetails.ReplicationStartTime
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
ptp := execCfg.ProtectedTimestampProvider.WithTxn(txn)
pts := jobsprotectedts.MakeRecord(ptsID, int64(s.job.ID()), now,
pts := jobsprotectedts.MakeRecord(ptsID, int64(s.job.ID()), replicationStartTime,
nil /* deprecatedSpans */, jobsprotectedts.Jobs, target)
if err := ptp.Protect(ctx, pts); err != nil {
return err
Expand Down

0 comments on commit 5fbcd8a

Please sign in to comment.