Skip to content

Commit

Permalink
rangefeed: surface unrecoverable errors and don't hopelessly retry
Browse files Browse the repository at this point in the history
One such error is when the rangefeed falls behind to a point where the
frontier timestamp precedes the GC threshold, and thus will never work.
A new callback lets callers find out about the error, possibly to start
a new rangefeed with an initial scan.
  • Loading branch information
irfansharif committed Nov 24, 2021
1 parent b46e17e commit a0ef182
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 15 deletions.
30 changes: 23 additions & 7 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ type Option interface {
}

type config struct {
retryOptions retry.Options
onInitialScanDone OnInitialScanDone
withInitialScan bool
withDiff bool
onInitialScanError OnInitialScanError
onCheckpoint OnCheckpoint
onFrontierAdvance OnFrontierAdvance
retryOptions retry.Options
onInitialScanDone OnInitialScanDone
withInitialScan bool
withDiff bool
onInitialScanError OnInitialScanError
onUnrecoverableError OnUnrecoverableError
onCheckpoint OnCheckpoint
onFrontierAdvance OnFrontierAdvance
}

type optionFunc func(*config)
Expand All @@ -46,6 +47,13 @@ type OnInitialScanDone func(ctx context.Context)
// endlessly.
type OnInitialScanError func(ctx context.Context, err error) (shouldFail bool)

// OnUnrecoverableError is called when the rangefeed exits with an unrecoverable
// error (preventing internal retries). One example is when the rangefeed falls
// behind to a point where the frontier timestamp precedes the GC threshold, and
// thus will never work. The callback lets callers find out about such errors
// (possibly, in our example, to start a new rangefeed with an initial scan).
type OnUnrecoverableError func(ctx context.Context, err error)

// WithInitialScan enables an initial scan of the data in the span. The rows of
// an initial scan will be passed to the value function used to construct the
// RangeFeed. Upon completion of the initial scan, the passed function (if
Expand All @@ -69,6 +77,14 @@ func WithOnInitialScanError(f OnInitialScanError) Option {
})
}

// WithOnInternalError sets up a callback to report unrecoverable errors during
// operation.
func WithOnInternalError(f OnUnrecoverableError) Option {
return optionFunc(func(c *config) {
c.onUnrecoverableError = f
})
}

// WithDiff makes an option to enable an initial scan which defaults to
// false.
func WithDiff() Option {
Expand Down
15 changes: 8 additions & 7 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,6 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
errCh := make(chan error)

for i := 0; r.Next(); i++ {

// TODO(ajwerner): Figure out what to do if the rangefeed falls behind to
// a point where the frontier timestamp precedes the GC threshold and thus
// will never work. Perhaps an initial scan could be performed again for
// some users. The API currently doesn't make that easy. Perhaps a callback
// should be called in order to allow the client to kill the process or
// something like that.
ts := frontier.Frontier()
log.VEventf(ctx, 1, "starting rangefeed from %v on %v", ts, f.span)
start := timeutil.Now()
Expand All @@ -255,6 +248,14 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
}

err := f.processEvents(ctx, frontier, eventCh, errCh)
if errors.HasType(err, &roachpb.BatchTimestampBeforeGCError{}) {
if errCallback := f.onUnrecoverableError; errCallback != nil {
errCallback(ctx, err)
}

log.VEventf(ctx, 1, "exiting rangefeed due to internal error: %v", err)
return
}
if err != nil && ctx.Err() == nil && restartLogEvery.ShouldLog() {
log.Warningf(ctx, "rangefeed failed %d times, restarting: %v",
log.Safe(i), err)
Expand Down
62 changes: 61 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package rangefeed_test

import (
"context"
"errors"
"sync"
"testing"

Expand All @@ -25,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -446,3 +446,63 @@ func TestRangefeedValueTimestamps(t *testing.T) {
require.True(t, v.PrevValue.Timestamp.IsEmpty())
}
}

// TestUnrecoverableErrors verifies that unrecoverable internal errors are surfaced
// to callers.
func TestUnrecoverableErrors(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db := tc.Server(0).DB()
scratchKey := tc.ScratchRange(t)
scratchKey = scratchKey[:len(scratchKey):len(scratchKey)]

sp := roachpb.Span{
Key: scratchKey,
EndKey: scratchKey.PrefixEnd(),
}
{
// Enable rangefeeds, otherwise the thing will retry until they are enabled.
_, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, err)
}
{
// Lower the closed timestamp target duration to speed up the test.
_, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
require.NoError(t, err)
}

f, err := rangefeed.NewFactory(tc.Stopper(), db, nil)
require.NoError(t, err)

preGCThresholdTS := hlc.Timestamp{WallTime: 1}
mu := struct {
syncutil.Mutex
internalErr error
}{}
r, err := f.RangeFeed(ctx, "test", sp, preGCThresholdTS,
func(context.Context, *roachpb.RangeFeedValue) {},
rangefeed.WithDiff(),
rangefeed.WithOnInternalError(func(ctx context.Context, err error) {
mu.Lock()
defer mu.Unlock()

mu.internalErr = err
}),
)
require.NoError(t, err)
defer r.Close()

testutils.SucceedsSoon(t, func() error {
mu.Lock()
defer mu.Unlock()

if !errors.HasType(mu.internalErr, &roachpb.BatchTimestampBeforeGCError{}) {
return errors.New("expected internal error")
}
return nil
})
}

0 comments on commit a0ef182

Please sign in to comment.