Skip to content

Commit

Permalink
Add limit for number of concurrent connections to registry
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Bulatov committed Jul 31, 2017
1 parent 3a936e7 commit aa13f25
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 3 deletions.
4 changes: 4 additions & 0 deletions images/dockerregistry/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ openshift:
# Attention! A weak secret can lead to the leakage of private data.
#
# secret: TopSecretLongToken
requests:
maxrunning: 0
maxinqueue: 0
maxwaitinqueue: 0s
11 changes: 10 additions & 1 deletion pkg/cmd/dockerregistry/dockerregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/openshift/origin/pkg/dockerregistry/server/api"
"github.com/openshift/origin/pkg/dockerregistry/server/audit"
registryconfig "github.com/openshift/origin/pkg/dockerregistry/server/configuration"
"github.com/openshift/origin/pkg/dockerregistry/server/maxconnections"
)

// Execute runs the Docker registry.
Expand Down Expand Up @@ -137,7 +138,15 @@ func Execute(configFile io.Reader) {
app.Config.HTTP.Headers.Set("X-Registry-Supports-Signatures", "1")

app.RegisterHealthChecks()
handler := alive("/", app)
handler := http.Handler(app)
if extraConfig.Requests.MaxRunning > 0 {
maxconnectionsHandler := maxconnections.New(extraConfig.Requests.MaxRunning, extraConfig.Requests.MaxInQueue, handler)
if extraConfig.Requests.MaxWaitInQueue > 0 {
maxconnectionsHandler.MaxWaitInQueue = extraConfig.Requests.MaxWaitInQueue
}
handler = maxconnectionsHandler
}
handler = alive("/", handler)
// TODO: temporarily keep for backwards compatibility; remove in the future
handler = alive("/healthz", handler)
handler = health.Handler(handler)
Expand Down
12 changes: 10 additions & 2 deletions pkg/dockerregistry/server/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"reflect"
"strings"
"time"

"gopkg.in/yaml.v2"

Expand All @@ -26,15 +27,22 @@ type openshiftConfig struct {
}

type Configuration struct {
Version configuration.Version `yaml:"version"`
Metrics Metrics `yaml:"metrics"`
Version configuration.Version `yaml:"version"`
Metrics Metrics `yaml:"metrics"`
Requests Requests `yaml:"requests"`
}

type Metrics struct {
Enabled bool `yaml:"enabled"`
Secret string `yaml:"secret"`
}

type Requests struct {
MaxRunning int
MaxInQueue int
MaxWaitInQueue time.Duration
}

type versionInfo struct {
Openshift struct {
Version *configuration.Version
Expand Down
102 changes: 102 additions & 0 deletions pkg/dockerregistry/server/maxconnections/maxconnections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package maxconnections

import (
"context"
"net/http"
"time"
)

func defaultOverloadHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "503 service is overloaded, please try again later", http.StatusServiceUnavailable)
}

// DefaultOverloadHandler is a default OverloadHandler that used by New.
var DefaultOverloadHandler http.Handler = http.HandlerFunc(defaultOverloadHandler)

// Handler implements the http.Handler interface.
type Handler struct {
// running is a buffered channel. Before invoking the handler, an empty
// struct is sent to the channel. When the handler is finished, one element
// is received back from the channel. If the channel's buffer is full, the
// request is enqueued.
running chan struct{}

// queue is a buffered channel. An empty struct is placed into the channel
// while a request is waiting for a spot in the running channel's buffer.
// If the queue channel's buffer is full, the request is proccess by
// OverloadHandler.
queue chan struct{}

// handler to invoke.
handler http.Handler

// MaxWaitInQueue is a maximum wait time in the queue.
MaxWaitInQueue time.Duration

// OverloadHandler is called if there is no space in running and queue
// channels.
OverloadHandler http.Handler

// newTimer allows to override the function newTimer for tests.
newTimer func(d time.Duration) *time.Timer
}

// New returns an http.Handler that runs no more than maxRunning h at the same
// time. It can enqueue up to maxInQueue requests awaiting to be run, for other
// requests OverloadHandler will be invoked.
func New(maxRunning, maxInQueue int, h http.Handler) *Handler {
return &Handler{
running: make(chan struct{}, maxRunning),
queue: make(chan struct{}, maxInQueue),
handler: h,

OverloadHandler: DefaultOverloadHandler,
newTimer: time.NewTimer,
}
}

func (h *Handler) enqueueRunning(ctx context.Context) bool {
select {
case h.running <- struct{}{}:
return true
default:
}

// Slow-path.
select {
case h.queue <- struct{}{}:
defer func() {
<-h.queue
}()
default:
return false
}

var timer *time.Timer
var timeout <-chan time.Time
if h.MaxWaitInQueue > 0 {
timer = h.newTimer(h.MaxWaitInQueue)
defer timer.Stop()
timeout = timer.C
}

select {
case h.running <- struct{}{}:
return true
case <-timeout:
case <-ctx.Done():
}
return false
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.enqueueRunning(r.Context()) {
defer func() {
<-h.running
}()
h.handler.ServeHTTP(w, r)
return
}

h.OverloadHandler.ServeHTTP(w, r)
}
166 changes: 166 additions & 0 deletions pkg/dockerregistry/server/maxconnections/maxconnections_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package maxconnections

import (
"net/http"
"net/http/httptest"
"reflect"
"sync"
"testing"
"time"
)

type counter struct {
mu sync.Mutex
m map[int]int
}

func newCounter() *counter {
return &counter{
m: make(map[int]int),
}
}

func (c *counter) Add(key int, delta int) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[key] += delta
}

func (c *counter) Values() map[int]int {
c.mu.Lock()
defer c.mu.Unlock()
m := make(map[int]int)
for k, v := range c.m {
m[k] = v
}
return m
}

func (c *counter) Equal(m map[int]int) bool {
c.mu.Lock()
defer c.mu.Unlock()
for k, v := range m {
if c.m[k] != v {
return false
}
}
for k, v := range c.m {
if _, ok := m[k]; !ok && v != 0 {
return false
}
}
return true
}

func TestCoutner(t *testing.T) {
c := newCounter()
c.Add(100, 1)
c.Add(200, 2)
c.Add(300, 3)
if expected := map[int]int{100: 1, 200: 2, 300: 3}; !reflect.DeepEqual(c.m, expected) {
t.Fatalf("c.m = %v, want %v", c.m, expected)
}
if expected := map[int]int{100: 1, 200: 2, 300: 3}; !c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
}

c.Add(200, -2)
if expected := map[int]int{100: 1, 200: 0, 300: 3}; !c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
}
if expected := map[int]int{100: 1, 300: 3}; !c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
}
if expected := map[int]int{100: 1, 300: 3, 400: 0}; !c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is false, want true", c.m, expected)
}

if expected := map[int]int{100: 1}; c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is true, want false", c.m, expected)
}
if expected := map[int]int{100: 1, 300: 3, 400: 4}; c.Equal(expected) {
t.Fatalf("counter(%v).Equal(%v) is true, want false", c.m, expected)
}
}

func TestMaxConnections(t *testing.T) {
const timeout = 1 * time.Second

maxRunning := 2
maxInQueue := 3
handlerBarrier := make(chan struct{}, maxRunning+maxInQueue+1)
h := New(maxRunning, maxInQueue, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-handlerBarrier
http.Error(w, "OK", http.StatusOK)
}))

deadline := make(chan time.Time)
h.newTimer = func(d time.Duration) *time.Timer {
t := time.NewTimer(d)
t.C = deadline
return t
}
h.MaxWaitInQueue = 1 // all clients in the queue will be rejected when the channel deadline is closed.

ts := httptest.NewServer(h)
defer ts.Close()
defer func() {
// Finish all pending requests in case of an error.
// This prevents ts.Close() from being stuck.
close(handlerBarrier)
}()

c := newCounter()
done := make(chan struct{})
wait := func(reason string) {
select {
case <-done:
case <-time.After(timeout):
t.Fatal(reason)
}
}
for i := 0; i < maxRunning+maxInQueue+1; i++ {
go func() {
res, err := http.Get(ts.URL)
if err != nil {
t.Errorf("failed to get %s: %s", ts.URL, err)
}
c.Add(res.StatusCode, 1)
done <- struct{}{}
}()
}

wait("timeout while waiting one failed client")

// expected state: 2 running, 3 in queue, 1 failed
if expected := map[int]int{503: 1}; !c.Equal(expected) {
t.Errorf("c = %v, want %v", c.Values(), expected)
}

handlerBarrier <- struct{}{}
wait("timeout while waiting one succeed client")

// expected state: 2 running, 2 in queue, 1 failed, 1 succeed
if expected := map[int]int{200: 1, 503: 1}; !c.Equal(expected) {
t.Errorf("c = %v, want %v", c.Values(), expected)
}

close(deadline)
wait("timeout while waiting the first failed client from the queue")
wait("timeout while waiting the second failed client from the queue")

// expected state: 2 running, 0 in queue, 3 failed, 1 succeed
if expected := map[int]int{200: 1, 503: 3}; !c.Equal(expected) {
t.Errorf("c = %v, want %v", c.Values(), expected)
}

handlerBarrier <- struct{}{}
handlerBarrier <- struct{}{}
wait("timeout while waiting the first succeed client")
wait("timeout while waiting the second succeed client")

// expected state: 0 running, 0 in queue, 3 failed, 3 succeed
if expected := map[int]int{200: 3, 503: 3}; !c.Equal(expected) {
t.Errorf("c = %v, want %v", c.Values(), expected)
}
}

0 comments on commit aa13f25

Please sign in to comment.