-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
streamingest: write and manage PTS on the destination tenant #92336
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,10 @@ import ( | |
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" | ||
"github.com/cockroachdb/cockroach/pkg/jobs" | ||
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb" | ||
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" | ||
"github.com/cockroachdb/cockroach/pkg/kv" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" | ||
"github.com/cockroachdb/cockroach/pkg/repstream/streampb" | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/settings/cluster" | ||
|
@@ -29,6 +32,7 @@ import ( | |
"github.com/cockroachdb/cockroach/pkg/util/retry" | ||
"github.com/cockroachdb/cockroach/pkg/util/timeutil" | ||
"github.com/cockroachdb/cockroach/pkg/util/tracing" | ||
"github.com/cockroachdb/cockroach/pkg/util/uuid" | ||
"github.com/cockroachdb/errors" | ||
) | ||
|
||
|
@@ -302,6 +306,17 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. | |
if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil { | ||
log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID) | ||
} | ||
|
||
// Now that we have completed the cutover we can release the protected | ||
// timestamp record on the destination tenant's keyspace. | ||
if details.ProtectedTimestampRecordID != nil { | ||
if err := execCtx.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { | ||
return releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn, *details.ProtectedTimestampRecordID) | ||
}); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
return errors.CombineErrors(ingestWithClient(), client.Close(ctx)) | ||
|
@@ -365,14 +380,77 @@ func (s *streamIngestionResumer) handleResumeError( | |
// Resume is part of the jobs.Resumer interface. Ensure that any errors | ||
// produced here are returned as s.handleResumeError. | ||
func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error { | ||
// Protect the destination tenant's keyspan from garbage collection. | ||
err := s.protectDestinationTenant(resumeCtx, execCtx) | ||
if err != nil { | ||
return s.handleResumeError(resumeCtx, execCtx, err) | ||
} | ||
Comment on lines
+384
to
+387
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 Right now cutover-after-failure requires that the user be able to Resume and reach the cutover code. So we need to be careful about all of the code between Resume and the cutover code in I believe that should be OK in this case since we early out in the function we are calling if we already have a pts record, which we should have. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I didn't realize this. I wonder if we need to refactor |
||
|
||
// Start ingesting KVs from the replication stream. | ||
err := ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job) | ||
err = ingestWithRetries(resumeCtx, execCtx.(sql.JobExecContext), s.job) | ||
if err != nil { | ||
return s.handleResumeError(resumeCtx, execCtx, err) | ||
} | ||
return nil | ||
} | ||
|
||
func releaseDestinationTenantProtectedTimestamp( | ||
ctx context.Context, execCtx interface{}, txn *kv.Txn, ptsID uuid.UUID, | ||
) error { | ||
jobExecCtx := execCtx.(sql.JobExecContext) | ||
ptp := jobExecCtx.ExecCfg().ProtectedTimestampProvider | ||
if err := ptp.Release(ctx, txn, ptsID); err != nil { | ||
if errors.Is(err, protectedts.ErrNotExists) { | ||
// No reason to return an error which might cause problems if it doesn't | ||
// seem to exist. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
nit: will anything break if we return the error? sometimes we might hide bugs when swallowing errors.. so I'd drop this (and return the error) unless this is really needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we return an error here then that will bubble up to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh this definitely makes sense in that case. thanks for explaining! |
||
log.Warningf(ctx, "failed to release protected which seems not to exist: %v", err) | ||
err = nil | ||
} | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
// protectDestinationTenant writes a protected timestamp record protecting the | ||
// destination tenant's keyspace from garbage collection. This protected | ||
// timestamp record is updated everytime the replication job records a new | ||
// frontier timestamp, and is released OnFailOrCancel. | ||
// | ||
// The method persists the ID of the protected timestamp record in the | ||
// replication job's Payload. | ||
func (s *streamIngestionResumer) protectDestinationTenant( | ||
ctx context.Context, execCtx interface{}, | ||
) error { | ||
details := s.job.Details().(jobspb.StreamIngestionDetails) | ||
|
||
// If we have already protected the destination tenant keyspan in a previous | ||
// resumption of the stream ingestion job, then there is nothing to do. | ||
if details.ProtectedTimestampRecordID != nil { | ||
return nil | ||
} | ||
|
||
execCfg := execCtx.(sql.JobExecContext).ExecCfg() | ||
target := ptpb.MakeTenantsTarget([]roachpb.TenantID{details.DestinationTenantID}) | ||
ptsID := uuid.MakeV4() | ||
now := execCfg.Clock.Now() | ||
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { | ||
pts := jobsprotectedts.MakeRecord(ptsID, int64(s.job.ID()), now, | ||
nil /* deprecatedSpans */, jobsprotectedts.Jobs, target) | ||
if err := execCfg.ProtectedTimestampProvider.Protect(ctx, txn, pts); err != nil { | ||
return err | ||
} | ||
return s.job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { | ||
if err := md.CheckRunningOrReverting(); err != nil { | ||
return err | ||
} | ||
details.ProtectedTimestampRecordID = &ptsID | ||
md.Payload.Details = jobspb.WrapPayloadDetails(details) | ||
ju.UpdatePayload(md.Payload) | ||
return nil | ||
}) | ||
}) | ||
} | ||
|
||
// revertToCutoverTimestamp attempts a cutover and errors out if one was not | ||
// executed. | ||
func revertToCutoverTimestamp( | ||
adityamaru marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
@@ -523,6 +601,13 @@ func (s *streamIngestionResumer) OnFailOrCancel( | |
return errors.Wrap(err, "update tenant record") | ||
} | ||
|
||
if details.ProtectedTimestampRecordID != nil { | ||
if err := releaseDestinationTenantProtectedTimestamp(ctx, execCtx, txn, | ||
*details.ProtectedTimestampRecordID); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't really need to discuss it here but are we sure we want that?
say we had a 25h PTS on the destination, and now we did a cutover, if we release the PTS we will start gc-ing a ton of bytes which may get the cluster in trouble.
I guess we cannot keep the PTS trailing at now-25h and let humans change the PTS when it makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a valid concern but I'm not sure it's one the replication job should be responsible for solving. Once the cutover is successful the replication job ceases to exist and we will go back to respecting the span config that applies to the tenant keyspace. In an ideal work we make the uncorking of GC not as chaos-inducing as it is today, but if the user does not wish to have a flurry of GC activity they can either:
Configure the GC TTL on the tenant keyspan to be 25h so that even after cutover we do not suddenly have a large amount of data eligible for GC.
Run the replication stream with a smaller retention window once we make this
ReplicationTTLSeconds
a user-configurable value.I do agree that this is something we will need to document explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good points! thanks.