Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIPE-631 Backoff exponentially when job acquisition fails #3153

Merged
merged 4 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,17 @@ func (a *AgentWorker) Ping(ctx context.Context) (*api.Job, error) {
// state. If the job is in an unassignable state, it will return an error immediately.
// Otherwise, it will retry every 3s for 30 s. The whole operation will timeout after 5 min.
func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error {
ctx, cancel := context.WithCancel(ctx)
go func() {
for {
time.Sleep(500 * time.Millisecond)
if a.stopping {
cancel()
return
}
}
}()

job, err := a.client.AcquireJob(ctx, jobId)
if err != nil {
return fmt.Errorf("failed to acquire job: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions agent/agent_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func TestAcquireAndRunJobWaiting(t *testing.T) {
}

// the last Retry-After is not recorded as the retries loop exits before using it
expectedSleeps := make([]time.Duration, 0, 9)
for d := 1; d <= 1<<8; d *= 2 {
expectedSleeps := make([]time.Duration, 0, 6)
for d := 1; d <= 1<<5; d *= 2 {
expectedSleeps = append(expectedSleeps, time.Duration(d)*time.Second)
}
assert.Equal(t, expectedSleeps, retrySleeps)
Expand Down
84 changes: 61 additions & 23 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math/rand/v2"
"net/http"
"os"
"runtime"
Expand Down Expand Up @@ -48,27 +47,30 @@ type Client struct {

// AcquireJob acquires a specific job from Buildkite.
// It doesn't interpret or run the job - the caller is responsible for that.
// It contains a builtin timeout of 270 seconds and makes up to 10 attempts.
// It contains a builtin timeout of 330 seconds and makes up to 7 attempts, backing off exponentially.
func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error) {
c.Logger.Info("Attempting to acquire job %s...", jobID)

// Timeout the context to prevent the exponential backoff from growing too
// large if the job is in the waiting state.
//
// If there were no delays or jitter, the attempts would happen at t = 0, 1, 2, 4, ..., 128s
// after the initial one. Therefore, there are 9 attempts taking at least 255s. If the jitter
// always hit the max of 1s, then another 8s is added to that. This is still comfortably within
// the timeout of 270s, and the bound seems tight enough so that the agent is not wasting time
// after the initial one. Therefore, there are 7 attempts taking at least 255s. If the jitter
// always hit the max of 5s, then another 40s is added to that. This is still comfortably within
// the timeout of 330s, and the bound seems tight enough so that the agent is not wasting time
// waiting for a retry that will never happen.
timeoutCtx, cancel := context.WithTimeout(ctx, 270*time.Second)
timeoutCtx, cancel := context.WithTimeout(ctx, 330*time.Second)
defer cancel()

// Acquire the job using the ID we were provided. We'll retry as best we can on non 422 error.
// Except for 423 errors, in which we exponentially back off under the direction of the API
// setting the Retry-After header
// Acquire the job using the ID we were provided.
// We'll retry as best we can on non 5xx errors, as well as 423 Locked and 429 Too Many Requests.
// For retryable errors, if available, we'll consume the value of the server-defined `Retry-After` response header
// to determine our next retry interval.
// 4xx errors that are not 423 or 429 will not be retried.
r := roko.NewRetrier(
roko.WithMaxAttempts(10),
roko.WithStrategy(roko.Constant(3*time.Second)),
roko.WithMaxAttempts(7),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by question: do we want any runtime operational control of this number? i.e. maybe something that's return in the agent registration endpoint?

I know we could build that gear — but the real question is: do we want it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side note: this comment made me realise that that comment is now a lie. i'll fix that.

to your actual question, for this particular value (the max retry count for acquire job), i don't really think so. acquire mode is generally intended for customers running their own customer schedulers, and if their acquire fails after 7 (in this case) attempts, they can probably call buildkite-agent start --acquire-job xxx again, so i think the utility of being able to change it serverside would be limited.

in a more general sense, being able to remote-customise certain agent behaviour would be super useful - doing stuff like dynamically slowing down ping intervals or setting an agent-wide rate limit ("you in particular are only allowed to make 1 request per second now", or something) would be pretty cool - i think we'd need a pretty useful initial thing to do that for though.

roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
roko.WithJitterRange(-1*time.Second, 5*time.Second),
roko.WithSleepFunc(c.RetrySleepFunc),
)

Expand All @@ -83,23 +85,40 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error)
c.Logger.Warn("%s (%s)", err, r)
return nil, err
}
switch resp.StatusCode {
case http.StatusUnprocessableEntity:

switch {
case resp.StatusCode == http.StatusLocked:
// If the API returns with a 423, the job is in the waiting state. Let's try again later.
warning := fmt.Sprintf("The job is waiting for a dependency: (%s)", err)
handleRetriableJobAcquisitionError(warning, resp, r, c.Logger)
return nil, err

case resp.StatusCode == http.StatusTooManyRequests:
// We're being rate limited by the backend. Let's try again later.
warning := fmt.Sprintf("Rate limited by the backend: %s", err)
handleRetriableJobAcquisitionError(warning, resp, r, c.Logger)
return nil, err

case resp.StatusCode >= 500:
// It's a 5xx. Probably worth retrying
warning := fmt.Sprintf("Server error: %s", err)
handleRetriableJobAcquisitionError(warning, resp, r, c.Logger)
return nil, err

case resp.StatusCode == http.StatusUnprocessableEntity:
// If the API returns with a 422, it usually means that the job is in a state where it can't be acquired -
// e.g. it's already running on another agent, or has been cancelled, or has already run
c.Logger.Warn("Buildkite rejected the call to acquire the job (%s)", err)
// e.g. it's already running on another agent, or has been cancelled, or has already run. Don't retry
c.Logger.Error("Buildkite rejected the call to acquire the job: %s", err)
r.Break()

return nil, fmt.Errorf("%w: %w", ErrJobAcquisitionRejected, err)

case http.StatusLocked:
// If the API returns with a 423, the job is in the waiting state
c.Logger.Warn("The job is waiting for a dependency (%s)", err)
duration, errParseDuration := time.ParseDuration(resp.Header.Get("Retry-After") + "s")
if errParseDuration != nil {
duration = time.Second + rand.N(time.Second)
}
r.SetNextInterval(duration)
case resp.StatusCode >= 400 && resp.StatusCode < 500:
// It's some other client error - not 429 or 423, which we retry, or 422, which we don't, but gets a special log message
// Don't retry it, the odds of success are low
c.Logger.Error("%s", err)
r.Break()

return nil, err

default:
Expand All @@ -112,6 +131,25 @@ func (c *Client) AcquireJob(ctx context.Context, jobID string) (*api.Job, error)
})
}

func handleRetriableJobAcquisitionError(warning string, resp *api.Response, r *roko.Retrier, logger logger.Logger) {
logger.Warn("%s (%s)", warning, r)
if resp != nil {
retryAfter := resp.Header.Get("Retry-After")

// Only customize the retry interval if the Retry-After header is present. Otherwise, keep using the default retrier settings
if retryAfter == "" {
return
}

duration, errParseDuration := time.ParseDuration(retryAfter + "s")
if errParseDuration != nil {
return // use the default retrier settings
}

r.SetNextInterval(duration)
}
}

// Connects the agent to the Buildkite Agent API, retrying up to 10 times with 5
// seconds delay if it fails.
func (c *Client) Connect(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/buildkite/bintest/v3 v3.3.0
github.com/buildkite/go-pipeline v0.13.3
github.com/buildkite/interpolate v0.1.5
github.com/buildkite/roko v1.2.0
github.com/buildkite/roko v1.3.0
github.com/buildkite/shellwords v0.0.0-20180315084142-c3f497d1e000
github.com/creack/pty v1.1.19
github.com/denisbrodbeck/machineid v1.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ github.com/buildkite/go-pipeline v0.13.3 h1:llI7sAdZ7sqYE7r8ePlmDADRhJ1K0Kua2+gv
github.com/buildkite/go-pipeline v0.13.3/go.mod h1:1uC2XdHkTV1G5jYv9K8omERIwrsYbBruBrPx1Zu1uFw=
github.com/buildkite/interpolate v0.1.5 h1:v2Ji3voik69UZlbfoqzx+qfcsOKLA61nHdU79VV+tPU=
github.com/buildkite/interpolate v0.1.5/go.mod h1:dHnrwHew5O8VNOAgMDpwRlFnhL5VSN6M1bHVmRZ9Ccc=
github.com/buildkite/roko v1.2.0 h1:hbNURz//dQqNl6Eo9awjQOVOZwSDJ8VEbBDxSfT9rGQ=
github.com/buildkite/roko v1.2.0/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I=
github.com/buildkite/roko v1.3.0 h1:Lgv5XK0rr0uCCZQqssavdwjFs550j8ovyVmnnLMfS/E=
github.com/buildkite/roko v1.3.0/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I=
github.com/buildkite/shellwords v0.0.0-20180315084142-c3f497d1e000 h1:hiVSLk7s3yFKFOHF/huoShLqrj13RMguWX2yzfvy7es=
github.com/buildkite/shellwords v0.0.0-20180315084142-c3f497d1e000/go.mod h1:gv0DYOzHEsKgo31lTCDGauIg4DTTGn41Bzp+t3wSOlk=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
Expand Down