diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index d343335215f1..dae9e76954cc 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -594,11 +594,6 @@ func (ds *DistSender) singleRangeFeed( for { stuckWatcher.stop() // if timer is running from previous iteration, stop it now - if catchupRes == nil { - // Already finished catch-up scan (in an earlier iteration of this loop), - // so start timer early, not on first event received. - stuckWatcher.ping() - } if transport.IsExhausted() { return args.Timestamp, newSendError( fmt.Sprintf("sending to all %d replicas failed", len(replicas))) @@ -639,19 +634,21 @@ func (ds *DistSender) singleRangeFeed( } } + var event *roachpb.RangeFeedEvent for { - event, err := stream.Recv() - if err == io.EOF { - return args.Timestamp, nil - } - if err != nil { + if err := stuckWatcher.do(func() (err error) { + event, err = stream.Recv() + return err + }); err != nil { + if err == io.EOF { + return args.Timestamp, nil + } if stuckWatcher.stuck() { afterCatchUpScan := catchupRes == nil return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold()) } return args.Timestamp, err } - stuckWatcher.ping() // starts timer on first event only msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span} switch t := event.GetValue().(type) { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go index d42b8b188c11..891d88fb551d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go @@ -49,14 +49,35 @@ type stuckRangeFeedCanceler struct { resetTimerAfter time.Time activeThreshold time.Duration - _stuck int32 // atomic + // _state manages canceler state transitions. + // do(): + // inactive <----- + // | | + // ----active---- | + // | | | + // timeout ok | + // | |----- + // stuck + // If timeout occurs outside do(), it is ignored. + _state int32 // atomic + + // A testing knob to notify when timer triggers. + afterTimerTrigger func() } +type state int32 + +const ( + inactive state = iota + active + stuck +) + // stuck returns true if the stuck detection got triggered. // If this returns true, the cancel function will be invoked // shortly, if it hasn't already. func (w *stuckRangeFeedCanceler) stuck() bool { - return atomic.LoadInt32(&w._stuck) != 0 + return atomic.LoadInt32(&w._state) == int32(stuck) } // stop releases the active timer, if any. It should be invoked @@ -69,13 +90,14 @@ func (w *stuckRangeFeedCanceler) stop() { } } -// ping notifies the canceler that the rangefeed has received an -// event, i.e. is making progress. -func (w *stuckRangeFeedCanceler) ping() { +// do invokes callback cb, arranging for cancellation to happen if the callback +// takes too long to complete. Returns errRestartStuckRange if cb took excessive +// amount of time. +func (w *stuckRangeFeedCanceler) do(cb func() error) error { threshold := w.threshold() if threshold == 0 { w.stop() - return + return cb() } mkTimer := func() { @@ -86,21 +108,31 @@ func (w *stuckRangeFeedCanceler) ping() { // ping() event arrives at 29.999s, the timer should only fire // at 90s, not 60s. w.t = time.AfterFunc(3*threshold/2, func() { - // NB: important to store _stuck before canceling, since we - // want the caller to be able to detect stuck() after ctx - // cancels. - atomic.StoreInt32(&w._stuck, 1) - w.cancel() + if w.afterTimerTrigger != nil { + defer w.afterTimerTrigger() + } + + // NB: trigger cancellation only if currently active. + if atomic.CompareAndSwapInt32(&w._state, int32(active), int32(stuck)) { + w.cancel() + } }) w.resetTimerAfter = timeutil.Now().Add(threshold / 2) } + if !atomic.CompareAndSwapInt32(&w._state, int32(inactive), int32(active)) { + return errRestartStuckRange + } + defer atomic.CompareAndSwapInt32(&w._state, int32(active), int32(inactive)) + if w.t == nil { mkTimer() } else if w.resetTimerAfter.Before(timeutil.Now()) || w.activeThreshold != threshold { w.stop() mkTimer() } + + return cb() } // newStuckRangeFeedCanceler sets up a canceler with the provided diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go index 26e0b6cdf9f8..187879dbf0c0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go @@ -12,46 +12,46 @@ package kvcoord import ( + "context" + "sync" "sync/atomic" "testing" "time" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) -type cancelRec int32 - -func (c *cancelRec) cancel() { - atomic.StoreInt32((*int32)(c), 1) -} - -func (c *cancelRec) canceled() bool { - return atomic.LoadInt32((*int32)(c)) != 0 -} - func TestStuckRangeFeedCanceler(t *testing.T) { defer leaktest.AfterTest(t)() _dur := int64(24 * time.Hour) // atomic - var cr cancelRec - c := newStuckRangeFeedCanceler(cr.cancel, func() time.Duration { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + doNothing := func() error { return nil } + blockUntilCanceled := func() error { + <-ctx.Done() + return ctx.Err() + } + + c := newStuckRangeFeedCanceler(cancel, func() time.Duration { return time.Duration(atomic.LoadInt64(&_dur)) }) - require.Nil(t, c.t) // not running upon creation + require.Nil(t, c.t) // not running upon creation. + for i := 0; i < 10; i++ { time.Sleep(time.Millisecond) require.False(t, c.stuck()) - c.ping() + require.NoError(t, c.do(doNothing)) require.NotNil(t, c.t) // first call to ping sets off timer } - atomic.StoreInt64(&_dur, int64(time.Nanosecond)) + atomic.StoreInt64(&_dur, int64(10*time.Millisecond)) // Nothing has reset the timer yet, so we won't be stuck here. // This isn't great but it is true, so documenting it. require.False(t, c.stuck()) // Ping will update the timer, so it will fire very soon. - c.ping() - require.Eventually(t, cr.canceled, time.Second /* max */, 5*time.Nanosecond /* tick */) + require.True(t, errors.Is(c.do(blockUntilCanceled), context.Canceled)) require.True(t, c.stuck()) atomic.StoreInt64(&_dur, int64(24*time.Hour)) @@ -60,6 +60,43 @@ func TestStuckRangeFeedCanceler(t *testing.T) { for i := 0; i < 10; i++ { time.Sleep(time.Nanosecond) require.True(t, c.stuck()) - c.ping() + require.True(t, errors.Is(c.do(blockUntilCanceled), errRestartStuckRange)) + } +} + +// Ensure that canceller monitors only the duration of the do() +// function, and not anything happening outside. +func TestStuckRangeFeedCancelerScope(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + doNothing := func() error { return nil } + + triggered := struct { + sync.Once + ch chan struct{} + }{ch: make(chan struct{})} + + const duration = time.Second + c := newStuckRangeFeedCanceler(cancel, func() time.Duration { + return duration + }) + c.afterTimerTrigger = func() { + triggered.Do(func() { + close(triggered.ch) + }) } + + require.Nil(t, c.t) // not running upon creation. + require.False(t, c.stuck()) + require.Nil(t, c.do(doNothing)) + + // Now, start waiting until timer triggers. + // Even though timer triggered, the watcher is not cancelled since + // time expired outside do(). + <-triggered.ch + require.Nil(t, ctx.Err()) + require.False(t, c.stuck()) + require.Nil(t, c.do(doNothing)) }