Skip to content

Commit

Permalink
Respect Retry-After headers for 429, 423 and all 5xx status codes
Browse files Browse the repository at this point in the history
  • Loading branch information
moskyb committed Jan 9, 2025
1 parent 55f6542 commit d52a6d0
Showing 1 changed file with 55 additions and 18 deletions.
73 changes: 55 additions & 18 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math/rand/v2"
"net/http"
"os"
"runtime"
Expand Down Expand Up @@ -48,7 +47,7 @@ type Client struct {

// AcquireJob acquires a specific job from Buildkite.
// It doesn't interpret or run the job - the caller is responsible for that.
// It contains a builtin timeout of 270 seconds and makes up to 10 attempts.
// It contains a builtin timeout of 330 seconds and makes up to 7 attempts, backing off exponentially.
func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error) {
c.Logger.Info("Attempting to acquire job %s...", jobID)

Expand All @@ -63,13 +62,15 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error)
timeoutCtx, cancel := context.WithTimeout(ctx, 330*time.Second)
defer cancel()

// Acquire the job using the ID we were provided. We'll retry as best we can on non 422 error.
// Except for 423 errors, in which we exponentially back off under the direction of the API
// setting the Retry-After header
// Acquire the job using the ID we were provided.
// We'll retry as best we can on non 5xx errors, as well as 423 Locked and 429 Too Many Requests.
// For retryable errors, if available, we'll consume the value of the server-defined `Retry-After` response header
// to determine our next retry interval.
// 4xx errors that are not 423 or 429 will not be retried.
r := roko.NewRetrier(
roko.WithMaxAttempts(7),
roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
roko.WithJitterRange(-time.Second, 5*time.Second),
roko.WithJitterRange(-1*time.Second, 5*time.Second),
roko.WithSleepFunc(c.RetrySleepFunc),
)

Expand All @@ -84,23 +85,40 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error)
c.Logger.Warn("%s (%s)", err, r)
return nil, err
}
switch resp.StatusCode {
case http.StatusUnprocessableEntity:

switch {
case resp.StatusCode == http.StatusLocked:
// If the API returns with a 423, the job is in the waiting state. Let's try again later.
warning := fmt.Sprintf("The job is waiting for a dependency: (%s)", err)
handleRetriableJobAcquisitionError(warning, resp, r, c.Logger)
return nil, err

case resp.StatusCode == http.StatusTooManyRequests:
// We're being rate limited by the backend. Let's try again later.
warning := fmt.Sprintf("Rate limited by the backend: %s", err)
handleRetriableJobAcquisitionError(warning, resp, r, c.Logger)
return nil, err

case resp.StatusCode >= 500:
// It's a 5xx. Probably worth retrying
warning := fmt.Sprintf("Server error: %s", err)
handleRetriableJobAcquisitionError(warning, resp, r, c.Logger)
return nil, err

case resp.StatusCode == http.StatusUnprocessableEntity:
// If the API returns with a 422, it usually means that the job is in a state where it can't be acquired -
// e.g. it's already running on another agent, or has been cancelled, or has already run
c.Logger.Warn("Buildkite rejected the call to acquire the job (%s)", err)
// e.g. it's already running on another agent, or has been cancelled, or has already run. Don't retry
c.Logger.Error("Buildkite rejected the call to acquire the job: %s", err)
r.Break()

return nil, fmt.Errorf("%w: %w", ErrJobAcquisitionRejected, err)

case http.StatusLocked:
// If the API returns with a 423, the job is in the waiting state
c.Logger.Warn("The job is waiting for a dependency (%s)", err)
duration, errParseDuration := time.ParseDuration(resp.Header.Get("Retry-After") + "s")
if errParseDuration != nil {
duration = time.Second + rand.N(time.Second)
}
r.SetNextInterval(duration)
case resp.StatusCode >= 400 && resp.StatusCode < 500:
// It's some other client error - not 429 or 423, which we retry, or 422, which we don't, but gets a special log message
// Don't retry it, the odds of success are low
c.Logger.Error("%s", err)
r.Break()

return nil, err

default:
Expand All @@ -113,6 +131,25 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error)
})
}

func handleRetriableJobAcquisitionError(warning string, resp *api.Response, r *roko.Retrier, logger logger.Logger) {
logger.Warn("%s (%s)", warning, r)
if resp != nil {
retryAfter := resp.Header.Get("Retry-After")

// Only customize the retry interval if the Retry-After header is present. Otherwise, keep using the default retrier settings
if retryAfter == "" {
return
}

duration, errParseDuration := time.ParseDuration(retryAfter + "s")
if errParseDuration != nil {
return // use the default retrier settings
}

r.SetNextInterval(duration)
}
}

// Connects the agent to the Buildkite Agent API, retrying up to 10 times with 5
// seconds delay if it fails.
func (c *Client) Connect(ctx context.Context) error {
Expand Down

0 comments on commit d52a6d0

Please sign in to comment.