From 7f452a1e7eed6049bd237e66eb65d3be74c7bcba Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 26 Jan 2023 14:03:10 +0200 Subject: [PATCH 1/7] add health checks --- injection/sharedmain/main.go | 17 +++++++ network/health_check.go | 90 ++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 network/health_check.go diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 8af9eb5d1f..fcba23cb8e 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -48,6 +48,7 @@ import ( "knative.dev/pkg/logging" "knative.dev/pkg/logging/logkey" "knative.dev/pkg/metrics" + "knative.dev/pkg/network" "knative.dev/pkg/profiling" "knative.dev/pkg/reconciler" "knative.dev/pkg/signals" @@ -207,6 +208,10 @@ func MainWithContext(ctx context.Context, component string, ctors ...injection.C ctx = WithHADisabled(ctx) } + if HasHealthProbesEnabled(ctx) { + network.ServeHealthProbes(ctx) + } + MainWithConfig(ctx, component, cfg, ctors...) } @@ -324,6 +329,18 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto } } +type healthProbesEnabledKey struct{} + +// WithHealthProbesEnabled signals to MainWithContext that it should enable default probes (readiness and liveness). +func WithHealthProbesEnabled(ctx context.Context) context.Context { + return context.WithValue(ctx, healthProbesEnabledKey{}, struct{}{}) +} + +// HasHealthProbesEnabled checks if default health checks are enabled in the related context. +func HasHealthProbesEnabled(ctx context.Context) bool { + return ctx.Value(healthProbesEnabledKey{}) != nil +} + func flush(logger *zap.SugaredLogger) { logger.Sync() metrics.FlushExporter() diff --git a/network/health_check.go b/network/health_check.go new file mode 100644 index 0000000000..b9ba5fd982 --- /dev/null +++ b/network/health_check.go @@ -0,0 +1,90 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package network + +import ( + "context" + "errors" + "log" + "net/http" + "os" + "sync" + "time" +) + +// ServeHealthProbes sets up liveness and readiness probes. +func ServeHealthProbes(ctx context.Context) { + port := os.Getenv("KNATIVE_HEALTH_PROBES_PORT") + if port == "" { + port = "8080" + } + handler := healthHandler{HealthCheck: newHealthCheck(ctx)} + mux := http.NewServeMux() + mux.HandleFunc("/", handler.handle) + mux.HandleFunc("/health", handler.handle) + mux.HandleFunc("/readiness", handler.handle) + + server := http.Server{ReadHeaderTimeout: time.Minute, Handler: mux, Addr: ":" + port} + + go func() { + go func() { + <-ctx.Done() + _ = server.Shutdown(ctx) + }() + + // start the web server on port and accept requests + log.Printf("Probes server listening on port %s", port) + + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatal(err) + } + }() +} + +func newHealthCheck(sigCtx context.Context) func() error { + once := sync.Once{} + return func() error { + select { + // When we get SIGTERM (sigCtx done), let readiness probes start failing. + case <-sigCtx.Done(): + once.Do(func() { + log.Println("Signal context canceled") + }) + return errors.New("received SIGTERM from kubelet") + default: + return nil + } + } +} + +// healthHandler handles responding to kubelet probes with a provided health check. +type healthHandler struct { + HealthCheck func() error +} + +func (h *healthHandler) handle(w http.ResponseWriter, r *http.Request) { + if IsKubeletProbe(r) { + if err := h.HealthCheck(); err != nil { + log.Println("Healthcheck failed: ", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + return + } + http.Error(w, "Unexpected request", http.StatusBadRequest) +} From 0da1d16d722074eecb97b177607d4f8fa27df0d8 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 3 Feb 2023 16:04:35 +0200 Subject: [PATCH 2/7] updates --- injection/sharedmain/main.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index fcba23cb8e..2cacb0f0a7 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -208,10 +208,6 @@ func MainWithContext(ctx context.Context, component string, ctors ...injection.C ctx = WithHADisabled(ctx) } - if HasHealthProbesEnabled(ctx) { - network.ServeHealthProbes(ctx) - } - MainWithConfig(ctx, component, cfg, ctors...) } @@ -318,6 +314,11 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto return controller.StartAll(ctx, controllers...) }) + // Setup default health checks to catch issues with cache sync etc. + if !HealthProbesDisabled(ctx) { + network.ServeHealthProbes(ctx) + } + // This will block until either a signal arrives or one of the grouped functions // returns an error. <-egCtx.Done() @@ -329,16 +330,16 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto } } -type healthProbesEnabledKey struct{} +type healthProbesDisabledKey struct{} -// WithHealthProbesEnabled signals to MainWithContext that it should enable default probes (readiness and liveness). -func WithHealthProbesEnabled(ctx context.Context) context.Context { - return context.WithValue(ctx, healthProbesEnabledKey{}, struct{}{}) +// WithHealthProbesDisabled signals to MainWithContext that it should disable default probes (readiness and liveness). +func WithHealthProbesDisabled(ctx context.Context) context.Context { + return context.WithValue(ctx, healthProbesDisabledKey{}, struct{}{}) } -// HasHealthProbesEnabled checks if default health checks are enabled in the related context. -func HasHealthProbesEnabled(ctx context.Context) bool { - return ctx.Value(healthProbesEnabledKey{}) != nil +// HealthProbesDisabled checks if default health checks are disabled in the related context. +func HealthProbesDisabled(ctx context.Context) bool { + return ctx.Value(healthProbesDisabledKey{}) != nil } func flush(logger *zap.SugaredLogger) { From af5d2b965b7a726b196e63849b876e09fc409f67 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Tue, 7 Feb 2023 14:07:00 +0200 Subject: [PATCH 3/7] make probes more flexible --- network/health_check.go | 133 ++++++++++++++++++++++++++--------- network/health_check_test.go | 109 ++++++++++++++++++++++++++++ 2 files changed, 208 insertions(+), 34 deletions(-) create mode 100644 network/health_check_test.go diff --git a/network/health_check.go b/network/health_check.go index b9ba5fd982..2b8359550b 100644 --- a/network/health_check.go +++ b/network/health_check.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The Knative Authors +Copyright 2023 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,26 +19,24 @@ package network import ( "context" "errors" - "log" "net/http" "os" "sync" "time" + + "knative.dev/pkg/logging" ) // ServeHealthProbes sets up liveness and readiness probes. +// If user sets no probes explicitly via the context then defaults are added. func ServeHealthProbes(ctx context.Context) { + logger := logging.FromContext(ctx) port := os.Getenv("KNATIVE_HEALTH_PROBES_PORT") if port == "" { port = "8080" } - handler := healthHandler{HealthCheck: newHealthCheck(ctx)} - mux := http.NewServeMux() - mux.HandleFunc("/", handler.handle) - mux.HandleFunc("/health", handler.handle) - mux.HandleFunc("/readiness", handler.handle) - server := http.Server{ReadHeaderTimeout: time.Minute, Handler: mux, Addr: ":" + port} + server := http.Server{ReadHeaderTimeout: time.Minute, Handler: muxWithHandles(ctx), Addr: ":" + port} go func() { go func() { @@ -47,44 +45,111 @@ func ServeHealthProbes(ctx context.Context) { }() // start the web server on port and accept requests - log.Printf("Probes server listening on port %s", port) + logger.Infof("Probes server listening on port %s", port) if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatal(err) + logger.Fatal(err) } }() } -func newHealthCheck(sigCtx context.Context) func() error { - once := sync.Once{} - return func() error { - select { - // When we get SIGTERM (sigCtx done), let readiness probes start failing. - case <-sigCtx.Done(): - once.Do(func() { - log.Println("Signal context canceled") - }) - return errors.New("received SIGTERM from kubelet") - default: - return nil +func muxWithHandles(ctx context.Context) *http.ServeMux { + var handler HealthCheckHandler + if h := getOverrideHealthCheckHandler(ctx); h != nil { + handler = *h + } else { + defaultHandle := newDefaultProbesHandle(ctx) + handler = HealthCheckHandler{ + ReadinessProbeRequestHandle: defaultHandle, + LivenessProbeRequestHandle: defaultHandle, } } + mux := http.NewServeMux() + if UserReadinessProbe(ctx) { + mux.HandleFunc("/readiness", handler.ReadinessProbeRequestHandle) + } + if UserLivenessProbe(ctx) { + mux.HandleFunc("/health", handler.LivenessProbeRequestHandle) + } + // Set both probes if user does not want to explicitly set only one. + if !UserReadinessProbe(ctx) && !UserLivenessProbe(ctx) { + mux.HandleFunc("/readiness", handler.ReadinessProbeRequestHandle) + mux.HandleFunc("/health", handler.LivenessProbeRequestHandle) + } + return mux } -// healthHandler handles responding to kubelet probes with a provided health check. -type healthHandler struct { - HealthCheck func() error +// HealthCheckHandler allows to set up a handler for probes. User can override the default handler +// with a custom one. +type HealthCheckHandler struct { + ReadinessProbeRequestHandle http.HandlerFunc + LivenessProbeRequestHandle http.HandlerFunc } -func (h *healthHandler) handle(w http.ResponseWriter, r *http.Request) { - if IsKubeletProbe(r) { - if err := h.HealthCheck(); err != nil { - log.Println("Healthcheck failed: ", err.Error()) - http.Error(w, err.Error(), http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) +func newDefaultProbesHandle(sigCtx context.Context) http.HandlerFunc { + logger := logging.FromContext(sigCtx) + once := sync.Once{} + return func(w http.ResponseWriter, r *http.Request) { + f := func() error { + select { + // When we get SIGTERM (sigCtx done), let readiness probes start failing. + case <-sigCtx.Done(): + once.Do(func() { + logger.Info("Signal context canceled") + }) + return errors.New("received SIGTERM from kubelet") + default: + return nil + } + } + + if IsKubeletProbe(r) { + if err := f(); err != nil { + logger.Errorf("Healthcheck failed: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + return } - return + http.Error(w, "Unexpected request", http.StatusBadRequest) + } +} + +type addUserReadinessProbeKey struct{} + +// WithUserReadinessProbe signals to ServeHealthProbes that it should set explicitly a readiness probe. +func WithUserReadinessProbe(ctx context.Context) context.Context { + return context.WithValue(ctx, addUserReadinessProbeKey{}, struct{}{}) +} + +// UserReadinessProbe checks if user has explicitly requested to set a readiness probe in the related context. +func UserReadinessProbe(ctx context.Context) bool { + return ctx.Value(addUserReadinessProbeKey{}) != nil +} + +type addUserLivenessProbeKey struct{} + +// WithUserLivenessProbe signals to ServeHealthProbes that it should set explicitly a liveness probe. +func WithUserLivenessProbe(ctx context.Context) context.Context { + return context.WithValue(ctx, addUserLivenessProbeKey{}, struct{}{}) +} + +// UserLivenessProbe checks if user has explicitly requested to set a liveness probe in the related context. +func UserLivenessProbe(ctx context.Context) bool { + return ctx.Value(addUserLivenessProbeKey{}) != nil +} + +type overrideHealthCheckHandlerKey struct{} + +// WithOverrideHealthCheckHandler signals to ServeHealthProbes that it should override the default handler with the one passed. +func WithOverrideHealthCheckHandler(ctx context.Context, handler *HealthCheckHandler) context.Context { + return context.WithValue(ctx, overrideHealthCheckHandlerKey{}, handler) +} + +func getOverrideHealthCheckHandler(ctx context.Context) *HealthCheckHandler { + if ctx.Value(overrideHealthCheckHandlerKey{}) != nil { + return ctx.Value(overrideHealthCheckHandlerKey{}).(*HealthCheckHandler) } - http.Error(w, "Unexpected request", http.StatusBadRequest) + return nil } diff --git a/network/health_check_test.go b/network/health_check_test.go new file mode 100644 index 0000000000..b7bbac109d --- /dev/null +++ b/network/health_check_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package network + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +func TestHealthCheckHandler(t *testing.T) { + tests := []struct { + name string + ctx context.Context + header http.Header + expectedReadiness int + expectedLiveness int + }{{ + name: "default health check handler", + ctx: context.Background(), + header: http.Header{ + UserAgentKey: []string{KubeProbeUAPrefix}, + }, + expectedReadiness: http.StatusOK, + expectedLiveness: http.StatusOK, + }, { + name: "default health check handler, no kubelet probe", + ctx: context.Background(), + header: http.Header{}, + expectedReadiness: http.StatusBadRequest, + expectedLiveness: http.StatusBadRequest, + }, { + name: "serve only readiness probes", + ctx: WithUserReadinessProbe(context.Background()), + header: http.Header{ + UserAgentKey: []string{KubeProbeUAPrefix}, + }, + expectedReadiness: http.StatusOK, + expectedLiveness: http.StatusNotFound, + }, { + name: "serve only liveness probes", + ctx: WithUserLivenessProbe(context.Background()), + header: http.Header{ + UserAgentKey: []string{KubeProbeUAPrefix}, + }, + expectedReadiness: http.StatusNotFound, + expectedLiveness: http.StatusOK, + }, { + name: "user provided health check handler", + ctx: WithOverrideHealthCheckHandler(context.Background(), &HealthCheckHandler{ + ReadinessProbeRequestHandle: testHandler(), + LivenessProbeRequestHandle: testHandler(), + }), + header: http.Header{ + UserAgentKey: []string{KubeProbeUAPrefix}, + }, + expectedReadiness: http.StatusBadGateway, + expectedLiveness: http.StatusBadGateway, + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mux := muxWithHandles(tc.ctx) + reqReadiness := http.Request{ + URL: &url.URL{ + Path: "/readiness", + }, + Header: tc.header, + } + resp := httptest.NewRecorder() + mux.ServeHTTP(resp, &reqReadiness) + if got, want := resp.Code, tc.expectedReadiness; got != want { + t.Errorf("Probe status = %d, wanted %d", got, want) + } + reqLiveness := http.Request{ + URL: &url.URL{ + Path: "/health", + }, + Header: tc.header, + } + resp = httptest.NewRecorder() + mux.ServeHTTP(resp, &reqLiveness) + if got, want := resp.Code, tc.expectedLiveness; got != want { + t.Errorf("Probe status = %d, wanted %d", got, want) + } + }) + } +} + +func testHandler() http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + http.Error(w, "test", http.StatusBadGateway) + } +} From 8ad9c50c19afb2ab0ae22d6a2536ba3d111131cb Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 9 Feb 2023 14:03:52 +0200 Subject: [PATCH 4/7] simplify --- injection/health_check.go | 114 ++++++++++++++ {network => injection}/health_check_test.go | 50 ++----- injection/sharedmain/main.go | 3 +- network/health_check.go | 155 -------------------- 4 files changed, 128 insertions(+), 194 deletions(-) create mode 100644 injection/health_check.go rename {network => injection}/health_check_test.go (62%) delete mode 100644 network/health_check.go diff --git a/injection/health_check.go b/injection/health_check.go new file mode 100644 index 0000000000..57a26aa5c0 --- /dev/null +++ b/injection/health_check.go @@ -0,0 +1,114 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package injection + +import ( + "context" + "errors" + "net/http" + "os" + "time" + + "knative.dev/pkg/logging" +) + +// ServeHealthProbes sets up liveness and readiness probes. +// If user sets no probes explicitly via the context then defaults are added. +func ServeHealthProbes(ctx context.Context) { + logger := logging.FromContext(ctx) + port := os.Getenv("KNATIVE_HEALTH_PROBES_PORT") + if port == "" { + port = "8080" + } + + server := http.Server{ReadHeaderTimeout: time.Minute, Handler: muxWithHandles(ctx), Addr: ":" + port} + + go func() { + go func() { + <-ctx.Done() + _ = server.Shutdown(ctx) + }() + + // start the web server on port and accept requests + logger.Infof("Probes server listening on port %s", port) + + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Fatal(err) + } + }() +} + +func muxWithHandles(ctx context.Context) *http.ServeMux { + mux := http.NewServeMux() + readiness := getReadinessHandleOrDefault(ctx) + liveness := getLivenessHandleOrDefault(ctx) + mux.HandleFunc("/readiness", *readiness) + mux.HandleFunc("/health", *liveness) + return mux +} + +func newDefaultProbesHandle(sigCtx context.Context) http.HandlerFunc { + logger := logging.FromContext(sigCtx) + return func(w http.ResponseWriter, r *http.Request) { + f := func() error { + select { + // When we get SIGTERM (sigCtx done), let readiness probes start failing. + case <-sigCtx.Done(): + logger.Info("Signal context canceled") + return errors.New("received SIGTERM from kubelet") + default: + return nil + } + } + if err := f(); err != nil { + logger.Errorf("Healthcheck failed: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + } +} + +type addReadinessKey struct{} + +// AddReadiness signals to probe setup logic to add a user provided probe handler +func AddReadiness(ctx context.Context, handlerFunc http.HandlerFunc) context.Context { + return context.WithValue(ctx, addReadinessKey{}, &handlerFunc) +} + +func getReadinessHandleOrDefault(ctx context.Context) *http.HandlerFunc { + if ctx.Value(addReadinessKey{}) != nil { + return ctx.Value(addReadinessKey{}).(*http.HandlerFunc) + } + defaultHandle := newDefaultProbesHandle(ctx) + return &defaultHandle +} + +type addLivenessKey struct{} + +func getLivenessHandleOrDefault(ctx context.Context) *http.HandlerFunc { + if ctx.Value(addLivenessKey{}) != nil { + return ctx.Value(addLivenessKey{}).(*http.HandlerFunc) + } + defaultHandle := newDefaultProbesHandle(ctx) + return &defaultHandle +} + +// AddLiveness signals to probe setup logic to add a user provided probe handler +func AddLiveness(ctx context.Context, handlerFunc http.HandlerFunc) context.Context { + return context.WithValue(ctx, addLivenessKey{}, &handlerFunc) +} diff --git a/network/health_check_test.go b/injection/health_check_test.go similarity index 62% rename from network/health_check_test.go rename to injection/health_check_test.go index b7bbac109d..b9d68eb9ae 100644 --- a/network/health_check_test.go +++ b/injection/health_check_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package network +package injection import ( "context" @@ -28,49 +28,27 @@ func TestHealthCheckHandler(t *testing.T) { tests := []struct { name string ctx context.Context - header http.Header expectedReadiness int expectedLiveness int }{{ - name: "default health check handler", - ctx: context.Background(), - header: http.Header{ - UserAgentKey: []string{KubeProbeUAPrefix}, - }, + name: "user provided no handlers, default health check handlers are used", + ctx: context.Background(), expectedReadiness: http.StatusOK, expectedLiveness: http.StatusOK, }, { - name: "default health check handler, no kubelet probe", - ctx: context.Background(), - header: http.Header{}, - expectedReadiness: http.StatusBadRequest, - expectedLiveness: http.StatusBadRequest, + name: "user provided readiness health check handler, liveness default handler is used", + ctx: AddReadiness(context.Background(), testHandler()), + expectedReadiness: http.StatusBadGateway, + expectedLiveness: http.StatusOK, }, { - name: "serve only readiness probes", - ctx: WithUserReadinessProbe(context.Background()), - header: http.Header{ - UserAgentKey: []string{KubeProbeUAPrefix}, - }, + name: "user provided liveness health check handler, readiness default handler is used", + ctx: AddLiveness(context.Background(), testHandler()), expectedReadiness: http.StatusOK, - expectedLiveness: http.StatusNotFound, - }, { - name: "serve only liveness probes", - ctx: WithUserLivenessProbe(context.Background()), - header: http.Header{ - UserAgentKey: []string{KubeProbeUAPrefix}, - }, - expectedReadiness: http.StatusNotFound, - expectedLiveness: http.StatusOK, + expectedLiveness: http.StatusBadGateway, }, { - name: "user provided health check handler", - ctx: WithOverrideHealthCheckHandler(context.Background(), &HealthCheckHandler{ - ReadinessProbeRequestHandle: testHandler(), - LivenessProbeRequestHandle: testHandler(), - }), - header: http.Header{ - UserAgentKey: []string{KubeProbeUAPrefix}, - }, - expectedReadiness: http.StatusBadGateway, + name: "user provided custom probes", + ctx: AddReadiness(AddLiveness(context.Background(), testHandler()), testHandler()), + expectedReadiness: http.StatusOK, expectedLiveness: http.StatusBadGateway, }} for _, tc := range tests { @@ -80,7 +58,6 @@ func TestHealthCheckHandler(t *testing.T) { URL: &url.URL{ Path: "/readiness", }, - Header: tc.header, } resp := httptest.NewRecorder() mux.ServeHTTP(resp, &reqReadiness) @@ -91,7 +68,6 @@ func TestHealthCheckHandler(t *testing.T) { URL: &url.URL{ Path: "/health", }, - Header: tc.header, } resp = httptest.NewRecorder() mux.ServeHTTP(resp, &reqLiveness) diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 2cacb0f0a7..0f776e6b5f 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -48,7 +48,6 @@ import ( "knative.dev/pkg/logging" "knative.dev/pkg/logging/logkey" "knative.dev/pkg/metrics" - "knative.dev/pkg/network" "knative.dev/pkg/profiling" "knative.dev/pkg/reconciler" "knative.dev/pkg/signals" @@ -316,7 +315,7 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto // Setup default health checks to catch issues with cache sync etc. if !HealthProbesDisabled(ctx) { - network.ServeHealthProbes(ctx) + injection.ServeHealthProbes(ctx) } // This will block until either a signal arrives or one of the grouped functions diff --git a/network/health_check.go b/network/health_check.go deleted file mode 100644 index 2b8359550b..0000000000 --- a/network/health_check.go +++ /dev/null @@ -1,155 +0,0 @@ -/* -Copyright 2023 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package network - -import ( - "context" - "errors" - "net/http" - "os" - "sync" - "time" - - "knative.dev/pkg/logging" -) - -// ServeHealthProbes sets up liveness and readiness probes. -// If user sets no probes explicitly via the context then defaults are added. -func ServeHealthProbes(ctx context.Context) { - logger := logging.FromContext(ctx) - port := os.Getenv("KNATIVE_HEALTH_PROBES_PORT") - if port == "" { - port = "8080" - } - - server := http.Server{ReadHeaderTimeout: time.Minute, Handler: muxWithHandles(ctx), Addr: ":" + port} - - go func() { - go func() { - <-ctx.Done() - _ = server.Shutdown(ctx) - }() - - // start the web server on port and accept requests - logger.Infof("Probes server listening on port %s", port) - - if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - logger.Fatal(err) - } - }() -} - -func muxWithHandles(ctx context.Context) *http.ServeMux { - var handler HealthCheckHandler - if h := getOverrideHealthCheckHandler(ctx); h != nil { - handler = *h - } else { - defaultHandle := newDefaultProbesHandle(ctx) - handler = HealthCheckHandler{ - ReadinessProbeRequestHandle: defaultHandle, - LivenessProbeRequestHandle: defaultHandle, - } - } - mux := http.NewServeMux() - if UserReadinessProbe(ctx) { - mux.HandleFunc("/readiness", handler.ReadinessProbeRequestHandle) - } - if UserLivenessProbe(ctx) { - mux.HandleFunc("/health", handler.LivenessProbeRequestHandle) - } - // Set both probes if user does not want to explicitly set only one. - if !UserReadinessProbe(ctx) && !UserLivenessProbe(ctx) { - mux.HandleFunc("/readiness", handler.ReadinessProbeRequestHandle) - mux.HandleFunc("/health", handler.LivenessProbeRequestHandle) - } - return mux -} - -// HealthCheckHandler allows to set up a handler for probes. User can override the default handler -// with a custom one. -type HealthCheckHandler struct { - ReadinessProbeRequestHandle http.HandlerFunc - LivenessProbeRequestHandle http.HandlerFunc -} - -func newDefaultProbesHandle(sigCtx context.Context) http.HandlerFunc { - logger := logging.FromContext(sigCtx) - once := sync.Once{} - return func(w http.ResponseWriter, r *http.Request) { - f := func() error { - select { - // When we get SIGTERM (sigCtx done), let readiness probes start failing. - case <-sigCtx.Done(): - once.Do(func() { - logger.Info("Signal context canceled") - }) - return errors.New("received SIGTERM from kubelet") - default: - return nil - } - } - - if IsKubeletProbe(r) { - if err := f(); err != nil { - logger.Errorf("Healthcheck failed: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - } - return - } - http.Error(w, "Unexpected request", http.StatusBadRequest) - } -} - -type addUserReadinessProbeKey struct{} - -// WithUserReadinessProbe signals to ServeHealthProbes that it should set explicitly a readiness probe. -func WithUserReadinessProbe(ctx context.Context) context.Context { - return context.WithValue(ctx, addUserReadinessProbeKey{}, struct{}{}) -} - -// UserReadinessProbe checks if user has explicitly requested to set a readiness probe in the related context. -func UserReadinessProbe(ctx context.Context) bool { - return ctx.Value(addUserReadinessProbeKey{}) != nil -} - -type addUserLivenessProbeKey struct{} - -// WithUserLivenessProbe signals to ServeHealthProbes that it should set explicitly a liveness probe. -func WithUserLivenessProbe(ctx context.Context) context.Context { - return context.WithValue(ctx, addUserLivenessProbeKey{}, struct{}{}) -} - -// UserLivenessProbe checks if user has explicitly requested to set a liveness probe in the related context. -func UserLivenessProbe(ctx context.Context) bool { - return ctx.Value(addUserLivenessProbeKey{}) != nil -} - -type overrideHealthCheckHandlerKey struct{} - -// WithOverrideHealthCheckHandler signals to ServeHealthProbes that it should override the default handler with the one passed. -func WithOverrideHealthCheckHandler(ctx context.Context, handler *HealthCheckHandler) context.Context { - return context.WithValue(ctx, overrideHealthCheckHandlerKey{}, handler) -} - -func getOverrideHealthCheckHandler(ctx context.Context) *HealthCheckHandler { - if ctx.Value(overrideHealthCheckHandlerKey{}) != nil { - return ctx.Value(overrideHealthCheckHandlerKey{}).(*HealthCheckHandler) - } - return nil -} From 3d90f13acb08e4ba1e2e5f1b2cab79b9954c0c67 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 9 Feb 2023 14:08:20 +0200 Subject: [PATCH 5/7] minor --- injection/health_check.go | 10 +++++----- injection/health_check_test.go | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/injection/health_check.go b/injection/health_check.go index 57a26aa5c0..72cb2c7cdb 100644 --- a/injection/health_check.go +++ b/injection/health_check.go @@ -100,6 +100,11 @@ func getReadinessHandleOrDefault(ctx context.Context) *http.HandlerFunc { type addLivenessKey struct{} +// AddLiveness signals to probe setup logic to add a user provided probe handler +func AddLiveness(ctx context.Context, handlerFunc http.HandlerFunc) context.Context { + return context.WithValue(ctx, addLivenessKey{}, &handlerFunc) +} + func getLivenessHandleOrDefault(ctx context.Context) *http.HandlerFunc { if ctx.Value(addLivenessKey{}) != nil { return ctx.Value(addLivenessKey{}).(*http.HandlerFunc) @@ -107,8 +112,3 @@ func getLivenessHandleOrDefault(ctx context.Context) *http.HandlerFunc { defaultHandle := newDefaultProbesHandle(ctx) return &defaultHandle } - -// AddLiveness signals to probe setup logic to add a user provided probe handler -func AddLiveness(ctx context.Context, handlerFunc http.HandlerFunc) context.Context { - return context.WithValue(ctx, addLivenessKey{}, &handlerFunc) -} diff --git a/injection/health_check_test.go b/injection/health_check_test.go index b9d68eb9ae..745ef39af1 100644 --- a/injection/health_check_test.go +++ b/injection/health_check_test.go @@ -36,19 +36,19 @@ func TestHealthCheckHandler(t *testing.T) { expectedReadiness: http.StatusOK, expectedLiveness: http.StatusOK, }, { - name: "user provided readiness health check handler, liveness default handler is used", + name: "user provided custom readiness health check handler, liveness default handler is used", ctx: AddReadiness(context.Background(), testHandler()), expectedReadiness: http.StatusBadGateway, expectedLiveness: http.StatusOK, }, { - name: "user provided liveness health check handler, readiness default handler is used", + name: "user provided custom liveness health check handler, readiness default handler is used", ctx: AddLiveness(context.Background(), testHandler()), expectedReadiness: http.StatusOK, expectedLiveness: http.StatusBadGateway, }, { - name: "user provided custom probes", + name: "user provided custom health check handlers", ctx: AddReadiness(AddLiveness(context.Background(), testHandler()), testHandler()), - expectedReadiness: http.StatusOK, + expectedReadiness: http.StatusBadGateway, expectedLiveness: http.StatusBadGateway, }} for _, tc := range tests { From a145416fffb746b42f3538ba6f299d09b25d9050 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 9 Feb 2023 14:13:25 +0200 Subject: [PATCH 6/7] internal only --- injection/sharedmain/main.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 0f776e6b5f..f604513008 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -314,7 +314,7 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto }) // Setup default health checks to catch issues with cache sync etc. - if !HealthProbesDisabled(ctx) { + if !healthProbesDisabled(ctx) { injection.ServeHealthProbes(ctx) } @@ -336,8 +336,7 @@ func WithHealthProbesDisabled(ctx context.Context) context.Context { return context.WithValue(ctx, healthProbesDisabledKey{}, struct{}{}) } -// HealthProbesDisabled checks if default health checks are disabled in the related context. -func HealthProbesDisabled(ctx context.Context) bool { +func healthProbesDisabled(ctx context.Context) bool { return ctx.Value(healthProbesDisabledKey{}) != nil } From d296136e01740d4d98d9bc9b356b1f18c9468e8b Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Wed, 15 Feb 2023 13:34:28 +0200 Subject: [PATCH 7/7] fixes --- injection/health_check.go | 35 +++++++++++++++-------------------- injection/sharedmain/main.go | 4 +++- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/injection/health_check.go b/injection/health_check.go index 72cb2c7cdb..2899c7e35b 100644 --- a/injection/health_check.go +++ b/injection/health_check.go @@ -20,36 +20,31 @@ import ( "context" "errors" "net/http" - "os" + "strconv" "time" "knative.dev/pkg/logging" ) +// HealthCheckDefaultPort defines the default port number for health probes +const HealthCheckDefaultPort = 8080 + // ServeHealthProbes sets up liveness and readiness probes. // If user sets no probes explicitly via the context then defaults are added. -func ServeHealthProbes(ctx context.Context) { +func ServeHealthProbes(ctx context.Context, port int) error { logger := logging.FromContext(ctx) - port := os.Getenv("KNATIVE_HEALTH_PROBES_PORT") - if port == "" { - port = "8080" - } - - server := http.Server{ReadHeaderTimeout: time.Minute, Handler: muxWithHandles(ctx), Addr: ":" + port} - + server := http.Server{ReadHeaderTimeout: time.Minute, Handler: muxWithHandles(ctx), Addr: ":" + strconv.Itoa(port)} go func() { - go func() { - <-ctx.Done() - _ = server.Shutdown(ctx) - }() - - // start the web server on port and accept requests - logger.Infof("Probes server listening on port %s", port) - - if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - logger.Fatal(err) - } + <-ctx.Done() + _ = server.Shutdown(ctx) }() + + // start the web server on port and accept requests + logger.Infof("Probes server listening on port %s", port) + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil } func muxWithHandles(ctx context.Context) *http.ServeMux { diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index f604513008..25562e965e 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -315,7 +315,9 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto // Setup default health checks to catch issues with cache sync etc. if !healthProbesDisabled(ctx) { - injection.ServeHealthProbes(ctx) + eg.Go(func() error { + return injection.ServeHealthProbes(ctx, injection.HealthCheckDefaultPort) + }) } // This will block until either a signal arrives or one of the grouped functions