From a0ef1822613c1f49586c524177bf6b62cc92186e Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 23 Nov 2021 14:14:48 -0500 Subject: [PATCH] rangefeed: surface unrecoverable errors and don't hopelessly retry One such error is when the rangefeed falls behind to a point where the frontier timestamp precedes the GC threshold, and thus will never work. A new callback lets callers find out about the error, possibly to start a new rangefeed with an initial scan. --- pkg/kv/kvclient/rangefeed/config.go | 30 ++++++--- pkg/kv/kvclient/rangefeed/rangefeed.go | 15 ++--- .../rangefeed/rangefeed_external_test.go | 62 ++++++++++++++++++- 3 files changed, 92 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 8c44eb382171..e86379844348 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -24,13 +24,14 @@ type Option interface { } type config struct { - retryOptions retry.Options - onInitialScanDone OnInitialScanDone - withInitialScan bool - withDiff bool - onInitialScanError OnInitialScanError - onCheckpoint OnCheckpoint - onFrontierAdvance OnFrontierAdvance + retryOptions retry.Options + onInitialScanDone OnInitialScanDone + withInitialScan bool + withDiff bool + onInitialScanError OnInitialScanError + onUnrecoverableError OnUnrecoverableError + onCheckpoint OnCheckpoint + onFrontierAdvance OnFrontierAdvance } type optionFunc func(*config) @@ -46,6 +47,13 @@ type OnInitialScanDone func(ctx context.Context) // endlessly. type OnInitialScanError func(ctx context.Context, err error) (shouldFail bool) +// OnUnrecoverableError is called when the rangefeed exits with an unrecoverable +// error (preventing internal retries). One example is when the rangefeed falls +// behind to a point where the frontier timestamp precedes the GC threshold, and +// thus will never work. The callback lets callers find out about such errors +// (possibly, in our example, to start a new rangefeed with an initial scan). +type OnUnrecoverableError func(ctx context.Context, err error) + // WithInitialScan enables an initial scan of the data in the span. The rows of // an initial scan will be passed to the value function used to construct the // RangeFeed. Upon completion of the initial scan, the passed function (if @@ -69,6 +77,14 @@ func WithOnInitialScanError(f OnInitialScanError) Option { }) } +// WithOnInternalError sets up a callback to report unrecoverable errors during +// operation. +func WithOnInternalError(f OnUnrecoverableError) Option { + return optionFunc(func(c *config) { + c.onUnrecoverableError = f + }) +} + // WithDiff makes an option to enable an initial scan which defaults to // false. func WithDiff() Option { diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index ed2e74ac109e..7ef477f5f5e3 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -232,13 +232,6 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { errCh := make(chan error) for i := 0; r.Next(); i++ { - - // TODO(ajwerner): Figure out what to do if the rangefeed falls behind to - // a point where the frontier timestamp precedes the GC threshold and thus - // will never work. Perhaps an initial scan could be performed again for - // some users. The API currently doesn't make that easy. Perhaps a callback - // should be called in order to allow the client to kill the process or - // something like that. ts := frontier.Frontier() log.VEventf(ctx, 1, "starting rangefeed from %v on %v", ts, f.span) start := timeutil.Now() @@ -255,6 +248,14 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { } err := f.processEvents(ctx, frontier, eventCh, errCh) + if errors.HasType(err, &roachpb.BatchTimestampBeforeGCError{}) { + if errCallback := f.onUnrecoverableError; errCallback != nil { + errCallback(ctx, err) + } + + log.VEventf(ctx, 1, "exiting rangefeed due to internal error: %v", err) + return + } if err != nil && ctx.Err() == nil && restartLogEvery.ShouldLog() { log.Warningf(ctx, "rangefeed failed %d times, restarting: %v", log.Safe(i), err) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 3d2d0703b065..3ab2fafdc26e 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -12,7 +12,6 @@ package rangefeed_test import ( "context" - "errors" "sync" "testing" @@ -25,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -446,3 +446,63 @@ func TestRangefeedValueTimestamps(t *testing.T) { require.True(t, v.PrevValue.Timestamp.IsEmpty()) } } + +// TestUnrecoverableErrors verifies that unrecoverable internal errors are surfaced +// to callers. +func TestUnrecoverableErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + scratchKey := tc.ScratchRange(t) + scratchKey = scratchKey[:len(scratchKey):len(scratchKey)] + + sp := roachpb.Span{ + Key: scratchKey, + EndKey: scratchKey.PrefixEnd(), + } + { + // Enable rangefeeds, otherwise the thing will retry until they are enabled. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + } + { + // Lower the closed timestamp target duration to speed up the test. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") + require.NoError(t, err) + } + + f, err := rangefeed.NewFactory(tc.Stopper(), db, nil) + require.NoError(t, err) + + preGCThresholdTS := hlc.Timestamp{WallTime: 1} + mu := struct { + syncutil.Mutex + internalErr error + }{} + r, err := f.RangeFeed(ctx, "test", sp, preGCThresholdTS, + func(context.Context, *roachpb.RangeFeedValue) {}, + rangefeed.WithDiff(), + rangefeed.WithOnInternalError(func(ctx context.Context, err error) { + mu.Lock() + defer mu.Unlock() + + mu.internalErr = err + }), + ) + require.NoError(t, err) + defer r.Close() + + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + + if !errors.HasType(mu.internalErr, &roachpb.BatchTimestampBeforeGCError{}) { + return errors.New("expected internal error") + } + return nil + }) +}