Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 fix: stop backoff when context is cancelled #883

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ kubectl delete -f osc-secret.yaml --namespace=kube-system
kubectl apply -f osc-secret.yaml --namespace=kube-system

## deploy the pod
git clone [email protected]:outscale-dev/osc-bsu-csi-driver.git -b v1.1.1
git clone [email protected]:outscale-dev/osc-bsu-csi-driver.git -b v1.4.1
cd osc-bsu-csi-driver
helm uninstall osc-bsu-csi-driver --namespace kube-system
helm install osc-bsu-csi-driver ./osc-bsu-csi-driver \
Expand Down
99 changes: 80 additions & 19 deletions pkg/cloud/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,106 @@ import (
"context"
"net/http"
"slices"
"strconv"
"time"

"github.com/outscale/osc-bsu-csi-driver/pkg/util"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)

// RetryOnHTTPCodes defines the list of HTTP codes for which we backoff.
var RetryOnHTTPCodes = []int{429, 500, 502, 503, 504}

type BackoffPolicy func(ctx context.Context, resp *http.Response, err error) (bool, error)
type BackoffOpt func(*BackoffPolicy)

// NoRetryOnErrors is the default backoff policy: retry only on RetryOnHTTPCodes http statuses.
// No retry on errors.
func NoRetryOnErrors(ctx context.Context, resp *http.Response, err error) (bool, error) {
switch {
case resp != nil && slices.Contains(RetryOnHTTPCodes, resp.StatusCode):
klog.FromContext(ctx).V(5).Info("Retrying...")
return false, nil
case err != nil:
return false, err
default:
return true, nil
func RetryOnErrors() BackoffOpt {
return func(bp *BackoffPolicy) {
bp.retryOnErrors = true
}
}

func WithBackoff(bo wait.Backoff) BackoffOpt {
return func(bp *BackoffPolicy) {
bp.backoff = bo
}
}

type BackoffPolicyer interface {
ExponentialBackoff(ctx context.Context, fn func(ctx context.Context) (bool, error)) error
OAPIResponseBackoff(ctx context.Context, resp *http.Response, err error) (bool, error)
}

type BackoffPolicy struct {
retryOnErrors bool
backoff wait.Backoff
}

func NewBackoffPolicy(opts ...BackoffOpt) *BackoffPolicy {
bp := &BackoffPolicy{
backoff: EnvBackoff(),
}
for _, opt := range opts {
opt(bp)
}
return bp
}

// ExponentialBackoffWithContext repeats a condition check with exponential backoff.
// It stops if context is cancelled.
func (bp *BackoffPolicy) ExponentialBackoff(ctx context.Context, fn func(ctx context.Context) (bool, error)) error {
// bp.backoff is not a pointer, a copy is used each time, ensuring that backoff restarts at 0 each time.
return wait.ExponentialBackoffWithContext(ctx, bp.backoff, fn)
}

// NoRetryOnErrors is an alternate policy that retries on all errors.
func RetryOnErrors(ctx context.Context, resp *http.Response, err error) (bool, error) {
// OAPIResponseBackoff decides if an OAPI response requires a backoff. It retries only on RetryOnHTTPCodes http statuses.
// It retries on errors only if retryOnErrors is set.
func (bp *BackoffPolicy) OAPIResponseBackoff(ctx context.Context, resp *http.Response, err error) (bool, error) {
switch {
case resp != nil && slices.Contains(RetryOnHTTPCodes, resp.StatusCode):
klog.FromContext(ctx).V(5).Info("Retrying...")
return false, nil
case err != nil:
case err != nil && bp.retryOnErrors:
klog.FromContext(ctx).V(5).Error(err, "Retrying...")
return false, nil
case err != nil:
return false, err
default:
return true, nil
}
}

var _ BackoffPolicy = NoRetryOnErrors
var _ BackoffPolicy = RetryOnErrors
var _ BackoffPolicyer = (*BackoffPolicy)(nil)

// DefaultBackoffPolicy is the default BackoffPolicy (NoRetryOnErrors)
var DefaultBackoffPolicy = NoRetryOnErrors
func EnvBackoff() wait.Backoff {
// BACKOFF_DURATION duration The initial duration.
// Fallback as int/duration in seconds.
dur := util.GetEnv("BACKOFF_DURATION", "1s")
duration, err := time.ParseDuration(dur)
if err != nil {
d, derr := strconv.Atoi(dur)
duration = time.Duration(d) * time.Second
err = derr
}
if err != nil {
duration = time.Second
}

// BACKOFF_FACTOR float Duration is multiplied by factor each iteration
factor, err := strconv.ParseFloat(util.GetEnv("BACKOFF_FACTOR", "1.6"), 32)
if err != nil {
factor = 1.6
}

// BACKOFF_STEPS integer : The remaining number of iterations in which
// the duration parameter may change
steps, err := strconv.Atoi(util.GetEnv("BACKOFF_STEPS", "7"))
if err != nil {
steps = 7
}
return wait.Backoff{
Duration: duration,
Factor: factor,
Steps: steps,
}
}
67 changes: 67 additions & 0 deletions pkg/cloud/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package cloud_test

import (
"context"
"strings"
"testing"
"time"

"github.com/outscale/osc-bsu-csi-driver/pkg/cloud"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
)

func TestEnvBackoff(t *testing.T) {
var tcs = []struct {
name string
env []string
backoff wait.Backoff
}{{
name: "default values",
backoff: wait.Backoff{
Duration: time.Second,
Factor: 1.6,
Steps: 7,
},
}, {
name: "compatibility with numeric durations",
env: []string{"BACKOFF_DURATION=2"},
backoff: wait.Backoff{
Duration: 2 * time.Second,
Factor: 1.6,
Steps: 7,
},
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
for _, env := range tc.env {
kv := strings.Split(env, "=")
t.Setenv(kv[0], kv[1])
}
bo := cloud.EnvBackoff()
assert.Equal(t, tc.backoff.Duration, bo.Duration)
assert.InEpsilon(t, tc.backoff.Factor, bo.Factor, 0.01)
assert.Equal(t, tc.backoff.Steps, bo.Steps)
})
}
}

func TestBackoffPolicy_ExponentialBackoff(t *testing.T) {
var count int
fn := func(context.Context) (bool, error) {
count++
return false, nil
}
bo := cloud.NewBackoffPolicy(cloud.WithBackoff(wait.Backoff{
Duration: time.Millisecond,
Steps: 2,
}))
t.Run("When called multiple times, backoff is triggered again", func(t *testing.T) {
for i := 0; i < 3; i++ {
err := bo.ExponentialBackoff(context.TODO(), fn)
require.Error(t, err, "It should return a timeout error")
assert.Equal(t, (i+1)*2, count)
}
})
}
Loading
Loading