Skip to content

Commit

Permalink
Merge pull request #8 from tonistiigi/tonistiigi/request-retry
Browse files Browse the repository at this point in the history
handle GitHub rate limiting with retries
  • Loading branch information
tonistiigi authored Oct 2, 2021
2 parents 91f1b0a + d6f6b7a commit 70b4bc0
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 24 deletions.
93 changes: 69 additions & 24 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func TryEnv(opt Opt) (*Cache, error) {
}

type Opt struct {
Client *http.Client
Client *http.Client
Timeout time.Duration
BackoffPool *BackoffPool
}

func New(token, url string, opt Opt) (*Cache, error) {
Expand Down Expand Up @@ -138,6 +140,13 @@ func New(token, url string, opt Opt) (*Cache, error) {
if opt.Client == nil {
opt.Client = http.DefaultClient
}
if opt.Timeout == 0 {
opt.Timeout = 5 * time.Minute
}

if opt.BackoffPool == nil {
opt.BackoffPool = defaultBackoffPool
}

return &Cache{
opt: opt,
Expand Down Expand Up @@ -199,15 +208,11 @@ func (c *Cache) Load(ctx context.Context, keys ...string) (*Entry, error) {
q.Set("keys", strings.Join(keys, ","))
q.Set("version", version(keys[0]))
req.URL.RawQuery = q.Encode()
req = req.WithContext(ctx)
Log("load cache %s", req.URL.String())
resp, err := c.opt.Client.Do(req)
resp, err := c.doWithRetries(ctx, req)
if err != nil {
return nil, errors.WithStack(err)
}
if err := checkResponse(resp); err != nil {
return nil, err
}
var ce Entry
dt, err := ioutil.ReadAll(io.LimitReader(resp.Body, 32*1024))
if err != nil {
Expand Down Expand Up @@ -238,15 +243,11 @@ func (c *Cache) reserve(ctx context.Context, key string) (int, error) {
c.auth(req)
c.accept(req)
req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx)
Log("save cache req %s body=%s", req.URL.String(), dt)
resp, err := c.opt.Client.Do(req)
resp, err := c.doWithRetries(ctx, req)
if err != nil {
return 0, errors.WithStack(err)
}
if err := checkResponse(resp); err != nil {
return 0, err
}

dt, err = ioutil.ReadAll(io.LimitReader(resp.Body, 32*1024))
if err != nil {
Expand Down Expand Up @@ -276,13 +277,10 @@ func (c *Cache) commit(ctx context.Context, id int, size int64) error {
c.accept(req)
req.Header.Set("Content-Type", "application/json")
Log("commit cache %s, size %d", req.URL.String(), size)
resp, err := c.opt.Client.Do(req)
resp, err := c.doWithRetries(ctx, req)
if err != nil {
return errors.Wrapf(err, "error committing cache %d", id)
}
if err := checkResponse(resp); err != nil {
return err
}
dt, err = ioutil.ReadAll(io.LimitReader(resp.Body, 32*1024))
if err != nil {
return err
Expand Down Expand Up @@ -413,13 +411,10 @@ func (c *Cache) uploadChunk(ctx context.Context, id int, ra io.ReaderAt, off, n
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/*", off, off+n-1))

Log("upload cache chunk %s, range %d-%d", req.URL.String(), off, off+n-1)
resp, err := c.opt.Client.Do(req)
resp, err := c.doWithRetries(ctx, req)
if err != nil {
return errors.WithStack(err)
}
if err := checkResponse(resp); err != nil {
return err
}
dt, err := ioutil.ReadAll(io.LimitReader(resp.Body, 32*1024))
if err != nil {
return errors.WithStack(err)
Expand All @@ -430,6 +425,38 @@ func (c *Cache) uploadChunk(ctx context.Context, id int, ra io.ReaderAt, off, n
return resp.Body.Close()
}

func (c *Cache) doWithRetries(ctx context.Context, req *http.Request) (*http.Response, error) {
req = req.WithContext(ctx)
var err error
max := time.Now().Add(c.opt.Timeout)
for {
if err1 := c.opt.BackoffPool.Wait(ctx, time.Until(max)); err1 != nil {
if err != nil {
return nil, errors.Wrapf(err, "%v", err1)
}
return nil, err1
}
var resp *http.Response
resp, err = c.opt.Client.Do(req)
if err != nil {
return nil, errors.WithStack(err)
}
if err := checkResponse(resp); err != nil {
var he HTTPError
if errors.As(err, &he) {
if he.StatusCode == http.StatusTooManyRequests {
c.opt.BackoffPool.Delay()
continue
}
}
c.opt.BackoffPool.Reset()
return nil, err
}
c.opt.BackoffPool.Reset()
return resp, nil
}
}

func (c *Cache) auth(r *http.Request) {
r.Header.Add("Authorization", "Bearer "+c.Token.Raw)
}
Expand Down Expand Up @@ -536,6 +563,19 @@ func (e GithubAPIError) Is(err error) bool {
return false
}

type HTTPError struct {
StatusCode int
Err error
}

func (e HTTPError) Error() string {
return e.Err.Error()
}

func (e HTTPError) Unwrap() error {
return e.Err
}

func checkResponse(resp *http.Response) error {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
Expand All @@ -545,13 +585,18 @@ func checkResponse(resp *http.Response) error {
return errors.WithStack(err)
}
var gae GithubAPIError
if err := json.Unmarshal(dt, &gae); err != nil {
return errors.Wrapf(err, "failed to parse error response %d: %s", resp.StatusCode, dt)
if err1 := json.Unmarshal(dt, &gae); err1 != nil {
err = errors.Wrapf(err1, "failed to parse error response %d: %s", resp.StatusCode, dt)
} else if gae.Message != "" {
err = errors.WithStack(gae)
} else {
err = errors.Errorf("unknown error %s: %s", resp.Status, dt)
}
if gae.Message != "" {
return errors.WithStack(gae)

return HTTPError{
StatusCode: resp.StatusCode,
Err: err,
}
return errors.Errorf("unknown error %d: %s", resp.StatusCode, dt)
}

func decryptToken(enc, pass string) (string, string, error) {
Expand Down
4 changes: 4 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"log"
"net/http"
"os"
"testing"
"time"
Expand Down Expand Up @@ -96,6 +97,9 @@ func TestExistingKey(t *testing.T) {
require.True(t, errors.As(err, &gae), "error was %+v", err)
require.Equal(t, "ArtifactCacheItemAlreadyExistsException", gae.TypeKey)
require.True(t, errors.Is(err, os.ErrExist))
var he HTTPError
require.True(t, errors.As(err, &he), "error was %+v", err)
require.Equal(t, http.StatusConflict, he.StatusCode)
}

func TestChunkedSave(t *testing.T) {
Expand Down
108 changes: 108 additions & 0 deletions retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package actionscache

import (
"context"
"sync"
"time"

"github.com/pkg/errors"
)

const maxBackoff = time.Second * 90
const minBackoff = time.Second * 1

var defaultBackoffPool = &BackoffPool{}

type BackoffPool struct {
mu sync.Mutex
queue []chan struct{}
timer *time.Timer
backoff time.Duration
target time.Time
}

func (b *BackoffPool) Wait(ctx context.Context, timeout time.Duration) error {
b.mu.Lock()
if b.timer == nil {
b.mu.Unlock()
return nil
}

done := make(chan struct{})
b.queue = append(b.queue, done)

b.mu.Unlock()

select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
case <-time.After(timeout):
return errors.Errorf("maximum timeout reached")
}
}

func (b *BackoffPool) Reset() {
b.mu.Lock()
b.reset()
b.backoff = 0
b.mu.Unlock()
}
func (b *BackoffPool) reset() {
for _, done := range b.queue {
close(done)
}
b.queue = nil
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
}

func (b *BackoffPool) trigger(t *time.Timer) {
b.mu.Lock()
if b.timer != t {
// this timer is not the current one
b.mu.Unlock()
return
}

b.reset()
b.backoff = b.backoff * 2
if b.backoff > maxBackoff {
b.backoff = maxBackoff
}
b.mu.Unlock()
}

func (b *BackoffPool) Delay() {
b.mu.Lock()
if b.timer != nil {
minTime := time.Now().Add(minBackoff)
if b.target.Before(minTime) {
b.target = minTime
b.timer.Stop()
b.setupTimer()
}
b.mu.Unlock()
return
}

if b.backoff == 0 {
b.backoff = minBackoff
}

b.target = time.Now().Add(b.backoff)
b.setupTimer()

b.mu.Unlock()
}

func (b *BackoffPool) setupTimer() {
var t *time.Timer
b.timer = time.AfterFunc(time.Until(b.target), func() {
b.trigger(t)
})
t = b.timer
}

0 comments on commit 70b4bc0

Please sign in to comment.