diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index c7032d18fc46..83142597e79a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -594,11 +594,6 @@ func (ds *DistSender) singleRangeFeed( for { stuckWatcher.stop() // if timer is running from previous iteration, stop it now - if catchupRes == nil { - // Already finished catch-up scan (in an earlier iteration of this loop), - // so start timer early, not on first event received. - stuckWatcher.ping() - } if transport.IsExhausted() { return args.Timestamp, newSendError( fmt.Sprintf("sending to all %d replicas failed", len(replicas))) @@ -639,19 +634,21 @@ func (ds *DistSender) singleRangeFeed( } } + var event *roachpb.RangeFeedEvent for { - event, err := stream.Recv() - if err == io.EOF { - return args.Timestamp, nil - } - if err != nil { + if err := stuckWatcher.do(func() (err error) { + event, err = stream.Recv() + return err + }); err != nil { + if err == io.EOF { + return args.Timestamp, nil + } if stuckWatcher.stuck() { afterCatchUpScan := catchupRes == nil return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold()) } return args.Timestamp, err } - stuckWatcher.ping() // starts timer on first event only msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span} switch t := event.GetValue().(type) { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go index d42b8b188c11..206eb73a03cf 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go @@ -49,7 +49,8 @@ type stuckRangeFeedCanceler struct { resetTimerAfter time.Time activeThreshold time.Duration - _stuck int32 // atomic + _stuck int32 // atomic + _active int32 // atomic } // stuck returns true if the stuck detection got triggered. @@ -71,11 +72,11 @@ func (w *stuckRangeFeedCanceler) stop() { // ping notifies the canceler that the rangefeed has received an // event, i.e. is making progress. -func (w *stuckRangeFeedCanceler) ping() { +func (w *stuckRangeFeedCanceler) do(fn func() error) error { threshold := w.threshold() if threshold == 0 { w.stop() - return + return fn() } mkTimer := func() { @@ -86,21 +87,28 @@ func (w *stuckRangeFeedCanceler) ping() { // ping() event arrives at 29.999s, the timer should only fire // at 90s, not 60s. w.t = time.AfterFunc(3*threshold/2, func() { - // NB: important to store _stuck before canceling, since we - // want the caller to be able to detect stuck() after ctx - // cancels. - atomic.StoreInt32(&w._stuck, 1) - w.cancel() + // NB: trigger cancellation only if currently active. + if atomic.LoadInt32(&w._active) != 0 { + // NB: important to store _stuck before canceling, since we + // want the caller to be able to detect stuck() after ctx + // cancels. + atomic.StoreInt32(&w._stuck, 1) + w.cancel() + } }) w.resetTimerAfter = timeutil.Now().Add(threshold / 2) } + atomic.StoreInt32(&w._active, 1) + defer atomic.StoreInt32(&w._active, 0) + if w.t == nil { mkTimer() } else if w.resetTimerAfter.Before(timeutil.Now()) || w.activeThreshold != threshold { w.stop() mkTimer() } + return fn() } // newStuckRangeFeedCanceler sets up a canceler with the provided diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go index 26e0b6cdf9f8..9b032f4aea28 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go @@ -12,6 +12,7 @@ package kvcoord import ( + "context" "sync/atomic" "testing" "time" @@ -20,38 +21,35 @@ import ( "github.com/stretchr/testify/require" ) -type cancelRec int32 - -func (c *cancelRec) cancel() { - atomic.StoreInt32((*int32)(c), 1) -} - -func (c *cancelRec) canceled() bool { - return atomic.LoadInt32((*int32)(c)) != 0 -} - func TestStuckRangeFeedCanceler(t *testing.T) { defer leaktest.AfterTest(t)() _dur := int64(24 * time.Hour) // atomic - var cr cancelRec - c := newStuckRangeFeedCanceler(cr.cancel, func() time.Duration { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + doNothing := func() error { return nil } + blockUntilCanceled := func() error { + <-ctx.Done() + return ctx.Err() + } + + c := newStuckRangeFeedCanceler(cancel, func() time.Duration { return time.Duration(atomic.LoadInt64(&_dur)) }) require.Nil(t, c.t) // not running upon creation + for i := 0; i < 10; i++ { time.Sleep(time.Millisecond) require.False(t, c.stuck()) - c.ping() + require.NoError(t, c.do(doNothing)) require.NotNil(t, c.t) // first call to ping sets off timer } - atomic.StoreInt64(&_dur, int64(time.Nanosecond)) + atomic.StoreInt64(&_dur, int64(10*time.Millisecond)) // Nothing has reset the timer yet, so we won't be stuck here. // This isn't great but it is true, so documenting it. require.False(t, c.stuck()) // Ping will update the timer, so it will fire very soon. - c.ping() - require.Eventually(t, cr.canceled, time.Second /* max */, 5*time.Nanosecond /* tick */) + require.Equal(t, context.Canceled, c.do(blockUntilCanceled)) require.True(t, c.stuck()) atomic.StoreInt64(&_dur, int64(24*time.Hour)) @@ -60,6 +58,6 @@ func TestStuckRangeFeedCanceler(t *testing.T) { for i := 0; i < 10; i++ { time.Sleep(time.Nanosecond) require.True(t, c.stuck()) - c.ping() + require.Equal(t, context.Canceled, c.do(blockUntilCanceled)) } }