Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RequestRetrier into backoff and URI middleware #275

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions conjure-go-client/httpclient/backoff_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2022 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package httpclient

import (
"net/http"
"net/url"

"github.com/palantir/conjure-go-runtime/v2/conjure-go-client/httpclient/internal"
"github.com/palantir/pkg/retry"
)

type backoffMiddleware struct {
retrier retry.Retrier
attemptedURIs map[string]struct{}
backoffFunc func()
}

// NewBackoffMiddleware returns middleware that uses a supplied Retrier to backoff before making requests if the client
// has attempted to reach the URI before or has sent too many requests.
func NewBackoffMiddleware(retrier retry.Retrier) Middleware {
return &backoffMiddleware{
retrier: retrier,
attemptedURIs: map[string]struct{}{},
}
}

func (b *backoffMiddleware) RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error) {
b.backoffRequest(req)
resp, err := next.RoundTrip(req)
b.handleResponse(err)
return resp, err
}

func (b *backoffMiddleware) backoffRequest(req *http.Request) {
baseURI := getBaseURI(req.URL)
defer func() {
b.attemptedURIs[baseURI] = struct{}{}
}()
// Use backoffFunc if backoff behavior was determined by previous response e.g. throttle on 429
if b.backoffFunc != nil {
b.backoffFunc()
b.backoffFunc = nil
return
}
// Trigger retrier on first attempt so that future attempts have backoff
if len(b.attemptedURIs) == 0 {
b.retrier.Next()
}
// Trigger retrier for backoff if URI was attempted before
if _, performBackoff := b.attemptedURIs[baseURI]; performBackoff {
b.retrier.Next()
}
}

func (b *backoffMiddleware) handleResponse(err error) {
errCode, _ := StatusCodeFromError(err)
switch errCode {
case internal.StatusCodeRetryOther, internal.StatusCodeRetryTemporaryRedirect:
b.retrier.Reset()
case internal.StatusCodeThrottle:
b.backoffFunc = func() { b.retrier.Next() }
}
}

func getBaseURI(u *url.URL) string {
uCopy := url.URL{
Scheme: u.Scheme,
Opaque: u.Opaque,
User: u.User,
Host: u.Host,
}
return uCopy.String()
}
21 changes: 6 additions & 15 deletions conjure-go-client/httpclient/body_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ type bodyMiddleware struct {
requestInput interface{}
requestEncoder codecs.Encoder

// if rawOutput is true, the body of the response is not drained before returning -- it is the responsibility of the
// caller to read from and properly close the response body.
rawOutput bool
responseOutput interface{}
responseDecoder codecs.Decoder

Expand Down Expand Up @@ -62,13 +59,12 @@ func (b *bodyMiddleware) setRequestBody(req *http.Request) (func(), error) {
return cleanup, nil
}

// Special case: if the requestInput is an io.ReadCloser and the requestEncoder is nil,
// use the provided input directly as the request body.
if bodyReadCloser, ok := b.requestInput.(io.ReadCloser); ok && b.requestEncoder == nil {
req.Body = bodyReadCloser
// Use the same heuristic as http.NewRequest to generate the "GetBody" function.
if newReq, err := http.NewRequest("", "", bodyReadCloser); err == nil {
req.GetBody = newReq.GetBody
// Special case: if the requestInput is a getBody function and the requestEncoder is nil,
// use the provided function to directly as the request body.
if getBody, ok := b.requestInput.(func() io.ReadCloser); ok && b.requestEncoder == nil {
req.Body = getBody()
req.GetBody = func() (io.ReadCloser, error) {
return getBody(), nil
}
return cleanup, nil
}
Expand Down Expand Up @@ -101,11 +97,6 @@ func (b *bodyMiddleware) setRequestBody(req *http.Request) (func(), error) {
}

func (b *bodyMiddleware) readResponse(resp *http.Response, respErr error) error {
// If rawOutput is true, return response directly without draining or closing body
if b.rawOutput && respErr == nil {
return nil
}

if respErr != nil {
return respErr
}
Expand Down
117 changes: 51 additions & 66 deletions conjure-go-client/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ import (
"context"
"net/http"
"net/url"
"strings"

"github.com/palantir/conjure-go-runtime/v2/conjure-go-client/httpclient/internal"
"github.com/palantir/conjure-go-runtime/v2/conjure-go-client/httpclient/internal/refreshingclient"
"github.com/palantir/pkg/bytesbuffers"
"github.com/palantir/pkg/refreshable"
"github.com/palantir/pkg/retry"
werror "github.com/palantir/witchcraft-go-error"
"github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log"
)

// A Client executes requests to a configured service.
Expand Down Expand Up @@ -87,47 +86,59 @@ func (c *clientImpl) Do(ctx context.Context, params ...RequestParam) (*http.Resp
return nil, werror.ErrorWithContextParams(ctx, "no base URIs are configured")
}

attempts := 2 * len(uris)
maxAttempts := 2 * len(uris)
if c.maxAttempts != nil {
if confMaxAttempts := c.maxAttempts.CurrentIntPtr(); confMaxAttempts != nil {
attempts = *confMaxAttempts
maxAttempts = *confMaxAttempts
}
}

var err error
b, err := applyRequestParams(c.bufferPool, params...)
if err != nil {
return nil, err
}
for _, c := range b.configureCtx {
ctx = c(ctx)
}
req, err := getRequest(ctx, b)
if err != nil {
return nil, err
}
cancelled := false
cancelFunc := func() { cancelled = true }
retrier := c.backoffOptions.CurrentRetryParams().Start(ctx)
clientCopy := c.getClientCopyWithMiddleware(b.errorDecoderMiddleware, b.bodyMiddleware, uris, retrier, cancelFunc)
attempts := 0
var resp *http.Response

retrier := internal.NewRequestRetrier(uris, c.backoffOptions.CurrentRetryParams().Start(ctx), attempts)
for {
uri, isRelocated := retrier.GetNextURI(resp, err)
if uri == "" {
break
for !cancelled && (maxAttempts == 0 || attempts < maxAttempts) {
reqCopy := req.Clone(ctx)
resp, err = clientCopy.Do(reqCopy)
err = unwrapURLError(ctx, err)
// unless this is exactly the scenario where the caller has opted into being responsible for draining and closing
// the response body, be sure to do so here.
if !b.rawOutput {
internal.DrainBody(resp)
}
if err != nil {
svc1log.FromContext(ctx).Debug("Retrying request", svc1log.Stacktrace(err))
attempts++
if resp != nil && isSuccessfulOrBadRequest(resp.StatusCode) {
break
}
resp, err = c.doOnce(ctx, uri, isRelocated, params...)
}
if err != nil {
return nil, err
}
return resp, nil
return resp, err
}

func (c *clientImpl) doOnce(
ctx context.Context,
baseURI string,
useBaseURIOnly bool,
params ...RequestParam,
) (*http.Response, error) {
func isSuccessfulOrBadRequest(statusCode int) bool {
return statusCode < 300 || (statusCode >= http.StatusBadRequest && statusCode < http.StatusInternalServerError)
}

// 1. create the request
func applyRequestParams(bufferPool bytesbuffers.Pool, params ...RequestParam) (*requestBuilder, error) {
b := &requestBuilder{
headers: make(http.Header),
query: make(url.Values),
bodyMiddleware: &bodyMiddleware{bufferPool: c.bufferPool},
bodyMiddleware: &bodyMiddleware{bufferPool: bufferPool},
}

for _, p := range params {
if p == nil {
continue
Expand All @@ -136,29 +147,25 @@ func (c *clientImpl) doOnce(
return nil, err
}
}
if useBaseURIOnly {
b.path = ""
}

for _, c := range b.configureCtx {
ctx = c(ctx)
}
return b, nil
}

func getRequest(ctx context.Context, b *requestBuilder) (*http.Request, error) {
if b.method == "" {
return nil, werror.ErrorWithContextParams(ctx, "httpclient: use WithRequestMethod() to specify HTTP method")
}
reqURI := joinURIAndPath(baseURI, b.path)
req, err := http.NewRequest(b.method, reqURI, nil)
req, err := http.NewRequestWithContext(ctx, b.method, b.path, nil)
if err != nil {
return nil, werror.WrapWithContextParams(ctx, err, "failed to build new HTTP request")
}
req = req.WithContext(ctx)
req.Header = b.headers
if q := b.query.Encode(); q != "" {
req.URL.RawQuery = q
}
return req, nil
}

// 2. create the transport and client
func (c *clientImpl) getClientCopyWithMiddleware(errorDecoderMiddleware Middleware, bodyMiddleware *bodyMiddleware, uris []string, backoffRetrier retry.Retrier, cancelFunc func()) http.Client {
// shallow copy so we can overwrite the Transport with a wrapped one.
clientCopy := *c.client.CurrentHTTPClient()
transport := clientCopy.Transport // start with the client's transport configured with default middleware
Expand All @@ -167,28 +174,22 @@ func (c *clientImpl) doOnce(
transport = wrapTransport(transport, c.uriScorer.CurrentURIScoringMiddleware())
// request decoder must precede the client decoder
// must precede the body middleware to read the response body
transport = wrapTransport(transport, b.errorDecoderMiddleware, c.errorDecoderMiddleware)
transport = wrapTransport(transport, errorDecoderMiddleware, c.errorDecoderMiddleware)
// must precede the body middleware to read the request body
transport = wrapTransport(transport, c.middlewares...)
// must wrap inner middlewares to mutate the return values
transport = wrapTransport(transport, b.bodyMiddleware)
transport = wrapTransport(transport, bodyMiddleware)
// must precede URI middleware to track attempted URIs
transport = wrapTransport(transport, NewBackoffMiddleware(backoffRetrier))
// must wrap inner middlewares to update request with resolved URL
transport = wrapTransport(transport, NewURIMiddleware(uris, cancelFunc))
// must be the outermost middleware to recover panics in the rest of the request flow
// there is a second, inner recoveryMiddleware in the client's default middlewares so that panics
// inside the inner-most RoundTrip benefit from traceIDs and loggers set on the context.
transport = wrapTransport(transport, c.recoveryMiddleware)

clientCopy.Transport = transport

// 3. execute the request using the client to get and handle the response
resp, respErr := clientCopy.Do(req)

// unless this is exactly the scenario where the caller has opted into being responsible for draining and closing
// the response body, be sure to do so here.
if !(respErr == nil && b.bodyMiddleware.rawOutput) {
internal.DrainBody(resp)
}

return resp, unwrapURLError(ctx, respErr)
return clientCopy
}

// unwrapURLError converts a *url.Error to a werror. We need this because all
Expand All @@ -205,21 +206,5 @@ func unwrapURLError(ctx context.Context, respErr error) error {
// We don't recognize this as a url.Error, just return the original.
return respErr
}
params := []werror.Param{werror.SafeParam("requestMethod", urlErr.Op)}

if parsedURL, _ := url.Parse(urlErr.URL); parsedURL != nil {
params = append(params,
werror.SafeParam("requestHost", parsedURL.Host),
werror.UnsafeParam("requestPath", parsedURL.Path))
}

return werror.WrapWithContextParams(ctx, urlErr.Err, "httpclient request failed", params...)
}

func joinURIAndPath(baseURI, reqPath string) string {
fullURI := strings.TrimRight(baseURI, "/")
if reqPath != "" {
fullURI += "/" + strings.TrimLeft(reqPath, "/")
}
return fullURI
return urlErr.Err
}
3 changes: 1 addition & 2 deletions conjure-go-client/httpclient/client_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,7 @@ func WithInitialBackoff(initialBackoff time.Duration) ClientParam {
})
}

// WithMaxRetries sets the maximum number of retries on transport errors for every request. Backoffs are
// also capped at this.
// WithMaxRetries sets the maximum number of retries on transport errors for every request.
// If unset, the client defaults to 2 * size of URIs
// TODO (#151): Rename to WithMaxAttempts and set maxAttempts directly using the argument provided to the function.
func WithMaxRetries(maxTransportRetries int) ClientParam {
Expand Down
Loading