Skip to content

Commit

Permalink
Merge pull request #15569 from dmage/maxconnections
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Add limit for number of concurrent connections to registry

The registry might have excessive resource usage under heavy load. To avoid this, we limit the number of concurrent requests. Requests over the MaxRunning limit are enqueued. Requests are rejected if there are MaxInQueue requests in the queue. Request may stay in the queue no more than MaxWaitInQueue.

See also #15448.
  • Loading branch information
openshift-merge-robot authored Aug 9, 2017
2 parents 01e5a1b + 949a64e commit 423e664
Show file tree
Hide file tree
Showing 10 changed files with 573 additions and 3 deletions.
15 changes: 15 additions & 0 deletions images/dockerregistry/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,18 @@ openshift:
# Attention! A weak secret can lead to the leakage of private data.
#
# secret: TopSecretLongToken
requests:
# GET and HEAD requests
read:
# maxrunning is a limit for the number of in-flight requests. A zero value means there is no limit.
maxrunning: 0
# maxinqueue sets the maximum number of requests that can be queued if the limit for the number of in-flight requests is reached.
maxinqueue: 0
# maxwaitinqueue is how long a request can wait in the queue. A zero value means it can wait forever.
maxwaitinqueue: 0
# PUT, PATCH, POST, DELETE requests and internal mirroring requests
write:
# See description of openshift.requests.read.
maxrunning: 0
maxinqueue: 0
maxwaitinqueue: 0
37 changes: 36 additions & 1 deletion pkg/cmd/dockerregistry/dockerregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/openshift/origin/pkg/dockerregistry/server/api"
"github.com/openshift/origin/pkg/dockerregistry/server/audit"
registryconfig "github.com/openshift/origin/pkg/dockerregistry/server/configuration"
"github.com/openshift/origin/pkg/dockerregistry/server/maxconnections"
"github.com/openshift/origin/pkg/dockerregistry/server/prune"
"github.com/openshift/origin/pkg/version"
)
Expand Down Expand Up @@ -155,6 +156,10 @@ func Execute(configFile io.Reader) {
registryClient := server.NewRegistryClient(clientcmd.NewConfig().BindToFile())
ctx = server.WithRegistryClient(ctx, registryClient)

readLimiter := newLimiter(extraConfig.Requests.Read)
writeLimiter := newLimiter(extraConfig.Requests.Write)
ctx = server.WithWriteLimiter(ctx, writeLimiter)

log.WithFields(versionFields()).Info("start registry")
// inject a logger into the uuid library. warns us if there is a problem
// with uuid generation under low entropy.
Expand Down Expand Up @@ -229,7 +234,9 @@ func Execute(configFile io.Reader) {
app.Config.HTTP.Headers.Set("X-Registry-Supports-Signatures", "1")

app.RegisterHealthChecks()
handler := alive("/", app)
handler := http.Handler(app)
handler = limit(readLimiter, writeLimiter, handler)
handler = alive("/", handler)
// TODO: temporarily keep for backwards compatibility; remove in the future
handler = alive("/healthz", handler)
handler = health.Handler(handler)
Expand Down Expand Up @@ -368,6 +375,34 @@ func logLevel(level configuration.Loglevel) log.Level {
return l
}

func newLimiter(c registryconfig.RequestsLimits) maxconnections.Limiter {
if c.MaxRunning <= 0 {
return nil
}
return maxconnections.NewLimiter(c.MaxRunning, c.MaxInQueue, c.MaxWaitInQueue)
}

func limit(readLimiter, writeLimiter maxconnections.Limiter, handler http.Handler) http.Handler {
readHandler := handler
if readLimiter != nil {
readHandler = maxconnections.New(readLimiter, readHandler)
}

writeHandler := handler
if writeLimiter != nil {
writeHandler = maxconnections.New(writeLimiter, writeHandler)
}

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.ToUpper(r.Method) {
case "GET", "HEAD", "OPTIONS":
readHandler.ServeHTTP(w, r)
default:
writeHandler.ServeHTTP(w, r)
}
})
}

// alive simply wraps the handler with a route that always returns an http 200
// response when the path is matched. If the path is not matched, the request
// is passed to the provided handler. There is no guarantee of anything but
Expand Down
17 changes: 15 additions & 2 deletions pkg/dockerregistry/server/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"reflect"
"strings"
"time"

"gopkg.in/yaml.v2"

Expand All @@ -26,15 +27,27 @@ type openshiftConfig struct {
}

type Configuration struct {
Version configuration.Version `yaml:"version"`
Metrics Metrics `yaml:"metrics"`
Version configuration.Version `yaml:"version"`
Metrics Metrics `yaml:"metrics"`
Requests Requests `yaml:"requests"`
}

type Metrics struct {
Enabled bool `yaml:"enabled"`
Secret string `yaml:"secret"`
}

type Requests struct {
Read RequestsLimits
Write RequestsLimits
}

type RequestsLimits struct {
MaxRunning int
MaxInQueue int
MaxWaitInQueue time.Duration
}

type versionInfo struct {
Openshift struct {
Version *configuration.Version
Expand Down
15 changes: 15 additions & 0 deletions pkg/dockerregistry/server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/dockerregistry/server/configuration"
"github.com/openshift/origin/pkg/dockerregistry/server/maxconnections"
)

type contextKey string
Expand All @@ -20,6 +21,9 @@ const (
// registryClientKey is the key for RegistryClient values in Contexts.
registryClientKey contextKey = "registryClient"

// writeLimiterKey is the key for write limiters in Contexts.
writeLimiterKey contextKey = "writeLimiter"

// userClientKey is the key for a origin's client with the current user's
// credentials in Contexts.
userClientKey contextKey = "userClient"
Expand Down Expand Up @@ -71,6 +75,17 @@ func RegistryClientFrom(ctx context.Context) RegistryClient {
return ctx.Value(registryClientKey).(RegistryClient)
}

// WithWriteLimiter returns a new Context with a write limiter.
func WithWriteLimiter(ctx context.Context, writeLimiter maxconnections.Limiter) context.Context {
return context.WithValue(ctx, writeLimiterKey, writeLimiter)
}

// WriteLimiterFrom returns the write limiter if one is stored in ctx, or nil otherwise.
func WriteLimiterFrom(ctx context.Context) maxconnections.Limiter {
writeLimiter, _ := ctx.Value(writeLimiterKey).(maxconnections.Limiter)
return writeLimiter
}

// withUserClient returns a new Context with the origin's client.
// This client should have the current user's credentials
func withUserClient(parent context.Context, userClient client.Interface) context.Context {
Expand Down
83 changes: 83 additions & 0 deletions pkg/dockerregistry/server/maxconnections/counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package maxconnections

import (
"reflect"
"sync"
"testing"
)

type countM map[interface{}]int

type counter struct {
mu sync.Mutex
m countM
}

func newCounter() *counter {
return &counter{
m: make(countM),
}
}

func (c *counter) Add(key interface{}, delta int) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[key] += delta
}

func (c *counter) Values() countM {
c.mu.Lock()
defer c.mu.Unlock()
m := make(map[interface{}]int)
for k, v := range c.m {
m[k] = v
}
return m
}

func (c *counter) Equal(m countM) bool {
c.mu.Lock()
defer c.mu.Unlock()
for k, v := range m {
if c.m[k] != v {
return false
}
}
for k, v := range c.m {
if _, ok := m[k]; !ok && v != 0 {
return false
}
}
return true
}

func TestCounter(t *testing.T) {
c := newCounter()
c.Add(100, 1)
c.Add(200, 2)
c.Add(300, 3)
if expected := (countM{100: 1, 200: 2, 300: 3}); !reflect.DeepEqual(c.m, expected) {
t.Fatalf("c.m = %v, want %v", c.m, expected)
}
if expected := (countM{100: 1, 200: 2, 300: 3}); !c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
}

c.Add(200, -2)
if expected := (countM{100: 1, 200: 0, 300: 3}); !c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
}
if expected := (countM{100: 1, 300: 3}); !c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
}
if expected := (countM{100: 1, 300: 3, 400: 0}); !c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
}

if expected := (countM{100: 1}); c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is true, want false", c.m, expected)
}
if expected := (countM{100: 1, 300: 3, 400: 4}); c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is true, want false", c.m, expected)
}
}
88 changes: 88 additions & 0 deletions pkg/dockerregistry/server/maxconnections/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package maxconnections

import (
"context"
"time"
)

// A Limiter controls starting of jobs.
type Limiter interface {
// Start decides whether a new job can be started. The decision may be
// returned after a delay if the limiter wants to throttle jobs.
Start(context.Context) bool

// Done must be called when a job is finished.
Done()
}

// limiter ensures that there are no more than maxRunning jobs at the same
// time. It can enqueue up to maxInQueue jobs awaiting to be run, for other
// jobs Start will return false immediately.
type limiter struct {
// running is a buffered channel. Before starting a job, an empty struct is
// sent to the channel. When the job is finished, one element is received
// back from the channel. If the channel's buffer is full, the job is
// enqueued.
running chan struct{}

// queue is a buffered channel. An empty struct is placed into the channel
// while a job is waiting for a spot in the running channel's buffer.
// If the queue channel's buffer is full, the job is declined.
queue chan struct{}

// maxWaitInQueue is a maximum wait time in the queue, zero means forever.
maxWaitInQueue time.Duration

// newTimer allows to override the function time.NewTimer for tests.
newTimer func(d time.Duration) *time.Timer
}

// NewLimiter return a limiter that allows no more than maxRunning jobs at the
// same time. It can enqueue up to maxInQueue jobs awaiting to be run, and a
// job may wait in the queue no more than maxWaitInQueue.
func NewLimiter(maxRunning, maxInQueue int, maxWaitInQueue time.Duration) Limiter {
return &limiter{
running: make(chan struct{}, maxRunning),
queue: make(chan struct{}, maxInQueue),
maxWaitInQueue: maxWaitInQueue,
newTimer: time.NewTimer,
}
}

func (l *limiter) Start(ctx context.Context) bool {
select {
case l.running <- struct{}{}:
return true
default:
}

// Slow-path.
select {
case l.queue <- struct{}{}:
defer func() {
<-l.queue
}()
default:
return false
}

var timeout <-chan time.Time
// if l.maxWaitInQueue is 0, timeout will stay nil which practically means wait forever.
if l.maxWaitInQueue > 0 {
timer := l.newTimer(l.maxWaitInQueue)
defer timer.Stop()
timeout = timer.C
}

select {
case l.running <- struct{}{}:
return true
case <-timeout:
case <-ctx.Done():
}
return false
}

func (l *limiter) Done() {
<-l.running
}
Loading

0 comments on commit 423e664

Please sign in to comment.