Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle GitHub rate limiting with retries #8

Merged
merged 1 commit into from
Oct 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 69 additions & 24 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func TryEnv(opt Opt) (*Cache, error) {
}

type Opt struct {
Client *http.Client
Client *http.Client
Timeout time.Duration
BackoffPool *BackoffPool
}

func New(token, url string, opt Opt) (*Cache, error) {
Expand Down Expand Up @@ -138,6 +140,13 @@ func New(token, url string, opt Opt) (*Cache, error) {
if opt.Client == nil {
opt.Client = http.DefaultClient
}
if opt.Timeout == 0 {
opt.Timeout = 5 * time.Minute
}

if opt.BackoffPool == nil {
opt.BackoffPool = defaultBackoffPool
}

return &Cache{
opt: opt,
Expand Down Expand Up @@ -199,15 +208,11 @@ func (c *Cache) Load(ctx context.Context, keys ...string) (*Entry, error) {
q.Set("keys", strings.Join(keys, ","))
q.Set("version", version(keys[0]))
req.URL.RawQuery = q.Encode()
req = req.WithContext(ctx)
Log("load cache %s", req.URL.String())
resp, err := c.opt.Client.Do(req)
resp, err := c.doWithRetries(ctx, req)
if err != nil {
return nil, errors.WithStack(err)
}
if err := checkResponse(resp); err != nil {
return nil, err
}
var ce Entry
dt, err := ioutil.ReadAll(io.LimitReader(resp.Body, 32*1024))
if err != nil {
Expand Down Expand Up @@ -238,15 +243,11 @@ func (c *Cache) reserve(ctx context.Context, key string) (int, error) {
c.auth(req)
c.accept(req)
req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx)
Log("save cache req %s body=%s", req.URL.String(), dt)
resp, err := c.opt.Client.Do(req)
resp, err := c.doWithRetries(ctx, req)
if err != nil {
return 0, errors.WithStack(err)
}
if err := checkResponse(resp); err != nil {
return 0, err
}

dt, err = ioutil.ReadAll(io.LimitReader(resp.Body, 32*1024))
if err != nil {
Expand Down Expand Up @@ -276,13 +277,10 @@ func (c *Cache) commit(ctx context.Context, id int, size int64) error {
c.accept(req)
req.Header.Set("Content-Type", "application/json")
Log("commit cache %s, size %d", req.URL.String(), size)
resp, err := c.opt.Client.Do(req)
resp, err := c.doWithRetries(ctx, req)
if err != nil {
return errors.Wrapf(err, "error committing cache %d", id)
}
if err := checkResponse(resp); err != nil {
return err
}
dt, err = ioutil.ReadAll(io.LimitReader(resp.Body, 32*1024))
if err != nil {
return err
Expand Down Expand Up @@ -413,13 +411,10 @@ func (c *Cache) uploadChunk(ctx context.Context, id int, ra io.ReaderAt, off, n
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/*", off, off+n-1))

Log("upload cache chunk %s, range %d-%d", req.URL.String(), off, off+n-1)
resp, err := c.opt.Client.Do(req)
resp, err := c.doWithRetries(ctx, req)
if err != nil {
return errors.WithStack(err)
}
if err := checkResponse(resp); err != nil {
return err
}
dt, err := ioutil.ReadAll(io.LimitReader(resp.Body, 32*1024))
if err != nil {
return errors.WithStack(err)
Expand All @@ -430,6 +425,38 @@ func (c *Cache) uploadChunk(ctx context.Context, id int, ra io.ReaderAt, off, n
return resp.Body.Close()
}

func (c *Cache) doWithRetries(ctx context.Context, req *http.Request) (*http.Response, error) {
req = req.WithContext(ctx)
var err error
max := time.Now().Add(c.opt.Timeout)
for {
if err1 := c.opt.BackoffPool.Wait(ctx, time.Until(max)); err1 != nil {
if err != nil {
return nil, errors.Wrapf(err, "%v", err1)
}
return nil, err1
}
var resp *http.Response
resp, err = c.opt.Client.Do(req)
if err != nil {
return nil, errors.WithStack(err)
}
if err := checkResponse(resp); err != nil {
var he HTTPError
if errors.As(err, &he) {
if he.StatusCode == http.StatusTooManyRequests {
c.opt.BackoffPool.Delay()
continue
}
}
c.opt.BackoffPool.Reset()
return nil, err
}
c.opt.BackoffPool.Reset()
return resp, nil
}
}

func (c *Cache) auth(r *http.Request) {
r.Header.Add("Authorization", "Bearer "+c.Token.Raw)
}
Expand Down Expand Up @@ -536,6 +563,19 @@ func (e GithubAPIError) Is(err error) bool {
return false
}

type HTTPError struct {
StatusCode int
Err error
}

func (e HTTPError) Error() string {
return e.Err.Error()
}

func (e HTTPError) Unwrap() error {
return e.Err
}

func checkResponse(resp *http.Response) error {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
Expand All @@ -545,13 +585,18 @@ func checkResponse(resp *http.Response) error {
return errors.WithStack(err)
}
var gae GithubAPIError
if err := json.Unmarshal(dt, &gae); err != nil {
return errors.Wrapf(err, "failed to parse error response %d: %s", resp.StatusCode, dt)
if err1 := json.Unmarshal(dt, &gae); err1 != nil {
err = errors.Wrapf(err1, "failed to parse error response %d: %s", resp.StatusCode, dt)
} else if gae.Message != "" {
err = errors.WithStack(gae)
} else {
err = errors.Errorf("unknown error %s: %s", resp.Status, dt)
}
if gae.Message != "" {
return errors.WithStack(gae)

return HTTPError{
StatusCode: resp.StatusCode,
Err: err,
}
return errors.Errorf("unknown error %d: %s", resp.StatusCode, dt)
}

func decryptToken(enc, pass string) (string, string, error) {
Expand Down
4 changes: 4 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"log"
"net/http"
"os"
"testing"
"time"
Expand Down Expand Up @@ -96,6 +97,9 @@ func TestExistingKey(t *testing.T) {
require.True(t, errors.As(err, &gae), "error was %+v", err)
require.Equal(t, "ArtifactCacheItemAlreadyExistsException", gae.TypeKey)
require.True(t, errors.Is(err, os.ErrExist))
var he HTTPError
require.True(t, errors.As(err, &he), "error was %+v", err)
require.Equal(t, http.StatusConflict, he.StatusCode)
}

func TestChunkedSave(t *testing.T) {
Expand Down
108 changes: 108 additions & 0 deletions retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package actionscache

import (
"context"
"sync"
"time"

"github.com/pkg/errors"
)

const maxBackoff = time.Second * 90
const minBackoff = time.Second * 1

var defaultBackoffPool = &BackoffPool{}

type BackoffPool struct {
mu sync.Mutex
queue []chan struct{}
timer *time.Timer
backoff time.Duration
target time.Time
}

func (b *BackoffPool) Wait(ctx context.Context, timeout time.Duration) error {
b.mu.Lock()
if b.timer == nil {
b.mu.Unlock()
return nil
}

done := make(chan struct{})
b.queue = append(b.queue, done)

b.mu.Unlock()

select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
case <-time.After(timeout):
return errors.Errorf("maximum timeout reached")
}
}

func (b *BackoffPool) Reset() {
b.mu.Lock()
b.reset()
b.backoff = 0
b.mu.Unlock()
}
func (b *BackoffPool) reset() {
for _, done := range b.queue {
close(done)
}
b.queue = nil
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
}

func (b *BackoffPool) trigger(t *time.Timer) {
b.mu.Lock()
if b.timer != t {
// this timer is not the current one
b.mu.Unlock()
return
}

b.reset()
b.backoff = b.backoff * 2
if b.backoff > maxBackoff {
b.backoff = maxBackoff
}
b.mu.Unlock()
}

func (b *BackoffPool) Delay() {
b.mu.Lock()
if b.timer != nil {
minTime := time.Now().Add(minBackoff)
if b.target.Before(minTime) {
b.target = minTime
b.timer.Stop()
b.setupTimer()
}
b.mu.Unlock()
return
}

if b.backoff == 0 {
b.backoff = minBackoff
}

b.target = time.Now().Add(b.backoff)
b.setupTimer()

b.mu.Unlock()
}

func (b *BackoffPool) setupTimer() {
var t *time.Timer
b.timer = time.AfterFunc(time.Until(b.target), func() {
b.trigger(t)
})
t = b.timer
}