Skip to content

Commit

Permalink
Tracking request counts in queue proxy (#11783)
Browse files Browse the repository at this point in the history
* tracking request counts in queue proxy

* info not infof

* adding benchmark for handler

* copyright

* unit test, params for pause/resume functions

* hacky fix for race condition in test

* drop context in enable check

* reviewer fixes
  • Loading branch information
psschwei authored Sep 1, 2021
1 parent 0e6dbde commit fd04ef8
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,16 @@ func buildServer(ctx context.Context, env config, healthState *health.State, rp
breaker := buildBreaker(logger, env)
metricsSupported := supportsMetrics(ctx, logger, env)
tracingEnabled := env.TracingConfigBackend != tracingconfig.None
concurrencyStateEnabled := env.ConcurrencyStateEndpoint != ""
timeout := time.Duration(env.RevisionTimeoutSeconds) * time.Second

// Create queue handler chain.
// Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first.
var composedHandler http.Handler = httpProxy
if concurrencyStateEnabled {
logger.Info("Concurrency state endpoint set, tracking request counts")
composedHandler = queue.ConcurrencyStateHandler(logger, composedHandler, nil, nil)
}
if metricsSupported {
composedHandler = requestAppMetricsHandler(logger, composedHandler, breaker, env)
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/queue/concurrency_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"net/http"

"go.uber.org/zap"
)

// ConcurrencyStateHandler tracks the in flight requests for the pod. When the requests
// drop to zero, it runs the `pause` function, and when requests scale up from zero, it
// runs the `resume` function. If either of `pause` or `resume` are not passed, it runs
// the respective local function(s). The local functions are the expected behavior; the
// function parameters are enabled primarily for testing purposes.
func ConcurrencyStateHandler(logger *zap.SugaredLogger, h http.Handler, pause, resume func()) http.HandlerFunc {
logger.Info("Concurrency state tracking enabled")

if pause == nil {
pause = func() {}
}

if resume == nil {
resume = func() {}
}

type req struct {
w http.ResponseWriter
r *http.Request

done chan struct{}
}

reqCh := make(chan req)
doneCh := make(chan struct{})
go func() {
inFlight := 0

// This loop is entirely synchronous, so there's no cleverness needed in
// ensuring open and close dont run at the same time etc. Only the
// delegated ServeHTTP is done in a goroutine.
for {
select {
case <-doneCh:
inFlight--
if inFlight == 0 {
logger.Info("Requests dropped to zero ...")
pause()
}

case r := <-reqCh:
inFlight++
if inFlight == 1 {
logger.Info("Requests increased from zero ...")
resume()
}

go func(r req) {
h.ServeHTTP(r.w, r.r)
close(r.done) // Return from ServeHTTP
doneCh <- struct{}{}
}(r)
}
}
}()

return func(w http.ResponseWriter, r *http.Request) {
done := make(chan struct{})
reqCh <- req{w, r, done}
// Block till we've processed the request
<-done
}
}
197 changes: 197 additions & 0 deletions pkg/queue/concurrency_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"net/http"
"net/http/httptest"
"strconv"
"sync"
"testing"
"time"

"go.uber.org/atomic"
pkglogging "knative.dev/pkg/logging"

network "knative.dev/networking/pkg"
)

func TestConcurrencyStateHandler(t *testing.T) {
tests := []struct {
name string
pauses, resumes int64
events map[time.Duration]time.Duration // start time => req length
}{{
name: "single request",
pauses: 1,
resumes: 1,
events: map[time.Duration]time.Duration{
1 * time.Second: 2 * time.Second,
},
}, {
name: "overlapping requests",
pauses: 1,
resumes: 1,
events: map[time.Duration]time.Duration{
25 * time.Millisecond: 100 * time.Millisecond,
75 * time.Millisecond: 200 * time.Millisecond,
},
}, {
name: "subsumbed request",
pauses: 1,
resumes: 1,
events: map[time.Duration]time.Duration{
25 * time.Millisecond: 300 * time.Millisecond,
75 * time.Millisecond: 200 * time.Millisecond,
},
}, {
name: "start stop start",
pauses: 2,
resumes: 2,
events: map[time.Duration]time.Duration{
25 * time.Millisecond: 300 * time.Millisecond,
75 * time.Millisecond: 200 * time.Millisecond,
850 * time.Millisecond: 300 * time.Millisecond,
900 * time.Millisecond: 400 * time.Millisecond,
},
}}

logger, _ := pkglogging.NewLogger("", "error")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
paused := atomic.NewInt64(0)
pause := func() {
paused.Inc()
}

resumed := atomic.NewInt64(0)
resume := func() {
resumed.Inc()
}

delegated := atomic.NewInt64(0)
delegate := func(w http.ResponseWriter, r *http.Request) {
wait, err := strconv.Atoi(r.Header.Get("wait"))
if err != nil {
panic(err)
}

time.Sleep(time.Duration(wait))
delegated.Inc()
}

h := ConcurrencyStateHandler(logger, http.HandlerFunc(delegate), pause, resume)

var wg sync.WaitGroup
wg.Add(len(tt.events))
for delay, length := range tt.events {
length := length
time.AfterFunc(delay, func() {
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "http://target", nil)
r.Header.Set("wait", strconv.FormatInt(int64(length), 10))
h.ServeHTTP(w, r)
wg.Done()
})
}

wg.Wait()
// Allow last update to finish (otherwise values are off, though this doesn't show
// as a race condition when running `go test -race `
// TODO Less hacky fix for this
time.Sleep(100 * time.Microsecond)

if got, want := paused.Load(), tt.pauses; got != want {
t.Errorf("expected to be paused %d times, but was paused %d times", want, got)
}

if got, want := delegated.Load(), int64(len(tt.events)); got != want {
t.Errorf("expected to be delegated %d times, but delegated %d times", want, got)
}

if got, want := resumed.Load(), tt.resumes; got != want {
t.Errorf("expected to be resumed %d times, but was resumed %d times", want, got)
}
})
}
}

func BenchmarkConcurrencyStateProxyHandler(b *testing.B) {
logger, _ := pkglogging.NewLogger("", "error")
baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})
stats := network.NewRequestStats(time.Now())

promStatReporter, err := NewPrometheusStatsReporter(
"ns", "testksvc", "testksvc",
"pod", reportingPeriod)
if err != nil {
b.Fatal("Failed to create stats reporter:", err)
}

req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
req.Header.Set(network.OriginalHostHeader, wantHost)

tests := []struct {
label string
breaker *Breaker
reportPeriod time.Duration
}{{
label: "breaker-10-no-reports",
breaker: NewBreaker(BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}),
reportPeriod: time.Hour,
}, {
label: "breaker-infinite-no-reports",
breaker: nil,
reportPeriod: time.Hour,
}, {
label: "breaker-10-many-reports",
breaker: NewBreaker(BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}),
reportPeriod: time.Microsecond,
}, {
label: "breaker-infinite-many-reports",
breaker: nil,
reportPeriod: time.Microsecond,
}}

for _, tc := range tests {
reportTicker := time.NewTicker(tc.reportPeriod)

go func() {
for now := range reportTicker.C {
promStatReporter.Report(stats.Report(now))
}
}()

h := ConcurrencyStateHandler(logger, ProxyHandler(tc.breaker, stats, true /*tracingEnabled*/, baseHandler), nil, nil)
b.Run("sequential-"+tc.label, func(b *testing.B) {
resp := httptest.NewRecorder()
for j := 0; j < b.N; j++ {
h(resp, req)
}
})
b.Run("parallel-"+tc.label, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
resp := httptest.NewRecorder()
for pb.Next() {
h(resp, req)
}
})
})

reportTicker.Stop()
}
}

0 comments on commit fd04ef8

Please sign in to comment.