Skip to content

Commit

Permalink
changefeedccl: allow retries for core changefeeds
Browse files Browse the repository at this point in the history
Previously, core changefeeds would stop entirely
due to transient errors or certain schema changes.
This change adds a retry loop to the core changefeed
distributed SQL workflow.

This change updates related tests which omitted sinkless
feeds since they could not handle schema changes.

Fixes #85008
Release note (general change): Changefeeds without
a specified sink will not longer terminate when
schema changes occur.
  • Loading branch information
jayshrivastava committed Aug 9, 2022
1 parent b1de533 commit 3b40478
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
// Detect whether this boundary should be used to kill or restart the
// changefeed.
if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART {
err = changefeedbase.MarkRetryableError(err)
err = changefeedbase.MarkRetryableErrorWithTimestamp(err, cf.frontier.boundaryTime)
}
}

Expand Down
49 changes: 39 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ import (
"github.com/cockroachdb/errors"
)

var changefeedRetryOptions = retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
}

// featureChangefeedEnabled is used to enable and disable the CHANGEFEED feature.
var featureChangefeedEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
Expand Down Expand Up @@ -196,10 +202,38 @@ func changefeedPlanHook(

telemetry.Count(`changefeed.create.core`)
logChangefeedCreateTelemetry(ctx, jr)
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
if err != nil {
telemetry.Count(`changefeed.core.error`)

var err error
for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); {
if err = distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh); err == nil {
return nil
}

if knobs, ok := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok {
if knobs != nil && knobs.HandleDistChangefeedError != nil {
err = knobs.HandleDistChangefeedError(err)
}
}

if !changefeedbase.IsRetryableError(err) {
log.Warningf(ctx, `CHANGEFEED returning with error: %+v`, err)
return err
}

// Check for a schemachange boundary timestamp returned via a
// retryable error. Retrying without updating the changefeed progress
// will result in the changefeed performing the schema change again,
// causing an infinite loop.
if ts, ok := changefeedbase.MaybeGetRetryableErrorTimestamp(err); ok {
progress = jobspb.Progress{
Progress: &jobspb.Progress_HighWater{HighWater: &ts},
Details: &jobspb.Progress_Changefeed{
Changefeed: &jobspb.ChangefeedProgress{},
},
}
}
}
telemetry.Count(`changefeed.core.error`)
return changefeedbase.MaybeStripRetryableErrorMarker(err)
}

Expand Down Expand Up @@ -934,15 +968,10 @@ func (b *changefeedResumer) resumeWithRetries(
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
// or for many other reasons.
opts := retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
}
var err error
var lastRunStatusUpdate time.Time

for r := retry.StartWithCtx(ctx, opts); r.Next(); {
for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); {
// startedCh is normally used to signal back to the creator of the job that
// the job has started; however, in this case nothing will ever receive
// on the channel, causing the changefeed flow to block. Replace it with
Expand All @@ -959,7 +988,7 @@ func (b *changefeedResumer) resumeWithRetries(
}
}

// Retry changefeed is error is retryable. In addition, we want to handle
// Retry changefeed if error is retryable. In addition, we want to handle
// context cancellation as retryable, but only if the resumer context has not been cancelled.
// (resumer context is canceled by the jobs framework -- so we should respect it).
isRetryableErr := changefeedbase.IsRetryableError(err) ||
Expand Down
89 changes: 34 additions & 55 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
}
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

// If we drop columns which are not targeted by the changefeed, it should not backfill.
Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
Expand Down Expand Up @@ -1105,7 +1105,7 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
Expand Down Expand Up @@ -1150,7 +1150,7 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

func TestChangefeedExternalIODisabled(t *testing.T) {
Expand Down Expand Up @@ -5294,36 +5294,6 @@ func TestChangefeedPrimaryKeyChangeWorks(t *testing.T) {
foo := feed(t, f, baseStmt)
defer closeFeed(t, foo)

// maybeHandleRestart deals with the fact that sinkless changefeeds don't
// gracefully handle primary index changes but rather force the client to
// deal with restarting the changefeed as of the last resolved timestamp.
//
// This ends up being pretty sane; sinkless changefeeds already require this
// behavior in the face of other transient failures so clients already need
// to implement this logic.
maybeHandleRestart := func(t *testing.T) (cleanup func()) {
return func() {}
}
if strings.HasSuffix(t.Name(), "sinkless") {
maybeHandleRestart = func(t *testing.T) func() {
var resolved hlc.Timestamp
for {
m, err := foo.Next()
if err != nil {
assert.Contains(t, err.Error(),
fmt.Sprintf("schema change occurred at %s", resolved.Next().AsOfSystemTime()))
break
}
resolved = extractResolvedTimestamp(t, m)
}
const restartStmt = baseStmt + ", cursor = $1"
foo = feed(t, f, restartStmt, resolved.AsOfSystemTime())
return func() {
closeFeed(t, foo)
}
}
}

// 'initial' is skipped because only the latest value ('updated') is
// emitted by the initial scan.
assertPayloads(t, foo, []string{
Expand All @@ -5337,7 +5307,6 @@ func TestChangefeedPrimaryKeyChangeWorks(t *testing.T) {
})

sqlDB.Exec(t, `ALTER TABLE foo ALTER PRIMARY KEY USING COLUMNS (b)`)
defer maybeHandleRestart(t)()
sqlDB.Exec(t, `INSERT INTO foo VALUES (3, 'c'), (4, 'd')`)
assertPayloads(t, foo, []string{
`foo: ["c"]->{"after": {"a": 3, "b": "c"}}`,
Expand All @@ -5360,7 +5329,6 @@ INSERT INTO foo VALUES (1, 'f');
`foo: ["a"]->{"after": {"a": 6, "b": "a"}}`,
`foo: ["e"]->{"after": {"a": 5, "b": "e"}}`,
})
defer maybeHandleRestart(t)()
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "f"}}`,
})
Expand Down Expand Up @@ -5406,24 +5374,6 @@ func TestChangefeedPrimaryKeyChangeWorksWithMultipleTables(t *testing.T) {
maybeHandleRestart := func(t *testing.T) (cleanup func()) {
return func() {}
}
if strings.HasSuffix(t.Name(), "sinkless") {
maybeHandleRestart = func(t *testing.T) func() {
var resolvedTS hlc.Timestamp
for {
m, err := cf.Next()
if err != nil {
assert.Contains(t, err.Error(), fmt.Sprintf("schema change occurred at %s", resolvedTS.Next().AsOfSystemTime()))
break
}
resolvedTS = extractResolvedTimestamp(t, m)
}
const restartStmt = baseStmt + ", cursor = $1"
cf = feed(t, f, restartStmt, resolvedTS.AsOfSystemTime())
return func() {
closeFeed(t, cf)
}
}
}

// 'initial' is skipped because only the latest value ('updated') is
// emitted by the initial scan.
Expand Down Expand Up @@ -7110,3 +7060,32 @@ func TestChangefeedTestTimesOut(t *testing.T) {

cdcTest(t, testFn)
}

// Regression for #85008.
func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE mytable (id INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO mytable VALUES (0)`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE mytable`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`mytable: [0]->{"after": {"id": 0}}`,
})

sqlDB.Exec(t, `ALTER TABLE mytable ADD COLUMN val INT DEFAULT 0`)
assertPayloads(t, cf, []string{
`mytable: [0]->{"after": {"id": 0, "val": 0}}`,
})
sqlDB.Exec(t, `INSERT INTO mytable VALUES (1,1)`)
assertPayloads(t, cf, []string{
`mytable: [1]->{"after": {"id": 1, "val": 1}}`,
})
}

cdcTest(t, testFn, feedTestForceSink("sinkless"))
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/flowinfra",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
27 changes: 26 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -77,7 +78,14 @@ func (e *taggedError) Unwrap() error { return e.wrapped }
const retryableErrorString = "retryable changefeed error"

type retryableError struct {
wrapped error
// A schema change may result in a changefeed returning retryableError,
// which can signal the changefeed to restart.
// boundaryTimestamp can be returned inside this error so
// the changefeed knows where to restart from. Note this is
// only useful for sinkless/core changefeeds because they do not have
// the ability to read/write their state to jobs tables during restarts.
boundaryTimestamp hlc.Timestamp
wrapped error
}

// MarkRetryableError wraps the given error, marking it as retryable to
Expand All @@ -86,6 +94,12 @@ func MarkRetryableError(e error) error {
return &retryableError{wrapped: e}
}

// MarkRetryableErrorWithTimestamp wraps the given error, marks it as
// retryable, and attaches a timestamp to the error.
func MarkRetryableErrorWithTimestamp(e error, ts hlc.Timestamp) error {
return &retryableError{boundaryTimestamp: ts, wrapped: e}
}

// Error implements the error interface.
func (e *retryableError) Error() string {
return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error())
Expand Down Expand Up @@ -125,6 +139,17 @@ func IsRetryableError(err error) bool {
errors.Is(err, sql.ErrPlanChanged))
}

// MaybeGetRetryableErrorTimestamp will get the timestamp of an error if
// the error is a retryableError and the timestamp field is populated.
func MaybeGetRetryableErrorTimestamp(err error) (timestamp hlc.Timestamp, ok bool) {
if retryableErr := (*retryableError)(nil); errors.As(err, &retryableErr) {
if !retryableErr.boundaryTimestamp.IsEmpty() {
return retryableErr.boundaryTimestamp, true
}
}
return hlc.Timestamp{}, false
}

// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the
// RetryableError marker out. This won't do anything if the RetryableError
// itself has been wrapped, but that's okay, we'll just have an uglier string.
Expand Down

0 comments on commit 3b40478

Please sign in to comment.