Skip to content

Commit

Permalink
streamingest: write and manage PTS on the destination tenant
Browse files Browse the repository at this point in the history
During C2C replication as the destination tenant is ingesting KVs
we must protect a certain window of MVCC revisions from garbage collection
so that the user can cutover to any of the timestamps that lie within
this window.

To this effect we introduce a `ReplicationTTLSeconds` field to the
replication job payload that governs the size of this window relative
to the replication job's highwatermark (frontier timestamp). On the first
resumption of the replication job we write a protected timestamp record
on the destination tenant's keyspace protecting all revisions above `now()`.
As the replication job updates its highwatermark, the PTS record is pulled
up to protect above `highWatermark - ReplicationTTLSeconds`. This active management
of the PTS always ensures that users can cutover to any time in
(highWatermark-ReplicationTTLSeconds, highWatermark] and older revisions are
gradually made eligible for GC as the frontier progresses.

The PTS is released if the replication job fails or is cancelled.

Fixes: #92093

Release note: None
  • Loading branch information
adityamaru committed Nov 22, 2022
1 parent d48216a commit f608bd5
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 14 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ go_library(
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/bulk",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/repstream",
"//pkg/repstream/streampb",
"//pkg/roachpb",
Expand Down Expand Up @@ -58,6 +61,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down Expand Up @@ -95,6 +99,7 @@ go_test(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/protectedts",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/securityassets",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
f := sf.frontier
registry := sf.flowCtx.Cfg.JobRegistry
jobID := jobspb.JobID(sf.spec.JobID)
ptp := sf.flowCtx.Cfg.ProtectedTimestampProvider

frontierResolvedSpans := make([]jobspb.ResolvedSpan, 0)
f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) {
Expand Down Expand Up @@ -439,6 +440,26 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
if md.RunStats != nil && md.RunStats.NumRuns > 1 {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}

// Update the protected timestamp record protecting the destination tenant's
// keyspan if the highWatermark has moved forward since the last time we
// recorded progress. This makes older revisions of replicated values with a
// timestamp less than highWatermark - ReplicationTTLSeconds, eligible for
// garbage collection.
replicationDetails := md.Payload.GetStreamIngestion()
if replicationDetails.ProtectedTimestampRecordID == nil {
return errors.AssertionFailedf("expected replication job to have a protected timestamp " +
"record over the destination tenant's keyspan")
}
record, err := ptp.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID)
if err != nil {
return err
}
newProtectAbove := highWatermark.Add(
-int64(replicationDetails.ReplicationTTLSeconds)*time.Second.Nanoseconds(), 0)
if record.Timestamp.Less(newProtectAbove) {
return ptp.UpdateTimestamp(ctx, txn, *replicationDetails.ProtectedTimestampRecordID, newProtectAbove)
}
return nil
}); err != nil {
return err
Expand Down
67 changes: 66 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -29,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -366,14 +370,60 @@ func (s *streamIngestionResumer) handleResumeError(
// Resume is part of the jobs.Resumer interface. Ensure that any errors
// produced here are returned as s.handleResumeError.
func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error {
// Protect the destination tenant's keyspan from garbage collection.
err := s.protectDestinationTenant(resumeCtx, execCtx)
if err != nil {
return s.handleResumeError(resumeCtx, execCtx, err)
}

// Start ingesting KVs from the replication stream.
err := ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job)
err = ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job)
if err != nil {
return s.handleResumeError(resumeCtx, execCtx, err)
}
return nil
}

// protectDestinationTenant writes a protected timestamp record protecting the
// destination tenant's keyspace from garbage collection. This protected
// timestamp record is updated everytime the replication job records a new
// frontier timestamp, and is released OnFailOrCancel.
//
// The method persists the ID of the protected timestamp record in the
// replication job's Payload.
func (s *streamIngestionResumer) protectDestinationTenant(
ctx context.Context, execCtx interface{},
) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)

// If we have already protected the destination tenant keyspan in a previous
// resumption of the stream ingestion job, then there is nothing to do.
if details.ProtectedTimestampRecordID != nil {
return nil
}

execCfg := execCtx.(sql.JobExecContext).ExecCfg()
target := ptpb.MakeTenantsTarget([]roachpb.TenantID{details.DestinationTenantID})
ptsID := uuid.MakeV4()
now := execCfg.Clock.Now()
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
pts := jobsprotectedts.MakeRecord(ptsID, int64(s.job.ID()), now,
nil /* deprecatedSpans */, jobsprotectedts.Jobs, target)
if err := execCfg.ProtectedTimestampProvider.Protect(ctx, txn, pts); err != nil {
return err
}
return s.job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}
details.ProtectedTimestampRecordID = &ptsID
md.Payload.Details = jobspb.WrapPayloadDetails(details)
ju.UpdatePayload(md.Payload)
return nil
})
})
}

// revertToCutoverTimestamp attempts a cutover and errors out if one was not
// executed.
func revertToCutoverTimestamp(
Expand Down Expand Up @@ -523,6 +573,21 @@ func (s *streamIngestionResumer) OnFailOrCancel(
return errors.Wrap(err, "update tenant record")
}

ptp := jobExecCtx.ExecCfg().ProtectedTimestampProvider
if details.ProtectedTimestampRecordID != nil {
return jobExecCtx.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := ptp.Release(ctx, txn, *details.ProtectedTimestampRecordID); err != nil {
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
// seem to exist.
log.Warningf(ctx, "failed to release protected which seems not to exist: %v", err)
err = nil
}
return err
}
return nil
})
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,19 @@ func ingestionPlanHook(
}

prefix := keys.MakeTenantPrefix(destinationTenantID)
// TODO(adityamaru): Wire this up to the user configurable option.
replicationTTLSeconds := 25 * 60 * 60
if knobs := p.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.OverrideReplicationTTLSeconds != 0 {
replicationTTLSeconds = knobs.OverrideReplicationTTLSeconds
}
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: string(streamAddress),
StreamID: uint64(streamID),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
DestinationTenantID: destinationTenantID,
SourceTenantName: roachpb.TenantName(sourceTenant),
DestinationTenantName: roachpb.TenantName(destinationTenant),
ReplicationTTLSeconds: int32(replicationTTLSeconds),
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
Expand Down
159 changes: 159 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
Expand Down Expand Up @@ -768,6 +770,9 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) {
defer cleanup()
producerJobID, ingestionJobID := c.startStreamReplication()

c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';")

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

Expand Down Expand Up @@ -1073,3 +1078,157 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
// Since the data was distributed across multiple nodes, multiple nodes should've been connected to
require.Greater(t, len(clientAddresses), 1)
}

func TestTenantReplicationProtectedTimestampManagement(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs
// Override the replication job details ReplicationTTLSeconds to a small value
// so that every progress update results in a protected timestamp update.
//
// TODO(adityamaru): Once this is wired up to be user configurable via an
// option to `CREATE TENANT ... FROM REPLICATION` we should replace this
// testing knob with a create tenant option.
args.testingKnobs = &sql.StreamingTestingKnobs{
OverrideReplicationTTLSeconds: 1,
}

testProtectedTimestampManagement := func(t *testing.T, cancelAfterPaused bool) {
// waitForProducerProtection asserts that there is a PTS record protecting
// the source tenant. We ensure the PTS record is protecting a timestamp
// greater or equal to the frontier we know we have replicated up until.
waitForProducerProtection := func(c *tenantStreamingClusters, frontier hlc.Timestamp, replicationJobID int) {
testutils.SucceedsSoon(t, func() error {
stats := streamIngestionStats(t, c.destSysSQL, replicationJobID)
if stats.ProducerStatus == nil {
return errors.New("nil ProducerStatus")
}
if stats.ProducerStatus.ProtectedTimestamp == nil {
return errors.New("nil ProducerStatus.ProtectedTimestamp")
}
pts := *stats.ProducerStatus.ProtectedTimestamp
if pts.Less(frontier) {
return errors.Newf("protection is at %s, expected to be >= %s",
pts.String(), frontier.String())
}
return nil
})
}

// checkNoDestinationProtections asserts that there is no PTS record
// protecting the destination tenant.
checkNoDestinationProtection := func(c *tenantStreamingClusters, replicationJobID int) {
execCfg := c.destSysServer.ExecutorConfig().(sql.ExecutorConfig)
require.NoError(t, c.destCluster.Server(0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn)
require.NoError(t, err)
payload := j.Payload()
replicationDetails := payload.GetStreamIngestion()
_, err = execCfg.ProtectedTimestampProvider.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID)
require.EqualError(t, err, protectedts.ErrNotExists.Error())
return nil
}))
}
checkDestinationProtection := func(c *tenantStreamingClusters, frontier hlc.Timestamp, replicationJobID int) {
execCfg := c.destSysServer.ExecutorConfig().(sql.ExecutorConfig)
ptp := execCfg.ProtectedTimestampProvider
require.NoError(t, c.destCluster.Server(0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn)
if err != nil {
return err
}
payload := j.Payload()
progress := j.Progress()
replicationDetails := payload.GetStreamIngestion()

require.NotNil(t, replicationDetails.ProtectedTimestampRecordID)
rec, err := ptp.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID)
if err != nil {
return err
}
require.True(t, frontier.LessEq(*progress.GetHighWater()))
frontier := progress.GetHighWater().GoTime().Round(time.Millisecond)
window := frontier.Sub(rec.Timestamp.GoTime().Round(time.Millisecond))
require.Equal(t, time.Second, window)
return nil
}))
}

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
c.destSysSQL.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';")

producerJobID, replicationJobID := c.startStreamReplication()

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID))

// Ensure that we wait at least a second so that the gap between the first
// time we write the protected timestamp (t1) during replication job
// startup, and the first progress update (t2) is greater than 1s. This is
// important because if `frontier@t2 - ReplicationTTLSeconds < t1` then we
// will not update the PTS record.
now := c.srcCluster.Server(0).Clock().Now().Add(int64(time.Second)*2, 0)
c.waitUntilHighWatermark(now, jobspb.JobID(replicationJobID))

// Check that the producer and replication job have written a protected
// timestamp.
waitForProducerProtection(c, now, replicationJobID)
checkDestinationProtection(c, now, replicationJobID)

now2 := now.Add(time.Second.Nanoseconds(), 0)
c.waitUntilHighWatermark(now2, jobspb.JobID(replicationJobID))
// Let the replication progress for a second before checking that the
// protected timestamp record has also been updated on the destination
// cluster. This update happens in the same txn in which we update the
// replication job's progress.
waitForProducerProtection(c, now2, replicationJobID)
checkDestinationProtection(c, now2, replicationJobID)

if cancelAfterPaused {
c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", replicationJobID))
jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(replicationJobID))
}

// Set GC TTL low, so that the GC job completes quickly in the test.
c.destSysSQL.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;")
c.destSysSQL.Exec(t, fmt.Sprintf("DROP TENANT %s", c.args.destTenantName))
jobutils.WaitForJobToCancel(c.t, c.destSysSQL, jobspb.JobID(replicationJobID))
jobutils.WaitForJobToCancel(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))

// Check if the producer job has released protected timestamp.
stats := streamIngestionStats(t, c.destSysSQL, replicationJobID)
require.NotNil(t, stats.ProducerStatus)
require.Nil(t, stats.ProducerStatus.ProtectedTimestamp)

// Check if the replication job has released protected timestamp.
checkNoDestinationProtection(c, replicationJobID)

// Wait for the GC job to finish, this should happen once the protected
// timestamp has been released.
c.destSysSQL.Exec(t, "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'")

// Check if dest tenant key range is cleaned up.
destTenantPrefix := keys.MakeTenantPrefix(args.destTenantID)
rows, err := c.destCluster.Server(0).DB().
Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10)
require.NoError(t, err)
require.Empty(t, rows)

c.destSysSQL.CheckQueryResults(t,
fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID),
[][]string{{"0"}})
}

t.Run("cancel-ingestion-after-paused", func(t *testing.T) {
testProtectedTimestampManagement(t, true /* cancelAfterPaused */)
})

t.Run("cancel-ingestion-while-running", func(t *testing.T) {
testProtectedTimestampManagement(t, false /* cancelAfterPaused */)
})
}
17 changes: 17 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ message StreamIngestionDetails {
(gogoproto.nullable) = false,
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"];

// ID of the protected timestamp record that protects the destination tenant's
// keyspan from GC while it is being replicated into.
bytes protected_timestamp_record_id = 10 [
(gogoproto.customname) = "ProtectedTimestampRecordID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
];

// ReplicationTTLSeconds specifies the maximum age of a value relative to the
// replication job's frontier timestamp, before the value is made eligible for
// garbage collection. Note, only older versions of values are eligible for
// GC. All values newer than this maximum age will be protected from GC by a
// protected timestamp record managed by the replication job.
//
// In other words, the `replication job's frontier timestamp - ReplicationTTLSeconds`
// is the earliest timestamp that the replication job can be cut-over to.
int32 replication_ttl_seconds = 11 [(gogoproto.customname) = "ReplicationTTLSeconds"];

reserved 5, 6;
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,10 @@ type StreamingTestingKnobs struct {
// BeforeIngestionStart allows blocking the stream ingestion job
// before a stream ingestion happens.
BeforeIngestionStart func(ctx context.Context) error

// OverrideReplicationTTLSeconds will override the default value of the
// `ReplicationTTLSeconds` field on the StreamIngestion job details.
OverrideReplicationTTLSeconds int
}

var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{}
Expand Down
Loading

0 comments on commit f608bd5

Please sign in to comment.