diff --git a/cmd/client/main.go b/cmd/client/main.go index 74d715b..143f340 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -21,7 +21,6 @@ import ( "crypto/x509" "fmt" "io/ioutil" - "math/rand" "net" "net/http" "net/url" @@ -32,14 +31,15 @@ import ( kingpin "gopkg.in/alecthomas/kingpin.v2" "github.com/ShowMax/go-fqdn" + "github.com/cenkalti/backoff/v4" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus-community/pushprox/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/promlog" "github.com/prometheus/common/promlog/flag" - "github.com/prometheus-community/pushprox/util" ) var ( @@ -49,6 +49,9 @@ var ( tlsCert = kingpin.Flag("tls.cert", " Client certificate file").String() tlsKey = kingpin.Flag("tls.key", " Private key file").String() metricsAddr = kingpin.Flag("metrics-addr", "Serve Prometheus metrics at this address").Default(":9369").String() + + retryInitialWait = kingpin.Flag("proxy.retry.initial-wait", "Amount of time to wait after proxy failure").Default("1s").Duration() + retryMaxWait = kingpin.Flag("proxy.retry.max-wait", "Maximum amount of time to wait between proxy poll retries").Default("5s").Duration() ) var ( @@ -76,6 +79,15 @@ func init() { prometheus.MustRegister(pushErrorCounter, pollErrorCounter, scrapeErrorCounter) } +func newBackOffFromFlags() backoff.BackOff { + b := backoff.NewExponentialBackOff() + b.InitialInterval = *retryInitialWait + b.Multiplier = 1.5 + b.MaxInterval = *retryMaxWait + b.MaxElapsedTime = time.Duration(0) + return b +} + // Coordinator for scrape requests and responses type Coordinator struct { logger log.Logger @@ -168,7 +180,7 @@ func (c *Coordinator) doPush(resp *http.Response, origRequest *http.Request, cli return nil } -func loop(c Coordinator, client *http.Client) error { +func (c *Coordinator) doPoll(client *http.Client) error { base, err := url.Parse(*proxyURL) if err != nil { level.Error(c.logger).Log("msg", "Error parsing url:", "err", err) @@ -201,35 +213,18 @@ func loop(c Coordinator, client *http.Client) error { return nil } -// decorrelated Jitter increases the maximum jitter based on the last random value. -type decorrelatedJitter struct { - duration time.Duration // sleep time - min time.Duration // min sleep time - cap time.Duration // max sleep time -} - -func newJitter() decorrelatedJitter { - rand.Seed(time.Now().UnixNano()) - return decorrelatedJitter{ - min: 50 * time.Millisecond, - cap: 5 * time.Second, +func (c *Coordinator) loop(bo backoff.BackOff, client *http.Client) { + op := func() error { + return c.doPoll(client) } -} -func (d *decorrelatedJitter) calc() time.Duration { - change := rand.Float64() * float64(d.duration*time.Duration(3)-d.min) - d.duration = d.min + time.Duration(change) - if d.duration > d.cap { - d.duration = d.cap - } - if d.duration < d.min { - d.duration = d.min + for { + if err := backoff.RetryNotify(op, bo, func(err error, _ time.Duration) { + pollErrorCounter.Inc() + }); err != nil { + level.Error(c.logger).Log("err", err) + } } - return d.duration -} - -func (d *decorrelatedJitter) sleep() { - time.Sleep(d.calc()) } func main() { @@ -299,14 +294,7 @@ func main() { TLSClientConfig: tlsConfig, } - jitter := newJitter() client := &http.Client{Transport: transport} - for { - err := loop(coordinator, client) - if err != nil { - pollErrorCounter.Inc() - jitter.sleep() - continue - } - } + + coordinator.loop(newBackOffFromFlags(), client) } diff --git a/cmd/client/main_test.go b/cmd/client/main_test.go index e304317..0adeec2 100644 --- a/cmd/client/main_test.go +++ b/cmd/client/main_test.go @@ -22,16 +22,6 @@ import ( "github.com/pkg/errors" ) -func TestJitter(t *testing.T) { - jitter := newJitter() - for i := 0; i < 100000; i++ { - duration := jitter.calc() - if !(jitter.min <= duration || duration <= jitter.cap) { - t.Fatal("invalid jitter value: ", duration) - } - } -} - type TestLogger struct{} func (tl *TestLogger) Log(vars ...interface{}) error { @@ -76,7 +66,7 @@ func TestHandleErr(t *testing.T) { func TestLoop(t *testing.T) { ts, c := prepareTest() defer ts.Close() - if err := loop(c, ts.Client()); err != nil { + if err := c.doPoll(ts.Client()); err != nil { t.Fatal(err) } } diff --git a/go.mod b/go.mod index da2dca7..3307327 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/ShowMax/go-fqdn v0.0.0-20180501083314-6f60894d629f + github.com/cenkalti/backoff/v4 v4.1.0 github.com/go-kit/kit v0.10.0 github.com/google/uuid v1.1.1 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 7af7da0..6a7395e 100644 --- a/go.sum +++ b/go.sum @@ -34,7 +34,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= +github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=