Skip to content

Commit

Permalink
kvcoord: Correctly handle stuck rangefeeds
Browse files Browse the repository at this point in the history
Fixes #92570

A watcher responsible for restarting stuck range feeds
may incorrectly cancel rangefeed if the downstream event
consumer is slow.

Release note (bug fix): Fix incorrect cancellation logic
when attempting to detect stuck range feeds.
  • Loading branch information
Yevgeniy Miretskiy committed Nov 28, 2022
1 parent b5be006 commit 8a77bee
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 36 deletions.
19 changes: 8 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,6 @@ func (ds *DistSender) singleRangeFeed(

for {
stuckWatcher.stop() // if timer is running from previous iteration, stop it now
if catchupRes == nil {
// Already finished catch-up scan (in an earlier iteration of this loop),
// so start timer early, not on first event received.
stuckWatcher.ping()
}
if transport.IsExhausted() {
return args.Timestamp, newSendError(
fmt.Sprintf("sending to all %d replicas failed", len(replicas)))
Expand Down Expand Up @@ -639,19 +634,21 @@ func (ds *DistSender) singleRangeFeed(
}
}

var event *roachpb.RangeFeedEvent
for {
event, err := stream.Recv()
if err == io.EOF {
return args.Timestamp, nil
}
if err != nil {
if err := stuckWatcher.do(func() (err error) {
event, err = stream.Recv()
return err
}); err != nil {
if err == io.EOF {
return args.Timestamp, nil
}
if stuckWatcher.stuck() {
afterCatchUpScan := catchupRes == nil
return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold())
}
return args.Timestamp, err
}
stuckWatcher.ping() // starts timer on first event only

msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span}
switch t := event.GetValue().(type) {
Expand Down
24 changes: 16 additions & 8 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type stuckRangeFeedCanceler struct {
resetTimerAfter time.Time
activeThreshold time.Duration

_stuck int32 // atomic
_stuck int32 // atomic
_active int32 // atomic
}

// stuck returns true if the stuck detection got triggered.
Expand All @@ -71,11 +72,11 @@ func (w *stuckRangeFeedCanceler) stop() {

// ping notifies the canceler that the rangefeed has received an
// event, i.e. is making progress.
func (w *stuckRangeFeedCanceler) ping() {
func (w *stuckRangeFeedCanceler) do(fn func() error) error {
threshold := w.threshold()
if threshold == 0 {
w.stop()
return
return fn()
}

mkTimer := func() {
Expand All @@ -86,21 +87,28 @@ func (w *stuckRangeFeedCanceler) ping() {
// ping() event arrives at 29.999s, the timer should only fire
// at 90s, not 60s.
w.t = time.AfterFunc(3*threshold/2, func() {
// NB: important to store _stuck before canceling, since we
// want the caller to be able to detect stuck() after ctx
// cancels.
atomic.StoreInt32(&w._stuck, 1)
w.cancel()
// NB: trigger cancellation only if currently active.
if atomic.LoadInt32(&w._active) != 0 {
// NB: important to store _stuck before canceling, since we
// want the caller to be able to detect stuck() after ctx
// cancels.
atomic.StoreInt32(&w._stuck, 1)
w.cancel()
}
})
w.resetTimerAfter = timeutil.Now().Add(threshold / 2)
}

atomic.StoreInt32(&w._active, 1)
defer atomic.StoreInt32(&w._active, 0)

if w.t == nil {
mkTimer()
} else if w.resetTimerAfter.Before(timeutil.Now()) || w.activeThreshold != threshold {
w.stop()
mkTimer()
}
return fn()
}

// newStuckRangeFeedCanceler sets up a canceler with the provided
Expand Down
32 changes: 15 additions & 17 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package kvcoord

import (
"context"
"sync/atomic"
"testing"
"time"
Expand All @@ -20,38 +21,35 @@ import (
"github.com/stretchr/testify/require"
)

type cancelRec int32

func (c *cancelRec) cancel() {
atomic.StoreInt32((*int32)(c), 1)
}

func (c *cancelRec) canceled() bool {
return atomic.LoadInt32((*int32)(c)) != 0
}

func TestStuckRangeFeedCanceler(t *testing.T) {
defer leaktest.AfterTest(t)()

_dur := int64(24 * time.Hour) // atomic
var cr cancelRec
c := newStuckRangeFeedCanceler(cr.cancel, func() time.Duration {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doNothing := func() error { return nil }
blockUntilCanceled := func() error {
<-ctx.Done()
return ctx.Err()
}

c := newStuckRangeFeedCanceler(cancel, func() time.Duration {
return time.Duration(atomic.LoadInt64(&_dur))
})
require.Nil(t, c.t) // not running upon creation

for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
require.False(t, c.stuck())
c.ping()
require.NoError(t, c.do(doNothing))
require.NotNil(t, c.t) // first call to ping sets off timer
}
atomic.StoreInt64(&_dur, int64(time.Nanosecond))
atomic.StoreInt64(&_dur, int64(10*time.Millisecond))
// Nothing has reset the timer yet, so we won't be stuck here.
// This isn't great but it is true, so documenting it.
require.False(t, c.stuck())
// Ping will update the timer, so it will fire very soon.
c.ping()
require.Eventually(t, cr.canceled, time.Second /* max */, 5*time.Nanosecond /* tick */)
require.Equal(t, context.Canceled, c.do(blockUntilCanceled))
require.True(t, c.stuck())

atomic.StoreInt64(&_dur, int64(24*time.Hour))
Expand All @@ -60,6 +58,6 @@ func TestStuckRangeFeedCanceler(t *testing.T) {
for i := 0; i < 10; i++ {
time.Sleep(time.Nanosecond)
require.True(t, c.stuck())
c.ping()
require.Equal(t, context.Canceled, c.do(blockUntilCanceled))
}
}

0 comments on commit 8a77bee

Please sign in to comment.