diff --git a/internal/server/event_handlers.go b/internal/server/event_handlers.go index 5e7a4ce51..556969841 100644 --- a/internal/server/event_handlers.go +++ b/internal/server/event_handlers.go @@ -19,10 +19,8 @@ package server import ( "context" "crypto/x509" - "encoding/json" "errors" "fmt" - "io" "net/http" "regexp" "strings" @@ -45,30 +43,13 @@ import ( func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { - r.Context() - body, err := io.ReadAll(r.Body) - if err != nil { - s.logger.Error(err, "reading the request body failed") - w.WriteHeader(http.StatusBadRequest) - return - } - defer r.Body.Close() - - event := &eventv1.Event{} - err = json.Unmarshal(body, event) - if err != nil { - s.logger.Error(err, "decoding the request body failed") - w.WriteHeader(http.StatusBadRequest) - return - } - - cleanupMetadata(event) + event := r.Context().Value(eventContextKey{}).(*eventv1.Event) ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second) defer cancel() var allAlerts apiv1beta2.AlertList - err = s.kubeClient.List(ctx, &allAlerts) + err := s.kubeClient.List(ctx, &allAlerts) if err != nil { s.logger.Error(err, "listing alerts failed") w.WriteHeader(http.StatusBadRequest) @@ -342,28 +323,6 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even return false } -// cleanupMetadata removes metadata entries which are not used for alerting -func cleanupMetadata(event *eventv1.Event) { - group := event.InvolvedObject.GetObjectKind().GroupVersionKind().Group - excludeList := []string{ - fmt.Sprintf("%s/%s", group, eventv1.MetaChecksumKey), - fmt.Sprintf("%s/%s", group, eventv1.MetaDigestKey), - } - - meta := make(map[string]string) - if event.Metadata != nil && len(event.Metadata) > 0 { - // Filter other meta based on group prefix, while filtering out excludes - for key, val := range event.Metadata { - if strings.HasPrefix(key, group) && !inList(excludeList, key) { - newKey := strings.TrimPrefix(key, fmt.Sprintf("%s/", group)) - meta[newKey] = val - } - } - } - - event.Metadata = meta -} - func inList(l []string, i string) bool { for _, v := range l { if strings.EqualFold(v, i) { diff --git a/internal/server/event_server.go b/internal/server/event_server.go index 856da3907..76ae042b6 100644 --- a/internal/server/event_server.go +++ b/internal/server/event_server.go @@ -38,6 +38,8 @@ import ( eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" ) +type eventContextKey struct{} + // EventServer handles event POST requests type EventServer struct { port string @@ -63,8 +65,16 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid s.logger.Error(err, "Event server crashed") os.Exit(1) } + var handler http.Handler = http.HandlerFunc(s.handleEvent()) + for _, middleware := range []func(http.Handler) http.Handler{ + limitMiddleware.Handle, + s.logRateLimitMiddleware, + s.cleanupMetadataMiddleware, + } { + handler = middleware(handler) + } mux := http.NewServeMux() - mux.Handle("/", s.logRateLimitMiddleware(limitMiddleware.Handle(http.HandlerFunc(s.handleEvent())))) + mux.Handle("/", handler) h := std.Handler("", mdlw, mux) srv := &http.Server{ Addr: s.port, @@ -90,6 +100,59 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid } } +// cleanupMetadataMiddleware cleans up the metadata using cleanupMetadata() and +// adds the cleaned event in the request context which can then be queried and +// used directly by the other http handlers. +func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + s.logger.Error(err, "reading the request body failed") + w.WriteHeader(http.StatusBadRequest) + return + } + r.Body.Close() + r.Body = io.NopCloser(bytes.NewBuffer(body)) + + event := &eventv1.Event{} + err = json.Unmarshal(body, event) + if err != nil { + s.logger.Error(err, "decoding the request body failed") + w.WriteHeader(http.StatusBadRequest) + return + } + + cleanupMetadata(event) + + ctxWithEvent := context.WithValue(r.Context(), eventContextKey{}, event) + reqWithEvent := r.WithContext(ctxWithEvent) + + h.ServeHTTP(w, reqWithEvent) + }) +} + +// cleanupMetadata removes metadata entries which are not used for alerting. +func cleanupMetadata(event *eventv1.Event) { + group := event.InvolvedObject.GetObjectKind().GroupVersionKind().Group + excludeList := []string{ + fmt.Sprintf("%s/%s", group, eventv1.MetaChecksumKey), + fmt.Sprintf("%s/%s", group, eventv1.MetaDigestKey), + } + + meta := make(map[string]string) + if event.Metadata != nil && len(event.Metadata) > 0 { + // Filter other meta based on group prefix, while filtering out excludes + for key, val := range event.Metadata { + if strings.HasPrefix(key, group) && !inList(excludeList, key) { + newKey := strings.TrimPrefix(key, fmt.Sprintf("%s/", group)) + meta[newKey] = val + } + } + } + + event.Metadata = meta +} + type statusRecorder struct { http.ResponseWriter Status int @@ -109,23 +172,7 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler { h.ServeHTTP(recorder, r) if recorder.Status == http.StatusTooManyRequests { - body, err := io.ReadAll(r.Body) - if err != nil { - s.logger.Error(err, "reading the request body failed") - w.WriteHeader(http.StatusBadRequest) - return - } - - event := &eventv1.Event{} - err = json.Unmarshal(body, event) - if err != nil { - s.logger.Error(err, "decoding the request body failed") - w.WriteHeader(http.StatusBadRequest) - return - } - - r.Body = io.NopCloser(bytes.NewBuffer(body)) - + event := r.Context().Value(eventContextKey{}).(*eventv1.Event) s.logger.V(1).Info("Discarding event, rate limiting duplicate events", "reconciler kind", event.InvolvedObject.Kind, "name", event.InvolvedObject.Name, @@ -135,24 +182,21 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler { } func eventKeyFunc(r *http.Request) (string, error) { - body, err := io.ReadAll(r.Body) - if err != nil { - return "", err + event := r.Context().Value(eventContextKey{}).(*eventv1.Event) + + comps := []string{ + "event", + event.InvolvedObject.Name, + event.InvolvedObject.Namespace, + event.InvolvedObject.Kind, + event.Message, } - event := &eventv1.Event{} - err = json.Unmarshal(body, event) - if err != nil { - return "", err - } - - r.Body = io.NopCloser(bytes.NewBuffer(body)) - - comps := []string{"event", event.InvolvedObject.Name, event.InvolvedObject.Namespace, event.InvolvedObject.Kind, event.Message} revString, ok := event.Metadata[eventv1.MetaRevisionKey] if ok { comps = append(comps, revString) } + val := strings.Join(comps, "/") digest := sha256.Sum256([]byte(val)) return fmt.Sprintf("%x", digest), nil diff --git a/internal/server/event_server_test.go b/internal/server/event_server_test.go index 00a6e33a1..dc4daa429 100644 --- a/internal/server/event_server_test.go +++ b/internal/server/event_server_test.go @@ -18,6 +18,7 @@ package server import ( "bytes" + "context" "encoding/json" "fmt" "net/http" @@ -53,6 +54,7 @@ func TestEventKeyFunc(t *testing.T) { severity string message string rateLimit bool + metadata map[string]string }{ { involvedObject: corev1.ObjectReference{ @@ -120,21 +122,66 @@ func TestEventKeyFunc(t *testing.T) { message: "Health check passed", rateLimit: true, }, + { + involvedObject: corev1.ObjectReference{ + APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + Kind: "Kustomization", + Name: "4", + Namespace: "4", + }, + severity: eventv1.EventSeverityInfo, + message: "Health check passed", + metadata: map[string]string{ + fmt.Sprintf("%s/%s", "kustomize.toolkit.fluxcd.io", eventv1.MetaRevisionKey): "rev1", + }, + rateLimit: false, + }, + { + involvedObject: corev1.ObjectReference{ + APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + Kind: "Kustomization", + Name: "4", + Namespace: "4", + }, + severity: eventv1.EventSeverityInfo, + message: "Health check passed", + metadata: map[string]string{ + fmt.Sprintf("%s/%s", "kustomize.toolkit.fluxcd.io", eventv1.MetaRevisionKey): "rev1", + }, + rateLimit: true, + }, + { + involvedObject: corev1.ObjectReference{ + APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1", + Kind: "Kustomization", + Name: "4", + Namespace: "4", + }, + severity: eventv1.EventSeverityInfo, + message: "Health check passed", + metadata: map[string]string{ + fmt.Sprintf("%s/%s", "kustomize.toolkit.fluxcd.io", eventv1.MetaRevisionKey): "rev2", + }, + rateLimit: false, + }, } for i, tt := range tests { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - event := eventv1.Event{ + event := &eventv1.Event{ InvolvedObject: tt.involvedObject, Severity: tt.severity, Message: tt.message, + Metadata: tt.metadata, } + cleanupMetadata(event) eventData, err := json.Marshal(event) g.Expect(err).ShouldNot(gomega.HaveOccurred()) - req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData)) - g.Expect(err).ShouldNot(gomega.HaveOccurred()) res := httptest.NewRecorder() - handler.ServeHTTP(res, req) + req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData)) + ctxWithEvent := context.WithValue(req.Context(), eventContextKey{}, event) + reqWithEvent := req.WithContext(ctxWithEvent) + handler.ServeHTTP(res, reqWithEvent) if tt.rateLimit { g.Expect(res.Code).Should(gomega.Equal(429))