Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-909 fix servicemonitor & prom rule reconcile #290

Merged
merged 3 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
with:
files: ./cover.out
flags: unittests
fail_ci_if_error: true
fail_ci_if_error: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that the solution to flaky test coverage ? Will it skip if it fails to upload for example ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah well, IMO coverage upload is not something critical enough to make whole CI fail, given how unstable it is


bundle-check:
runs-on: ubuntu-latest
Expand Down
30 changes: 13 additions & 17 deletions controllers/consoleplugin/consoleplugin_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,9 @@ func NewReconciler(cl reconcilers.ClientHelper, ns, prevNS, imageName string, av
return CPReconciler{ClientHelper: cl, nobjMngr: nobjMngr, owned: owned, image: imageName, availableAPIs: availableAPIs}
}

// InitStaticResources inits some "static" / one-shot resources, usually not subject to reconciliation
func (r *CPReconciler) InitStaticResources(ctx context.Context) error {
cr := buildClusterRole()
return r.ReconcileClusterRole(ctx, cr)
}

// PrepareNamespaceChange cleans up old namespace and restore the relevant "static" resources
func (r *CPReconciler) PrepareNamespaceChange(ctx context.Context) error {
// Switching namespace => delete everything in the previous namespace
// CleanupNamespace cleans up old namespace
func (r *CPReconciler) CleanupNamespace(ctx context.Context) {
r.nobjMngr.CleanupPreviousNamespace(ctx)
cr := buildClusterRole()
return r.ReconcileClusterRole(ctx, cr)
}

// Reconcile is the reconciler entry point to reconcile the current plugin state with the desired configuration
Expand Down Expand Up @@ -147,6 +138,11 @@ func (r *CPReconciler) reconcilePermissions(ctx context.Context, builder *builde
return r.CreateOwned(ctx, builder.serviceAccount())
} // update not needed for now

cr := buildClusterRole()
if err := r.ReconcileClusterRole(ctx, cr); err != nil {
return err
}

desired := builder.clusterRoleBinding()
if err := r.ReconcileClusterRoleBinding(ctx, desired); err != nil {
return err
Expand Down Expand Up @@ -227,18 +223,18 @@ func (r *CPReconciler) reconcileService(ctx context.Context, builder builder, de
if err := r.CreateOwned(ctx, newSVC); err != nil {
return err
}
if r.availableAPIs.HasSvcMonitor() {
serviceMonitor := builder.serviceMonitor()
if err := r.CreateOwned(ctx, serviceMonitor); err != nil {
return err
}
}
} else if serviceNeedsUpdate(r.owned.service, &desired.ConsolePlugin, &report) {
newSVC := builder.service(r.owned.service)
if err := r.UpdateOwned(ctx, r.owned.service, newSVC); err != nil {
return err
}
}
if r.availableAPIs.HasSvcMonitor() {
serviceMonitor := builder.serviceMonitor()
if err := reconcilers.GenericReconcile(ctx, r.nobjMngr, &r.ClientHelper, r.owned.serviceMonitor, serviceMonitor, &report, helper.ServiceMonitorChanged); err != nil {
return err
}
}
return nil
}

Expand Down
27 changes: 4 additions & 23 deletions controllers/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,31 +202,12 @@ func (r *FlowCollectorReconciler) handleNamespaceChanged(
cpReconciler *consoleplugin.CPReconciler,
) error {
log := log.FromContext(ctx)
if oldNS == "" {
// First install: create one-shot resources
log.Info("FlowCollector first install: creating initial resources")
err := flpReconciler.InitStaticResources(ctx)
if err != nil {
return err
}
if r.availableAPIs.HasConsolePlugin() {
err := cpReconciler.InitStaticResources(ctx)
if err != nil {
return err
}
}
} else {
if oldNS != "" {
// Namespace updated, clean up previous namespace
log.Info("FlowCollector namespace change detected: cleaning up previous namespace and preparing next one", "old namespace", oldNS, "new namepace", newNS)
err := flpReconciler.PrepareNamespaceChange(ctx)
if err != nil {
return err
}
log.Info("FlowCollector namespace change detected: cleaning up previous namespace", "old namespace", oldNS, "new namepace", newNS)
flpReconciler.CleanupNamespace(ctx)
if r.availableAPIs.HasConsolePlugin() {
err := cpReconciler.PrepareNamespaceChange(ctx)
if err != nil {
return err
}
cpReconciler.CleanupNamespace(ctx)
}
}

Expand Down
31 changes: 31 additions & 0 deletions controllers/flowcollector_controller_console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
operatorsv1 "github.com/openshift/api/operator/v1"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
appsv1 "k8s.io/api/apps/v1"
ascv2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -172,6 +173,36 @@ func flowCollectorConsolePluginSpecs() {
return svc.Spec.Ports[0].Port
}, timeout, interval).Should(Equal(int32(9099)))
})

It("Should create desired objects when they're not found (e.g. case of an operator upgrade)", func() {
sm := monitoringv1.ServiceMonitor{}

By("Expecting ServiceMonitor to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "netobserv-plugin",
Namespace: cpNamespace,
}, &sm)
}, timeout, interval).Should(Succeed())

// Manually delete ServiceMonitor
By("Deleting ServiceMonitor")
Eventually(func() error {
return k8sClient.Delete(ctx, &sm)
}, timeout, interval).Should(Succeed())

// Do a dummy change that will trigger reconcile, and make sure SM is created again
UpdateCR(crKey, func(fc *flowslatest.FlowCollector) {
fc.Spec.Processor.LogLevel = "info"
})
By("Expecting ServiceMonitor to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "netobserv-plugin",
Namespace: cpNamespace,
}, &sm)
}, timeout, interval).Should(Succeed())
})
})

Context("Configuring the Loki URL", func() {
Expand Down
68 changes: 67 additions & 1 deletion controllers/flowcollector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
appsv1 "k8s.io/api/apps/v1"
ascv2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -339,6 +340,71 @@ func flowCollectorControllerSpecs() {
return ofc.Data["sampling"]
}, timeout, interval).Should(Equal("1"))
})

It("Should create desired objects when they're not found (e.g. case of an operator upgrade)", func() {
psvc := v1.Service{}
sm := monitoringv1.ServiceMonitor{}
pr := monitoringv1.PrometheusRule{}
By("Expecting prometheus service to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-prom",
Namespace: operatorNamespace,
}, &psvc)
}, timeout, interval).Should(Succeed())

By("Expecting ServiceMonitor to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-monitor",
Namespace: operatorNamespace,
}, &sm)
}, timeout, interval).Should(Succeed())

By("Expecting PrometheusRule to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-alert",
Namespace: operatorNamespace,
}, &pr)
}, timeout, interval).Should(Succeed())

// Manually delete ServiceMonitor
By("Deleting ServiceMonitor")
Eventually(func() error {
return k8sClient.Delete(ctx, &sm)
}, timeout, interval).Should(Succeed())

// Do a dummy change that will trigger reconcile, and make sure SM is created again
UpdateCR(crKey, func(fc *flowslatest.FlowCollector) {
fc.Spec.Processor.LogLevel = "info"
})
By("Expecting ServiceMonitor to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-monitor",
Namespace: operatorNamespace,
}, &sm)
}, timeout, interval).Should(Succeed())

// Manually delete Rule
By("Deleting prom rule")
Eventually(func() error {
return k8sClient.Delete(ctx, &pr)
}, timeout, interval).Should(Succeed())

// Do a dummy change that will trigger reconcile, and make sure Rule is created again
UpdateCR(crKey, func(fc *flowslatest.FlowCollector) {
fc.Spec.Processor.LogLevel = "debug"
})
By("Expecting PrometheusRule to exist")
Eventually(func() interface{} {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "flowlogs-pipeline-alert",
Namespace: operatorNamespace,
}, &pr)
}, timeout, interval).Should(Succeed())
})
})

Context("With Kafka", func() {
Expand Down Expand Up @@ -739,7 +805,7 @@ func UpdateCR(key types.NamespacedName, updater func(*flowslatest.FlowCollector)
Eventually(func() error {
updater(cr)
return k8sClient.Update(ctx, cr)
}).Should(Succeed())
}, timeout, interval).Should(Succeed())
}

func checkDigestUpdate(oldDigest *string, annots map[string]string) error {
Expand Down
53 changes: 21 additions & 32 deletions controllers/flowlogspipeline/flp_ingest_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

flowslatest "github.com/netobserv/network-observability-operator/api/v1beta1"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/reconcilers"
"github.com/netobserv/network-observability-operator/pkg/helper"
)

Expand Down Expand Up @@ -50,6 +51,8 @@ func newIngesterReconciler(info *reconcilersCommonInfo) *flpIngesterReconciler {
info.nobjMngr.AddManagedObject(configMapName(ConfKafkaIngester), owned.configMap)
if info.availableAPIs.HasSvcMonitor() {
info.nobjMngr.AddManagedObject(serviceMonitorName(ConfKafkaIngester), owned.serviceMonitor)
}
if info.availableAPIs.HasPromRule() {
info.nobjMngr.AddManagedObject(prometheusRuleName(ConfKafkaIngester), owned.prometheusRule)
}

Expand All @@ -64,18 +67,9 @@ func (r *flpIngesterReconciler) context(ctx context.Context) context.Context {
return log.IntoContext(ctx, l)
}

// initStaticResources inits some "static" / one-shot resources, usually not subject to reconciliation
func (r *flpIngesterReconciler) initStaticResources(ctx context.Context) error {
cr := buildClusterRoleIngester(r.useOpenShiftSCC)
return r.ReconcileClusterRole(ctx, cr)
}

// PrepareNamespaceChange cleans up old namespace and restore the relevant "static" resources
func (r *flpIngesterReconciler) prepareNamespaceChange(ctx context.Context) error {
// Switching namespace => delete everything in the previous namespace
// cleanupNamespace cleans up old namespace
func (r *flpIngesterReconciler) cleanupNamespace(ctx context.Context) {
r.nobjMngr.CleanupPreviousNamespace(ctx)
cr := buildClusterRoleIngester(r.useOpenShiftSCC)
return r.ReconcileClusterRole(ctx, cr)
}

func (r *flpIngesterReconciler) reconcile(ctx context.Context, desired *flowslatest.FlowCollector) error {
Expand Down Expand Up @@ -126,34 +120,24 @@ func (r *flpIngesterReconciler) reconcilePrometheusService(ctx context.Context,
if err := r.CreateOwned(ctx, builder.newPromService()); err != nil {
return err
}
if r.availableAPIs.HasSvcMonitor() {
if err := r.CreateOwned(ctx, builder.generic.serviceMonitor()); err != nil {
return err
}
if err := r.CreateOwned(ctx, builder.generic.prometheusRule()); err != nil {
} else {
newSVC := builder.fromPromService(r.owned.promService)
if helper.ServiceChanged(r.owned.promService, newSVC, &report) {
if err := r.UpdateOwned(ctx, r.owned.promService, newSVC); err != nil {
return err
}
}
return nil
}
newSVC := builder.fromPromService(r.owned.promService)
if helper.ServiceChanged(r.owned.promService, newSVC, &report) {
if err := r.UpdateOwned(ctx, r.owned.promService, newSVC); err != nil {
if r.availableAPIs.HasSvcMonitor() {
serviceMonitor := builder.generic.serviceMonitor()
if err := reconcilers.GenericReconcile(ctx, r.nobjMngr, &r.ClientHelper, r.owned.serviceMonitor, serviceMonitor, &report, helper.ServiceMonitorChanged); err != nil {
return err
}
}
if r.availableAPIs.HasSvcMonitor() {
newMonitorSvc := builder.generic.serviceMonitor()
if helper.ServiceMonitorChanged(r.owned.serviceMonitor, newMonitorSvc) {
if err := r.UpdateOwned(ctx, r.owned.serviceMonitor, newMonitorSvc); err != nil {
return err
}
}
newPromRules := builder.generic.prometheusRule()
if helper.PrometheusRuleChanged(r.owned.prometheusRule, newPromRules) {
if err := r.UpdateOwned(ctx, r.owned.prometheusRule, newPromRules); err != nil {
return err
}
if r.availableAPIs.HasPromRule() {
promRules := builder.generic.prometheusRule()
if err := reconcilers.GenericReconcile(ctx, r.nobjMngr, &r.ClientHelper, r.owned.prometheusRule, promRules, &report, helper.PrometheusRuleChanged); err != nil {
return err
}
}
return nil
Expand Down Expand Up @@ -183,6 +167,11 @@ func (r *flpIngesterReconciler) reconcilePermissions(ctx context.Context, builde
return r.CreateOwned(ctx, builder.serviceAccount())
} // We only configure name, update is not needed for now

cr := buildClusterRoleIngester(r.useOpenShiftSCC)
if err := r.ReconcileClusterRole(ctx, cr); err != nil {
return err
}

desired := builder.clusterRoleBinding()
if err := r.ClientHelper.ReconcileClusterRoleBinding(ctx, desired); err != nil {
return err
Expand Down
Loading