Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingest: write and manage PTS on the destination tenant #92336

Merged
merged 1 commit into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ go_library(
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/bulk",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/repstream",
"//pkg/repstream/streampb",
"//pkg/roachpb",
Expand Down Expand Up @@ -58,6 +61,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down Expand Up @@ -95,6 +99,7 @@ go_test(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/protectedts",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/securityassets",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
f := sf.frontier
registry := sf.flowCtx.Cfg.JobRegistry
jobID := jobspb.JobID(sf.spec.JobID)
ptp := sf.flowCtx.Cfg.ProtectedTimestampProvider

frontierResolvedSpans := make([]jobspb.ResolvedSpan, 0)
f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) {
Expand Down Expand Up @@ -439,6 +440,26 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
if md.RunStats != nil && md.RunStats.NumRuns > 1 {
ju.UpdateRunStats(1, md.RunStats.LastRun)
}

// Update the protected timestamp record protecting the destination tenant's
// keyspan if the highWatermark has moved forward since the last time we
// recorded progress. This makes older revisions of replicated values with a
// timestamp less than highWatermark - ReplicationTTLSeconds, eligible for
// garbage collection.
replicationDetails := md.Payload.GetStreamIngestion()
if replicationDetails.ProtectedTimestampRecordID == nil {
return errors.AssertionFailedf("expected replication job to have a protected timestamp " +
"record over the destination tenant's keyspan")
}
record, err := ptp.GetRecord(ctx, txn, *replicationDetails.ProtectedTimestampRecordID)
if err != nil {
return err
}
newProtectAbove := highWatermark.Add(
-int64(replicationDetails.ReplicationTTLSeconds)*time.Second.Nanoseconds(), 0)
if record.Timestamp.Less(newProtectAbove) {
return ptp.UpdateTimestamp(ctx, txn, *replicationDetails.ProtectedTimestampRecordID, newProtectAbove)
}
return nil
}); err != nil {
return err
Expand Down
87 changes: 86 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -29,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -302,6 +306,17 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID)
}

// Now that we have completed the cutover we can release the protected
// timestamp record on the destination tenant's keyspace.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Now that we have completed the cutover we can release the protected
// timestamp record on the destination tenant's keyspace.

we don't really need to discuss it here but are we sure we want that?

say we had a 25h PTS on the destination, and now we did a cutover, if we release the PTS we will start gc-ing a ton of bytes which may get the cluster in trouble.

I guess we cannot keep the PTS trailing at now-25h and let humans change the PTS when it makes sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid concern but I'm not sure it's one the replication job should be responsible for solving. Once the cutover is successful the replication job ceases to exist and we will go back to respecting the span config that applies to the tenant keyspace. In an ideal work we make the uncorking of GC not as chaos-inducing as it is today, but if the user does not wish to have a flurry of GC activity they can either:

  1. Configure the GC TTL on the tenant keyspan to be 25h so that even after cutover we do not suddenly have a large amount of data eligible for GC.

  2. Run the replication stream with a smaller retention window once we make this ReplicationTTLSeconds a user-configurable value.

I do agree that this is something we will need to document explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good points! thanks.

if details.ProtectedTimestampRecordID != nil {
if err := execCtx.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn, *details.ProtectedTimestampRecordID)
}); err != nil {
return err
}
}

return nil
}
return errors.CombineErrors(ingestWithClient(), client.Close(ctx))
Expand Down Expand Up @@ -365,14 +380,77 @@ func (s *streamIngestionResumer) handleResumeError(
// Resume is part of the jobs.Resumer interface. Ensure that any errors
// produced here are returned as s.handleResumeError.
func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error {
// Protect the destination tenant's keyspan from garbage collection.
err := s.protectDestinationTenant(resumeCtx, execCtx)
if err != nil {
return s.handleResumeError(resumeCtx, execCtx, err)
}
Comment on lines +384 to +387
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Right now cutover-after-failure requires that the user be able to Resume and reach the cutover code. So we need to be careful about all of the code between Resume and the cutover code in ingest.

I believe that should be OK in this case since we early out in the function we are calling if we already have a pts record, which we should have.

Copy link
Contributor Author

@adityamaru adityamaru Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I didn't realize this. I wonder if we need to refactor Resume to run a different code path if it's being resumed after a pause due to failure. This way we minimize what we do between resumption and checking for cutover. But, yes I don't think we've made things worse because of the reason you described above.


// Start ingesting KVs from the replication stream.
err := ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job)
err = ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job)
if err != nil {
return s.handleResumeError(resumeCtx, execCtx, err)
}
return nil
}

func releaseDestinationTenantProtectedTimestamp(
ctx context.Context, execCtx interface{}, txn *kv.Txn, ptsID uuid.UUID,
) error {
jobExecCtx := execCtx.(sql.JobExecContext)
ptp := jobExecCtx.ExecCfg().ProtectedTimestampProvider
if err := ptp.Release(ctx, txn, ptsID); err != nil {
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
// seem to exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// No reason to return an error which might cause problems if it doesn't
// seem to exist.

nit: will anything break if we return the error? sometimes we might hide bugs when swallowing errors.. so I'd drop this (and return the error) unless this is really needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we return an error here then that will bubble up to OnFailOrCancel which is a method the job registry does not allow to fail. In other words, we will retry OnFailOrCancel endlessly (with backoff) until it succeeds which it will never since it won't be able to find the record. A reason why we might not find the record is if OnFailOrCancel gets retried causing us to invoke this method more than once in separate txns. The expectation is that all operations in Resume and OnFailOrCancel should be idempotent and so we safeguard against multiple invocations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this definitely makes sense in that case. thanks for explaining!

log.Warningf(ctx, "failed to release protected which seems not to exist: %v", err)
err = nil
}
return err
}
return nil
}

// protectDestinationTenant writes a protected timestamp record protecting the
// destination tenant's keyspace from garbage collection. This protected
// timestamp record is updated everytime the replication job records a new
// frontier timestamp, and is released OnFailOrCancel.
//
// The method persists the ID of the protected timestamp record in the
// replication job's Payload.
func (s *streamIngestionResumer) protectDestinationTenant(
ctx context.Context, execCtx interface{},
) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)

// If we have already protected the destination tenant keyspan in a previous
// resumption of the stream ingestion job, then there is nothing to do.
if details.ProtectedTimestampRecordID != nil {
return nil
}

execCfg := execCtx.(sql.JobExecContext).ExecCfg()
target := ptpb.MakeTenantsTarget([]roachpb.TenantID{details.DestinationTenantID})
ptsID := uuid.MakeV4()
now := execCfg.Clock.Now()
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
pts := jobsprotectedts.MakeRecord(ptsID, int64(s.job.ID()), now,
nil /* deprecatedSpans */, jobsprotectedts.Jobs, target)
if err := execCfg.ProtectedTimestampProvider.Protect(ctx, txn, pts); err != nil {
return err
}
return s.job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}
details.ProtectedTimestampRecordID = &ptsID
md.Payload.Details = jobspb.WrapPayloadDetails(details)
ju.UpdatePayload(md.Payload)
return nil
})
})
}

// revertToCutoverTimestamp attempts a cutover and errors out if one was not
// executed.
func revertToCutoverTimestamp(
Expand Down Expand Up @@ -523,6 +601,13 @@ func (s *streamIngestionResumer) OnFailOrCancel(
return errors.Wrap(err, "update tenant record")
}

if details.ProtectedTimestampRecordID != nil {
if err := releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn,
*details.ProtectedTimestampRecordID); err != nil {
return err
}
}

return nil
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,19 @@ func ingestionPlanHook(
}

prefix := keys.MakeTenantPrefix(destinationTenantID)
// TODO(adityamaru): Wire this up to the user configurable option.
replicationTTLSeconds := 25 * 60 * 60
if knobs := p.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.OverrideReplicationTTLSeconds != 0 {
replicationTTLSeconds = knobs.OverrideReplicationTTLSeconds
}
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: string(streamAddress),
StreamID: uint64(streamID),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
DestinationTenantID: destinationTenantID,
SourceTenantName: roachpb.TenantName(sourceTenant),
DestinationTenantName: roachpb.TenantName(destinationTenant),
ReplicationTTLSeconds: int32(replicationTTLSeconds),
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
Expand Down
Loading