Skip to content

Commit

Permalink
Merge #92336
Browse files Browse the repository at this point in the history
92336: streamingest: write and manage PTS on the destination tenant r=stevendanna a=adityamaru

During C2C replication as the destination tenant is ingesting KVs we must protect a certain window of MVCC revisions from garbage collection so that the user can cutover to any of the timestamps that lie within this window.

To this effect we introduce a `ReplicationTTLSeconds` field to the replication job payload that governs the size of this window relative to the replication job's highwatermark (frontier timestamp). On the first resumption of the replication job we write a protected timestamp record on the destination tenant's keyspace protecting all revisions above `now()`. As the replication job updates its highwatermark, the PTS record is pulled up to protect above `highWatermark - ReplicationTTLSeconds`. This active management of the PTS always ensures that users can cutover to any time in (highWatermark-ReplicationTTLSeconds, highWatermark] and older revisions are gradually made eligible for GC as the frontier progresses.

The PTS is released if the replication job fails or is cancelled.

Fixes: #92093

Release note: None

Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Nov 23, 2022
2 parents 5173d51 + 721cc76 commit b04c3b0
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 44 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ go_library(
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/bulk",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/repstream",
"//pkg/repstream/streampb",
"//pkg/roachpb",
Expand Down Expand Up @@ -58,6 +61,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down Expand Up @@ -95,6 +99,7 @@ go_test(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/protectedts",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/securityassets",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
f := sf.frontier
registry := sf.flowCtx.Cfg.JobRegistry
jobID := jobspb.JobID(sf.spec.JobID)
ptp := sf.flowCtx.Cfg.ProtectedTimestampProvider

frontierResolvedSpans := make([]jobspb.ResolvedSpan, 0)
f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) {
Expand Down Expand Up @@ -439,6 +440,26 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
if md.RunStats != nil && md.RunStats.NumRuns > 1 {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}

// Update the protected timestamp record protecting the destination tenant's
// keyspan if the highWatermark has moved forward since the last time we
// recorded progress. This makes older revisions of replicated values with a
// timestamp less than highWatermark - ReplicationTTLSeconds, eligible for
// garbage collection.
replicationDetails := md.Payload.GetStreamIngestion()
if replicationDetails.ProtectedTimestampRecordID == nil {
return errors.AssertionFailedf("expected replication job to have a protected timestamp " +
"record over the destination tenant's keyspan")
}
record, err := ptp.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID)
if err != nil {
return err
}
newProtectAbove := highWatermark.Add(
-int64(replicationDetails.ReplicationTTLSeconds)*time.Second.Nanoseconds(), 0)
if record.Timestamp.Less(newProtectAbove) {
return ptp.UpdateTimestamp(ctx, txn, *replicationDetails.ProtectedTimestampRecordID, newProtectAbove)
}
return nil
}); err != nil {
return err
Expand Down
87 changes: 86 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -29,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -302,6 +306,17 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID)
}

// Now that we have completed the cutover we can release the protected
// timestamp record on the destination tenant's keyspace.
if details.ProtectedTimestampRecordID != nil {
if err := execCtx.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn, *details.ProtectedTimestampRecordID)
}); err != nil {
return err
}
}

return nil
}
return errors.CombineErrors(ingestWithClient(), client.Close(ctx))
Expand Down Expand Up @@ -365,14 +380,77 @@ func (s *streamIngestionResumer) handleResumeError(
// Resume is part of the jobs.Resumer interface. Ensure that any errors
// produced here are returned as s.handleResumeError.
func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error {
// Protect the destination tenant's keyspan from garbage collection.
err := s.protectDestinationTenant(resumeCtx, execCtx)
if err != nil {
return s.handleResumeError(resumeCtx, execCtx, err)
}

// Start ingesting KVs from the replication stream.
err := ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job)
err = ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job)
if err != nil {
return s.handleResumeError(resumeCtx, execCtx, err)
}
return nil
}

func releaseDestinationTenantProtectedTimestamp(
ctx context.Context, execCtx interface{}, txn *kv.Txn, ptsID uuid.UUID,
) error {
jobExecCtx := execCtx.(sql.JobExecContext)
ptp := jobExecCtx.ExecCfg().ProtectedTimestampProvider
if err := ptp.Release(ctx, txn, ptsID); err != nil {
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
// seem to exist.
log.Warningf(ctx, "failed to release protected which seems not to exist: %v", err)
err = nil
}
return err
}
return nil
}

// protectDestinationTenant writes a protected timestamp record protecting the
// destination tenant's keyspace from garbage collection. This protected
// timestamp record is updated everytime the replication job records a new
// frontier timestamp, and is released OnFailOrCancel.
//
// The method persists the ID of the protected timestamp record in the
// replication job's Payload.
func (s *streamIngestionResumer) protectDestinationTenant(
ctx context.Context, execCtx interface{},
) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)

// If we have already protected the destination tenant keyspan in a previous
// resumption of the stream ingestion job, then there is nothing to do.
if details.ProtectedTimestampRecordID != nil {
return nil
}

execCfg := execCtx.(sql.JobExecContext).ExecCfg()
target := ptpb.MakeTenantsTarget([]roachpb.TenantID{details.DestinationTenantID})
ptsID := uuid.MakeV4()
now := execCfg.Clock.Now()
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
pts := jobsprotectedts.MakeRecord(ptsID, int64(s.job.ID()), now,
nil /* deprecatedSpans */, jobsprotectedts.Jobs, target)
if err := execCfg.ProtectedTimestampProvider.Protect(ctx, txn, pts); err != nil {
return err
}
return s.job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}
details.ProtectedTimestampRecordID = &ptsID
md.Payload.Details = jobspb.WrapPayloadDetails(details)
ju.UpdatePayload(md.Payload)
return nil
})
})
}

// revertToCutoverTimestamp attempts a cutover and errors out if one was not
// executed.
func revertToCutoverTimestamp(
Expand Down Expand Up @@ -523,6 +601,13 @@ func (s *streamIngestionResumer) OnFailOrCancel(
return errors.Wrap(err, "update tenant record")
}

if details.ProtectedTimestampRecordID != nil {
if err := releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn,
*details.ProtectedTimestampRecordID); err != nil {
return err
}
}

return nil
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,19 @@ func ingestionPlanHook(
}

prefix := keys.MakeTenantPrefix(destinationTenantID)
// TODO(adityamaru): Wire this up to the user configurable option.
replicationTTLSeconds := 25 * 60 * 60
if knobs := p.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.OverrideReplicationTTLSeconds != 0 {
replicationTTLSeconds = knobs.OverrideReplicationTTLSeconds
}
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: string(streamAddress),
StreamID: uint64(streamID),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
DestinationTenantID: destinationTenantID,
SourceTenantName: roachpb.TenantName(sourceTenant),
DestinationTenantName: roachpb.TenantName(destinationTenant),
ReplicationTTLSeconds: int32(replicationTTLSeconds),
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
Expand Down
Loading

0 comments on commit b04c3b0

Please sign in to comment.