Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/retry: always run at least one iteration #51310

Merged
merged 1 commit into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (ds *DistSender) partialRangeFeed(
}
}
}
return nil
return ctx.Err()
}

// singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func TestApplier(t *testing.T) {
check(t, step(get(`b`)), `db0.Get(ctx, "b") // ("2", nil)`)
check(t, step(scan(`a`, `c`)), `db1.Scan(ctx, "a", "c", 0) // (["a":"1", "b":"2"], nil)`)

checkErr(t, step(get(`a`)), `db0.Get(ctx, "a") // (nil, aborted in distSender: context canceled)`)
checkErr(t, step(put(`a`, `1`)), `db1.Put(ctx, "a", 1) // aborted in distSender: context canceled`)
checkErr(t, step(scanForUpdate(`a`, `c`)), `db0.ScanForUpdate(ctx, "a", "c", 0) // (nil, aborted in distSender: context canceled)`)
checkErr(t, step(get(`a`)), `db0.Get(ctx, "a") // (nil, aborted during DistSender.Send: context canceled)`)
checkErr(t, step(put(`a`, `1`)), `db1.Put(ctx, "a", 1) // aborted during DistSender.Send: context canceled`)
checkErr(t, step(scanForUpdate(`a`, `c`)), `db0.ScanForUpdate(ctx, "a", "c", 0) // (nil, aborted during DistSender.Send: context canceled)`)

// Batch
check(t, step(batch(put(`b`, `2`), get(`a`), scan(`a`, `c`))), `
Expand All @@ -78,10 +78,10 @@ func TestApplier(t *testing.T) {
checkErr(t, step(batch(put(`b`, `2`), get(`a`), scanForUpdate(`a`, `c`))), `
{
b := &Batch{}
b.Put(ctx, "b", 2) // aborted in distSender: context canceled
b.Get(ctx, "a") // (nil, aborted in distSender: context canceled)
b.ScanForUpdate(ctx, "a", "c") // (nil, aborted in distSender: context canceled)
db0.Run(ctx, b) // aborted in distSender: context canceled
b.Put(ctx, "b", 2) // aborted during DistSender.Send: context canceled
b.Get(ctx, "a") // (nil, aborted during DistSender.Send: context canceled)
b.ScanForUpdate(ctx, "a", "c") // (nil, aborted during DistSender.Send: context canceled)
db0.Run(ctx, b) // aborted during DistSender.Send: context canceled
}
`)

Expand Down Expand Up @@ -130,7 +130,7 @@ db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
check(t, step(split(`foo`)), `db1.AdminSplit(ctx, "foo") // nil`)
check(t, step(merge(`foo`)), `db0.AdminMerge(ctx, "foo") // nil`)
checkErr(t, step(split(`foo`)),
`db1.AdminSplit(ctx, "foo") // aborted in distSender: context canceled`)
`db1.AdminSplit(ctx, "foo") // aborted during DistSender.Send: context canceled`)
checkErr(t, step(merge(`foo`)),
`db0.AdminMerge(ctx, "foo") // aborted in distSender: context canceled`)
`db0.AdminMerge(ctx, "foo") // aborted during DistSender.Send: context canceled`)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func (r *Replica) executeAdminCommandWithDescriptor(
// in a retry loop. Note that this is speculative; there wasn't an incident
// that suggested this.
retryOpts.RandomizationFactor = 0.5
lastErr := ctx.Err()
var lastErr error
for retryable := retry.StartWithCtx(ctx, retryOpts); retryable.Next(); {
// The replica may have been destroyed since the start of the retry loop.
// We need to explicitly check this condition. Having a valid lease, as we
Expand Down
16 changes: 5 additions & 11 deletions pkg/storage/cloudimpl/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,25 +188,19 @@ func checkHTTPContentRangeHeader(h string, pos int64) error {
func (r *resumingHTTPReader) sendRequest(
reqHeaders map[string]string,
) (resp *http.Response, err error) {
// Initialize err to the context.Canceled: if our context is canceled, we will
// never enter the loop below; in this case we want to return "nil, canceled"
err = context.Canceled
for attempt, retries := 0,
retry.StartWithCtx(r.ctx, HTTPRetryOptions); retries.Next(); attempt++ {
resp, err = r.client.req(r.ctx, "GET", r.url, nil, reqHeaders)

for attempt, retries := 0, retry.StartWithCtx(r.ctx, HTTPRetryOptions); retries.Next(); attempt++ {
resp, err := r.client.req(r.ctx, "GET", r.url, nil, reqHeaders)
if err == nil {
return
return resp, nil
}

log.Errorf(r.ctx, "HTTP:Req error: err=%s (attempt %d)", err, attempt)

if !errors.HasType(err, (*retryableHTTPError)(nil)) {
return
return nil, err
}
}

return
return nil, r.ctx.Err()
}

// requestNextRanges issues additional http request
Expand Down
43 changes: 26 additions & 17 deletions pkg/util/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand All @@ -27,7 +28,7 @@ type Options struct {
Multiplier float64 // Default backoff constant
MaxRetries int // Maximum number of attempts (0 for infinite)
RandomizationFactor float64 // Randomize the backoff interval by constant
Closer <-chan struct{} // Optionally end retry loop channel close.
Closer <-chan struct{} // Optionally end retry loop channel close
}

// Retry implements the public methods necessary to control an exponential-
Expand All @@ -47,7 +48,8 @@ func Start(opts Options) Retry {

// StartWithCtx returns a new Retry initialized to some default values. The
// Retry can then be used in an exponential-backoff retry loop. If the provided
// context is canceled (see Context.Done), the retry loop ends early.
// context is canceled (see Context.Done), the retry loop ends early, but will
// always run at least once.
func StartWithCtx(ctx context.Context, opts Options) Retry {
if opts.InitialBackoff == 0 {
opts.InitialBackoff = 50 * time.Millisecond
Expand All @@ -62,26 +64,31 @@ func StartWithCtx(ctx context.Context, opts Options) Retry {
opts.Multiplier = 2
}

r := Retry{opts: opts}
var r Retry
r.opts = opts
r.ctxDoneChan = ctx.Done()
r.Reset()
r.mustReset()
return r
}

// Reset resets the Retry to its initial state, meaning that the next call to
// Next will return true immediately and subsequent calls will behave as if
// they had followed the very first attempt (i.e. their backoffs will be
// short).
// Next will return true immediately and subsequent calls will behave as if they
// had followed the very first attempt (i.e. their backoffs will be short). The
// exception to this is if the provided Closer has fired or context has been
// canceled, in which case subsequent calls to Next will still return false
// immediately.
func (r *Retry) Reset() {
select {
case <-r.opts.Closer:
// When the closer has fired, you can't keep going.
return
case <-r.ctxDoneChan:
// When the context was canceled, you can't keep going.
return
default:
r.mustReset()
}
}

func (r *Retry) mustReset() {
r.currentAttempt = 0
r.isReset = true
}
Expand All @@ -100,8 +107,13 @@ func (r Retry) retryIn() time.Duration {
}

// Next returns whether the retry loop should continue, and blocks for the
// appropriate length of time before yielding back to the caller. If a stopper
// is present, Next will eagerly return false when the stopper is stopped.
// appropriate length of time before yielding back to the caller.
//
// Next is guaranteed to return true on its first call. As such, a retry loop
// can be thought of as implementing do-while semantics (i.e. always running at
// least once). Otherwide, if a context and/or closer is present, Next will
// return false if the context is canceled and/or the closer fires while the
// method is waiting.
func (r *Retry) Next() bool {
if r.isReset {
r.isReset = false
Expand Down Expand Up @@ -147,7 +159,8 @@ func (r *Retry) NextCh() <-chan time.Time {
}

// WithMaxAttempts is a helper that runs fn N times and collects the last err.
// It guarantees fn will run at least once. Otherwise, an error will be returned.
// The function will terminate early if the provided context is canceled, but it
// guarantees that fn will run at least once.
func WithMaxAttempts(ctx context.Context, opts Options, n int, fn func() error) error {
if n <= 0 {
return errors.Errorf("max attempts should not be 0 or below, got: %d", n)
Expand All @@ -162,11 +175,7 @@ func WithMaxAttempts(ctx context.Context, opts Options, n int, fn func() error)
}
}
if err == nil {
if ctx.Err() != nil {
err = errors.Wrap(ctx.Err(), "did not run function due to context completion")
} else {
err = errors.New("did not run function due to closed opts.Closer")
}
log.Fatal(ctx, "never ran function in WithMaxAttempts")
}
return err
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/util/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func TestRetryNextCh(t *testing.T) {
func TestRetryWithMaxAttempts(t *testing.T) {
expectedErr := errors.New("placeholder")
attempts := 0
noErrFunc := func() error {
attempts++
return nil
}
errWithAttemptsCounterFunc := func() error {
attempts++
return expectedErr
Expand Down Expand Up @@ -183,11 +187,11 @@ func TestRetryWithMaxAttempts(t *testing.T) {
Multiplier: 2,
MaxRetries: 1,
},
retryFunc: func() error { return nil },
retryFunc: noErrFunc,
maxAttempts: 3,

minNumAttempts: 0,
maxNumAttempts: 0,
minNumAttempts: 1,
maxNumAttempts: 1,
},
{
desc: "succeeds after one faked error",
Expand Down Expand Up @@ -235,7 +239,7 @@ func TestRetryWithMaxAttempts(t *testing.T) {
cancelCtxFunc()
},

minNumAttempts: 0,
minNumAttempts: 1,
maxNumAttempts: 3,
expectedErrText: "did not run function due to context completion: context canceled",
},
Expand All @@ -255,7 +259,7 @@ func TestRetryWithMaxAttempts(t *testing.T) {
close(closeCh)
},

minNumAttempts: 0,
minNumAttempts: 1,
maxNumAttempts: 3,
expectedErrText: "did not run function due to closed opts.Closer",
},
Expand Down