Skip to content

Commit

Permalink
changefeedccl: migrate deprecated pts records to new scheme
Browse files Browse the repository at this point in the history
This change adds logic in changefeeds to "migrate" pts records which use the old style
(ie. protect spans instead of targets / desc ids). With this change, when a changefeed
manages its pts record (once every 10 minutes by default), it will check to see if the
record is deprecated. If so, it will create a new record with the non-deprecated
style and remove the old one.

This change helps accomplish #82888 by effectively removing changefeed jobs relying on
the old PTS subsystem, allowing the old subsystem to be removed.

Informs: #82888
Release note: None
Epic: CRDB-34798
  • Loading branch information
jayshrivastava committed Jan 22, 2024
1 parent 3ccf718 commit 5dd7755
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ go_test(
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down
33 changes: 28 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,20 +1628,43 @@ func (cf *changeFrontier) manageProtectedTimestamps(
highWater = cf.highWaterAtStart
}

recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
if progress.ProtectedTimestampRecord == uuid.Nil {
ptr := createProtectedTimestampRecord(
ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater,
)
progress.ProtectedTimestampRecord = ptr.ID.GetUUID()
return pts.Protect(ctx, ptr)
}

log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater)

rec, err := pts.GetRecord(ctx, progress.ProtectedTimestampRecord)
if err != nil {
return err
}

if rec.Target != nil {
return pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater)
}

// If this changefeed was created in 22.1 or earlier, it may be using a deprecated pts record in which
// the target field is nil. If so, we "migrate" it to use the new style of pts records and delete the old one.
preserveDeprecatedPts := cf.knobs.PreserveDeprecatedPts != nil && cf.knobs.PreserveDeprecatedPts()
if !preserveDeprecatedPts {
prevRecordId := progress.ProtectedTimestampRecord
ptr := createProtectedTimestampRecord(
ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater,
)
if err := pts.Protect(ctx, ptr); err != nil {
return err
}
} else {
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", recordID, highWater)
if err := pts.UpdateTimestamp(ctx, recordID, highWater); err != nil {
progress.ProtectedTimestampRecord = ptr.ID.GetUUID()
if err := pts.Release(ctx, prevRecordId); err != nil {
return err
}

log.Eventf(ctx, "created new pts record %v to replace old pts record %v at %v",
progress.ProtectedTimestampRecord, prevRecordId, highWater)
}

return nil
Expand Down
142 changes: 142 additions & 0 deletions pkg/ccl/changefeedccl/protected_timestamps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,40 @@ package changefeedccl
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -472,3 +479,138 @@ func TestPTSRecordProtectsTargetsAndDescriptorTable(t *testing.T) {
_, err := fetchTableDescriptors(ctx, &execCfg, targets, asOf)
require.NoError(t, err)
}

// TestChangefeedUpdateProtectedTimestamp tests that changefeeds using the
// old style PTS records will migrate themselves to use the new style PTS
// records.
func TestChangefeedMigratesProtectedTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
ctx := context.Background()

useOldStylePts := atomic.Bool{}
useOldStylePts.Store(true)
knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.PreserveDeprecatedPts = func() bool {
return useOldStylePts.Load()
}

ptsInterval := 50 * time.Millisecond
changefeedbase.ProtectTimestampInterval.Override(
context.Background(), &s.Server.ClusterSettings().SV, ptsInterval)

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sysDB := sqlutils.MakeSQLRunner(s.SystemServer.SQLConn(t))

sysDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'")
sysDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '20ms'`)
defer closeFeed(t, foo)

registry := s.Server.JobRegistry().(*jobs.Registry)
execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig)
ptp := s.Server.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider
fooDesc := desctestutils.TestingGetPublicTableDescriptor(s.SystemServer.DB(), s.Codec, "d", "foo")
fooID := fooDesc.GetID()
descID := descpb.ID(keys.DescriptorTableID)

jobFeed := foo.(cdctest.EnterpriseTestFeed)
loadProgressErr := func() (jobspb.Progress, error) {
job, err := registry.LoadJob(ctx, jobFeed.JobID())
if err != nil {
return jobspb.Progress{}, err
}
return job.Progress(), nil
}

getPTSRecordID := func() uuid.UUID {
var recordID uuid.UUID
testutils.SucceedsSoon(t, func() error {
progress, err := loadProgressErr()
if err != nil {
return err
}
uid := progress.GetChangefeed().ProtectedTimestampRecord
if uid == uuid.Nil {
return errors.Newf("no pts record")
}
recordID = uid
return nil
})
return recordID
}

readPTSRecord := func(recID uuid.UUID) (rec *ptpb.Record, err error) {
err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
rec, err = ptp.WithTxn(txn).GetRecord(ctx, recID)
if err != nil {
return err
}
return nil
})
return
}
removePTSTarget := func(recordID uuid.UUID) error {
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
if _, err := txn.ExecEx(ctx, "pts-test", txn.KV(), sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(
"UPDATE system.protected_ts_records SET target = NULL WHERE id = '%s'",
recordID),
); err != nil {
return err
}
return nil
})
}

// Wipe out the targets from the changefeed PTS record, simulating an old-style PTS record.
oldRecordID := getPTSRecordID()
require.NoError(t, removePTSTarget(oldRecordID))
rec, err := readPTSRecord(oldRecordID)
require.NoError(t, err)
require.NotNil(t, rec)
require.Nil(t, rec.Target)

// Flip the knob so the changefeed migrates the old style PTS record to the new one.
useOldStylePts.Store(false)

getNewPTSRecord := func() *ptpb.Record {
var recID uuid.UUID
var record *ptpb.Record
testutils.SucceedsSoon(t, func() error {
recID = getPTSRecordID()
if recID.Equal(oldRecordID) {
return errors.New("waiting for new PTS record")
}

return nil
})
record, err = readPTSRecord(recID)
if err != nil {
t.Fatal(err)
}
return record
}

// Read the new PTS record.
newRec := getNewPTSRecord()
require.NotNil(t, newRec.Target)

// Assert the new PTS record has the right targets.
targetIDs := newRec.Target.GetSchemaObjects().IDs
require.Contains(t, targetIDs, fooID)
require.Contains(t, targetIDs, descID)

// Ensure the old pts record was deleted.
_, err = readPTSRecord(oldRecordID)
require.ErrorContains(t, err, "does not exist")
}

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)
}
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type TestingKnobs struct {
// SpanPartitionsCallback is called with the span partition
// when the changefeed is planned.
SpanPartitionsCallback func([]sql.SpanPartition)

// PreserveDeprecatedPts is used to prevent a changefeed from upgrading
// its PTS record from the deprecated style to the new style.
PreserveDeprecatedPts func() bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 5dd7755

Please sign in to comment.